MessagePublisher.cs 2.12 KB
using MassTransit;
using Microsoft.Extensions.Logging;
using Rcs.Application.MessageBus;

namespace Rcs.Infrastructure.MessageBus;

/// <summary>
/// 消息发布者实现
/// </summary>
public class MessagePublisher : IMessagePublisher
{
    private readonly IPublishEndpoint _publishEndpoint;
    private readonly ILogger<MessagePublisher> _logger;

    public MessagePublisher(IPublishEndpoint publishEndpoint, ILogger<MessagePublisher> logger)
    {
        _publishEndpoint = publishEndpoint;
        _logger = logger;
    }

    /// <summary>
    /// 发布消息
    /// </summary>
    public async Task PublishAsync<T>(T message, CancellationToken cancellationToken = default) where T : class, IMessage
    {
        try
        {
            _logger.LogInformation("正在发布消息: {MessageType}, 消息ID: {MessageId}", 
                message.MessageType, message.MessageId);

            await _publishEndpoint.Publish(message, cancellationToken);

            _logger.LogInformation("消息发布成功: {MessageType}, 消息ID: {MessageId}", 
                message.MessageType, message.MessageId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "消息发布失败: {MessageType}, 消息ID: {MessageId}", 
                message.MessageType, message.MessageId);
            throw;
        }
    }

    /// <summary>
    /// 批量发布消息
    /// </summary>
    public async Task PublishBatchAsync<T>(IEnumerable<T> messages, CancellationToken cancellationToken = default) where T : class, IMessage
    {
        var messageList = messages.ToList();
        
        try
        {
            _logger.LogInformation("正在批量发布消息: 数量 {Count}, 类型: {MessageType}", 
                messageList.Count, typeof(T).Name);

            await _publishEndpoint.PublishBatch(messageList, cancellationToken);

            _logger.LogInformation("批量消息发布成功: {Count} 条消息", messageList.Count);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "批量消息发布失败: {Count} 条消息", messageList.Count);
            throw;
        }
    }
}