StompWebSocketServiceImpl.java
4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.huaheng.pc.stompwebsocket.service;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.huaheng.pc.stompwebsocket.WebsocketConstants;
import com.huaheng.pc.stompwebsocket.domain.Message;
import com.huaheng.pc.stompwebsocket.domain.StompMessage;
import com.huaheng.pc.stompwebsocket.domain.StompPayload;
import com.huaheng.pc.stompwebsocket.mapper.StompMessageMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
/**
* @author
*/
@Service
public class StompWebSocketServiceImpl extends ServiceImpl<StompMessageMapper, StompMessage> implements StompWebSocketService {
@Autowired
private SimpMessageSendingOperations messaging;
/**
* 快速推荐消息到用户
* @param user
* @param title
* @param content
*/
public void quickSendToUser(String user, String title, String content){
StompPayload<Message> p = new StompPayload<>();
p.addUser(user);
Message msg = new Message();
msg.setTitle(title);
msg.setContent(content);
p.setData(msg);
p.setQos(1);
send(WebsocketConstants.TOPIC_MESSAGE, p);
}
public void quickBroadcast(String title, String content){
StompPayload<Message> p = new StompPayload<>();
Message msg = new Message();
msg.setTitle(title);
msg.setContent(content);
p.setData(msg);
p.setQos(0);
send(WebsocketConstants.TOPIC_MESSAGE, p);
}
/**
* 发送消息
*
* @param topic
* @param payload
* @param <T>
*/
public <T> void send(String topic, StompPayload<T> payload) {
List<String> sendTo = payload.getSendTo();
if (sendTo == null) {
//没有指定sendTo则广播发送
sendMessageBroadcast(topic, payload.getData());
} else {
//遍历发送消息给指定用户
for (String loginName : payload.getSendTo()) {
sendMessageToUser(topic, payload, loginName);
}
}
}
/**
* 广播发送消息
*
* @param topic
* @param data
*/
public <T> void sendMessageBroadcast(String topic, T data) {
messaging.convertAndSend(topic, data);
}
/**
* 给指定用户发消息
*
* @param topic
* @param payload
* @param user loginName
*/
public <T> void sendMessageToUser(String topic, StompPayload<T> payload, String user) {
HashMap<String, Object> headers = new HashMap<>();
if (payload.isQos1()) {
StompMessage msg = new StompMessage();
msg.setUser(user);
msg.setTopic(topic);
msg.setMsg(JSON.toJSONString(payload.getData()));
msg.setCreated(new Date());
msg.setLastUpdated(new Date());
save(msg);
headers.put(WebsocketConstants.HEADER_MEG_ID, msg.getId().toString());
}
messaging.convertAndSendToUser(user, topic, payload.getData(), headers);
}
public void confirmMsg(String msgId) {
removeById(msgId);
}
/**
* 将离线后的消息发送给用户
*
* @param user
*/
@Async
public void sendOfflineMessageToUser(String user) {
try {
Thread.sleep(2000);
LambdaQueryWrapper<StompMessage> queryWrapper = Wrappers.lambdaQuery();
queryWrapper.eq(StompMessage::getUser, user);
List<StompMessage> msgList = list(queryWrapper);
for (StompMessage msg : msgList) {
messaging.convertAndSendToUser(user, msg.getTopic(), msg.getMsg());
removeById(msg.getId());
}
} catch (Exception e) {
System.out.println("===========================================================【end】===\n\n\n");
System.out.println(" ======> e.getMessage() = " + e.getMessage());
}
}
}