RabbitTaskSender.java 1.9 KB
package com.huaheng.pc.task.taskHeader.service;

import com.huaheng.common.constant.Constants;
import com.huaheng.pc.monitor.message.domain.Messages;
import com.huaheng.pc.monitor.message.service.BrokerMessageLogService;
import com.huaheng.pc.task.taskHeader.domain.TaskHeader;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;

/**
 * Created by Enzo Cotter on 2020/4/16.
 */
@Component
public class RabbitTaskSender {

    //自动注入RabbitTemplate模板类
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private BrokerMessageLogService brokerMessageLogService;

    //回调函数: confirm确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            String messageId = correlationData.getId();
            if(ack){
                //如果confirm返回成功 则进行更新
                brokerMessageLogService.changeBrokerMessageLogStatus(messageId, Constants.SEND_SUCCESS, new Date());
            } else {
                //失败则进行具体的后续操作:重试 或者补偿等手段
                System.err.println("异常处理...");
            }
        }
    };

    //发送消息方法调用: 构建自定义对象消息
    public void sendTask(Messages messages) {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        //消息唯一ID
        CorrelationData correlationData = new CorrelationData(String.valueOf(messages.getId()));
        rabbitTemplate.convertAndSend("wms_exchange", "task.Issued", messages.getMsgBody(), correlationData);
    }
}