README.md
MassTransit 消息总线使用指南
概述
本项目集成了 MassTransit + RabbitMQ 实现分布式消息总线,用于系统内部的异步通信和事件驱动架构。
配置
appsettings.json 配置
{
"AppSettings": {
"RabbitMq": {
"Host": "localhost",
"Port": 5672,
"VirtualHost": "/",
"Username": "guest",
"Password": "guest",
"UseSSL": false,
"Heartbeat": 60,
"PrefetchCount": 16,
"ConcurrentMessageLimit": 32,
"RetryLimit": 3,
"RetryInterval": 5
}
}
}
Docker Compose
RabbitMQ 服务已在 docker-compose.yml 中配置:
docker-compose up -d rabbitmq
访问管理界面:http://localhost:15672 (默认账号:guest/guest)
消息类型
事件 (Events)
事件表示已经发生的事情,用于通知其他服务。
-
RobotStatusChangedEvent- 机器人状态变更 -
TaskCreatedEvent- 任务创建 -
TaskCompletedEvent- 任务完成
命令 (Commands)
命令表示请求执行某个操作。
-
CreateTaskCommand- 创建任务 -
CancelTaskCommand- 取消任务
使用示例
1. 发布消息(在 Service 或 Controller 中)
using Rcs.Application.MessageBus;
using Rcs.Domain.Models.MessageBus.Events;
public class RobotService
{
private readonly IMessagePublisher _messagePublisher;
public RobotService(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
public async Task UpdateRobotStatusAsync(Robot robot)
{
// 更新数据库...
// 发布状态变更事件
var statusEvent = new RobotStatusChangedEvent
{
RobotId = robot.Id,
RobotName = robot.Name,
Status = robot.Status,
BatteryLevel = robot.BatteryLevel,
PositionX = robot.PositionX,
PositionY = robot.PositionY,
Theta = robot.Theta,
MapId = robot.MapId
};
await _messagePublisher.PublishAsync(statusEvent);
}
public async Task BatchPublishStatusesAsync(List<Robot> robots)
{
var events = robots.Select(r => new RobotStatusChangedEvent
{
RobotId = r.Id,
RobotName = r.Name,
Status = r.Status,
BatteryLevel = r.BatteryLevel,
PositionX = r.PositionX,
PositionY = r.PositionY,
Theta = r.Theta,
MapId = r.MapId
});
await _messagePublisher.PublishBatchAsync(events);
}
}
2. 在 Controller 中使用
[ApiController]
[Route("api/[controller]")]
public class TaskController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public TaskController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpPost]
public async Task<IActionResult> CreateTask([FromBody] CreateTaskRequest request)
{
// 发布创建任务命令
var command = new CreateTaskCommand
{
TaskTemplateId = request.TemplateId,
RobotId = request.RobotId,
Parameters = request.Parameters,
Priority = request.Priority,
CreatedBy = User.Identity?.Name ?? "System"
};
await _messagePublisher.PublishAsync(command);
return Accepted(new { command.MessageId });
}
[HttpDelete("{taskId}")]
public async Task<IActionResult> CancelTask(Guid taskId, [FromBody] CancelTaskRequest request)
{
var command = new CancelTaskCommand
{
TaskId = taskId,
Reason = request.Reason,
CancelledBy = User.Identity?.Name ?? "System"
};
await _messagePublisher.PublishAsync(command);
return Accepted();
}
}
3. 创建自定义消费者
在 Rcs.Infrastructure/MessageBus/Consumers/ 目录下创建新的消费者:
using MassTransit;
using Microsoft.Extensions.Logging;
using Rcs.Domain.Models.MessageBus.Events;
namespace Rcs.Infrastructure.MessageBus.Consumers;
public class CustomEventConsumer : IConsumer<CustomEvent>
{
private readonly ILogger<CustomEventConsumer> _logger;
private readonly IYourService _yourService;
public CustomEventConsumer(
ILogger<CustomEventConsumer> logger,
IYourService yourService)
{
_logger = logger;
_yourService = yourService;
}
public async Task Consume(ConsumeContext<CustomEvent> context)
{
var message = context.Message;
_logger.LogInformation("Received CustomEvent: {EventData}", message);
try
{
// 处理业务逻辑
await _yourService.HandleEventAsync(message);
_logger.LogInformation("CustomEvent processed successfully");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing CustomEvent");
throw; // 抛出异常会触发重试机制
}
}
}
4. 注册自定义消费者
在 MassTransitExtensions.cs 中注册:
services.AddMassTransit(x =>
{
// 注册消费者
x.AddConsumer<CustomEventConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
// ... 其他配置 ...
// 配置接收端点
cfg.ReceiveEndpoint("custom-queue", e =>
{
e.ConfigureConsumer<CustomEventConsumer>(context);
});
});
});
消息队列说明
robot-status-queue
- 消费者:
RobotStatusChangedConsumer - 用途:处理机器人状态变更事件
task-events-queue
- 消费者:
TaskCreatedConsumer,TaskCompletedConsumer - 用途:处理任务相关事件
task-commands-queue
- 消费者:
CreateTaskCommandConsumer,CancelTaskCommandConsumer - 用途:处理任务相关命令
高级功能
消息重试
配置了自动重试机制,失败的消息会按配置的间隔重试:
- 重试次数:3 次(可在配置中调整)
- 重试间隔:5 秒(可在配置中调整)
并发控制
-
PrefetchCount: 16 - 每次从队列预取的消息数量 -
ConcurrentMessageLimit: 32 - 同时处理的最大消息数量
消息序列化
默认使用 JSON 序列化,自动处理消息的序列化和反序列化。
死信队列
失败的消息在重试次数用尽后会自动进入死信队列(*_error),可在 RabbitMQ 管理界面查看。
监控和调试
RabbitMQ 管理界面
访问 http://localhost:15672 查看:
- 队列状态和消息数量
- 消费者连接状态
- 消息流量统计
- 死信队列中的失败消息
日志
所有消息的发布和消费都会记录日志,可通过配置日志级别来控制详细程度:
{
"Logging": {
"LogLevel": {
"MassTransit": "Information",
"Default": "Information"
}
}
}
最佳实践
-
事件命名:使用过去式,如
TaskCreatedEvent、RobotStatusChangedEvent -
命令命名:使用祈使句,如
CreateTaskCommand、CancelTaskCommand - 幂等性:消费者应该是幂等的,能够处理重复消息
- 错误处理:在消费者中捕获并记录异常,决定是否重试
- 消息大小:避免发送过大的消息,考虑使用引用而不是完整数据
- 版本控制:为消息添加版本字段,便于未来升级
故障排查
消息未被消费
- 检查 RabbitMQ 服务是否运行
- 检查队列是否创建
- 检查消费者是否注册
- 查看应用日志
消息重复消费
- 确保消费者实现是幂等的
- 检查是否有多个实例运行
- 查看 RabbitMQ 管理界面的消费者列表
性能问题
- 调整
PrefetchCount和ConcurrentMessageLimit - 增加消费者实例数量
- 优化消费者处理逻辑
- 使用消息批处理