RabbitTaskSender.java
1.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.huaheng.pc.task.taskHeader.service;
import com.huaheng.common.constant.Constants;
import com.huaheng.pc.monitor.message.domain.Messages;
import com.huaheng.pc.monitor.message.service.BrokerMessageLogService;
import com.huaheng.pc.task.taskHeader.domain.TaskHeader;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
/**
* Created by Enzo Cotter on 2020/4/16.
*/
@Component
public class RabbitTaskSender {
//自动注入RabbitTemplate模板类
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private BrokerMessageLogService brokerMessageLogService;
//回调函数: confirm确认
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
String messageId = correlationData.getId();
if(ack){
//如果confirm返回成功 则进行更新
brokerMessageLogService.changeBrokerMessageLogStatus(messageId, Constants.SEND_SUCCESS, new Date());
} else {
//失败则进行具体的后续操作:重试 或者补偿等手段
System.err.println("异常处理...");
}
}
};
//发送消息方法调用: 构建自定义对象消息
public void sendTask(Messages messages) {
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息唯一ID
CorrelationData correlationData = new CorrelationData(String.valueOf(messages.getId()));
rabbitTemplate.convertAndSend("wms_exchange", "task.Issued", messages.getMsgBody(), correlationData);
}
}