EquipmentDataRecordHandleBackgroundService.cs 3.22 KB
using HHECS.DAQShared.Models;
using System.Text.Json;

namespace HHECS.DAQServer.Services
{
    public class EquipmentDataRecordHandleBackgroundService : BackgroundService
    {
        private readonly IFreeSql _freeSql;
        private readonly DataCacheService _dataCacheService;
        private readonly ILogger<EquipmentDataRecordHandleBackgroundService> _logger;
        private readonly int _limit = 1000;

        public EquipmentDataRecordHandleBackgroundService(IFreeSql freeSql, DataCacheService dataCacheService, ILogger<EquipmentDataRecordHandleBackgroundService> logger)
        {
            _freeSql = freeSql;
            _dataCacheService = dataCacheService;
            _logger = logger;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var equipmentDataRecordTemps = new List<EquipmentDataRecord>();
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    //队列数量小于设定值,则等待
                    if (_dataCacheService.EquipmentDataRecordQueue.Count < _limit)
                    {
                        //每两秒执行一次
                        await Task.Delay(2000, stoppingToken);
                    }

                    if (equipmentDataRecordTemps.Count < _limit)
                    {
                        for (int i = 0; i < _limit; i++)
                        {
                            if (_dataCacheService.EquipmentDataRecordQueue.IsEmpty)
                            {
                                break;
                            }

                            if (equipmentDataRecordTemps.Count == _limit)
                            {
                                break;
                            }
                            _dataCacheService.EquipmentDataRecordQueue.TryDequeue(out var queueItem);
                            var data = new EquipmentDataRecord
                            {
                                EquipmentCode = queueItem.EquipmentSN,
                                Tags = JsonSerializer.Serialize(queueItem.Reported),
                                Timestamp = queueItem.Timestamp,
                                Version = queueItem.Version,
                                IsHandle = false,
                                CreateTime = DateTime.Now
                            };
                            equipmentDataRecordTemps.Add(data);
                        }
                    }

                    if (equipmentDataRecordTemps.Count > 0)
                    {
                        using var equipmentDataRecordRepository = _freeSql.GetRepository<EquipmentDataRecord>();
                        equipmentDataRecordRepository.Insert(equipmentDataRecordTemps);
                        _logger.LogInformation($"[设备记录数据表]成功新增{equipmentDataRecordTemps.Count}条数据!");
                        equipmentDataRecordTemps.Clear();
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError($"[{nameof(EquipmentDataRecordHandleBackgroundService)}]线程异常:{ex.Message}");
                }
            }
        }
    }
}