CenterService.cs 12 KB
using HHECS.DAQClient.Common;
using HHECS.DAQClient.Model;
using HHECS.DAQClient.Communications;
using HHECS.DAQClient.DataAccess;
using HslCommunication.Profinet.Siemens;
using HHECS.DAQClient.Dto;
using HHECS.EquipmentModel;
using System.Text.Json;
using HHECS.BllModel;
using System.Configuration;

namespace HHECS.DAQClient.Services
{
    internal class CenterService
    {
        private readonly SystemLog _log = SystemLog.GetInstance();
        private readonly DataContext _context;
        private readonly HttpService _httpService;
        private List<Equipment> _equipments = new List<Equipment>();
        private List<ICommunication> communications = new List<ICommunication>();
        private CancellationTokenSource cts = new CancellationTokenSource();
        private readonly object _lock = new object();

        /// <summary>
        /// 当前区域
        /// </summary>
        /// <remarks>倘若指定了区域,则只加载该区域的数据</remarks>
        private readonly string currentArea = string.Empty;

        public CenterService(DataContext dataContext, HttpService httpService)
        {
            _context = dataContext;
            _httpService = httpService;
            currentArea = ConfigurationManager.AppSettings["Area"];
        }

        /// <summary>
        /// 重新加载设备信息并开始采集数据
        /// </summary>
        /// <returns></returns>
        public void Start()
        {
            cts = new CancellationTokenSource();
            try
            {
                if (string.IsNullOrWhiteSpace(currentArea))
                {
                    _equipments = _context.Equipment.Where(x => !x.Disable).ToList();
                }
                else
                {
                    _equipments = _context.Equipment.Where(x => !x.Disable && x.DestinationArea == currentArea).ToList();
                }
                if (_equipments.Count == 0)
                {
                    _log.LogWarning($"设备数据为空,请配置数据后操作!");
                    cts.Cancel();
                    return;
                }

                var equipmentIds = _equipments.Select(x => x.Id).ToList();
                var equipmentTypeIds = _equipments.Select(x => x.EquipmentTypeId).Distinct().ToList();

                var equipmentProps = _context.EquipmentProp.Where(x => equipmentIds.Contains(x.EquipmentId)).ToList();
                var equipmentTypes = _context.EquipmentType.Where(x => equipmentTypeIds.Contains(x.Id)).ToList();
                var equipmentTypePropTemplates = _context.EquipmentTypePropTemplate.Where(x => equipmentTypeIds.Contains(x.EquipmentTypeId)).ToList();
                //组合逻辑外键
                _equipments.ForEach(t =>
                {
                    t.EquipmentType = equipmentTypes.FirstOrDefault(i => i.Id == t.EquipmentTypeId);
                    t.EquipmentProps = equipmentProps.Where(i => i.EquipmentId == t.Id).ToList();
                });
                equipmentProps.ForEach(t =>
                {
                    t.Equipment = _equipments.FirstOrDefault(i => i.Id == t.EquipmentId);
                    t.EquipmentTypePropTemplate = equipmentTypePropTemplates.FirstOrDefault(i => i.Id == t.EquipmentTypePropTemplateId);
                });

                List<CommunicationConfig> communicationConfigs;

                if (string.IsNullOrWhiteSpace(currentArea))
                {
                    communicationConfigs = _context.CommunicationConfigs.Where(x => !x.Disable).ToList();
                }
                else
                {
                    communicationConfigs = _context.CommunicationConfigs.Where(x => !x.Disable && x.Area == currentArea).ToList();
                }

                if (communicationConfigs.Count == 0)
                {
                    _log.LogWarning($"通讯配置数据为空,请配置数据后操作!");
                    cts.Cancel();
                    return;
                }

                communications = InitialCommunication(communicationConfigs);
                //采集数据
                foreach (var item in communications)
                {
                    _ = Task.Run(async () =>
                    {
                        while (!cts.IsCancellationRequested)
                        {
                            try
                            {
                                var equipmentTemps = _equipments.Where(x => x.IP == item.IpAddress).ToList();
                                var props = equipmentTemps.SelectMany(x => x.EquipmentProps).Where(x => x.EquipmentTypePropTemplate.PropType != EquipmentPropType.Self).ToList();

                                if (props.Count == 0)
                                {
                                    await Task.Delay(1000);
                                    continue;
                                }

                                var temps = props.Select(x => new DataItem
                                {
                                    Id = x.Id,
                                    Code = x.EquipmentTypePropTemplateCode,
                                    DataAddress = x.Address,
                                    DataType = x.EquipmentTypePropTemplate.DataType,
                                    Value = string.Empty
                                }).ToList();
                                //读取数据
                                var result = item.Read(temps);
                                if (!result.Success)
                                {
                                    _log.LogError(result.Msg);
                                    await Task.Delay(1000);
                                    continue;
                                }

                                //赋值
                                foreach (var item in props)
                                {
                                    item.Value = temps.Find(x => x.Id == item.Id).Value;
                                    item.Updated = DateTime.Now;
                                }

                                var records = equipmentTemps.Select(x => new EquipmentDataQueue
                                {
                                    EquipmentCode = x.Code,
                                    EquipmentTypeCode = x.EquipmentType.Code,
                                    IsCommit = false,
                                    Reported = JsonSerializer.Serialize(x.EquipmentProps.Select(p => new TagItem
                                    {
                                        Tag = p.Address,
                                        Value = p.Value
                                    }).ToList()),
                                    Version = 1,
                                    SourceTimestamp = ConvertToTimestamp(DateTime.Now),
                                    Created = DateTime.Now,
                                }).ToList();
                                var addResult = SaveEquipmentDataQueues(records);
                                if (!addResult.Success)
                                {
                                    _log.LogError($"新增设备数据记录失败,{addResult.Msg}");
                                }
                                await Task.Delay(1000);
                            }
                            catch (Exception ex)
                            {
                                _log.LogError($"读取IP:{item.IpAddress}设备出现异常:{ex.Message}");
                            }
                            await Task.Delay(1000);
                        }
                    }, cts.Token);
                }

                _ = Task.Run(() =>
                {
                    SendEquipmentDataToCloud(cts);
                }, cts.Token);
            }
            catch (Exception ex)
            {
                _log.LogException($"数据初始化失败:{ex.Message}");
            }
        }

        /// <summary>
        /// 停止PLC数据采集
        /// </summary>
        /// <returns></returns>
        public void Stop()
        {
            _log.Log("已停止采集!");
            cts.Cancel();//取消异步线程
        }

        private List<ICommunication> InitialCommunication(IEnumerable<CommunicationConfig> communicationConfigs)
        {
            var result = new List<ICommunication>();
            try
            {
                foreach (var item in communicationConfigs)
                {
                    ICommunication communication;
                    switch (item.CommunicationType)
                    {
                        case CommunicationTypeConst.None:
                            break;
                        case CommunicationTypeConst.KukaVarProxy:
                            communication = new KukaAvarProxyCommunication(item.Id, item.IpAddress, item.Port);
                            result.Add(communication);
                            break;
                        case CommunicationTypeConst.Siemens_S1200:
                            communication = new SiemensS7Communication(item.Id, SiemensPLCS.S1200, item.IpAddress);
                            result.Add(communication);
                            break;
                        case CommunicationTypeConst.Siemens_S1500:
                            communication = new SiemensS7Communication(item.Id, SiemensPLCS.S1500, item.IpAddress);
                            result.Add(communication);
                            break;
                        case CommunicationTypeConst.TcpClient:
                            communication = new TcpClientCommunication(item.Id, item.IpAddress, item.Port);
                            break;
                        default:
                            break;
                    }
                }
            }
            catch (Exception ex)
            {
                _log.LogError($"设备通讯初始化异常:{ex.Message}");
            }
            return result;
        }

        private BllResult SaveEquipmentDataQueues(IEnumerable<EquipmentDataQueue> equipments)
        {
            lock (_lock)
            {
                try
                {
                    _context.EquipmentDataQueue.AddRange(equipments);
                    _context.SaveChanges();
                    return BllResultFactory.Success();
                }
                catch (Exception ex)
                {
                    return BllResultFactory.Error(ex.Message);
                }
            }
        }

        private void SendEquipmentDataToCloud(CancellationTokenSource cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var records = _context.EquipmentDataQueue.Where(x => !x.IsCommit).OrderByDescending(x => x.Created).ToList();
                foreach (var item in records.GroupBy(x => x.EquipmentCode))
                {
                    var data = item.Select(x => new EquipmentDataDto
                    {
                        Plmeid = x.Id,
                        EquipmentSN = x.EquipmentCode,
                        Reported = JsonSerializer.Deserialize<List<TagItem>>(x.Reported),
                        Version = x.Version,
                        Timestamp = x.SourceTimestamp,
                    }).ToList();
                    var result = _httpService.SendEquipmentData(data);
                    if (!result.Success)
                    {
                        _log.LogWarning($"推送[{item.Key}]设备数据失败,{result.Msg}");
                        continue;
                    }
                    _log.LogSuccess($"成功推送{data.Count}条设备数据");
                    _context.EquipmentDataQueue.RemoveRange(item.ToList());
                    _context.SaveChanges();
                }
                Task.Delay(1000).Wait();
            }
        }

        private static long ConvertToTimestamp(DateTime? dateTime)
        {
            if (dateTime == null)
            {
                return 0;
            }
            return Convert.ToInt64(((DateTime)dateTime - new DateTime(1970, 1, 1)).TotalSeconds);
        }
    }
}