StompWebSocketServiceImpl.java 3.12 KB
package com.huaheng.pc.stompwebsocket.service;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.huaheng.pc.stompwebsocket.WebsocketConstants;
import com.huaheng.pc.stompwebsocket.domain.StompMessage;
import com.huaheng.pc.stompwebsocket.domain.StompPayload;
import com.huaheng.pc.stompwebsocket.mapper.StompMessageMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.HashMap;
import java.util.List;

@Service("stompWebSocketService")
public class StompWebSocketServiceImpl extends ServiceImpl<StompMessageMapper, StompMessage> implements StompWebSocketService{

    @Autowired
    private SimpMessageSendingOperations messaging;

    /**
     * 发送消息
     * @param topic
     * @param payload
     * @param <T>
     */
    public <T> void send(String topic, StompPayload<T> payload) {
        List<String> sendTo = payload.getSendTo();
        if (sendTo == null) {
            //没有指定sendTo则广播发送
            sendMessageBroadcast(topic, payload.getData());
        } else {
            //遍历发送消息给指定用户
            for (String loginName : payload.getSendTo()) {
                sendMessageToUser(topic, payload, loginName);
            }
        }

    }

    /**
     * 广播发送消息
     *
     * @param topic
     * @param data
     */
    public <T> void sendMessageBroadcast(String topic, T data) {
        messaging.convertAndSend(topic, data);
    }


    /**
     * 给指定用户发消息
     *
     * @param topic
     * @param payload
     * @param user  loginName
     */
    public <T> void sendMessageToUser(String topic, StompPayload<T> payload, String user) {
        HashMap<String, Object> headers = new HashMap<>();
        if(payload.isQos1()){
            StompMessage msg = new StompMessage();
            msg.setUser(user);
            msg.setTopic(topic);
            msg.setMsg(JSON.toJSONString(payload.getData()));
            msg.setCreated(new Date());
            msg.setLastUpdated(new Date());
            save(msg);
            headers.put(WebsocketConstants.HEADER_MEG_ID, msg.getId().toString());
        }
        messaging.convertAndSendToUser(user, topic, payload.getData(), headers);
    }

    public void confirmMsg(String msg_id){
        removeById(msg_id);
    }

    /**
     * 将离线后的消息发送给用户
     * @param user
     */
    public void sendOfflineMessageToUser(String user){
        LambdaQueryWrapper<StompMessage> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.eq(StompMessage::getUser, user);
        List<StompMessage> msgList = list(queryWrapper);
        for(StompMessage msg : msgList){
            messaging.convertAndSendToUser(user, msg.getTopic(), msg.getMsg());
            removeById(msg.getId());
        }
    }
}