Name Last Update
..
MassTransit Loading commit data...
README.md Loading commit data...

README.md

MassTransit 过滤器使用指南

概述

本项目参考 HaH.RCS 实现了两个 MassTransit 过滤器:

  1. LoggingFilter - 日志过滤器,记录消息处理的详细信息
  2. RedisFilter - Redis 统计过滤器,记录消息处理统计到 Redis

文件结构

Filters/
└── MassTransit/
    ├── LoggingFilter.cs    # 日志过滤器实现
    └── RedisFilter.cs      # Redis 统计过滤器实现

两种消息传递方式

MassTransit 支持两种消息传递方式,过滤器对两者都有效:

1. Mediator(进程内消息传递)

  • 类似于 MediatR
  • 用于应用内的 CQRS 模式
  • Command/Query 在同一进程内处理
  • 适合:单体应用、进程内通信

2. RabbitMQ(进程间消息传递)

  • 外部消息队列
  • 用于分布式系统、微服务间通信
  • 消息持久化、可靠传递
  • 适合:分布式系统、异步处理

配置方法(参考 HaH.RCS)

配置 Mediator(进程内)

builder.Services.AddMediator(cfg =>
{
    // 自动注册所有 Handler
    AddMediatorConsumersFromAssembly(cfg);

    cfg.ConfigureMediator((context, mediatorCfg) =>
    {
        // 配置过滤器(注意:过滤器注册顺序很重要)
        mediatorCfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
        mediatorCfg.UseConsumeFilter(typeof(RedisFilter<>), context);
    });
});

配置 RabbitMQ(进程间)

builder.Services.AddMassTransit(x =>
{
    // 注册 RabbitMQ 消费者
    x.AddConsumer<SomeIntegrationEventHandler>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(...);

        // 为 RabbitMQ 配置过滤器
        cfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
        cfg.UseConsumeFilter(typeof(RedisFilter<>), context);

        cfg.ConfigureEndpoints(context);
    });
});

完整配置示例

public static void InstallMessageBus(this WebApplicationBuilder builder)
{
    // 1. Mediator(进程内 CQRS)
    builder.Services.AddMediator(cfg =>
    {
        AddMediatorConsumersFromAssembly(cfg);

        cfg.ConfigureMediator((context, mediatorCfg) =>
        {
            mediatorCfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
            mediatorCfg.UseConsumeFilter(typeof(RedisFilter<>), context);
        });
    });

    // 2. RabbitMQ(进程间消息队列)
    builder.Services.AddMassTransit(x =>
    {
        x.AddConsumer<IntegrationEventHandler>();

        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("rabbitmq://localhost");

            cfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
            cfg.UseConsumeFilter(typeof(RedisFilter<>), context);

            cfg.ConfigureEndpoints(context);
        });
    });
}

过滤器说明

LoggingFilter(日志过滤器)

记录所有消息处理的详细信息:

  • 消息开始处理时间
  • 消息完成处理时间和耗时
  • 消息内容(Debug 级别)
  • 异常信息

日志示例:

[Information] 开始处理消息 - 类型: CreateRobotCommand, 消息ID: abc-123
[Information] 完成处理消息 - 类型: CreateRobotCommand, 耗时: 150ms, 状态: 成功

RedisFilter(Redis 统计过滤器)

自动记录消息统计到 Redis:

  • 每日消息总数
  • 按消息类型统计
  • 成功/失败次数
  • 处理耗时记录

Redis 键格式:

message:stats:{日期}:total                    # 总消息数
message:stats:{日期}:type:{消息类型}          # 按类型统计
message:stats:{日期}:success                  # 成功数
message:stats:{日期}:failed                   # 失败数

查询示例:

redis-cli GET "message:stats:2024-01-01:total"
redis-cli GET "message:stats:2024-01-01:type:CreateRobotCommand"

使用场景

Mediator 使用场景

// 在 Controller 或 Service 中使用 Mediator 发送命令
public class RobotController : ControllerBase
{
    private readonly IMediator _mediator;

    public async Task<IActionResult> CreateRobot(CreateRobotCommand command)
    {
        // 进程内处理,过滤器自动应用
        await _mediator.Send(command);
        return Ok();
    }
}

RabbitMQ 使用场景

// 发布到 RabbitMQ,其他服务订阅处理
public class RobotService
{
    private readonly IPublishEndpoint _publishEndpoint;

    public async Task NotifyRobotCreated(RobotCreatedEvent evt)
    {
        // 发布到 RabbitMQ,分布式处理
        await _publishEndpoint.Publish(evt);
    }
}

过滤器执行顺序

过滤器按注册顺序执行:

  1. LoggingFilter - 记录开始日志
  2. RedisFilter - 记录统计
  3. 消息处理器 - 实际业务处理
  4. RedisFilter - 更新统计(成功/失败)
  5. LoggingFilter - 记录完成日志

日志配置

appsettings.json 中配置:

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Rcs.Infrastructure.Filters.MassTransit.LoggingFilter": "Information",
      "Rcs.Infrastructure.Filters.MassTransit.RedisFilter": "Information"
    }
  }
}

依赖项

  • LoggingFilter: Microsoft.Extensions.Logging
  • RedisFilter: Microsoft.Extensions.Logging, StackExchange.Redis

特性

  1. 全局应用 - 所有消息自动应用过滤器
  2. 双模式支持 - 同时支持 Mediator 和 RabbitMQ
  3. 自动化 - 无需为每个消费者单独配置
  4. 可靠性 - Redis 故障不影响消息处理
  5. 性能优化 - 异步处理,不阻塞主流程

Mediator vs RabbitMQ 选择

特性 Mediator RabbitMQ
通信方式 进程内 进程间
性能 快速 相对较慢
可靠性 依赖进程 消息持久化
使用场景 CQRS、单体应用 微服务、分布式系统
配置复杂度 简单 需要 RabbitMQ 服务器

自定义过滤器

要创建自定义过滤器:

public class CustomFilter<T> : IFilter<ConsumeContext<T>> where T : class
{
    public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
    {
        // 前置处理
        await next.Send(context);
        // 后置处理
    }

    public void Probe(ProbeContext context)
    {
        context.CreateFilterScope("customFilter");
    }
}

然后注册:

// Mediator
mediatorCfg.UseConsumeFilter(typeof(CustomFilter<>), context);

// RabbitMQ
cfg.UseConsumeFilter(typeof(CustomFilter<>), context);

故障排查

问题:日志未输出

  • 检查日志级别配置
  • 确认 InstallFilters() 已调用

问题:Mediator 消息未处理

  • 确认 Handler 已注册到 Mediator
  • 检查 AddMediatorConsumersFromAssembly 是否正确

问题:RabbitMQ 消息未处理

  • 检查 RabbitMQ 连接
  • 确认消费者已用 x.AddConsumer<>() 注册
  • 检查队列绑定配置

问题:Redis 统计未记录

  • 检查 Redis 连接
  • 查看日志警告信息
  • Redis 故障不影响消息处理