MessageBusInstaller.cs 5.97 KB
using MassTransit;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Rcs.Application.MessageBus;
using Rcs.Domain.Settings;
using Rcs.Infrastructure.Filters.MassTransit;
using Rcs.Infrastructure.MessageBus;
using Rcs.Infrastructure.MessageBus.Handlers;
using Rcs.Infrastructure.MessageBus.Handlers.Commands;
using Rcs.Infrastructure.MessageBus.Handlers.Events;

namespace Rcs.Infrastructure.Installs
{
    /// <summary>
    /// 消息总线安装器
    /// </summary>
    public static class MessageBusInstaller
    {
        /// <summary>
        /// 安装 MassTransit 和 RabbitMQ 消息总线
        /// </summary>
        public static void InstallMessageBus(this WebApplicationBuilder builder)
        {
            // 读取 RabbitMQ 配置
            var rabbitMqSettings = builder.Configuration.GetSection(nameof(AppSettings)).Get<AppSettings>()!.RabbitMq;

            // 注册消息发布者
            builder.Services.AddScoped<IMessagePublisher, MessagePublisher>();

            // 配置 Mediator(进程内消息传递,用于 CQRS)
            builder.Services.AddMediator(cfg =>
            {
                // 从程序集中添加所有消费者(Command/Query Handlers)
                AddMediatorConsumersFromAssembly(cfg);

                cfg.ConfigureMediator((context, mediatorCfg) =>
                {
                    // 配置过滤器(注意:过滤器注册顺序很重要)
                    mediatorCfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
                    mediatorCfg.UseConsumeFilter(typeof(RedisFilter<>), context);

                });
            });

            // 配置 MassTransit(RabbitMQ - 进程间消息传递)
            builder.Services.AddMassTransit(x =>
            {
                AddRabbitMQConsumersFromAssembly(x);
                // 配置 RabbitMQ
                x.UsingRabbitMq((context, cfg) =>
                {
                    // 构建 RabbitMQ 连接字符串
                    var hostAddress = rabbitMqSettings.Port == 5672
                        ? rabbitMqSettings.Host
                        : $"{rabbitMqSettings.Host}:{rabbitMqSettings.Port}";

                    cfg.Host(hostAddress, rabbitMqSettings.VirtualHost, h =>
                    {
                        h.Username(rabbitMqSettings.Username);
                        h.Password(rabbitMqSettings.Password);

                        if (rabbitMqSettings.UseSSL)
                        {
                            h.UseSsl(s =>
                            {
                                s.Protocol = System.Security.Authentication.SslProtocols.Tls12;
                            });
                        }

                        h.Heartbeat(TimeSpan.FromSeconds(rabbitMqSettings.Heartbeat));
                    });

                    // 配置全局过滤器(按照 HaH.RCS 的方式)
                    // 注意:过滤器注册顺序很重要
                    cfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
                    cfg.UseConsumeFilter(typeof(RedisFilter<>), context);

                    // 配置消息重试策略
                    cfg.UseMessageRetry(r =>
                    {
                        r.Interval(rabbitMqSettings.RetryLimit, TimeSpan.FromSeconds(rabbitMqSettings.RetryInterval));
                    });

                    // 配置预取数量
                    cfg.PrefetchCount = rabbitMqSettings.PrefetchCount;

                    // 配置并发消息限制
                    cfg.ConcurrentMessageLimit = rabbitMqSettings.ConcurrentMessageLimit;

                    // 配置消息命名格式
                    cfg.MessageTopology.SetEntityNameFormatter(new KebabCaseEntityNameFormatter());

                    // 配置任务命令接收端点
                    cfg.ReceiveEndpoint("task-commands-queue", e =>
                    {
                        
                    });

                    // 配置机器人命令接收端点
                    cfg.ReceiveEndpoint("robot-commands-queue", e =>
                    {
                        // CreateRobotCommandHandler 现在仅通过 Mediator 处理,不再使用 RabbitMQ
                        e.ConfigureConsumer<DeleteRobotCommandHandler>(context);
                        e.ConfigureConsumer<GetRobotQueryHandler>(context);
                        e.ConfigureConsumer<GetRobotsQueryHandler>(context);
                    });

                    // 配置所有端点
                    cfg.ConfigureEndpoints(context);
                });
            });
        }

        /// <summary>
        /// 从程序集中添加 Mediator 消费者
        /// </summary>
        private static void AddMediatorConsumersFromAssembly(IMediatorRegistrationConfigurator cfg)
        {
            // 从 Infrastructure 层添加所有消费者(Command/Query Handlers)
            cfg.AddConsumers(typeof(GetMapQueryHandler).Assembly);
        }
        /// <summary>
        /// 从程序集中添加 RabbitMQ 消费者
        /// </summary>
        private static void AddRabbitMQConsumersFromAssembly(IBusRegistrationConfigurator cfg)
        {
            // 从 Infrastructure 层添加所有消费者(Command/Query Handlers)
            cfg.AddConsumers(typeof(RobotStatusChangedDomainEventHandler).Assembly);
        }

        /// <summary>
        /// Kebab-case 实体名称格式化器
        /// </summary>
        private class KebabCaseEntityNameFormatter : IEntityNameFormatter
        {
            public string FormatEntityName<T>()
            {
                return ToKebabCase(typeof(T).Name);
            }

            private static string ToKebabCase(string value)
            {
                if (string.IsNullOrEmpty(value))
                    return value;

                return string.Concat(
                    value.Select((x, i) => i > 0 && char.IsUpper(x) ? "-" + x : x.ToString())
                ).ToLower();
            }
        }
    }
}