LanYinWsClientService.cs
7.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Rcs.Cyaninetech.Models;
using Rcs.Domain.Settings;
namespace Rcs.Cyaninetech.Services
{
/// <summary>
/// 蓝音 WebSocket 客户端服务实现
/// @author zzy
/// </summary>
public class LanYinWsClientService : ILanYinWsClientService, IDisposable
{
private readonly ILogger<LanYinWsClientService> _logger;
private readonly LanYinWsSettings _settings;
private ClientWebSocket? _webSocket;
private CancellationTokenSource? _receiveCts;
private readonly SemaphoreSlim _sendLock = new(1, 1);
public bool IsConnected => _webSocket?.State == WebSocketState.Open;
public event EventHandler<List<LanYinRobotStatus>>? OnRobotStatusReceived;
public event EventHandler<List<LanYinRobotInfo>>? OnRobotInfoReceived;
public event EventHandler<LanYinRobotRealtimePath>? OnRobotRealtimePathReceived;
public LanYinWsClientService(ILogger<LanYinWsClientService> logger, IOptions<AppSettings> settings)
{
_logger = logger;
_settings = settings.Value.LanYinSettings.WebSocket;
}
/// <summary>
/// 连接 WebSocket 服务器
/// @author zzy
/// </summary>
public async Task ConnectAsync(CancellationToken cancellationToken = default)
{
if (IsConnected) return;
_webSocket?.Dispose();
_webSocket = new ClientWebSocket();
_receiveCts = new CancellationTokenSource();
try
{
var uri = new Uri(_settings.WebSocketUrl);
await _webSocket.ConnectAsync(uri, cancellationToken);
_logger.LogInformation("[LanYin WS] 已连接到 {Url}", _settings.WebSocketUrl);
// 启动接收消息任务
_ = ReceiveMessagesAsync(_receiveCts.Token);
}
catch (Exception ex)
{
_logger.LogError(ex, "[LanYin WS] 连接失败: {Url}", _settings.WebSocketUrl);
throw;
}
}
/// <summary>
/// 断开连接
/// @author zzy
/// </summary>
public async Task DisconnectAsync()
{
_receiveCts?.Cancel();
if (_webSocket?.State == WebSocketState.Open)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
}
_webSocket?.Dispose();
_webSocket = null;
_logger.LogInformation("[LanYin WS] 已断开连接");
}
/// <summary>
/// 发送订阅请求
/// @author zzy
/// </summary>
public async Task SubscribeAsync(List<string> topics)
{
if (!IsConnected) throw new InvalidOperationException("WebSocket 未连接");
var request = new LanYinWsSubscribeRequest { Data = topics };
var json = JsonSerializer.Serialize(request);
await SendAsync(json);
_logger.LogInformation("[LanYin WS] 已发送订阅请求: {Topics}", string.Join(", ", topics));
}
/// <summary>
/// 发送消息
/// @author zzy
/// </summary>
private async Task SendAsync(string message)
{
if (_webSocket == null || _webSocket.State != WebSocketState.Open) return;
await _sendLock.WaitAsync();
try
{
var bytes = Encoding.UTF8.GetBytes(message);
await _webSocket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, CancellationToken.None);
}
finally
{
_sendLock.Release();
}
}
/// <summary>
/// 接收消息循环
/// @author zzy
/// </summary>
private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
{
var buffer = new byte[8192];
var messageBuilder = new StringBuilder();
while (!cancellationToken.IsCancellationRequested && _webSocket?.State == WebSocketState.Open)
{
try
{
var result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
if (result.MessageType == WebSocketMessageType.Close)
{
_logger.LogWarning("[LanYin WS] 服务器关闭连接");
break;
}
messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
if (result.EndOfMessage)
{
var message = messageBuilder.ToString();
messageBuilder.Clear();
ProcessMessage(message);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "[LanYin WS] 接收消息异常");
}
}
}
/// <summary>
/// 处理接收到的消息,根据 name 分发到不同处理方法
/// @author zzy
/// </summary>
private void ProcessMessage(string message)
{
try
{
var baseMsg = JsonSerializer.Deserialize<LanYinWsMessage>(message);
if (baseMsg == null || baseMsg.Type != "data") return;
var topics = _settings.Topics;
if (baseMsg.Name == topics.RobotStatus)
HandleRobotStatus(message);
else if (baseMsg.Name == topics.RobotInfo)
HandleRobotInfo(message);
else if (baseMsg.Name == topics.RobotRealtimePath)
HandleRobotRealtimePath(message);
else
_logger.LogDebug("[LanYin WS] 未知消息类型: {Name}", baseMsg.Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "[LanYin WS] 解析消息失败: {Message}", message.Length > 200 ? message[..200] : message);
}
}
/// <summary>
/// 处理机器人状态消息
/// @author zzy
/// </summary>
private void HandleRobotStatus(string message)
{
var msg = JsonSerializer.Deserialize<LanYinWsMessage<List<LanYinRobotStatus>>>(message);
if (msg?.Data != null)
{
OnRobotStatusReceived?.Invoke(this, msg.Data);
}
}
/// <summary>
/// 处理机器人信息消息
/// @author zzy
/// </summary>
private void HandleRobotInfo(string message)
{
var msg = JsonSerializer.Deserialize<LanYinWsMessage<List<LanYinRobotInfo>>>(message);
if (msg?.Data != null)
{
OnRobotInfoReceived?.Invoke(this, msg.Data);
}
}
/// <summary>
/// 处理机器人实时路径消息
/// @author zzy
/// </summary>
private void HandleRobotRealtimePath(string message)
{
var msg = JsonSerializer.Deserialize<LanYinWsMessage<LanYinRobotRealtimePath>>(message);
if (msg?.Data != null)
{
OnRobotRealtimePathReceived?.Invoke(this, msg.Data);
}
}
public void Dispose()
{
_receiveCts?.Cancel();
_receiveCts?.Dispose();
_webSocket?.Dispose();
_sendLock.Dispose();
}
}
}