MainVM.cs 10.1 KB
using System.Collections.ObjectModel;
using System.Configuration;
using System.Diagnostics;
using System.Windows;
using System.Windows.Controls;
using CommunityToolkit.Mvvm.ComponentModel;
using CommunityToolkit.Mvvm.Input;
using HHECS.DAQClient.Common;
using HHECS.DAQClient.DataAccess;
using HHECS.DAQClient.Model;
using HHECS.DAQClient.Services;
using HHECS.DAQClient.View.CommunicationView;
using HHECS.DAQClient.View.EquipmentView;
using MessageBox = HandyControl.Controls.MessageBox;

namespace HHECS.DAQClient.ViewModel
{
    internal partial class MainVM : ObservableObject
    {
        [ObservableProperty]
        private bool btnStartEnabled = true;

        [ObservableProperty]
        private bool btnStopEnabled = false;

        [ObservableProperty]
        private ObservableCollection<LogModel> logModels = new();

        [ObservableProperty]
        private Page communicationPage = new CommunicationPage();

        [ObservableProperty]
        private Page equipmentPage = new EquipmentPage();

        [ObservableProperty]
        private Page equipmentDataQueuePage = new EquipmentDataQueuePage();

        /// <summary>
        /// 自动上传数据
        /// </summary>
        [ObservableProperty]
        private bool autoCommit = true;

        /// <summary>
        /// 每次提交的数据量
        /// </summary>
        private int _commitCount = 30;
        private readonly SystemLog _log = SystemLog.GetInstance();
        private readonly CenterService _centerService;
        private readonly HttpService _httpService;
        private readonly DataContext _context;
        public MainVM(DataContext context, CenterService centerService, HttpService httpService)
        {
            _centerService = centerService;
            _httpService = httpService;
            _context = context;
            Initial();
        }

        private void Initial()
        {
            var commitCountConfig = ConfigurationManager.AppSettings["CommitCount"];
            _ = int.TryParse(commitCountConfig, out _commitCount);
            if (_commitCount <= 0)
            {
                //未配置,则默认30
                _commitCount = 30;
            }
            RefreshLog();
            UploadEquipmentDataToCloudByMemoryQueue();
            UploadEquipmentDataToCloudByDatabase();
        }

        [RelayCommand]
        public void Start()
        {
            BtnStartEnabled = false;
            BtnStopEnabled = true;
            _centerService.Start();

        }

        [RelayCommand]
        public void Stop()
        {
            BtnStartEnabled = true;
            BtnStopEnabled = false;
            _centerService.Stop();
        }

        private void RefreshLog()
        {
            Task.Run(() =>
            {
                Application.Current.Dispatcher.Invoke(async () =>
                {
                    do
                    {
                        while (!_log.IsEmpty)
                        {
                            var log = _log.GetLog();
                            if (log == null)
                            {
                                await Task.Delay(100);
                                continue;
                            }

                            var oldItem = LogModels.Where(x => x.Messages.Equals(log.Messages)).FirstOrDefault();
                            if (oldItem != null)
                            {
                                oldItem.CreateTime = log.CreateTime;
                                LogModels = new ObservableCollection<LogModel>(LogModels.OrderByDescending(x => x.CreateTime));
                            }
                            else
                            {
                                LogModels.Insert(0, new LogModel()
                                {
                                    LogType = log.LogType,
                                    Messages = log.Messages,
                                    CreateTime = log.CreateTime
                                });
                            }
                            if (LogModels.Count > 50)
                            {
                                LogModels.Remove(LogModels.Last());
                            }
                        }
                        await Task.Delay(100);
                    } while (true);
                });
            });
        }

        /// <summary>
        /// 上传数据至IOTCloud
        /// </summary>
        /// <remarks>从队列获取数据,1秒上传一次,上传失败,则保存至数据库</remarks>
        private void UploadEquipmentDataToCloudByMemoryQueue(int millisecondsTimeout = 1000)
        {
            Task.Run(() =>
            {
                while (true)
                {
                    try
                    {
                        Thread.Sleep(millisecondsTimeout);
                        if (_centerService.EquipmentDataQueues.IsEmpty)
                        {
                            //数据为空
                            continue;
                        }

                        var temps = new List<EquipmentDataQueue>();
                        var commitFailureRecord = new List<EquipmentDataQueue>();
                        for (int i = 0; i < _commitCount; i++)
                        {
                            var result = _centerService.EquipmentDataQueues.TryDequeue(out var item);
                            if (result && item != null)
                            {
                                temps.Add(item);
                            }
                        }

                        //自动上传启用,且数据库无未上传的数据,则直接推送
                        if (AutoCommit && !_context.EquipmentDataQueue.Where(x => true).Any())
                        {
                            Stopwatch stopwatch = Stopwatch.StartNew();
                            foreach (var item in temps.GroupBy(x => x.EquipmentCode))
                            {
                                stopwatch.Restart();
                                var records = item.ToList();
                                var result = _httpService.SendEquipmentData(records);
                                stopwatch.Stop();
                                if (!result.Success)
                                {
                                    commitFailureRecord.AddRange(records);
                                    _log.LogError($"推送设备[{item.Key}]数据失败,{result.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms");
                                    continue;
                                }
                                _log.LogSuccess($"成功推送{records.Count}条设备[{item.Key}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                            }
                        }
                        //自动上传关闭或数据库存在未上传的记录,则直接存入数据库
                        else
                        {
                            commitFailureRecord.AddRange(temps);
                        }

                        //将上传失败的数据存入数据库
                        if (commitFailureRecord.Count > 0)
                        {
                            _context.EquipmentDataQueue.AddRange(commitFailureRecord);
                            _context.SaveChanges();
                            _log.LogInfo($"新增{commitFailureRecord.Count}条数据记录");
                        }
                    }
                    catch (Exception ex)
                    {
                        _log.LogError($"[LocalQueue]数据上传线程异常:{ex.Message}");
                    }
                }
            });
        }

        /// <summary>
        /// 上传数据至IOTCloud
        /// </summary>
        /// <remarks>从数据库获取数据,默认5秒上传一次</remarks>
        private void UploadEquipmentDataToCloudByDatabase(int millisecondsTimeout = 5000)
        {
            Task.Run(() =>
            {
                while (true)
                {
                    try
                    {
                        if (!AutoCommit)
                        {
                            //自动上传关闭,则不上传数据
                            Thread.Sleep(millisecondsTimeout);
                            continue;
                        }
                        var equipmentCodes = _context.EquipmentDataQueue.Where(x => true).Distinct().ToList(x => x.EquipmentCode);
                        var queues = new List<EquipmentDataQueue>();
                        Stopwatch stopwatch = Stopwatch.StartNew();
                        foreach (var equipmentCode in equipmentCodes)
                        {
                            stopwatch.Restart();
                            var temps = _context.EquipmentDataQueue.Where(x => x.EquipmentCode.Equals(equipmentCode) && !x.IsCommit).OrderByDescending(x => x.Created).OrderBy(x => x.SourceTimestamp).Take(_commitCount).ToList();
                            var result = _httpService.SendEquipmentData(temps);
                            stopwatch.Stop();
                            if (!result.Success)
                            {
                                _log.LogError($"推送设备[{equipmentCode}]数据失败,{result.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms");
                                continue;
                            }
                            queues.AddRange(temps);
                            _log.LogSuccess($"成功推送{temps.Count}条设备[{equipmentCode}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                        }
                        if (queues.Count > 0)
                        {
                            _context.EquipmentDataQueue.RemoveRange(queues);
                            _context.SaveChanges();
                        }
                    }
                    catch (Exception ex)
                    {
                        _log.LogError($"[DataBase]数据上传线程异常:{ex.Message}");
                    }
                    Thread.Sleep(millisecondsTimeout);
                }
            });
        }
    }
}