EquipmentController.cs 8.8 KB
using HHECS.BllModel;
using HHECS.DAQServer.Dto.Equipment;
using HHECS.DAQServer.Services;
using HHECS.DAQShared.Models;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using System.Text;
using System.Text.Json;

namespace HHECS.DAQServer.Controllers
{
    /// <summary>
    /// 设备数据
    /// </summary>
    [Route("api/[controller]/[action]")]
    [ApiController]
    public class EquipmentController : ControllerBase
    {
        private readonly IFreeSql _freeSql;
        private readonly DataCacheService _dataCacheService;
        private readonly CommonService _commonService;
        private readonly IDistributedCache _cache;

        /// <summary>
        ///小于此时间,认为是无效数据,ORM框架有限制,应与实体类配置保持一致
        /// </summary>
        private readonly DateTime minStartTime;

        public EquipmentController(IFreeSql freeSql, DataCacheService dataCacheService, CommonService commonService, IDistributedCache cache)
        {
            _freeSql = freeSql;
            _dataCacheService = dataCacheService;
            _commonService = commonService;
            _cache = cache;
            minStartTime = DateTime.Parse("2024-6-11");
        }

        /// <summary>
        /// 推送设备实时数据
        /// </summary>
        /// <param name="data"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<BllResult> SendEquipmentData(IEnumerable<EquipmentDataDto> data)
        {
            try
            {
                if (!data.Any())
                {
                    return BllResultFactory.Error($"数据不能为空!");
                }

                //缓存配置
                var options = new DistributedCacheEntryOptions().SetSlidingExpiration(TimeSpan.FromMinutes(10));

                //缓存设备实时数据
                foreach (var record in data.GroupBy(x => x.EquipmentSN))
                {
                    //获取最新时间戳的数据
                    var lastItem = record.OrderByDescending(x => x.Timestamp).First();

                    //获取当前缓存数据
                    var cacheDataBytes = await _cache.GetAsync(record.Key);
                    if (cacheDataBytes != null)
                    {
                        var cacheData = JsonSerializer.Deserialize<EquipmentDataDto>(Encoding.Default.GetString(cacheDataBytes));
                        if (cacheData.Timestamp >= lastItem.Timestamp)
                        {
                            continue;
                        }
                    }

                    var encodedCurrentData = JsonSerializer.SerializeToUtf8Bytes(lastItem);
                    await _cache.SetAsync(record.Key, encodedCurrentData, options);
                }
                var maxCacheCount = _commonService.GetMaxCacheCount();

                //缓存队列超过设定值,暂时不记录到队列,降低数据丢失的风险
                if (_dataCacheService.EquipmentDataRecordQueue.Count >= maxCacheCount)
                {
                    return BllResultFactory.Error($"数据队列缓存超过设定值({maxCacheCount}),待当前队列数据处理完成后才能继续");
                }

                var records = data.Select(x => new EquipmentDataRecord
                {
                    EquipmentCode = x.EquipmentSN,
                    Tags = JsonSerializer.Serialize(x.Reported),
                    IsHandle = false,
                    Version = x.Version,
                    CreateTime = _commonService.TimestampConvertToLocalDateTime(x.Timestamp),
                    Timestamp = x.Timestamp,
                }).Where(x => x.CreateTime != default && x.CreateTime >= minStartTime).ToList();

                //加入缓存队列,存入数据库
                foreach (var item in records)
                {
                    _dataCacheService.EquipmentDataRecordQueue.Enqueue(item);
                }

                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }

        /// <summary>
        /// 推送某段时间的设备数据
        /// </summary>
        /// <param name="data"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<BllResult> SendEquipmentDataV2(IEnumerable<EquipmentDataV2Dto> data)
        {
            try
            {
                var maxCacheCount = _commonService.GetMaxCacheCount();
                if (_dataCacheService.EquipmentDataRecordQueue.Count >= maxCacheCount)
                {
                    return BllResultFactory.Error($"数据队列缓存超过设定值[{maxCacheCount}],待当前队列数据[{_dataCacheService.EquipmentDataRecordQueue.Count}]处理完成后才能继续");
                }

                foreach (var item in data)
                {
                    var startTime = _commonService.TimestampConvertToLocalDateTime(item.TimestampStart);
                    var endTime = _commonService.TimestampConvertToLocalDateTime(item.TimestampEnd);
                    //无效时间
                    if (startTime <= minStartTime || startTime == default || endTime == default)
                    {
                        continue;
                    }

                    var nextStartTime = startTime;
                    //3秒间隔,分割数据
                    const int seccond = 3;
                    do
                    {
                        var currentTime = nextStartTime < endTime ? nextStartTime : endTime;
                        DateTimeOffset localTime = DateTime.SpecifyKind(currentTime, DateTimeKind.Local);
                        var timestamp = localTime.ToUnixTimeMilliseconds();
                        var record = new EquipmentDataRecord
                        {
                            EquipmentCode = item.EquipmentSN,
                            Tags = JsonSerializer.Serialize(item.Reported),
                            IsHandle = false,
                            Version = item.Version,
                            Timestamp = timestamp,
                            CreateTime = localTime.LocalDateTime,
                        };
                        _dataCacheService.EquipmentDataRecordQueue.Enqueue(record);
                        nextStartTime = nextStartTime.AddSeconds(seccond);
                    } while (nextStartTime <= endTime.AddSeconds(seccond));
                }
                await Task.Delay(1);
                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }

        [HttpPost]
        public async Task<BllResult> SendEquipmentStatusData(IEnumerable<EquipmentDataDto> data)
        {
            try
            {
                if (!data.Any())
                {
                    return BllResultFactory.Error($"数据不能为空!");
                }

                var equipmentSNs = data.Select(x => x.EquipmentSN).Distinct().ToList();
                var dbEquipmentSNs = await _freeSql.Queryable<EquipmentExtend>().Where(x => equipmentSNs.Contains(x.Code)).Distinct().ToListAsync(x => x.Code);

                if (equipmentSNs.Count != dbEquipmentSNs.Count)
                {
                    var temps = equipmentSNs.Except(dbEquipmentSNs).ToList();
                    return BllResultFactory.Error($"设备SN为[{string.Join(',', temps)}]的设备信息不存在!");
                }

                foreach (var item in data.GroupBy(x => x.EquipmentSN))
                {
                    var record = item.OrderByDescending(x => x.Timestamp).First();
                    _dataCacheService.EquipmentStatusDictionary.AddOrUpdate(item.Key, record, (key, oldValue) => record);
                }
                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }

        /// <summary>
        /// 更新客户端在线状态
        /// </summary>
        /// <param name="data"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<BllResult> UpdateClientStatus(ClientStatusDto data)
        {
            try
            {
                var newValueToAdd = DateTime.Now;
                var clientStatusRepository = _freeSql.GetRepository<ClientStatus>();
                if (!await clientStatusRepository.Where(x => x.ClientKeys == data.ClientId).AnyAsync())
                {
                    return BllResultFactory.Error($"客户端标识:{data.ClientId}不存在,请查验后再试!");
                }
                //添加到队列,统一更新
                _dataCacheService.ClientStatusDictionary.AddOrUpdate(data.ClientId, newValueToAdd, (key, oldValue) => newValueToAdd);
                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }
    }
}