Skip to content

Extensibility

A. Shafie edited this page Apr 18, 2025 · 2 revisions

Extensibility

This section covers how to extend LiteBus with custom functionality.

Creating Custom Modules

LiteBus's modular architecture allows you to create custom modules beyond the built-in command, query, and event modules.

Module Components

A custom module typically consists of:

  1. Abstractions - Interfaces that define the module's contracts
  2. Implementations - Classes that implement the module's functionality
  3. DI Extensions - Extension methods for registering the module with dependency injection

Implementing a Custom Module

To create a custom module, implement the following key interfaces:

1. Define Module Interfaces

// Marker interface for registrable types
public interface IRegistrableNotificationConstruct { }

// Main notification interface
public interface INotification : IRegistrableNotificationConstruct { }

// Handler interface
public interface INotificationHandler<in TNotification> : IRegistrableNotificationConstruct, 
                                                          IAsyncMessageHandler<TNotification>
    where TNotification : INotification { }

// Mediator interface
public interface INotificationMediator : IRegistrableNotificationConstruct
{
    Task PublishNotificationAsync(INotification notification, 
                                 CancellationToken cancellationToken = default);
}

2. Implement the Module

// Main implementation class
public class NotificationMediator : INotificationMediator
{
    private readonly IMessageMediator _messageMediator;
    
    public NotificationMediator(IMessageMediator messageMediator)
    {
        _messageMediator = messageMediator;
    }
    
    public Task PublishNotificationAsync(INotification notification, 
                                       CancellationToken cancellationToken = default)
    {
        var strategy = new SingleAsyncHandlerMediationStrategy<INotification>();
        var resolver = new ActualTypeOrFirstAssignableTypeMessageResolveStrategy();
        
        var options = new MediateOptions<INotification, Task>
        {
            MessageMediationStrategy = strategy,
            MessageResolveStrategy = resolver,
            CancellationToken = cancellationToken,
            Tags = Array.Empty<string>()
        };
        
        return _messageMediator.Mediate(notification, options);
    }
}

3. Create Integration with Dependency Injection

// Module class implementing IModule
internal class NotificationModule : IModule
{
    private readonly Action<NotificationModuleBuilder> _builder;
    
    public NotificationModule(Action<NotificationModuleBuilder> builder)
    {
        _builder = builder;
    }
    
    public void Build(IModuleConfiguration configuration)
    {
        _builder(new NotificationModuleBuilder(configuration.MessageRegistry));
        configuration.Services.TryAddTransient<INotificationMediator, NotificationMediator>();
    }
}

// Module builder for registration
public class NotificationModuleBuilder
{
    private readonly IMessageRegistry _messageRegistry;
    
    public NotificationModuleBuilder(IMessageRegistry messageRegistry)
    {
        _messageRegistry = messageRegistry;
    }
    
    public NotificationModuleBuilder Register<T>() where T : IRegistrableNotificationConstruct
    {
        _messageRegistry.Register(typeof(T));
        return this;
    }
    
    public NotificationModuleBuilder RegisterFromAssembly(Assembly assembly)
    {
        foreach (var type in assembly.GetTypes().Where(t => 
                 t.IsAssignableTo(typeof(IRegistrableNotificationConstruct))))
        {
            _messageRegistry.Register(type);
        }
        return this;
    }
}

// Extension method for module registration
public static class ModuleRegistryExtensions
{
    public static IModuleRegistry AddNotificationModule(
        this IModuleRegistry moduleRegistry,
        Action<NotificationModuleBuilder> builder)
    {
        moduleRegistry.Register(new NotificationModule(builder));
        return moduleRegistry;
    }
}

4. Usage Example

// Register the module
services.AddLiteBus(builder =>
{
    builder.AddNotificationModule(module =>
    {
        module.RegisterFromAssembly(typeof(Program).Assembly);
    });
});

// Use the module
public class OrderController : ControllerBase
{
    private readonly INotificationMediator _notificationMediator;
    
    public OrderController(INotificationMediator notificationMediator)
    {
        _notificationMediator = notificationMediator;
    }
    
    [HttpPost]
    public async Task<IActionResult> CreateOrder(CreateOrderRequest request)
    {
        // Process order...
        
        // Send notification
        await _notificationMediator.PublishNotificationAsync(new OrderCreatedNotification
        {
            OrderId = orderId,
            CustomerId = request.CustomerId
        });
        
        return Ok();
    }
}

Custom Mediation Strategies

LiteBus allows you to create custom mediation strategies to control how messages are processed.

Implementing a Custom Mediation Strategy

Create a custom mediation strategy by implementing the IMessageMediationStrategy<TMessage, TMessageResult> interface:

public class ParallelEventMediationStrategy<TMessage> : IMessageMediationStrategy<TMessage, Task>
    where TMessage : notnull
{
    private readonly int _maxDegreeOfParallelism;
    
    public ParallelEventMediationStrategy(int maxDegreeOfParallelism = 4)
    {
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
    }
    
    public async Task Mediate(
        TMessage message, 
        IMessageDependencies messageDependencies, 
        IExecutionContext executionContext)
    {
        try
        {
            // Execute pre-handlers sequentially
            await messageDependencies.RunAsyncPreHandlers(message);
            
            // Execute main handlers in parallel with limited concurrency
            var handlers = messageDependencies.Handlers.ToList();
            if (handlers.Count == 0)
            {
                return;
            }
            
            var options = new ParallelOptions
            {
                MaxDegreeOfParallelism = _maxDegreeOfParallelism,
                CancellationToken = executionContext.CancellationToken
            };
            
            await Parallel.ForEachAsync(handlers, options, async (handler, token) =>
            {
                var task = (Task)handler.Handler.Value.Handle(message);
                await task;
            });
            
            // Execute post-handlers sequentially
            await messageDependencies.RunAsyncPostHandlers(message, null);
        }
        catch (Exception e) when (e is not LiteBusExecutionAbortedException)
        {
            await messageDependencies.RunAsyncErrorHandlers(
                message, 
                null, 
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(e));
        }
    }
}

Using a Custom Mediation Strategy

public class CustomEventMediator : IEventMediator
{
    private readonly IMessageMediator _messageMediator;
    
    public CustomEventMediator(IMessageMediator messageMediator)
    {
        _messageMediator = messageMediator;
    }
    
    public Task PublishAsync(
        IEvent @event, 
        EventMediationSettings? eventMediationSettings = null, 
        CancellationToken cancellationToken = default)
    {
        eventMediationSettings ??= new EventMediationSettings();
        
        // Use custom parallel strategy
        var mediationStrategy = new ParallelEventMediationStrategy<IEvent>(maxDegreeOfParallelism: 8);
        var resolveStrategy = new ActualTypeOrFirstAssignableTypeMessageResolveStrategy();
        
        return _messageMediator.Mediate(@event, new MediateOptions<IEvent, Task>
        {
            MessageMediationStrategy = mediationStrategy,
            MessageResolveStrategy = resolveStrategy,
            CancellationToken = cancellationToken,
            Tags = eventMediationSettings.Filters.Tags,
            RegisterPlainMessagesOnSpot = !eventMediationSettings.ThrowIfNoHandlerFound
        });
    }
    
    // Implement the generic method similarly
    public Task PublishAsync<TEvent>(
        TEvent @event, 
        EventMediationSettings? eventMediationSettings = null, 
        CancellationToken cancellationToken = default) where TEvent : notnull
    {
        eventMediationSettings ??= new EventMediationSettings();
        
        var mediationStrategy = new ParallelEventMediationStrategy<TEvent>(maxDegreeOfParallelism: 8);
        var resolveStrategy = new ActualTypeOrFirstAssignableTypeMessageResolveStrategy();
        
        return _messageMediator.Mediate(@event, new MediateOptions<TEvent, Task>
        {
            MessageMediationStrategy = mediationStrategy,
            MessageResolveStrategy = resolveStrategy,
            CancellationToken = cancellationToken,
            Tags = eventMediationSettings.Filters.Tags,
            RegisterPlainMessagesOnSpot = !eventMediationSettings.ThrowIfNoHandlerFound
        });
    }
}

Register your custom mediator:

services.AddLiteBus(builder =>
{
    builder.AddEventModule(module =>
    {
        module.RegisterFromAssembly(typeof(Program).Assembly);
    });
});

// Replace the default event mediator with custom implementation
services.AddTransient<IEventMediator, CustomEventMediator>();

Custom Message Resolution

LiteBus allows customizing how message types are resolved to their handlers.

Implementing a Custom Message Resolve Strategy

Create a custom resolve strategy by implementing the IMessageResolveStrategy interface:

public class PrioritizedMessageResolveStrategy : IMessageResolveStrategy
{
    public IMessageDescriptor? Find(Type messageType, IMessageRegistry messageRegistry)
    {
        // First try to find an exact match
        var descriptor = messageRegistry.SingleOrDefault(d => d.MessageType == messageType);
        if (descriptor != null)
        {
            return descriptor;
        }
        
        // If no exact match, look for a descriptor with a base type
        // Sort by inheritance depth to find the closest match
        var matchingDescriptors = messageRegistry
            .Where(d => d.MessageType.IsAssignableFrom(messageType))
            .Select(d => new
            {
                Descriptor = d,
                InheritanceDepth = CalculateInheritanceDepth(messageType, d.MessageType)
            })
            .OrderBy(x => x.InheritanceDepth)
            .ToList();
        
        return matchingDescriptors.FirstOrDefault()?.Descriptor;
    }
    
    private int CalculateInheritanceDepth(Type derivedType, Type baseType)
    {
        int depth = 0;
        Type currentType = derivedType;
        
        while (currentType != null && currentType != baseType)
        {
            depth++;
            currentType = currentType.BaseType;
        }
        
        return depth;
    }
}

Using a Custom Message Resolve Strategy

public class CustomCommandMediator : ICommandMediator
{
    private readonly IMessageMediator _messageMediator;
    
    public CustomCommandMediator(IMessageMediator messageMediator)
    {
        _messageMediator = messageMediator;
    }
    
    public Task SendAsync(
        ICommand command, 
        CommandMediationSettings? commandMediationSettings = null, 
        CancellationToken cancellationToken = default)
    {
        commandMediationSettings ??= new CommandMediationSettings();
        
        var mediationStrategy = new SingleAsyncHandlerMediationStrategy<ICommand>();
        
        // Use custom resolve strategy
        var resolveStrategy = new PrioritizedMessageResolveStrategy();
        
        var options = new MediateOptions<ICommand, Task>
        {
            MessageMediationStrategy = mediationStrategy,
            MessageResolveStrategy = resolveStrategy,
            CancellationToken = cancellationToken,
            Tags = commandMediationSettings.Filters.Tags
        };
        
        return _messageMediator.Mediate(command, options);
    }
    
    // Implement the generic method similarly
    public Task<TCommandResult?> SendAsync<TCommandResult>(
        ICommand<TCommandResult> command, 
        CommandMediationSettings? commandMediationSettings = null, 
        CancellationToken cancellationToken = default)
    {
        // Similar implementation with custom resolve strategy
        // ...
    }
}

Integration Patterns

Message Decoration

To add cross-cutting functionality to all message handling, you can create decorator classes:

public class LoggingCommandMediatorDecorator : ICommandMediator
{
    private readonly ICommandMediator _innerMediator;
    private readonly ILogger<LoggingCommandMediatorDecorator> _logger;
    
    public LoggingCommandMediatorDecorator(
        ICommandMediator innerMediator,
        ILogger<LoggingCommandMediatorDecorator> logger)
    {
        _innerMediator = innerMediator;
        _logger = logger;
    }
    
    public async Task SendAsync(
        ICommand command, 
        CommandMediationSettings? commandMediationSettings = null, 
        CancellationToken cancellationToken = default)
    {
        var commandType = command.GetType().Name;
        _logger.LogInformation("Sending command {CommandType}", commandType);
        
        var stopwatch = Stopwatch.StartNew();
        try
        {
            await _innerMediator.SendAsync(command, commandMediationSettings, cancellationToken);
            stopwatch.Stop();
            
            _logger.LogInformation("Command {CommandType} completed in {ElapsedMs}ms", 
                                 commandType, stopwatch.ElapsedMilliseconds);
        }
        catch (Exception ex)
        {
            stopwatch.Stop();
            _logger.LogError(ex, "Command {CommandType} failed after {ElapsedMs}ms", 
                           commandType, stopwatch.ElapsedMilliseconds);
            throw;
        }
    }
    
    public async Task<TCommandResult?> SendAsync<TCommandResult>(
        ICommand<TCommandResult> command, 
        CommandMediationSettings? commandMediationSettings = null, 
        CancellationToken cancellationToken = default)
    {
        var commandType = command.GetType().Name;
        _logger.LogInformation("Sending command {CommandType} with result", commandType);
        
        var stopwatch = Stopwatch.StartNew();
        try
        {
            var result = await _innerMediator.SendAsync(command, commandMediationSettings, cancellationToken);
            stopwatch.Stop();
            
            _logger.LogInformation("Command {CommandType} completed in {ElapsedMs}ms", 
                                 commandType, stopwatch.ElapsedMilliseconds);
            
            return result;
        }
        catch (Exception ex)
        {
            stopwatch.Stop();
            _logger.LogError(ex, "Command {CommandType} failed after {ElapsedMs}ms", 
                           commandType, stopwatch.ElapsedMilliseconds);
            throw;
        }
    }
}

Register the decorator:

services.AddLiteBus(builder =>
{
    builder.AddCommandModule(module =>
    {
        module.RegisterFromAssembly(typeof(Program).Assembly);
    });
});

// Add decorator
services.Decorate<ICommandMediator, LoggingCommandMediatorDecorator>();

External Message Integration

To integrate LiteBus with external messaging systems like RabbitMQ or Kafka:

// Example RabbitMQ consumer that publishes to LiteBus
public class RabbitMQEventConsumer : BackgroundService
{
    private readonly IEventMediator _eventMediator;
    private readonly IConnection _rabbitConnection;
    private readonly string _queueName;
    
    public RabbitMQEventConsumer(
        IEventMediator eventMediator,
        IConnectionFactory connectionFactory,
        string queueName)
    {
        _eventMediator = eventMediator;
        _rabbitConnection = connectionFactory.CreateConnection();
        _queueName = queueName;
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var channel = _rabbitConnection.CreateModel();
        channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
        
        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.Received += async (_, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            
            try
            {
                // Deserialize to appropriate event type
                var eventData = JsonSerializer.Deserialize<EventEnvelope>(message);
                var eventType = Type.GetType(eventData.EventType);
                var @event = JsonSerializer.Deserialize(eventData.Payload, eventType) as IEvent;
                
                if (@event != null)
                {
                    // Publish to LiteBus
                    await _eventMediator.PublishAsync(@event, stoppingToken);
                    channel.BasicAck(ea.DeliveryTag, multiple: false);
                }
            }
            catch (Exception ex)
            {
                // Handle error and potentially nack the message
                channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: true);
            }
        };
        
        channel.BasicConsume(_queueName, autoAck: false, consumer);
        
        // Keep the service running until cancellation is requested
        await Task.Delay(Timeout.Infinite, stoppingToken);
    }
    
    public override void Dispose()
    {
        _rabbitConnection?.Dispose();
        base.Dispose();
    }
}

// Example envelope for external messages
public class EventEnvelope
{
    public string EventType { get; set; }
    public string Payload { get; set; }
}

Best Practices for Extensibility

  1. Maintain Separation of Concerns - Keep custom extensions focused on specific functionality
  2. Follow Existing Patterns - Model custom modules after the existing command, query, and event modules
  3. Avoid Deep Coupling - Design extensions to work with LiteBus interfaces rather than implementations
  4. Test Thoroughly - Custom extensions should have comprehensive test coverage
  5. Consider Performance - Be mindful of the performance implications of custom implementations
  6. Document Behavior - Clearly document how your extensions modify the standard behavior
Clone this wiki locally