OrderedDomainEventDispatcher.cs 11 KB
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Rcs.Application;
using Rcs.Domain;
using Rcs.Domain.Entities;
using Rcs.Infrastructure.MessageBus.Handlers.Events;
using Rcs.Domain.Entities.DomainEvents;
using System.Reflection;

namespace Rcs.Infrastructure;

/// <summary>
/// 有序的领域事件分发器实现(优化版本)
/// </summary>
public class OrderedDomainEventDispatcher : IDomainEventDispatcher
{
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<OrderedDomainEventDispatcher> _logger;

    // 缓存事件处理器类型,避免反射带来的性能开销
    private static readonly Dictionary<Type, List<Type>> _eventHandlersCache = new();

    public OrderedDomainEventDispatcher(IServiceProvider serviceProvider, ILogger<OrderedDomainEventDispatcher> logger)
    {
        _serviceProvider = serviceProvider;
        _logger = logger;

        // 初始化事件处理器缓存
        InitializeEventHandlersCache();
    }

    /// <summary>
    /// 分发并处理实体的所有领域事件(顺序执行)
    /// </summary>
    public async Task DispatchEventsAsync(Entity entity, CancellationToken cancellationToken = default)
    {
        if (entity?.DomainEvents == null || !entity.DomainEvents.Any())
        {
            return;
        }

        // 创建事件列表的副本,避免在迭代过程中修改
        var domainEvents = entity.DomainEvents.ToList();

        _logger.LogInformation("开始处理实体 {EntityType} 的 {EventCount} 个领域事件",
            entity.GetType().Name, domainEvents.Count);

        // 简化:直接按事件添加顺序处理(时间戳排序在AppDbContext中完成)
        foreach (var domainEvent in domainEvents)
        {
            await DispatchAsync(domainEvent, cancellationToken);
        }

        // 清除已处理的事件
        entity.ClearDomainEvents();

        _logger.LogInformation("成功处理实体 {EntityType} 的所有领域事件", entity.GetType().Name);
    }

    /// <summary>
    /// 分发单个领域事件(处理器串行执行)
    /// </summary>
    public async Task DispatchAsync<T>(T domainEvent, CancellationToken cancellationToken = default) where T : IDomainEvent
    {
        var eventType = domainEvent.GetType();

        _logger.LogDebug("正在查找事件类型 {EventType} 的处理器,缓存中共有 {CacheCount} 种事件类型",
            eventType.Name, _eventHandlersCache.Count);

        // 输出缓存中的所有事件类型用于调试
        if (_eventHandlersCache.Count == 0)
        {
            _logger.LogWarning("事件处理器缓存为空,可能初始化失败");
        }
        else
        {
            _logger.LogDebug("缓存中的事件类型: {EventTypes}", string.Join(", ", _eventHandlersCache.Keys.Select(k => k.Name)));
        }

        if (!_eventHandlersCache.TryGetValue(eventType, out var handlerTypes) || !handlerTypes.Any())
        {
            _logger.LogWarning("未找到领域事件 {EventType} 的处理器", eventType.Name);

            return;
        }

        _logger.LogInformation("开始处理领域事件 {EventType},找到 {HandlerCount} 个处理器", eventType.Name, handlerTypes.Count);

        // 🔧 修改:串行执行处理器,避免并行交叉问题
        foreach (var handlerType in handlerTypes)
        {
            await HandleEventAsync(handlerType, domainEvent, cancellationToken);
        }

        _logger.LogInformation("成功处理领域事件 {EventType}", eventType.Name);
    }

    /// <summary>
    /// 分发单个领域事件(非泛型版本)
    /// </summary>
    public async Task DispatchAsync(IDomainEvent domainEvent, CancellationToken cancellationToken = default)
    {
        if (domainEvent == null)
        {
            _logger.LogWarning("尝试处理空的领域事件");
            return;
        }

        var eventType = domainEvent.GetType();

        _logger.LogDebug("正在查找事件类型 {EventType} 的处理器,缓存中共有 {CacheCount} 种事件类型",
            eventType.Name, _eventHandlersCache.Count);

        // 输出缓存中的所有事件类型用于调试
        if (_eventHandlersCache.Count == 0)
        {
            _logger.LogWarning("事件处理器缓存为空,可能初始化失败");
        }
        else
        {
            _logger.LogDebug("缓存中的事件类型: {EventTypes}", string.Join(", ", _eventHandlersCache.Keys.Select(k => k.Name)));
        }

        if (!_eventHandlersCache.TryGetValue(eventType, out var handlerTypes) || !handlerTypes.Any())
        {
            _logger.LogWarning("未找到领域事件 {EventType} 的处理器", eventType.Name);
            return;
        }

        _logger.LogInformation("开始处理领域事件 {EventType},找到 {HandlerCount} 个处理器", eventType.Name, handlerTypes.Count);

        // 🔧 修改:串行执行处理器,避免并行交叉问题
        foreach (var handlerType in handlerTypes)
        {
            await HandleEventAsync(handlerType, domainEvent);
        }

        _logger.LogInformation("成功处理领域事件 {EventType}", eventType.Name);
    }

    /// <summary>
    /// 处理单个事件(串行)
    /// </summary>
    private async Task HandleEventAsync<T>(Type handlerType, T domainEvent, CancellationToken cancellationToken) where T : IDomainEvent
    {
        try
        {
            // 创建服务范围以解析作用域服务
            using var scope = _serviceProvider.CreateScope();

            // 从服务容器中获取处理器实例
            var handler = scope.ServiceProvider.GetService(handlerType);

            if (handler == null)
            {
                _logger.LogWarning("无法解析事件处理器类型 {HandlerType}", handlerType.Name);
                return;
            }

            // 调用处理器的 Handle 方法
            var eventType = domainEvent.GetType();
            var handleMethod = handlerType.GetMethod("Handle", new[] { eventType });

            if (handleMethod == null)
            {
                _logger.LogError("事件处理器 {HandlerType} 未找到处理 {EventType} 的 Handle 方法", handlerType.Name, eventType.Name);
                return;
            }

            _logger.LogDebug("使用处理器 {HandlerType} 处理事件 {EventType}", handlerType.Name, eventType.Name);

            var task = (Task)handleMethod.Invoke(handler, new object[] { domainEvent });

            if (task != null)
            {
                await task;
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "事件处理器 {HandlerType} 处理事件 {EventType} 时发生错误",
                handlerType.Name, domainEvent.GetType().Name);
            throw;
        }
    }

    /// <summary>
    /// 处理单个事件(串行)- 非泛型版本
    /// </summary>
    private async Task HandleEventAsync(Type handlerType, IDomainEvent domainEvent)
    {
        try
        {
            // 创建服务范围以解析作用域服务
            using var scope = _serviceProvider.CreateScope();

            // 从服务容器中获取处理器实例
            var handler = scope.ServiceProvider.GetService(handlerType);

            if (handler == null)
            {
                _logger.LogWarning("无法解析事件处理器类型 {HandlerType}", handlerType.Name);
                return;
            }

            var eventType = domainEvent.GetType();

            // 调用处理器的 Handle 方法
            var handleMethod = handlerType.GetMethod("Handle", new[] { eventType });

            if (handleMethod == null)
            {
                _logger.LogError("事件处理器 {HandlerType} 未找到处理 {EventType} 的 Handle 方法", handlerType.Name, eventType.Name);
                return;
            }

            _logger.LogDebug("使用处理器 {HandlerType} 处理事件 {EventType}", handlerType.Name, eventType.Name);

            var task = (Task)handleMethod.Invoke(handler, new object[] { domainEvent });

            if (task != null)
            {
                await task;
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "事件处理器 {HandlerType} 处理事件 {EventType} 时发生错误",
                handlerType.Name, domainEvent.GetType().Name);
            throw;
        }
    }


    /// <summary>
    /// 初始化事件处理器缓存
    /// </summary>
    private static void InitializeEventHandlersCache()
    {
        // 只扫描特定程序集,避免不必要的扫描提高性能
        // 严格排除 MassTransit Command/Query Handlers
        var assembliesToScan = new[]
        {
            typeof(RobotCreatedDomainEventHandler).Assembly, // Infrastructure层
            typeof(RobotCreatedDomainEvent).Assembly        // Domain层
        };

        foreach (var assembly in assembliesToScan)
        {
            try
            {
                var types = assembly.GetTypes()
                    .Where(t => t.IsClass && !t.IsAbstract && !t.IsInterface)
                    .ToList();

                foreach (var type in types)
                {
                    // 查找 Handle 方法
                    var handleMethods = type.GetMethods()
                        .Where(m => m.Name == "Handle" && m.GetParameters().Length == 1)
                        .ToList();

                    foreach (var method in handleMethods)
                    {
                        var parameterType = method.GetParameters()[0].ParameterType;

                        // 严格过滤:只缓存实现了 IDomainEvent 接口的事件处理器
                        // 这会自动排除所有 MassTransit 的 Command/Query/Event Handlers
                        // 因为它们处理的类型不实现 IDomainEvent 接口
                        if (typeof(IDomainEvent).IsAssignableFrom(parameterType))
                        {
                            if (!_eventHandlersCache.ContainsKey(parameterType))
                            {
                                _eventHandlersCache[parameterType] = new List<Type>();
                            }

                            if (!_eventHandlersCache[parameterType].Contains(type))
                            {
                                _eventHandlersCache[parameterType].Add(type);

                                // 静态方法中无法使用Logger,使用Console输出调试信息
                                Console.WriteLine($"[DomainEventCache] 注册处理器: {type.Name} -> {parameterType.Name}");
                            }
                        }
                    }
                }
            }
            catch (ReflectionTypeLoadException ex)
            {
                // 忽略无法加载的程序集
                Console.WriteLine($"[DomainEventCache] 程序集加载失败: {assembly.FullName}, 错误: {ex.Message}");
            }
        }

        Console.WriteLine($"[DomainEventCache] 初始化完成,共注册了 {_eventHandlersCache.Count} 种事件类型的处理器");
    }
}