EquipmentController.cs
4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
using HHECS.BllModel;
using HHECS.DAQServer.Dto.Equipment;
using HHECS.DAQServer.Services;
using HHECS.DAQShared.Models;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using System.Text;
using System.Text.Json;
namespace HHECS.DAQServer.Controllers
{
/// <summary>
/// 设备数据
/// </summary>
[Route("api/[controller]/[action]")]
[ApiController]
public class EquipmentController : ControllerBase
{
private readonly IFreeSql _freeSql;
private readonly DataCacheService _dataCacheService;
private readonly CommonService _commonService;
private readonly IDistributedCache _cache;
public EquipmentController(IFreeSql freeSql, DataCacheService dataCacheService, CommonService commonService, IDistributedCache cache)
{
_freeSql = freeSql;
_dataCacheService = dataCacheService;
_commonService = commonService;
_cache = cache;
}
/// <summary>
/// 推送设备实时数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
[HttpPost]
public async Task<BllResult> SendEquipmentData(IEnumerable<EquipmentDataDto> data)
{
try
{
if (!data.Any())
{
return BllResultFactory.Error($"数据不能为空!");
}
//小于此时间,认为是无效数据,ORM框架有限制,应与实体类配置保持一致
var minStartTime = DateTime.Parse("2024-6-11");
//缓存配置
var options = new DistributedCacheEntryOptions().SetSlidingExpiration(TimeSpan.FromMinutes(10));
//缓存设备实时数据
foreach (var record in data.GroupBy(x => x.EquipmentSN))
{
//获取最新时间戳的数据
var lastItem = record.OrderByDescending(x => x.Timestamp).First();
//获取当前缓存数据
var cacheDataBytes = await _cache.GetAsync(record.Key);
if (cacheDataBytes != null)
{
var cacheData = JsonSerializer.Deserialize<EquipmentDataDto>(Encoding.Default.GetString(cacheDataBytes));
if (cacheData.Timestamp >= lastItem.Timestamp)
{
continue;
}
}
var encodedCurrentData = JsonSerializer.SerializeToUtf8Bytes(lastItem);
await _cache.SetAsync(record.Key, encodedCurrentData, options);
}
const int maxCount = 1000 * 5;
//缓存队列超过设定值,暂时不记录到队列,降低数据丢失的风险
if (_dataCacheService.EquipmentDataRecordQueue.Count >= maxCount)
{
return BllResultFactory.Error($"数据队列缓存超过设定值({maxCount}),待当前队列处理完成后才能继续");
}
var records = data.Select(x => new EquipmentDataRecord
{
EquipmentCode = x.EquipmentSN,
Tags = JsonSerializer.Serialize(x.Reported),
IsHandle = false,
Version = x.Version,
CreateTime = _commonService.TimestampConvertToLocalDateTime(x.Timestamp),
Timestamp = x.Timestamp,
}).Where(x => x.CreateTime != default && x.CreateTime >= minStartTime).ToList();
//加入缓存队列,存入数据库
foreach (var item in records)
{
_dataCacheService.EquipmentDataRecordQueue.Enqueue(item);
}
return BllResultFactory.Success();
}
catch (Exception ex)
{
return BllResultFactory.Error(ex.Message);
}
}
/// <summary>
/// 更新客户端在线状态
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
[HttpPost]
public async Task<BllResult> UpdateClientStatus(ClientStatusDto data)
{
try
{
var newValueToAdd = DateTime.Now;
var clientStatusRepository = _freeSql.GetRepository<ClientStatus>();
if (!await clientStatusRepository.Where(x => x.ClientKeys == data.ClientId).AnyAsync())
{
return BllResultFactory.Error($"客户端标识:{data.ClientId}不存在,请查验后再试!");
}
//添加到队列,统一更新
_dataCacheService.ClientStatusQueue.AddOrUpdate(data.ClientId, newValueToAdd, (key, oldValue) => newValueToAdd);
return BllResultFactory.Success();
}
catch (Exception ex)
{
return BllResultFactory.Error(ex.Message);
}
}
}
}