README.md
MassTransit 过滤器使用指南
概述
本项目参考 HaH.RCS 实现了两个 MassTransit 过滤器:
- LoggingFilter - 日志过滤器,记录消息处理的详细信息
- RedisFilter - Redis 统计过滤器,记录消息处理统计到 Redis
文件结构
Filters/
└── MassTransit/
├── LoggingFilter.cs # 日志过滤器实现
└── RedisFilter.cs # Redis 统计过滤器实现
两种消息传递方式
MassTransit 支持两种消息传递方式,过滤器对两者都有效:
1. Mediator(进程内消息传递)
- 类似于 MediatR
- 用于应用内的 CQRS 模式
- Command/Query 在同一进程内处理
- 适合:单体应用、进程内通信
2. RabbitMQ(进程间消息传递)
- 外部消息队列
- 用于分布式系统、微服务间通信
- 消息持久化、可靠传递
- 适合:分布式系统、异步处理
配置方法(参考 HaH.RCS)
配置 Mediator(进程内)
builder.Services.AddMediator(cfg =>
{
// 自动注册所有 Handler
AddMediatorConsumersFromAssembly(cfg);
cfg.ConfigureMediator((context, mediatorCfg) =>
{
// 配置过滤器(注意:过滤器注册顺序很重要)
mediatorCfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
mediatorCfg.UseConsumeFilter(typeof(RedisFilter<>), context);
});
});
配置 RabbitMQ(进程间)
builder.Services.AddMassTransit(x =>
{
// 注册 RabbitMQ 消费者
x.AddConsumer<SomeIntegrationEventHandler>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(...);
// 为 RabbitMQ 配置过滤器
cfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
cfg.UseConsumeFilter(typeof(RedisFilter<>), context);
cfg.ConfigureEndpoints(context);
});
});
完整配置示例
public static void InstallMessageBus(this WebApplicationBuilder builder)
{
// 1. Mediator(进程内 CQRS)
builder.Services.AddMediator(cfg =>
{
AddMediatorConsumersFromAssembly(cfg);
cfg.ConfigureMediator((context, mediatorCfg) =>
{
mediatorCfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
mediatorCfg.UseConsumeFilter(typeof(RedisFilter<>), context);
});
});
// 2. RabbitMQ(进程间消息队列)
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<IntegrationEventHandler>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
cfg.UseConsumeFilter(typeof(RedisFilter<>), context);
cfg.ConfigureEndpoints(context);
});
});
}
过滤器说明
LoggingFilter(日志过滤器)
记录所有消息处理的详细信息:
- 消息开始处理时间
- 消息完成处理时间和耗时
- 消息内容(Debug 级别)
- 异常信息
日志示例:
[Information] 开始处理消息 - 类型: CreateRobotCommand, 消息ID: abc-123
[Information] 完成处理消息 - 类型: CreateRobotCommand, 耗时: 150ms, 状态: 成功
RedisFilter(Redis 统计过滤器)
自动记录消息统计到 Redis:
- 每日消息总数
- 按消息类型统计
- 成功/失败次数
- 处理耗时记录
Redis 键格式:
message:stats:{日期}:total # 总消息数
message:stats:{日期}:type:{消息类型} # 按类型统计
message:stats:{日期}:success # 成功数
message:stats:{日期}:failed # 失败数
查询示例:
redis-cli GET "message:stats:2024-01-01:total"
redis-cli GET "message:stats:2024-01-01:type:CreateRobotCommand"
使用场景
Mediator 使用场景
// 在 Controller 或 Service 中使用 Mediator 发送命令
public class RobotController : ControllerBase
{
private readonly IMediator _mediator;
public async Task<IActionResult> CreateRobot(CreateRobotCommand command)
{
// 进程内处理,过滤器自动应用
await _mediator.Send(command);
return Ok();
}
}
RabbitMQ 使用场景
// 发布到 RabbitMQ,其他服务订阅处理
public class RobotService
{
private readonly IPublishEndpoint _publishEndpoint;
public async Task NotifyRobotCreated(RobotCreatedEvent evt)
{
// 发布到 RabbitMQ,分布式处理
await _publishEndpoint.Publish(evt);
}
}
过滤器执行顺序
过滤器按注册顺序执行:
- LoggingFilter - 记录开始日志
- RedisFilter - 记录统计
- 消息处理器 - 实际业务处理
- RedisFilter - 更新统计(成功/失败)
- LoggingFilter - 记录完成日志
日志配置
在 appsettings.json 中配置:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Rcs.Infrastructure.Filters.MassTransit.LoggingFilter": "Information",
"Rcs.Infrastructure.Filters.MassTransit.RedisFilter": "Information"
}
}
}
依赖项
-
LoggingFilter:
Microsoft.Extensions.Logging -
RedisFilter:
Microsoft.Extensions.Logging,StackExchange.Redis
特性
- 全局应用 - 所有消息自动应用过滤器
- 双模式支持 - 同时支持 Mediator 和 RabbitMQ
- 自动化 - 无需为每个消费者单独配置
- 可靠性 - Redis 故障不影响消息处理
- 性能优化 - 异步处理,不阻塞主流程
Mediator vs RabbitMQ 选择
| 特性 | Mediator | RabbitMQ |
|---|---|---|
| 通信方式 | 进程内 | 进程间 |
| 性能 | 快速 | 相对较慢 |
| 可靠性 | 依赖进程 | 消息持久化 |
| 使用场景 | CQRS、单体应用 | 微服务、分布式系统 |
| 配置复杂度 | 简单 | 需要 RabbitMQ 服务器 |
自定义过滤器
要创建自定义过滤器:
public class CustomFilter<T> : IFilter<ConsumeContext<T>> where T : class
{
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
// 前置处理
await next.Send(context);
// 后置处理
}
public void Probe(ProbeContext context)
{
context.CreateFilterScope("customFilter");
}
}
然后注册:
// Mediator
mediatorCfg.UseConsumeFilter(typeof(CustomFilter<>), context);
// RabbitMQ
cfg.UseConsumeFilter(typeof(CustomFilter<>), context);
故障排查
问题:日志未输出
- 检查日志级别配置
- 确认
InstallFilters()已调用
问题:Mediator 消息未处理
- 确认 Handler 已注册到 Mediator
- 检查
AddMediatorConsumersFromAssembly是否正确
问题:RabbitMQ 消息未处理
- 检查 RabbitMQ 连接
- 确认消费者已用
x.AddConsumer<>()注册 - 检查队列绑定配置
问题:Redis 统计未记录
- 检查 Redis 连接
- 查看日志警告信息
- Redis 故障不影响消息处理