OrderedDomainEventDispatcher.cs
11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Rcs.Application;
using Rcs.Domain;
using Rcs.Domain.Entities;
using Rcs.Infrastructure.MessageBus.Handlers.Events;
using Rcs.Domain.Entities.DomainEvents;
using System.Reflection;
namespace Rcs.Infrastructure;
/// <summary>
/// 有序的领域事件分发器实现(优化版本)
/// </summary>
public class OrderedDomainEventDispatcher : IDomainEventDispatcher
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OrderedDomainEventDispatcher> _logger;
// 缓存事件处理器类型,避免反射带来的性能开销
private static readonly Dictionary<Type, List<Type>> _eventHandlersCache = new();
public OrderedDomainEventDispatcher(IServiceProvider serviceProvider, ILogger<OrderedDomainEventDispatcher> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
// 初始化事件处理器缓存
InitializeEventHandlersCache();
}
/// <summary>
/// 分发并处理实体的所有领域事件(顺序执行)
/// </summary>
public async Task DispatchEventsAsync(Entity entity, CancellationToken cancellationToken = default)
{
if (entity?.DomainEvents == null || !entity.DomainEvents.Any())
{
return;
}
// 创建事件列表的副本,避免在迭代过程中修改
var domainEvents = entity.DomainEvents.ToList();
_logger.LogInformation("开始处理实体 {EntityType} 的 {EventCount} 个领域事件",
entity.GetType().Name, domainEvents.Count);
// 简化:直接按事件添加顺序处理(时间戳排序在AppDbContext中完成)
foreach (var domainEvent in domainEvents)
{
await DispatchAsync(domainEvent, cancellationToken);
}
// 清除已处理的事件
entity.ClearDomainEvents();
_logger.LogInformation("成功处理实体 {EntityType} 的所有领域事件", entity.GetType().Name);
}
/// <summary>
/// 分发单个领域事件(处理器串行执行)
/// </summary>
public async Task DispatchAsync<T>(T domainEvent, CancellationToken cancellationToken = default) where T : IDomainEvent
{
var eventType = domainEvent.GetType();
_logger.LogDebug("正在查找事件类型 {EventType} 的处理器,缓存中共有 {CacheCount} 种事件类型",
eventType.Name, _eventHandlersCache.Count);
// 输出缓存中的所有事件类型用于调试
if (_eventHandlersCache.Count == 0)
{
_logger.LogWarning("事件处理器缓存为空,可能初始化失败");
}
else
{
_logger.LogDebug("缓存中的事件类型: {EventTypes}", string.Join(", ", _eventHandlersCache.Keys.Select(k => k.Name)));
}
if (!_eventHandlersCache.TryGetValue(eventType, out var handlerTypes) || !handlerTypes.Any())
{
_logger.LogWarning("未找到领域事件 {EventType} 的处理器", eventType.Name);
return;
}
_logger.LogInformation("开始处理领域事件 {EventType},找到 {HandlerCount} 个处理器", eventType.Name, handlerTypes.Count);
// 🔧 修改:串行执行处理器,避免并行交叉问题
foreach (var handlerType in handlerTypes)
{
await HandleEventAsync(handlerType, domainEvent, cancellationToken);
}
_logger.LogInformation("成功处理领域事件 {EventType}", eventType.Name);
}
/// <summary>
/// 分发单个领域事件(非泛型版本)
/// </summary>
public async Task DispatchAsync(IDomainEvent domainEvent, CancellationToken cancellationToken = default)
{
if (domainEvent == null)
{
_logger.LogWarning("尝试处理空的领域事件");
return;
}
var eventType = domainEvent.GetType();
_logger.LogDebug("正在查找事件类型 {EventType} 的处理器,缓存中共有 {CacheCount} 种事件类型",
eventType.Name, _eventHandlersCache.Count);
// 输出缓存中的所有事件类型用于调试
if (_eventHandlersCache.Count == 0)
{
_logger.LogWarning("事件处理器缓存为空,可能初始化失败");
}
else
{
_logger.LogDebug("缓存中的事件类型: {EventTypes}", string.Join(", ", _eventHandlersCache.Keys.Select(k => k.Name)));
}
if (!_eventHandlersCache.TryGetValue(eventType, out var handlerTypes) || !handlerTypes.Any())
{
_logger.LogWarning("未找到领域事件 {EventType} 的处理器", eventType.Name);
return;
}
_logger.LogInformation("开始处理领域事件 {EventType},找到 {HandlerCount} 个处理器", eventType.Name, handlerTypes.Count);
// 🔧 修改:串行执行处理器,避免并行交叉问题
foreach (var handlerType in handlerTypes)
{
await HandleEventAsync(handlerType, domainEvent);
}
_logger.LogInformation("成功处理领域事件 {EventType}", eventType.Name);
}
/// <summary>
/// 处理单个事件(串行)
/// </summary>
private async Task HandleEventAsync<T>(Type handlerType, T domainEvent, CancellationToken cancellationToken) where T : IDomainEvent
{
try
{
// 创建服务范围以解析作用域服务
using var scope = _serviceProvider.CreateScope();
// 从服务容器中获取处理器实例
var handler = scope.ServiceProvider.GetService(handlerType);
if (handler == null)
{
_logger.LogWarning("无法解析事件处理器类型 {HandlerType}", handlerType.Name);
return;
}
// 调用处理器的 Handle 方法
var eventType = domainEvent.GetType();
var handleMethod = handlerType.GetMethod("Handle", new[] { eventType });
if (handleMethod == null)
{
_logger.LogError("事件处理器 {HandlerType} 未找到处理 {EventType} 的 Handle 方法", handlerType.Name, eventType.Name);
return;
}
_logger.LogDebug("使用处理器 {HandlerType} 处理事件 {EventType}", handlerType.Name, eventType.Name);
var task = (Task)handleMethod.Invoke(handler, new object[] { domainEvent });
if (task != null)
{
await task;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "事件处理器 {HandlerType} 处理事件 {EventType} 时发生错误",
handlerType.Name, domainEvent.GetType().Name);
throw;
}
}
/// <summary>
/// 处理单个事件(串行)- 非泛型版本
/// </summary>
private async Task HandleEventAsync(Type handlerType, IDomainEvent domainEvent)
{
try
{
// 创建服务范围以解析作用域服务
using var scope = _serviceProvider.CreateScope();
// 从服务容器中获取处理器实例
var handler = scope.ServiceProvider.GetService(handlerType);
if (handler == null)
{
_logger.LogWarning("无法解析事件处理器类型 {HandlerType}", handlerType.Name);
return;
}
var eventType = domainEvent.GetType();
// 调用处理器的 Handle 方法
var handleMethod = handlerType.GetMethod("Handle", new[] { eventType });
if (handleMethod == null)
{
_logger.LogError("事件处理器 {HandlerType} 未找到处理 {EventType} 的 Handle 方法", handlerType.Name, eventType.Name);
return;
}
_logger.LogDebug("使用处理器 {HandlerType} 处理事件 {EventType}", handlerType.Name, eventType.Name);
var task = (Task)handleMethod.Invoke(handler, new object[] { domainEvent });
if (task != null)
{
await task;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "事件处理器 {HandlerType} 处理事件 {EventType} 时发生错误",
handlerType.Name, domainEvent.GetType().Name);
throw;
}
}
/// <summary>
/// 初始化事件处理器缓存
/// </summary>
private static void InitializeEventHandlersCache()
{
// 只扫描特定程序集,避免不必要的扫描提高性能
// 严格排除 MassTransit Command/Query Handlers
var assembliesToScan = new[]
{
typeof(RobotCreatedDomainEventHandler).Assembly, // Infrastructure层
typeof(RobotCreatedDomainEvent).Assembly // Domain层
};
foreach (var assembly in assembliesToScan)
{
try
{
var types = assembly.GetTypes()
.Where(t => t.IsClass && !t.IsAbstract && !t.IsInterface)
.ToList();
foreach (var type in types)
{
// 查找 Handle 方法
var handleMethods = type.GetMethods()
.Where(m => m.Name == "Handle" && m.GetParameters().Length == 1)
.ToList();
foreach (var method in handleMethods)
{
var parameterType = method.GetParameters()[0].ParameterType;
// 严格过滤:只缓存实现了 IDomainEvent 接口的事件处理器
// 这会自动排除所有 MassTransit 的 Command/Query/Event Handlers
// 因为它们处理的类型不实现 IDomainEvent 接口
if (typeof(IDomainEvent).IsAssignableFrom(parameterType))
{
if (!_eventHandlersCache.ContainsKey(parameterType))
{
_eventHandlersCache[parameterType] = new List<Type>();
}
if (!_eventHandlersCache[parameterType].Contains(type))
{
_eventHandlersCache[parameterType].Add(type);
// 静态方法中无法使用Logger,使用Console输出调试信息
Console.WriteLine($"[DomainEventCache] 注册处理器: {type.Name} -> {parameterType.Name}");
}
}
}
}
}
catch (ReflectionTypeLoadException ex)
{
// 忽略无法加载的程序集
Console.WriteLine($"[DomainEventCache] 程序集加载失败: {assembly.FullName}, 错误: {ex.Message}");
}
}
Console.WriteLine($"[DomainEventCache] 初始化完成,共注册了 {_eventHandlersCache.Count} 种事件类型的处理器");
}
}