package com.huaheng.pc.stompwebsocket.service; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.huaheng.common.utils.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.huaheng.pc.stompwebsocket.WebsocketConstants; import com.huaheng.pc.stompwebsocket.domain.Message; import com.huaheng.pc.stompwebsocket.domain.StompMessage; import com.huaheng.pc.stompwebsocket.domain.StompPayload; import com.huaheng.pc.stompwebsocket.mapper.StompMessageMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.Date; import java.util.HashMap; import java.util.List; @Service public class StompWebSocketServiceImpl extends ServiceImpl<StompMessageMapper, StompMessage> implements StompWebSocketService { @Autowired private SimpMessageSendingOperations messaging; /** * 快速推荐消息到用户 * @param user * @param title * @param content */ public void quickSendToUser(String user, String title, String content){ StompPayload<Message> p = new StompPayload<>(); p.addUser(user); Message msg = new Message(); msg.setTitle(title); msg.setContent(content); p.setData(msg); p.setQos(1); send(WebsocketConstants.TOPIC_MESSAGE, p); } public void quickBroadcast(String title, String content){ StompPayload<Message> p = new StompPayload<>(); Message msg = new Message(); msg.setTitle(title); msg.setContent(content); p.setData(msg); p.setQos(0); send(WebsocketConstants.TOPIC_MESSAGE, p); } /** * 发送消息 * * @param topic * @param payload * @param <T> */ public <T> void send(String topic, StompPayload<T> payload) { List<String> sendTo = payload.getSendTo(); if (sendTo == null) { //没有指定sendTo则广播发送 sendMessageBroadcast(topic, payload.getData()); } else { //遍历发送消息给指定用户 for (String loginName : payload.getSendTo()) { sendMessageToUser(topic, payload, loginName); } } } /** * 广播发送消息 * * @param topic * @param data */ public <T> void sendMessageBroadcast(String topic, T data) { messaging.convertAndSend(topic, data); } /** * 给指定用户发消息 * * @param topic * @param payload * @param user loginName */ public <T> void sendMessageToUser(String topic, StompPayload<T> payload, String user) { HashMap<String, Object> headers = new HashMap<>(); if (payload.isQos1()) { StompMessage msg = new StompMessage(); msg.setUser(user); msg.setTopic(topic); msg.setMsg(JSON.toJSONString(payload.getData())); msg.setCreated(new Date()); msg.setLastUpdated(new Date()); save(msg); headers.put(WebsocketConstants.HEADER_MEG_ID, msg.getId().toString()); } messaging.convertAndSendToUser(user, topic, payload.getData(), headers); } public void confirmMsg(String msg_id) { removeById(msg_id); } /** * 将离线后的消息发送给用户 * * @param user */ @Async public void sendOfflineMessageToUser(String user) { try { Thread.sleep(2000); LambdaQueryWrapper<StompMessage> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.eq(StompMessage::getUser, user); List<StompMessage> msgList = list(queryWrapper); for (StompMessage msg : msgList) { messaging.convertAndSendToUser(user, msg.getTopic(), msg.getMsg()); removeById(msg.getId()); } } catch (Exception e) { System.out.println("===========================================================【end】===\n\n\n"); System.out.println(" ======> e.getMessage() = " + e.getMessage()); } } }