Name Last Update
..
Handlers Loading commit data...
MessagePublisher.cs Loading commit data...
README.md Loading commit data...

README.md

MassTransit 消息总线使用指南

概述

本项目集成了 MassTransit + RabbitMQ 实现分布式消息总线,用于系统内部的异步通信和事件驱动架构。

配置

appsettings.json 配置

{
  "AppSettings": {
    "RabbitMq": {
      "Host": "localhost",
      "Port": 5672,
      "VirtualHost": "/",
      "Username": "guest",
      "Password": "guest",
      "UseSSL": false,
      "Heartbeat": 60,
      "PrefetchCount": 16,
      "ConcurrentMessageLimit": 32,
      "RetryLimit": 3,
      "RetryInterval": 5
    }
  }
}

Docker Compose

RabbitMQ 服务已在 docker-compose.yml 中配置:

docker-compose up -d rabbitmq

访问管理界面:http://localhost:15672 (默认账号:guest/guest)

消息类型

事件 (Events)

事件表示已经发生的事情,用于通知其他服务。

  • RobotStatusChangedEvent - 机器人状态变更
  • TaskCreatedEvent - 任务创建
  • TaskCompletedEvent - 任务完成

命令 (Commands)

命令表示请求执行某个操作。

  • CreateTaskCommand - 创建任务
  • CancelTaskCommand - 取消任务

使用示例

1. 发布消息(在 Service 或 Controller 中)

using Rcs.Application.MessageBus;
using Rcs.Domain.Models.MessageBus.Events;

public class RobotService
{
    private readonly IMessagePublisher _messagePublisher;

    public RobotService(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    public async Task UpdateRobotStatusAsync(Robot robot)
    {
        // 更新数据库...

        // 发布状态变更事件
        var statusEvent = new RobotStatusChangedEvent
        {
            RobotId = robot.Id,
            RobotName = robot.Name,
            Status = robot.Status,
            BatteryLevel = robot.BatteryLevel,
            PositionX = robot.PositionX,
            PositionY = robot.PositionY,
            Theta = robot.Theta,
            MapId = robot.MapId
        };

        await _messagePublisher.PublishAsync(statusEvent);
    }

    public async Task BatchPublishStatusesAsync(List<Robot> robots)
    {
        var events = robots.Select(r => new RobotStatusChangedEvent
        {
            RobotId = r.Id,
            RobotName = r.Name,
            Status = r.Status,
            BatteryLevel = r.BatteryLevel,
            PositionX = r.PositionX,
            PositionY = r.PositionY,
            Theta = r.Theta,
            MapId = r.MapId
        });

        await _messagePublisher.PublishBatchAsync(events);
    }
}

2. 在 Controller 中使用

[ApiController]
[Route("api/[controller]")]
public class TaskController : ControllerBase
{
    private readonly IMessagePublisher _messagePublisher;

    public TaskController(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    [HttpPost]
    public async Task<IActionResult> CreateTask([FromBody] CreateTaskRequest request)
    {
        // 发布创建任务命令
        var command = new CreateTaskCommand
        {
            TaskTemplateId = request.TemplateId,
            RobotId = request.RobotId,
            Parameters = request.Parameters,
            Priority = request.Priority,
            CreatedBy = User.Identity?.Name ?? "System"
        };

        await _messagePublisher.PublishAsync(command);

        return Accepted(new { command.MessageId });
    }

    [HttpDelete("{taskId}")]
    public async Task<IActionResult> CancelTask(Guid taskId, [FromBody] CancelTaskRequest request)
    {
        var command = new CancelTaskCommand
        {
            TaskId = taskId,
            Reason = request.Reason,
            CancelledBy = User.Identity?.Name ?? "System"
        };

        await _messagePublisher.PublishAsync(command);

        return Accepted();
    }
}

3. 创建自定义消费者

Rcs.Infrastructure/MessageBus/Consumers/ 目录下创建新的消费者:

using MassTransit;
using Microsoft.Extensions.Logging;
using Rcs.Domain.Models.MessageBus.Events;

namespace Rcs.Infrastructure.MessageBus.Consumers;

public class CustomEventConsumer : IConsumer<CustomEvent>
{
    private readonly ILogger<CustomEventConsumer> _logger;
    private readonly IYourService _yourService;

    public CustomEventConsumer(
        ILogger<CustomEventConsumer> logger,
        IYourService yourService)
    {
        _logger = logger;
        _yourService = yourService;
    }

    public async Task Consume(ConsumeContext<CustomEvent> context)
    {
        var message = context.Message;

        _logger.LogInformation("Received CustomEvent: {EventData}", message);

        try
        {
            // 处理业务逻辑
            await _yourService.HandleEventAsync(message);

            _logger.LogInformation("CustomEvent processed successfully");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing CustomEvent");
            throw; // 抛出异常会触发重试机制
        }
    }
}

4. 注册自定义消费者

MassTransitExtensions.cs 中注册:

services.AddMassTransit(x =>
{
    // 注册消费者
    x.AddConsumer<CustomEventConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        // ... 其他配置 ...

        // 配置接收端点
        cfg.ReceiveEndpoint("custom-queue", e =>
        {
            e.ConfigureConsumer<CustomEventConsumer>(context);
        });
    });
});

消息队列说明

robot-status-queue

  • 消费者:RobotStatusChangedConsumer
  • 用途:处理机器人状态变更事件

task-events-queue

  • 消费者:TaskCreatedConsumer, TaskCompletedConsumer
  • 用途:处理任务相关事件

task-commands-queue

  • 消费者:CreateTaskCommandConsumer, CancelTaskCommandConsumer
  • 用途:处理任务相关命令

高级功能

消息重试

配置了自动重试机制,失败的消息会按配置的间隔重试:

  • 重试次数:3 次(可在配置中调整)
  • 重试间隔:5 秒(可在配置中调整)

并发控制

  • PrefetchCount: 16 - 每次从队列预取的消息数量
  • ConcurrentMessageLimit: 32 - 同时处理的最大消息数量

消息序列化

默认使用 JSON 序列化,自动处理消息的序列化和反序列化。

死信队列

失败的消息在重试次数用尽后会自动进入死信队列(*_error),可在 RabbitMQ 管理界面查看。

监控和调试

RabbitMQ 管理界面

访问 http://localhost:15672 查看:

  • 队列状态和消息数量
  • 消费者连接状态
  • 消息流量统计
  • 死信队列中的失败消息

日志

所有消息的发布和消费都会记录日志,可通过配置日志级别来控制详细程度:

{
  "Logging": {
    "LogLevel": {
      "MassTransit": "Information",
      "Default": "Information"
    }
  }
}

最佳实践

  1. 事件命名:使用过去式,如 TaskCreatedEventRobotStatusChangedEvent
  2. 命令命名:使用祈使句,如 CreateTaskCommandCancelTaskCommand
  3. 幂等性:消费者应该是幂等的,能够处理重复消息
  4. 错误处理:在消费者中捕获并记录异常,决定是否重试
  5. 消息大小:避免发送过大的消息,考虑使用引用而不是完整数据
  6. 版本控制:为消息添加版本字段,便于未来升级

故障排查

消息未被消费

  1. 检查 RabbitMQ 服务是否运行
  2. 检查队列是否创建
  3. 检查消费者是否注册
  4. 查看应用日志

消息重复消费

  1. 确保消费者实现是幂等的
  2. 检查是否有多个实例运行
  3. 查看 RabbitMQ 管理界面的消费者列表

性能问题

  1. 调整 PrefetchCountConcurrentMessageLimit
  2. 增加消费者实例数量
  3. 优化消费者处理逻辑
  4. 使用消息批处理

参考资料