MqttClientService.cs 10.5 KB
using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
using Rcs.Application.Shared;
using Rcs.Domain.Repositories;
using Rcs.Domain.Settings;
using Task = System.Threading.Tasks.Task;

namespace Rcs.Infrastructure.Mqtt
{
    public class MqttClientService : BackgroundService, IMqttClientService
    {
        private readonly ILogger<MqttClientService> _logger;
        private readonly IOptions<AppSettings> _options;
        private readonly IMqttClient _client;
        private readonly IServiceProvider _serviceProvider;
        private readonly IMqttMessageHandler _messageHandler;
        private readonly List<Protocol> ? _protocols;

        // 记录已订阅的主题及其 QoS
        private readonly ConcurrentDictionary<string, MqttQualityOfServiceLevel> _subscriptions = new(StringComparer.OrdinalIgnoreCase);

        public MqttClientService(ILogger<MqttClientService> logger, IOptions<AppSettings> options, IServiceProvider serviceProvider, IMqttMessageHandler messageHandler)
        {
            _logger = logger;
            _options = options;
            _serviceProvider = serviceProvider;
            _messageHandler = messageHandler;
            _protocols = _options.Value.Mqtt.Protocols;
            var factory = new MqttFactory();
            _client = factory.CreateMqttClient();
            
            _client.ApplicationMessageReceivedAsync += async e =>
            {
                try
                {
                    // 使用专门的消息处理器处理接收到的消息
                    await _messageHandler.HandleMessageAsync(e);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "MQTT消息处理发生错误");
                }
            };

            _client.DisconnectedAsync += async e =>
            {
                _logger.LogWarning("[MQTT] 连接断开: {ReasonString}", e.ReasonString);

                // 简单重连策略
                while (!_client.IsConnected)
                {
                    try
                    {
                        await Task.Delay(TimeSpan.FromSeconds(_options.Value.Mqtt.ReconnectDelaySeconds));
                        await ConnectAsync(CancellationToken.None);
                        await ResubscribeAllAsync(CancellationToken.None);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError("[MQTT] 连接失败,正在重连");
                    }
                }
            };
            
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {
                await ConnectAsync(stoppingToken);
                
                using var scope = _serviceProvider.CreateScope();
                var robotRepo = scope.ServiceProvider.GetRequiredService<IRobotRepository>();

                var robots = await robotRepo.GetActiveRobotsAsync();
                // 可在此添加默认订阅
                if (!robots.Any()) return;
                List<(string,MqttQualityOfServiceLevel)> mqttParameters = new();
                foreach (var robot in robots)
                {
                    // if(!robot.RealtimeScheduling) continue;// 非实时调度robot不订阅mqtt
                    await _messageHandler.ResetHeaderIdsAsync(robot);

                    var protocol = _protocols.FirstOrDefault(p => p.ProtocolName == robot.ProtocolName);
                    if (protocol is null || protocol.Topics.Count <= 0) continue;
                        
                    foreach (var topic in protocol.Topics)
                    {
                        mqttParameters.Add((
                            $"{robot.ProtocolName}/{robot.ProtocolVersion}/{robot.RobotManufacturer}/{robot.RobotSerialNumber}/{topic}",
                            topic.Equals("connection") ? MqttQualityOfServiceLevel.AtLeastOnce : MqttQualityOfServiceLevel.AtMostOnce));
                    }
                }

                await AddOrUpdateSubscriptionsAsync(mqttParameters, stoppingToken);
            }
            catch (Exception ex )
            {
                _logger.LogError("MQTT连接失败,{msg}",ex.Message);
            }
        }

        private async Task ConnectAsync(CancellationToken ct)
        {
            var builder = new MqttClientOptionsBuilder()
                .WithClientId(_options.Value.Mqtt.ClientId)
                .WithTcpServer(_options.Value.Mqtt.Broker, _options.Value.Mqtt.Port)
                .WithCleanSession(_options.Value.Mqtt.CleanSession)
                .WithKeepAlivePeriod(TimeSpan.FromSeconds(_options.Value.Mqtt.KeepAlivePeriodSeconds))
                .WithProtocolVersion(MqttProtocolVersion.V311);

            if (!string.IsNullOrWhiteSpace(_options.Value.Mqtt.Username))
            {
                builder = builder.WithCredentials(_options.Value.Mqtt.Username, _options.Value.Mqtt.Password);
            }
            if (_options.Value.Mqtt.UseTls)
            {
                builder = builder.WithTls();
            }

            var clientOptions = builder.Build();

            await _client.ConnectAsync(clientOptions, ct);
            _logger.LogInformation("[MQTT] 连接到 {Broker}:{Port}, 客户端ID {ClientId}, 连接状态 {status}", _options.Value.Mqtt.Broker, _options.Value.Mqtt.Port, _options.Value.Mqtt.ClientId, _client.IsConnected);
        }

        private async Task ResubscribeAllAsync(CancellationToken ct)
        {
            if (_subscriptions.IsEmpty || !_client.IsConnected) return;
            foreach (var kv in _subscriptions)
            {
                await _client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(kv.Key).WithQualityOfServiceLevel(kv.Value).Build(), ct);
                _logger.LogInformation("[MQTT] 重新订阅: {Topic} (QoS {QoS})", kv.Key, kv.Value);
            }
        }

        public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false, CancellationToken ct = default)
        {
            var msg = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(qos)
                .WithRetainFlag(retain)
                .Build();

            if (!_client.IsConnected)
            {
                _logger.LogWarning("[MQTT] 未连接到Broker,请重新连接后再试");
                await ConnectAsync(ct);
                await ResubscribeAllAsync(ct);
            }

            await _client.PublishAsync(msg, ct);
            _logger.LogInformation("[MQTT] Send | Topic={Topic} | QoS={QoS} | Payload={Payload}", topic, qos, payload);
        }

        public async Task AddOrUpdateSubscriptionsAsync(IEnumerable<(string topic, MqttQualityOfServiceLevel qos)> subscriptions, CancellationToken ct = default)
        {

            if (!_client.IsConnected)
            {
                _logger.LogWarning("[MQTT] 未连接");
                return;
            }

            // 构建批量订阅过滤器
            var topicFilters = subscriptions.Select(sub => 
                new MqttTopicFilterBuilder().WithTopic(sub.topic).WithQualityOfServiceLevel(sub.qos).Build()).ToList();

            foreach (var tf in topicFilters){
                if (!_subscriptions.ContainsKey(tf.Topic)){
                    await _client.SubscribeAsync(tf, ct);
                    _subscriptions[tf.Topic] = tf.QualityOfServiceLevel;
                    _logger.LogInformation("[MQTT] 订阅主题: {Topic} (QoS {QoS})", tf.Topic, tf.QualityOfServiceLevel);
                }
            }
        }

        public async Task RemoveSubscriptionAsync(string topic, CancellationToken ct = default)
        {
            _subscriptions.TryRemove(topic, out _);

            if (_client.IsConnected)
            {
                await _client.UnsubscribeAsync(topic, ct);
                _logger.LogInformation("[MQTT] Unsubscribed: {Topic}", topic);
            }
        }

        public IReadOnlyCollection<(string topic, MqttQualityOfServiceLevel qos)> GetSubscriptions()
            => _subscriptions.Select(kv => (kv.Key, kv.Value)).ToArray();

        public async Task PublishInstantActionsAsync(string protocolName, string protocolVersion, string RobotManufacturer, string RobotSerialNumber, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false, CancellationToken ct = default)
        {
            var topic = $"{protocolName}/{protocolVersion}/{RobotManufacturer}/{RobotSerialNumber}/instantActions";
            var msg = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(qos)
                .WithRetainFlag(retain)
                .Build();

            if (!_client.IsConnected)
            {
                _logger.LogWarning("[MQTT] 未连接到Broker,请重新连接后再试");
                await ConnectAsync(ct);
                await ResubscribeAllAsync(ct);
            }

            await _client.PublishAsync(msg, ct);
            _logger.LogInformation("[MQTT] Send | Topic={Topic} | QoS={QoS} | Payload={Payload}", topic, qos, payload);
        }

        public async Task PublishOrderAsync(string protocolName, string protocolVersion, string RobotManufacturer, string RobotSerialNumber, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false, CancellationToken ct = default)
        {
            var topic = $"{protocolName}/{protocolVersion}/{RobotManufacturer}/{RobotSerialNumber}/order";
            var msg = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(qos)
                .WithRetainFlag(retain)
                .Build();

            if (!_client.IsConnected)
            {
                _logger.LogWarning("[MQTT] 未连接到Broker,请重新连接后再试");
                await ConnectAsync(ct);
                await ResubscribeAllAsync(ct);
            }

            await _client.PublishAsync(msg, ct);
            _logger.LogInformation("[MQTT] Send | Topic={Topic} | QoS={QoS} | Payload={Payload}", topic, qos, payload);
        }
    }
}