RabbitMQHandleBackgroundService.cs 6.61 KB
using HHECS.BllModel;
using HHECS.DAQShared.Dto;
using HHECS.DAQShared.Models;
using RabbitMQ.Client;
using System.Text.Json;

namespace HHECS.DAQServer.Services
{
    /// <summary>
    /// RabbitMQ存储线程
    /// </summary>
    public class RabbitMQHandleBackgroundService : BackgroundService
    {
        private readonly IFreeSql _freeSql;
        private readonly DataCacheService _dataCacheService;
        private readonly ILogger<RabbitMQHandleBackgroundService> _logger;
        private DateTime _lastReloadTime = DateTime.MinValue;

        private List<EquipmentExtend> _equipments = new List<EquipmentExtend>();

        private List<EquipmentTypeExtend> _equipmentTypes = new List<EquipmentTypeExtend>();

        /// <summary>
        /// 是否为生产环境
        /// </summary>
        private readonly bool _isProductionEnvironment;

        /// <summary>
        /// 数据刷新时间 间隔
        /// </summary>
        private readonly TimeSpan _reloadTimeSpan = TimeSpan.FromMinutes(1);

        public RabbitMQHandleBackgroundService(IFreeSql freeSql, DataCacheService dataCacheService, IConfiguration configuration, ILogger<RabbitMQHandleBackgroundService> logger)
        {
            _freeSql = freeSql;
            _dataCacheService = dataCacheService;
            _logger = logger;
            _isProductionEnvironment = configuration.GetValue<bool>("IsProductionEnvironment");
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //缓存推送失败的数据
            EquipmentDataDto failTemp = null;
            var factory = new ConnectionFactory()
            {
                HostName = "172.16.29.90",
                UserName = "producer",
                Password = "Aa123456",
                VirtualHost = _isProductionEnvironment ? "hhecs.daq" : "hhecs.daq.development"
            };
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    if (_dataCacheService.EquipmentDataRecordQueue.IsEmpty)
                    {
                        await Task.Delay(1000, stoppingToken);
                        continue;
                    }

                    //每分钟刷新一次
                    if ((DateTime.Now - _lastReloadTime) > _reloadTimeSpan)
                    {
                        var result = ReloadData();
                        if (!result.Success)
                        {
                            _logger.LogError($"[{nameof(RabbitMQHandleBackgroundService)}]线程,数据刷新失败:{result.Msg}");
                        }
                        _lastReloadTime = DateTime.Now;
                    }

                    if (_equipments.Count == 0 || _equipmentTypes.Count == 0)
                    {
                        await Task.Delay(1000, stoppingToken);
                        continue;
                    }

                    using var connection = factory.CreateConnection();
                    var total = 0;
                    do
                    {
                        //队列为空,结束循环
                        if (_dataCacheService.EquipmentDataRecordQueue.IsEmpty)
                        {
                            break;
                        }

                        EquipmentDataDto currentData = null;
                        if (failTemp == null)
                        {
                            _dataCacheService.EquipmentDataRecordQueue.TryDequeue(out currentData);
                        }
                        else
                        {
                            //上一次推送失败,则继续使用上一次的数据
                            currentData = failTemp;
                        }

                        if (currentData == null)
                        {
                            continue;
                        }

                        var equipmentTypeId = _equipments.Where(x => x.Code == currentData.EquipmentSN).Select(x => x.EquipmentTypeId).FirstOrDefault();
                        if (equipmentTypeId == default)
                        {
                            failTemp = null;
                            continue;
                        }

                        var equipmentTypeCode = _equipmentTypes.Where(x => x.Id == equipmentTypeId).Select(x => x.Code).FirstOrDefault();

                        if (string.IsNullOrWhiteSpace(equipmentTypeCode))
                        {
                            failTemp = null;
                            continue;
                        }

                        using var channel = connection.CreateModel();
                        var exchangeName = equipmentTypeCode;
                        channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, true);

                        // 数据
                        var body = JsonSerializer.SerializeToUtf8Bytes(currentData);
                        var properties = channel.CreateBasicProperties();
                        properties.Persistent = true;
                        properties.DeliveryMode = 2;
                        channel.BasicPublish(exchangeName, $"{equipmentTypeCode}.{currentData.EquipmentSN}", properties, body);
                        failTemp = null;//推送成功后清空
                        total++;

                    } while (DateTime.Now - _lastReloadTime <= _reloadTimeSpan);

                    if (total > 0)
                    {
                        _logger.LogInformation($"成功写入{total}条数据");
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError($"[{nameof(RabbitMQHandleBackgroundService)}]线程异常:{ex.Message}");
                }
            }
        }

        /// <summary>
        /// 刷新数据
        /// </summary>
        private BllResult ReloadData()
        {
            try
            {
                _equipments = _freeSql.Queryable<EquipmentExtend>().ToList(x => new EquipmentExtend
                {
                    Id = x.Id,
                    Code = x.Code,
                    Name = x.Name,
                    EquipmentTypeId = x.EquipmentTypeId,
                });
                _equipmentTypes = _freeSql.Queryable<EquipmentTypeExtend>().ToList(x => new EquipmentTypeExtend
                {
                    Id = x.Id,
                    Code = x.Code,
                    Name = x.Name,
                });
                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }
    }
}