LanYinWsClientService.cs 7.73 KB
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Rcs.Cyaninetech.Models;
using Rcs.Domain.Settings;

namespace Rcs.Cyaninetech.Services
{
    /// <summary>
    /// 蓝音 WebSocket 客户端服务实现
    /// @author zzy
    /// </summary>
    public class LanYinWsClientService : ILanYinWsClientService, IDisposable
    {
        private readonly ILogger<LanYinWsClientService> _logger;
        private readonly LanYinWsSettings _settings;
        private ClientWebSocket? _webSocket;
        private CancellationTokenSource? _receiveCts;
        private readonly SemaphoreSlim _sendLock = new(1, 1);

        public bool IsConnected => _webSocket?.State == WebSocketState.Open;

        public event EventHandler<List<LanYinRobotStatus>>? OnRobotStatusReceived;
        public event EventHandler<List<LanYinRobotInfo>>? OnRobotInfoReceived;
        public event EventHandler<LanYinRobotRealtimePath>? OnRobotRealtimePathReceived;

        public LanYinWsClientService(ILogger<LanYinWsClientService> logger, IOptions<AppSettings> settings)
        {
            _logger = logger;
            _settings = settings.Value.LanYinSettings.WebSocket;
        }

        /// <summary>
        /// 连接 WebSocket 服务器
        /// @author zzy
        /// </summary>
        public async Task ConnectAsync(CancellationToken cancellationToken = default)
        {
            if (IsConnected) return;

            _webSocket?.Dispose();
            _webSocket = new ClientWebSocket();
            _receiveCts = new CancellationTokenSource();

            try
            {
                var uri = new Uri(_settings.WebSocketUrl);
                await _webSocket.ConnectAsync(uri, cancellationToken);
                _logger.LogInformation("[LanYin WS] 已连接到 {Url}", _settings.WebSocketUrl);

                // 启动接收消息任务
                _ = ReceiveMessagesAsync(_receiveCts.Token);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "[LanYin WS] 连接失败: {Url}", _settings.WebSocketUrl);
                throw;
            }
        }

        /// <summary>
        /// 断开连接
        /// @author zzy
        /// </summary>
        public async Task DisconnectAsync()
        {
            _receiveCts?.Cancel();

            if (_webSocket?.State == WebSocketState.Open)
            {
                await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
            }

            _webSocket?.Dispose();
            _webSocket = null;
            _logger.LogInformation("[LanYin WS] 已断开连接");
        }

        /// <summary>
        /// 发送订阅请求
        /// @author zzy
        /// </summary>
        public async Task SubscribeAsync(List<string> topics)
        {
            if (!IsConnected) throw new InvalidOperationException("WebSocket 未连接");

            var request = new LanYinWsSubscribeRequest { Data = topics };
            var json = JsonSerializer.Serialize(request);
            await SendAsync(json);
            _logger.LogInformation("[LanYin WS] 已发送订阅请求: {Topics}", string.Join(", ", topics));
        }

        /// <summary>
        /// 发送消息
        /// @author zzy
        /// </summary>
        private async Task SendAsync(string message)
        {
            if (_webSocket == null || _webSocket.State != WebSocketState.Open) return;

            await _sendLock.WaitAsync();
            try
            {
                var bytes = Encoding.UTF8.GetBytes(message);
                await _webSocket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            finally
            {
                _sendLock.Release();
            }
        }

        /// <summary>
        /// 接收消息循环
        /// @author zzy
        /// </summary>
        private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
        {
            var buffer = new byte[8192];
            var messageBuilder = new StringBuilder();

            while (!cancellationToken.IsCancellationRequested && _webSocket?.State == WebSocketState.Open)
            {
                try
                {
                    var result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);

                    if (result.MessageType == WebSocketMessageType.Close)
                    {
                        _logger.LogWarning("[LanYin WS] 服务器关闭连接");
                        break;
                    }

                    messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));

                    if (result.EndOfMessage)
                    {
                        var message = messageBuilder.ToString();
                        messageBuilder.Clear();
                        ProcessMessage(message);
                    }
                }
                catch (OperationCanceledException)
                {
                    break;
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "[LanYin WS] 接收消息异常");
                }
            }
        }

        /// <summary>
        /// 处理接收到的消息,根据 name 分发到不同处理方法
        /// @author zzy
        /// </summary>
        private void ProcessMessage(string message)
        {
            try
            {
                var baseMsg = JsonSerializer.Deserialize<LanYinWsMessage>(message);
                if (baseMsg == null || baseMsg.Type != "data") return;

                var topics = _settings.Topics;
                if (baseMsg.Name == topics.RobotStatus)
                    HandleRobotStatus(message);
                else if (baseMsg.Name == topics.RobotInfo)
                    HandleRobotInfo(message);
                else if (baseMsg.Name == topics.RobotRealtimePath)
                    HandleRobotRealtimePath(message);
                else
                    _logger.LogDebug("[LanYin WS] 未知消息类型: {Name}", baseMsg.Name);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "[LanYin WS] 解析消息失败: {Message}", message.Length > 200 ? message[..200] : message);
            }
        }

        /// <summary>
        /// 处理机器人状态消息
        /// @author zzy
        /// </summary>
        private void HandleRobotStatus(string message)
        {
            var msg = JsonSerializer.Deserialize<LanYinWsMessage<List<LanYinRobotStatus>>>(message);
            if (msg?.Data != null)
            {
                OnRobotStatusReceived?.Invoke(this, msg.Data);
            }
        }

        /// <summary>
        /// 处理机器人信息消息
        /// @author zzy
        /// </summary>
        private void HandleRobotInfo(string message)
        {
            var msg = JsonSerializer.Deserialize<LanYinWsMessage<List<LanYinRobotInfo>>>(message);
            if (msg?.Data != null)
            {
                OnRobotInfoReceived?.Invoke(this, msg.Data);
            }
        }

        /// <summary>
        /// 处理机器人实时路径消息
        /// @author zzy
        /// </summary>
        private void HandleRobotRealtimePath(string message)
        {
            var msg = JsonSerializer.Deserialize<LanYinWsMessage<LanYinRobotRealtimePath>>(message);
            if (msg?.Data != null)
            {
                OnRobotRealtimePathReceived?.Invoke(this, msg.Data);
            }
        }

        public void Dispose()
        {
            _receiveCts?.Cancel();
            _receiveCts?.Dispose();
            _webSocket?.Dispose();
            _sendLock.Dispose();
        }
    }
}