MqttMessageHandler.cs
17.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
using System.Collections.Concurrent;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet.Client;
using Rcs.Application.Services;
using Rcs.Domain.Entities;
using Rcs.Domain.Models.VDA5050;
using Rcs.Infrastructure.Mqtt.ParseFactory;
using Task = System.Threading.Tasks.Task;
using TaskStatus = System.Threading.Tasks.TaskStatus;
namespace Rcs.Infrastructure.Mqtt
{
/// <summary>
/// MQTT消息处理器,支持动态注册和动态字符串
/// </summary>
public class MqttMessageHandler : IMqttMessageHandler
{
private readonly ILogger<MqttMessageHandler> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IStateParserFactory _stateParserFactory;
private readonly IVisualizationParserFactory _visualizationParserFactory;
private readonly IRobotCacheService _robotCacheService;
private readonly ConcurrentDictionary<string, Func<string, string, string, Task<MqttHandleResult>>> _handlers;
private readonly ConcurrentDictionary<string, int> _topicHeaderId = new()
{
["connection"] = 0,
["state"] = 0,
["factsheet"] = 0,
["visualization"] = 0
};
private DateTime _lastLogTime = DateTime.MinValue;
private readonly TimeSpan _logInterval = TimeSpan.FromSeconds(15);
public MqttMessageHandler(
ILogger<MqttMessageHandler> logger,
IServiceProvider serviceProvider,
IStateParserFactory stateParserFactory,
IVisualizationParserFactory visualizationParserFactory,
IRobotCacheService robotCacheService)
{
_logger = logger;
_serviceProvider = serviceProvider;
_stateParserFactory = stateParserFactory;
_visualizationParserFactory = visualizationParserFactory;
_robotCacheService = robotCacheService;
_handlers = new ConcurrentDictionary<string, Func<string, string, string, Task<MqttHandleResult>>>();
// 注册默认处理器
RegisterDefaultHandlers();
}
/// <summary>
/// 处理接收到的MQTT消息
/// </summary>
public async Task HandleMessageAsync(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var topic = e.ApplicationMessage.Topic;
var payload = ExtractPayload(e.ApplicationMessage);
if (!topic.Contains("visualization"))
{
var now = DateTime.Now;
if (now - _lastLogTime >= _logInterval)
{
_logger.LogInformation("[MQTT] Recive | Topic={Topic} | Payload={Payload}", topic, payload);
_lastLogTime = now;
}
}
var result = await ProcessMessageAsync(topic, payload);
if (!result.Success)
{
_logger.LogWarning("[Advanced MQTT Handler] 消息处理失败 | Topic={Topic} | Error={Error}",
topic, result.Message);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[Advanced MQTT Handler] 消息处理过程中发生严重错误");
}
}
/// <summary>
/// 注册消息类型处理器
/// </summary>
/// <param name="messageType">消息类型</param>
/// <param name="handler">处理器函数</param>
public void RegisterHandler(string messageType, Func<string, string, string, Task<MqttHandleResult>> handler)
{
_handlers.AddOrUpdate(messageType.ToLowerInvariant(), handler, (key, oldValue) => handler);
_logger.LogInformation("[Advanced MQTT Handler] 注册处理器 | MessageType={MessageType}", messageType);
}
/// <summary>
/// 处理消息
/// </summary>
private async Task<MqttHandleResult> ProcessMessageAsync(string topic, string payload)
{
var topicParts = topic.Split('/');
if (topicParts.Length < 5)
{
return MqttHandleResult.CreateFailure($"主题格式不正确: {topic}");
}
var robotSerialNumber = topicParts[3];
var messageType = topicParts[4].ToLowerInvariant();
// robot制造商
var robotManufacturer = topicParts[2];
if (_handlers.TryGetValue(messageType, out var handler))
{
return await handler(robotManufacturer,robotSerialNumber, payload);
}
else
{
return MqttHandleResult.CreateFailure($"未找到消息类型 '{messageType}' 的处理器");
}
}
/// <summary>
/// 注册默认处理器
/// </summary>
private void RegisterDefaultHandlers()
{
RegisterHandler("connection", HandleConnectionMessageAsync);
RegisterHandler("state", HandleStateMessageAsync);
RegisterHandler("factsheet", HandleFactsheetMessageAsync);
RegisterHandler("visualization", HandleVisualizationMessageAsync);
}
/// <summary>
/// 处理连接状态消息
/// </summary>
private async Task<MqttHandleResult> HandleConnectionMessageAsync(string robotManufacturer, string robotSerialNumber, string payload)
{
try
{
var connectionInfo = JsonSerializer.Deserialize<Connection>(payload);
if (connectionInfo == null) throw new Exception();
var robotStatus = connectionInfo.ConnectionState switch
{
"ONLINE" => OnlineStatus.Online,
"OFFLINE" => OnlineStatus.Offline,
"CONNECTIONBROKEN" => OnlineStatus.Connectionbroken,
_ => OnlineStatus.Offline
};
return MqttHandleResult.CreateSuccess($"更新机器人连接状态: {robotSerialNumber} -> {connectionInfo.ConnectionState}");
}
catch (Exception ex)
{
return MqttHandleResult.CreateFailure($"处理MQTT连接消息失败: {ex.Message}", ex);
}
}
/// <summary>
/// 处理状态消息(直接使用制造商+序列号更新缓存)
/// @author zzy
/// </summary>
private async Task<MqttHandleResult> HandleStateMessageAsync(string robotManufacturer, string robotSerialNumber, string payload)
{
try
{
// 使用工厂解析特定制造商的状态数据
var stateInfo = _stateParserFactory.ParseState(robotManufacturer, payload);
if (stateInfo == null) throw new Exception("状态数据解析失败");
// 解析状态
var robotStatus = DetermineRobotStatus(stateInfo);
var operatingMode = ParseOperatingMode(stateInfo.OperatingMode);
var errors = stateInfo.Errors.Any() ? JsonSerializer.Serialize(stateInfo.Errors) : null;
// 直接使用制造商+序列号更新状态到Redis缓存
await _robotCacheService.UpdateStatusAsync(
robotManufacturer,
robotSerialNumber,
robotStatus,
OnlineStatus.Online,
(int?)stateInfo.BatteryState?.BatteryCharge,
stateInfo.Driving,
stateInfo.Paused,
stateInfo.BatteryState?.Charging,
operatingMode,
errors);
// 更新位置到Redis缓存
if (stateInfo.AgvPosition != null)
{
await _robotCacheService.UpdateLocationAsync(
robotManufacturer,
robotSerialNumber,
null,
null,
stateInfo.AgvPosition.X,
stateInfo.AgvPosition.Y,
stateInfo.AgvPosition.Theta);
}
return MqttHandleResult.CreateSuccess($"成功更新机器人状态: {robotManufacturer}:{robotSerialNumber}");
}
catch (Exception ex)
{
_logger.LogError(ex, "处理制造商 '{Manufacturer}' 的状态消息失败", robotManufacturer);
return MqttHandleResult.CreateFailure($"处理状态消息失败: {ex.Message}", ex);
}
}
/// <summary>
/// 处理设备信息消息
/// </summary>
private Task<MqttHandleResult> HandleFactsheetMessageAsync(string robotManufacturer, string robotSerialNumber, string payload)
{
try
{
//var factSheet = JsonSerializer.Deserialize<FactSheet>(payload);
//if (factSheet == null) throw new SharedException(10001);
//if (!IsNewHeaderId("factsheet", factSheet.HeaderId)) throw new SharedException(10002);
//using var scope = _serviceProvider.CreateScope();
//var robotRepository = scope.ServiceProvider.GetRequiredService<IRobotRepository>();
//var robot = await robotRepository.GetBySerialNumberAsync(robotSerialNumber);
//if (robot == null)
//{
// throw new ExtendRobotException(robotSerialNumber, 20003);
//}
//robot.UpdateFactSheet(factSheet);
//await robotRepository.UpdateAsync(robot);
return Task.FromResult(MqttHandleResult.CreateSuccess());
}
catch (Exception ex)
{
return Task.FromResult(MqttHandleResult.CreateFailure($"处理设备信息失败: {ex.Message}", ex));
}
}
/// <summary>
/// 处理可视化消息
/// </summary>
private async Task<MqttHandleResult> HandleVisualizationMessageAsync(string robotManufacturer, string robotSerialNumber, string payload)
{
try
{
var visualization = _visualizationParserFactory.ParseVisualization(robotManufacturer, payload);
if (visualization == null) return MqttHandleResult.CreateFailure("可视化数据解析失败");
// 可选:进行headerId去重判断
if (!await IsNewHeaderIdAsync("visualization", robotSerialNumber, visualization.HeaderId)) throw new Exception();
try
{
// var redisDb = _redisHelper.GetDatabaseWithPrefix();
// var robot = await redisDb.StringGetAsync($"robot:{robotSerialNumber}");
// if (robot.HasValue) {
// var robotPosition = JsonSerializer.Deserialize<Robot>(robot, new JsonSerializerOptions
// {
// PropertyNamingPolicy = JsonNamingPolicy.CamelCase
// });
// if (visualization is Visualization_HikRobot visualization_HikRobot) {
// robotOld.CurrentX = visualization_HikRobot.AgvPosition.X * robotOld.CoordinateScale;
// robotOld.CurrentY = visualization_HikRobot.AgvPosition.Y * robotOld.CoordinateScale;
// robotOld.CurrentTheta = visualization_HikRobot.AgvPosition.Theta;
// }
// var robotNew = JsonSerializer.Serialize(robotOld, new JsonSerializerOptions
// {
// PropertyNamingPolicy = JsonNamingPolicy.CamelCase
// });
// await redisDb.StringSetAsync($"robot:{robotSerialNumber}", robotNew);
// }
//_logger.LogDebug("成功将机器人 {RobotCode} 数据存储到Redis", robot.RobotCode);
}
catch (Exception ex)
{
_logger.LogError(ex, "存储机器人 {RobotCode} 数据到Redis失败", robotSerialNumber);
}
// 目前仅确认解析成功即可
return MqttHandleResult.CreateSuccess();
}
catch (Exception ex)
{
return MqttHandleResult.CreateFailure($"处理可视化消息失败: {ex.Message}", ex);
}
}
/// <summary>
/// 从MQTT消息中提取载荷
/// </summary>
private static string ExtractPayload(MQTTnet.MqttApplicationMessage message)
{
return message.PayloadSegment.Array is { } arr
? System.Text.Encoding.UTF8.GetString(arr, message.PayloadSegment.Offset, message.PayloadSegment.Count)
: string.Empty;
}
private static string BuildHeaderHashKey(string robotCode)
{
return $"revice-headers:robot:{robotCode}";
}
// 基于Redis按机器人维度检查并更新headerId
private async Task<bool> IsNewHeaderIdAsync(string topic, string robotCode, int newHeaderId)
{
if (string.IsNullOrWhiteSpace(robotCode))
{
// 回退到进程内判断(无robotCode场景)
return IsNewHeaderId(topic, newHeaderId);
}
try
{
// var redisDb = _redisHelper.GetDatabaseWithPrefix();
// var key = BuildHeaderHashKey(robotCode);
// var field = topic.ToLowerInvariant();
// var currentValue = await redisDb.HashGetAsync(key, field);
// var currentHeaderId = 0;
// if (currentValue.HasValue && int.TryParse(currentValue.ToString(), out var parsed))
// {
// currentHeaderId = parsed;
// }
//
// if (newHeaderId > currentHeaderId)
// {
// await redisDb.HashSetAsync(key, field, newHeaderId);
// return true;
// }
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, "检查并更新机器人 {RobotCode} 的 {Topic} headerId 失败", robotCode, topic);
return false;
}
}
// 将指定机器人的所有topic headerId重置为0
public async Task ResetHeaderIdsAsync(Robot robot)
{
try
{
var key = BuildHeaderHashKey(robot.RobotCode);
var fields = new (string key, string field, int value)[]
{
(key,"connection", 0),
(key,"state", 0),
(key,"factsheet", 0),
(key,"visualization", 0)
};
// await _robotStatusRedisService.RsetHeaderIdAsync(fields);
}
catch (Exception ex)
{
_logger.LogError(ex, "重置机器人 {RobotCode} 的 headerId 失败", robot.RobotCode);
}
}
// 判断headerId是否大于当前值,如果是则更新并返回true,否则返回false
private bool IsNewHeaderId(string topic, int newHeaderId)
{
if (_topicHeaderId.TryGetValue(topic, out var currentHeaderId))
{
if (newHeaderId > currentHeaderId)
{
_topicHeaderId[topic] = newHeaderId;
return true;
}
else
{
return false;
}
}
else
{
return false;
}
}
/// <summary>
/// 根据状态信息判断Robot状态
/// </summary>
private RobotStatus DetermineRobotStatus(State stateInfo)
{
// 判断是否有致命错误
var hasFatalError = stateInfo.Errors.Any(e =>
e.ErrorLevel?.Equals("FATAL", StringComparison.OrdinalIgnoreCase) == true);
if (hasFatalError)
return RobotStatus.Error;
// 判断是否忙碌(正在行驶或暂停)
if (stateInfo.Driving || stateInfo.Paused == true)
return RobotStatus.Busy;
// 默认空闲
return RobotStatus.Idle;
}
/// <summary>
/// 解析操作模式
/// @author zzy
/// </summary>
private static OperatingMode ParseOperatingMode(string? mode)
{
return mode?.ToUpperInvariant() switch
{
"AUTOMATIC" => OperatingMode.Automatic,
"SEMIAUTOMATIC" => OperatingMode.Semiautomatic,
"MANUAL" => OperatingMode.Manual,
"SERVICE" => OperatingMode.Service,
"TEACHIN" => OperatingMode.Teachin,
_ => OperatingMode.Automatic
};
}
}
}