StompWebSocketServiceImpl.java 4.23 KB
package com.huaheng.pc.stompwebsocket.service;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.huaheng.common.utils.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.huaheng.pc.stompwebsocket.WebsocketConstants;
import com.huaheng.pc.stompwebsocket.domain.Message;
import com.huaheng.pc.stompwebsocket.domain.StompMessage;
import com.huaheng.pc.stompwebsocket.domain.StompPayload;
import com.huaheng.pc.stompwebsocket.mapper.StompMessageMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.HashMap;
import java.util.List;

@Service
public class StompWebSocketServiceImpl extends ServiceImpl<StompMessageMapper, StompMessage> implements StompWebSocketService {

    @Autowired
    private SimpMessageSendingOperations messaging;

    /**
     * 快速推荐消息到用户
     * @param user
     * @param title
     * @param content
     */
    public void quickSendToUser(String user, String title, String content){
        StompPayload<Message> p = new StompPayload<>();
        p.addUser(user);
        Message msg = new Message();
        msg.setTitle(title);
        msg.setContent(content);
        p.setData(msg);
        p.setQos(1);
        send(WebsocketConstants.TOPIC_MESSAGE, p);
    }

    public void quickBroadcast(String title, String content){
        StompPayload<Message> p = new StompPayload<>();
        Message msg = new Message();
        msg.setTitle(title);
        msg.setContent(content);
        p.setData(msg);
        p.setQos(0);
        send(WebsocketConstants.TOPIC_MESSAGE, p);
    }

    /**
     * 发送消息
     *
     * @param topic
     * @param payload
     * @param <T>
     */
    public <T> void send(String topic, StompPayload<T> payload) {
        List<String> sendTo = payload.getSendTo();
        if (sendTo == null) {
            //没有指定sendTo则广播发送
            sendMessageBroadcast(topic, payload.getData());
        } else {
            //遍历发送消息给指定用户
            for (String loginName : payload.getSendTo()) {
                sendMessageToUser(topic, payload, loginName);
            }
        }

    }

    /**
     * 广播发送消息
     *
     * @param topic
     * @param data
     */
    public <T> void sendMessageBroadcast(String topic, T data) {
        messaging.convertAndSend(topic, data);
    }


    /**
     * 给指定用户发消息
     *
     * @param topic
     * @param payload
     * @param user    loginName
     */
    public <T> void sendMessageToUser(String topic, StompPayload<T> payload, String user) {
        HashMap<String, Object> headers = new HashMap<>();
        if (payload.isQos1()) {
            StompMessage msg = new StompMessage();
            msg.setUser(user);
            msg.setTopic(topic);
            msg.setMsg(JSON.toJSONString(payload.getData()));
            msg.setCreated(new Date());
            msg.setLastUpdated(new Date());
            save(msg);
            headers.put(WebsocketConstants.HEADER_MEG_ID, msg.getId().toString());
        }
        messaging.convertAndSendToUser(user, topic, payload.getData(), headers);
    }

    public void confirmMsg(String msg_id) {
        removeById(msg_id);
    }

    /**
     * 将离线后的消息发送给用户
     *
     * @param user
     */
    @Async
    public void sendOfflineMessageToUser(String user) {
        try {
            Thread.sleep(2000);
            LambdaQueryWrapper<StompMessage> queryWrapper = Wrappers.lambdaQuery();
            queryWrapper.eq(StompMessage::getUser, user);
            List<StompMessage> msgList = list(queryWrapper);
            for (StompMessage msg : msgList) {
                messaging.convertAndSendToUser(user, msg.getTopic(), msg.getMsg());
                removeById(msg.getId());
            }
        } catch (Exception e) {
            System.out.println("===========================================================【end】===\n\n\n");
            System.out.println(" ======>  e.getMessage() = " + e.getMessage());
        }
    }
}