MessageBusInstaller.cs
5.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
using MassTransit;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Rcs.Application.MessageBus;
using Rcs.Domain.Settings;
using Rcs.Infrastructure.Filters.MassTransit;
using Rcs.Infrastructure.MessageBus;
using Rcs.Infrastructure.MessageBus.Handlers;
using Rcs.Infrastructure.MessageBus.Handlers.Commands;
using Rcs.Infrastructure.MessageBus.Handlers.Events;
namespace Rcs.Infrastructure.Installs
{
/// <summary>
/// 消息总线安装器
/// </summary>
public static class MessageBusInstaller
{
/// <summary>
/// 安装 MassTransit 和 RabbitMQ 消息总线
/// </summary>
public static void InstallMessageBus(this WebApplicationBuilder builder)
{
// 读取 RabbitMQ 配置
var rabbitMqSettings = builder.Configuration.GetSection(nameof(AppSettings)).Get<AppSettings>()!.RabbitMq;
// 注册消息发布者
builder.Services.AddScoped<IMessagePublisher, MessagePublisher>();
// 配置 Mediator(进程内消息传递,用于 CQRS)
builder.Services.AddMediator(cfg =>
{
// 从程序集中添加所有消费者(Command/Query Handlers)
AddMediatorConsumersFromAssembly(cfg);
cfg.ConfigureMediator((context, mediatorCfg) =>
{
// 配置过滤器(注意:过滤器注册顺序很重要)
mediatorCfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
mediatorCfg.UseConsumeFilter(typeof(RedisFilter<>), context);
});
});
// 配置 MassTransit(RabbitMQ - 进程间消息传递)
builder.Services.AddMassTransit(x =>
{
AddRabbitMQConsumersFromAssembly(x);
// 配置 RabbitMQ
x.UsingRabbitMq((context, cfg) =>
{
// 构建 RabbitMQ 连接字符串
var hostAddress = rabbitMqSettings.Port == 5672
? rabbitMqSettings.Host
: $"{rabbitMqSettings.Host}:{rabbitMqSettings.Port}";
cfg.Host(hostAddress, rabbitMqSettings.VirtualHost, h =>
{
h.Username(rabbitMqSettings.Username);
h.Password(rabbitMqSettings.Password);
if (rabbitMqSettings.UseSSL)
{
h.UseSsl(s =>
{
s.Protocol = System.Security.Authentication.SslProtocols.Tls12;
});
}
h.Heartbeat(TimeSpan.FromSeconds(rabbitMqSettings.Heartbeat));
});
// 配置全局过滤器(按照 HaH.RCS 的方式)
// 注意:过滤器注册顺序很重要
cfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
cfg.UseConsumeFilter(typeof(RedisFilter<>), context);
// 配置消息重试策略
cfg.UseMessageRetry(r =>
{
r.Interval(rabbitMqSettings.RetryLimit, TimeSpan.FromSeconds(rabbitMqSettings.RetryInterval));
});
// 配置预取数量
cfg.PrefetchCount = rabbitMqSettings.PrefetchCount;
// 配置并发消息限制
cfg.ConcurrentMessageLimit = rabbitMqSettings.ConcurrentMessageLimit;
// 配置消息命名格式
cfg.MessageTopology.SetEntityNameFormatter(new KebabCaseEntityNameFormatter());
// 配置任务命令接收端点
cfg.ReceiveEndpoint("task-commands-queue", e =>
{
});
// 配置机器人命令接收端点
cfg.ReceiveEndpoint("robot-commands-queue", e =>
{
// CreateRobotCommandHandler 现在仅通过 Mediator 处理,不再使用 RabbitMQ
e.ConfigureConsumer<DeleteRobotCommandHandler>(context);
e.ConfigureConsumer<GetRobotQueryHandler>(context);
e.ConfigureConsumer<GetRobotsQueryHandler>(context);
});
// 配置所有端点
cfg.ConfigureEndpoints(context);
});
});
}
/// <summary>
/// 从程序集中添加 Mediator 消费者
/// </summary>
private static void AddMediatorConsumersFromAssembly(IMediatorRegistrationConfigurator cfg)
{
// 从 Infrastructure 层添加所有消费者(Command/Query Handlers)
cfg.AddConsumers(typeof(GetMapQueryHandler).Assembly);
}
/// <summary>
/// 从程序集中添加 RabbitMQ 消费者
/// </summary>
private static void AddRabbitMQConsumersFromAssembly(IBusRegistrationConfigurator cfg)
{
// 从 Infrastructure 层添加所有消费者(Command/Query Handlers)
cfg.AddConsumers(typeof(RobotStatusChangedDomainEventHandler).Assembly);
}
/// <summary>
/// Kebab-case 实体名称格式化器
/// </summary>
private class KebabCaseEntityNameFormatter : IEntityNameFormatter
{
public string FormatEntityName<T>()
{
return ToKebabCase(typeof(T).Name);
}
private static string ToKebabCase(string value)
{
if (string.IsNullOrEmpty(value))
return value;
return string.Concat(
value.Select((x, i) => i > 0 && char.IsUpper(x) ? "-" + x : x.ToString())
).ToLower();
}
}
}
}