GenericSyncBackgroundService.cs 3.99 KB
using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Rcs.Application.Services.Sync;

namespace Rcs.Infrastructure.Services.Sync;

/// <summary>
/// 通用定时同步后台服务
/// @author zzy
/// </summary>
public class GenericSyncBackgroundService : BackgroundService
{
    private readonly ILogger<GenericSyncBackgroundService> _logger;
    private readonly IServiceProvider _serviceProvider;
    private readonly ConcurrentDictionary<string, DateTime> _lastSyncTimes = new();
    private readonly ConcurrentDictionary<string, bool> _syncInProgress = new();
    private readonly SemaphoreSlim _semaphore = new(3); // 最大并发同步数

    private const int CheckIntervalSeconds = 10; // 检查间隔

    public GenericSyncBackgroundService(
        ILogger<GenericSyncBackgroundService> logger,
        IServiceProvider serviceProvider)
    {
        _logger = logger;
        _serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("[定时同步服务] 启动");

        // 等待应用完全启动
        await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await ProcessSyncTasksAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "[定时同步服务] 处理同步任务时发生错误");
            }

            await Task.Delay(TimeSpan.FromSeconds(CheckIntervalSeconds), stoppingToken);
        }

        _logger.LogInformation("[定时同步服务] 停止");
    }

    private async Task ProcessSyncTasksAsync(CancellationToken cancellationToken)
    {
        using var scope = _serviceProvider.CreateScope();
        var providers = scope.ServiceProvider.GetServices<ISyncProvider>();

        var tasks = new List<Task>();

        foreach (var provider in providers)
        {
            try
            {
                var syncTasks = await provider.GetSyncTasksAsync(cancellationToken);

                foreach (var syncTask in syncTasks.Where(t => t.AutoSync))
                {
                    if (ShouldSync(syncTask))
                    {
                        tasks.Add(ExecuteSyncWithSemaphoreAsync(provider, syncTask, cancellationToken));
                    }
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "[定时同步服务] 获取同步任务失败: Provider={Provider}", provider.TaskType);
            }
        }

        if (tasks.Count > 0)
        {
            await Task.WhenAll(tasks);
        }
    }

    private bool ShouldSync(ISyncTask task)
    {
        // 检查是否正在同步
        if (_syncInProgress.TryGetValue(task.TaskId, out var inProgress) && inProgress)
            return false;

        // 检查是否到达同步时间
        if (_lastSyncTimes.TryGetValue(task.TaskId, out var lastSync))
        {
            var elapsed = DateTime.Now - lastSync;
            return elapsed.TotalSeconds >= task.SyncIntervalSeconds;
        }

        // 首次同步
        return true;
    }

    private async Task ExecuteSyncWithSemaphoreAsync(ISyncProvider provider, ISyncTask task, CancellationToken cancellationToken)
    {
        // 标记正在同步
        _syncInProgress[task.TaskId] = true;

        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            await provider.ExecuteSyncAsync(task, cancellationToken);
            _lastSyncTimes[task.TaskId] = DateTime.Now;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "[定时同步服务] 执行同步失败: TaskId={TaskId}, TaskType={TaskType}", task.TaskId, task.TaskType);
        }
        finally
        {
            _semaphore.Release();
            _syncInProgress[task.TaskId] = false;
        }
    }
}