GenericSyncBackgroundService.cs
3.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Rcs.Application.Services.Sync;
namespace Rcs.Infrastructure.Services.Sync;
/// <summary>
/// 通用定时同步后台服务
/// @author zzy
/// </summary>
public class GenericSyncBackgroundService : BackgroundService
{
private readonly ILogger<GenericSyncBackgroundService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentDictionary<string, DateTime> _lastSyncTimes = new();
private readonly ConcurrentDictionary<string, bool> _syncInProgress = new();
private readonly SemaphoreSlim _semaphore = new(3); // 最大并发同步数
private const int CheckIntervalSeconds = 10; // 检查间隔
public GenericSyncBackgroundService(
ILogger<GenericSyncBackgroundService> logger,
IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("[定时同步服务] 启动");
// 等待应用完全启动
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessSyncTasksAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "[定时同步服务] 处理同步任务时发生错误");
}
await Task.Delay(TimeSpan.FromSeconds(CheckIntervalSeconds), stoppingToken);
}
_logger.LogInformation("[定时同步服务] 停止");
}
private async Task ProcessSyncTasksAsync(CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var providers = scope.ServiceProvider.GetServices<ISyncProvider>();
var tasks = new List<Task>();
foreach (var provider in providers)
{
try
{
var syncTasks = await provider.GetSyncTasksAsync(cancellationToken);
foreach (var syncTask in syncTasks.Where(t => t.AutoSync))
{
if (ShouldSync(syncTask))
{
tasks.Add(ExecuteSyncWithSemaphoreAsync(provider, syncTask, cancellationToken));
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[定时同步服务] 获取同步任务失败: Provider={Provider}", provider.TaskType);
}
}
if (tasks.Count > 0)
{
await Task.WhenAll(tasks);
}
}
private bool ShouldSync(ISyncTask task)
{
// 检查是否正在同步
if (_syncInProgress.TryGetValue(task.TaskId, out var inProgress) && inProgress)
return false;
// 检查是否到达同步时间
if (_lastSyncTimes.TryGetValue(task.TaskId, out var lastSync))
{
var elapsed = DateTime.Now - lastSync;
return elapsed.TotalSeconds >= task.SyncIntervalSeconds;
}
// 首次同步
return true;
}
private async Task ExecuteSyncWithSemaphoreAsync(ISyncProvider provider, ISyncTask task, CancellationToken cancellationToken)
{
// 标记正在同步
_syncInProgress[task.TaskId] = true;
await _semaphore.WaitAsync(cancellationToken);
try
{
await provider.ExecuteSyncAsync(task, cancellationToken);
_lastSyncTimes[task.TaskId] = DateTime.Now;
}
catch (Exception ex)
{
_logger.LogError(ex, "[定时同步服务] 执行同步失败: TaskId={TaskId}, TaskType={TaskType}", task.TaskId, task.TaskType);
}
finally
{
_semaphore.Release();
_syncInProgress[task.TaskId] = false;
}
}
}