package com.huaheng.pc.task.taskHeader.service; import com.huaheng.common.constant.Constants; import com.huaheng.pc.monitor.message.domain.Messages; import com.huaheng.pc.monitor.message.service.BrokerMessageLogService; import com.huaheng.pc.task.taskHeader.domain.TaskHeader; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Date; /** * Created by Enzo Cotter on 2020/4/16. */ @Component public class RabbitTaskSender { //自动注入RabbitTemplate模板类 @Resource private RabbitTemplate rabbitTemplate; @Resource private BrokerMessageLogService brokerMessageLogService; //回调函数: confirm确认 final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData: " + correlationData); String messageId = correlationData.getId(); if(ack){ //如果confirm返回成功 则进行更新 brokerMessageLogService.changeBrokerMessageLogStatus(messageId, Constants.SEND_SUCCESS, new Date()); } else { //失败则进行具体的后续操作:重试 或者补偿等手段 System.err.println("异常处理..."); } } }; //发送消息方法调用: 构建自定义对象消息 public void sendTask(Messages messages) { rabbitTemplate.setConfirmCallback(confirmCallback); //消息唯一ID CorrelationData correlationData = new CorrelationData(String.valueOf(messages.getId())); rabbitTemplate.convertAndSend("wms_exchange", "task.Issued", messages.getMsgBody(), correlationData); } }