UDPServer.java
7.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package com.huaheng.api.Weighing.server;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.huaheng.api.Weighing.service.BusinessService;
import com.huaheng.common.utils.StringUtils;
import com.huaheng.pc.config.weight.domain.KuaidiWeight;
import com.huaheng.pc.config.weight.service.KuaidiWeightService;
import com.huaheng.pc.shipment.kuaidiHeader.domain.KuaidiHeader;
import com.huaheng.pc.shipment.kuaidiHeader.service.KuaidiHeaderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Udp;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* @ClassName UDPServer
* @Description TODO
* @Author Administrator
* @Date 2020/12/215:04
*/
@Configuration
public class UDPServer {
private static final Logger
logger = LoggerFactory.getLogger(UDPServer.class);
/**测试地址**/
// private static final Integer udpPort = 8016;
/**
* 正式地址
**/
private static final Integer udpPort = 8900;
@Resource
private BusinessService
businessService;
@Resource
KuaidiWeightService kuaidiWeightService;
@Resource
KuaidiHeaderService kuaidiHeaderService;
/**
* UDP消息接收服务写法一
* https://docs.spring.io/spring-integration/reference/html/ip.html#inbound-udp-adapters-java-configuration
*
* @param
* @return org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/14 11:00
*/
/*@Bean
public UnicastReceivingChannelAdapter unicastReceivingChannelAdapter() {
// 实例化一个UDP消息接收服务
UnicastReceivingChannelAdapter unicastReceivingChannelAdapter = new UnicastReceivingChannelAdapter(udpPort);
// unicastReceivingChannelAdapter.setOutputChannel(new DirectChannel());
unicastReceivingChannelAdapter.setOutputChannelName("udpChannel");
logger.info("UDP服务启动成功,端口号为: {}", udpPort);
return unicastReceivingChannelAdapter;
}*/
/**
* UDP消息接收服务写法二
* https://docs.spring.io/spring-integration/reference/html/ip.html#inbound-udp-adapters-java-dsl-configuration
*
* @param
* @return org.springframework.integration.dsl.IntegrationFlow
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 16:08
*/
@Bean
public IntegrationFlow integrationFlow() {
logger.info("UDP服务启动成功,端口号为: {}" + udpPort);
return IntegrationFlows.from(Udp.inboundAdapter(udpPort)).channel("udpChannel").get();
}
/**
* 转换器
*
* @param payload
* @param headers
* @return java.lang.String
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:30
*/
@Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter")
public String transformer(@Payload byte[] payload, @Headers Map<String, Object> headers) {
String message = new String(payload);
// 转换为大写
// message = message.toUpperCase();
return message;
}
/**
* 过滤器
*
* @param message
* @param headers
* @return boolean
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:30
*/
@Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
public boolean filter(String message, @Headers Map<String, Object> headers) {
// 获取来源Id
String id = headers.get("id").toString();
// 获取来源IP,可以进行IP过滤
String ip = headers.get("ip_address").toString();
// 获取来源Port
String port = headers.get("ip_port").toString();
// 信息数据过滤
/*if (message.indexOf("-") < 0) {
// 没有-的数据会被过滤
return false;
}*/
return true;
}
/**
* 路由分发处理器
*
* @param message
* @param headers
* @return java.lang.String
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:35
*/
@Router(inputChannel = "udpRouter")
public String router(String message, @Headers Map<String, Object> headers) {
// 获取来源Id
String id = headers.get("id").toString();
// 获取来源IP,可以进行IP过滤
String ip = headers.get("ip_address").toString();
// 获取来源Port
String port = headers.get("ip_port").toString();
// 筛选,走那个处理器
// if (false) {
// return "udpHandle2";
// }
return "udpHandle1";
}
/**
* 最终处理器1
*
* @param message
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/20 15:12
*/
@ServiceActivator(inputChannel = "udpHandle1")
public void udpMessageHandle(String message, @Headers Map<String, Object> headers) throws Exception {
// 可以进行异步处理
// businessService.udpHandleMethod(message);
logger.info("UDP1:{}" + message);
List<String>
code = Arrays.asList(message.split(","));
String trackCode = code.get(0);
String weight = code.get(1);
KuaidiWeight kuaidiWeight = new KuaidiWeight();
kuaidiWeight.setTrackCode(trackCode);
kuaidiWeight.setWeight(weight);
kuaidiWeight.setCreated(new Date());
kuaidiWeightService.insertWeight(kuaidiWeight);
KuaidiHeader kuaidiHeader = new KuaidiHeader();
kuaidiHeader = kuaidiHeaderService.getOne(new LambdaQueryWrapper<KuaidiHeader>().eq(KuaidiHeader::getTrackCode, trackCode).orderByDesc(KuaidiHeader::getId).last("limit 1"));
if (kuaidiHeader != null) {
if (StringUtils.isEmpty(kuaidiHeader.getTotalWeight())) {
kuaidiHeader.setTotalWeight(weight);
} else {
if (kuaidiHeader.getTotalWeight().compareTo(weight) < 0) {
kuaidiHeader.setOpenWeight(weight);
} else {
kuaidiHeader.setOpenWeight(kuaidiHeader.getTotalWeight());
kuaidiHeader.setTotalWeight(weight);
}
}
kuaidiHeaderService.updateById(kuaidiHeader);
} else {
KuaidiHeader model = new KuaidiHeader();
model.setWarehouseId(2);
model.setWarehouseCode("SS0001");
model.setTotalWeight(weight);
model.setTrackCode(trackCode);
model.setDeleted(true);
kuaidiHeaderService.save(model);
}
}
/**
* 最终处理器2
*
* @param message
* @return void
* @throws
* @author wliduo[i@dolyw.com]
* @date 2020/5/14 11:02
*/
@ServiceActivator(inputChannel = "udpHandle2")
public void udpMessageHandle2(String message, @Headers Map<String, Object> headers) throws Exception {
logger.info("UDP2:" + message);
System.out.println("UDP2:" + message);
}
}