-
Notifications
You must be signed in to change notification settings - Fork 11
Extensibility
This section covers how to extend LiteBus with custom functionality.
LiteBus's modular architecture allows you to create custom modules beyond the built-in command, query, and event modules.
A custom module typically consists of:
- Abstractions - Interfaces that define the module's contracts
- Implementations - Classes that implement the module's functionality
- DI Extensions - Extension methods for registering the module with dependency injection
To create a custom module, implement the following key interfaces:
// Marker interface for registrable types
public interface IRegistrableNotificationConstruct { }
// Main notification interface
public interface INotification : IRegistrableNotificationConstruct { }
// Handler interface
public interface INotificationHandler<in TNotification> : IRegistrableNotificationConstruct,
IAsyncMessageHandler<TNotification>
where TNotification : INotification { }
// Mediator interface
public interface INotificationMediator : IRegistrableNotificationConstruct
{
Task PublishNotificationAsync(INotification notification,
CancellationToken cancellationToken = default);
}
// Main implementation class
public class NotificationMediator : INotificationMediator
{
private readonly IMessageMediator _messageMediator;
public NotificationMediator(IMessageMediator messageMediator)
{
_messageMediator = messageMediator;
}
public Task PublishNotificationAsync(INotification notification,
CancellationToken cancellationToken = default)
{
var strategy = new SingleAsyncHandlerMediationStrategy<INotification>();
var resolver = new ActualTypeOrFirstAssignableTypeMessageResolveStrategy();
var options = new MediateOptions<INotification, Task>
{
MessageMediationStrategy = strategy,
MessageResolveStrategy = resolver,
CancellationToken = cancellationToken,
Tags = Array.Empty<string>()
};
return _messageMediator.Mediate(notification, options);
}
}
// Module class implementing IModule
internal class NotificationModule : IModule
{
private readonly Action<NotificationModuleBuilder> _builder;
public NotificationModule(Action<NotificationModuleBuilder> builder)
{
_builder = builder;
}
public void Build(IModuleConfiguration configuration)
{
_builder(new NotificationModuleBuilder(configuration.MessageRegistry));
configuration.Services.TryAddTransient<INotificationMediator, NotificationMediator>();
}
}
// Module builder for registration
public class NotificationModuleBuilder
{
private readonly IMessageRegistry _messageRegistry;
public NotificationModuleBuilder(IMessageRegistry messageRegistry)
{
_messageRegistry = messageRegistry;
}
public NotificationModuleBuilder Register<T>() where T : IRegistrableNotificationConstruct
{
_messageRegistry.Register(typeof(T));
return this;
}
public NotificationModuleBuilder RegisterFromAssembly(Assembly assembly)
{
foreach (var type in assembly.GetTypes().Where(t =>
t.IsAssignableTo(typeof(IRegistrableNotificationConstruct))))
{
_messageRegistry.Register(type);
}
return this;
}
}
// Extension method for module registration
public static class ModuleRegistryExtensions
{
public static IModuleRegistry AddNotificationModule(
this IModuleRegistry moduleRegistry,
Action<NotificationModuleBuilder> builder)
{
moduleRegistry.Register(new NotificationModule(builder));
return moduleRegistry;
}
}
// Register the module
services.AddLiteBus(builder =>
{
builder.AddNotificationModule(module =>
{
module.RegisterFromAssembly(typeof(Program).Assembly);
});
});
// Use the module
public class OrderController : ControllerBase
{
private readonly INotificationMediator _notificationMediator;
public OrderController(INotificationMediator notificationMediator)
{
_notificationMediator = notificationMediator;
}
[HttpPost]
public async Task<IActionResult> CreateOrder(CreateOrderRequest request)
{
// Process order...
// Send notification
await _notificationMediator.PublishNotificationAsync(new OrderCreatedNotification
{
OrderId = orderId,
CustomerId = request.CustomerId
});
return Ok();
}
}
LiteBus allows you to create custom mediation strategies to control how messages are processed.
Create a custom mediation strategy by implementing the IMessageMediationStrategy<TMessage, TMessageResult>
interface:
public class ParallelEventMediationStrategy<TMessage> : IMessageMediationStrategy<TMessage, Task>
where TMessage : notnull
{
private readonly int _maxDegreeOfParallelism;
public ParallelEventMediationStrategy(int maxDegreeOfParallelism = 4)
{
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
public async Task Mediate(
TMessage message,
IMessageDependencies messageDependencies,
IExecutionContext executionContext)
{
try
{
// Execute pre-handlers sequentially
await messageDependencies.RunAsyncPreHandlers(message);
// Execute main handlers in parallel with limited concurrency
var handlers = messageDependencies.Handlers.ToList();
if (handlers.Count == 0)
{
return;
}
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _maxDegreeOfParallelism,
CancellationToken = executionContext.CancellationToken
};
await Parallel.ForEachAsync(handlers, options, async (handler, token) =>
{
var task = (Task)handler.Handler.Value.Handle(message);
await task;
});
// Execute post-handlers sequentially
await messageDependencies.RunAsyncPostHandlers(message, null);
}
catch (Exception e) when (e is not LiteBusExecutionAbortedException)
{
await messageDependencies.RunAsyncErrorHandlers(
message,
null,
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(e));
}
}
}
public class CustomEventMediator : IEventMediator
{
private readonly IMessageMediator _messageMediator;
public CustomEventMediator(IMessageMediator messageMediator)
{
_messageMediator = messageMediator;
}
public Task PublishAsync(
IEvent @event,
EventMediationSettings? eventMediationSettings = null,
CancellationToken cancellationToken = default)
{
eventMediationSettings ??= new EventMediationSettings();
// Use custom parallel strategy
var mediationStrategy = new ParallelEventMediationStrategy<IEvent>(maxDegreeOfParallelism: 8);
var resolveStrategy = new ActualTypeOrFirstAssignableTypeMessageResolveStrategy();
return _messageMediator.Mediate(@event, new MediateOptions<IEvent, Task>
{
MessageMediationStrategy = mediationStrategy,
MessageResolveStrategy = resolveStrategy,
CancellationToken = cancellationToken,
Tags = eventMediationSettings.Filters.Tags,
RegisterPlainMessagesOnSpot = !eventMediationSettings.ThrowIfNoHandlerFound
});
}
// Implement the generic method similarly
public Task PublishAsync<TEvent>(
TEvent @event,
EventMediationSettings? eventMediationSettings = null,
CancellationToken cancellationToken = default) where TEvent : notnull
{
eventMediationSettings ??= new EventMediationSettings();
var mediationStrategy = new ParallelEventMediationStrategy<TEvent>(maxDegreeOfParallelism: 8);
var resolveStrategy = new ActualTypeOrFirstAssignableTypeMessageResolveStrategy();
return _messageMediator.Mediate(@event, new MediateOptions<TEvent, Task>
{
MessageMediationStrategy = mediationStrategy,
MessageResolveStrategy = resolveStrategy,
CancellationToken = cancellationToken,
Tags = eventMediationSettings.Filters.Tags,
RegisterPlainMessagesOnSpot = !eventMediationSettings.ThrowIfNoHandlerFound
});
}
}
Register your custom mediator:
services.AddLiteBus(builder =>
{
builder.AddEventModule(module =>
{
module.RegisterFromAssembly(typeof(Program).Assembly);
});
});
// Replace the default event mediator with custom implementation
services.AddTransient<IEventMediator, CustomEventMediator>();
LiteBus allows customizing how message types are resolved to their handlers.
Create a custom resolve strategy by implementing the IMessageResolveStrategy
interface:
public class PrioritizedMessageResolveStrategy : IMessageResolveStrategy
{
public IMessageDescriptor? Find(Type messageType, IMessageRegistry messageRegistry)
{
// First try to find an exact match
var descriptor = messageRegistry.SingleOrDefault(d => d.MessageType == messageType);
if (descriptor != null)
{
return descriptor;
}
// If no exact match, look for a descriptor with a base type
// Sort by inheritance depth to find the closest match
var matchingDescriptors = messageRegistry
.Where(d => d.MessageType.IsAssignableFrom(messageType))
.Select(d => new
{
Descriptor = d,
InheritanceDepth = CalculateInheritanceDepth(messageType, d.MessageType)
})
.OrderBy(x => x.InheritanceDepth)
.ToList();
return matchingDescriptors.FirstOrDefault()?.Descriptor;
}
private int CalculateInheritanceDepth(Type derivedType, Type baseType)
{
int depth = 0;
Type currentType = derivedType;
while (currentType != null && currentType != baseType)
{
depth++;
currentType = currentType.BaseType;
}
return depth;
}
}
public class CustomCommandMediator : ICommandMediator
{
private readonly IMessageMediator _messageMediator;
public CustomCommandMediator(IMessageMediator messageMediator)
{
_messageMediator = messageMediator;
}
public Task SendAsync(
ICommand command,
CommandMediationSettings? commandMediationSettings = null,
CancellationToken cancellationToken = default)
{
commandMediationSettings ??= new CommandMediationSettings();
var mediationStrategy = new SingleAsyncHandlerMediationStrategy<ICommand>();
// Use custom resolve strategy
var resolveStrategy = new PrioritizedMessageResolveStrategy();
var options = new MediateOptions<ICommand, Task>
{
MessageMediationStrategy = mediationStrategy,
MessageResolveStrategy = resolveStrategy,
CancellationToken = cancellationToken,
Tags = commandMediationSettings.Filters.Tags
};
return _messageMediator.Mediate(command, options);
}
// Implement the generic method similarly
public Task<TCommandResult?> SendAsync<TCommandResult>(
ICommand<TCommandResult> command,
CommandMediationSettings? commandMediationSettings = null,
CancellationToken cancellationToken = default)
{
// Similar implementation with custom resolve strategy
// ...
}
}
To add cross-cutting functionality to all message handling, you can create decorator classes:
public class LoggingCommandMediatorDecorator : ICommandMediator
{
private readonly ICommandMediator _innerMediator;
private readonly ILogger<LoggingCommandMediatorDecorator> _logger;
public LoggingCommandMediatorDecorator(
ICommandMediator innerMediator,
ILogger<LoggingCommandMediatorDecorator> logger)
{
_innerMediator = innerMediator;
_logger = logger;
}
public async Task SendAsync(
ICommand command,
CommandMediationSettings? commandMediationSettings = null,
CancellationToken cancellationToken = default)
{
var commandType = command.GetType().Name;
_logger.LogInformation("Sending command {CommandType}", commandType);
var stopwatch = Stopwatch.StartNew();
try
{
await _innerMediator.SendAsync(command, commandMediationSettings, cancellationToken);
stopwatch.Stop();
_logger.LogInformation("Command {CommandType} completed in {ElapsedMs}ms",
commandType, stopwatch.ElapsedMilliseconds);
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(ex, "Command {CommandType} failed after {ElapsedMs}ms",
commandType, stopwatch.ElapsedMilliseconds);
throw;
}
}
public async Task<TCommandResult?> SendAsync<TCommandResult>(
ICommand<TCommandResult> command,
CommandMediationSettings? commandMediationSettings = null,
CancellationToken cancellationToken = default)
{
var commandType = command.GetType().Name;
_logger.LogInformation("Sending command {CommandType} with result", commandType);
var stopwatch = Stopwatch.StartNew();
try
{
var result = await _innerMediator.SendAsync(command, commandMediationSettings, cancellationToken);
stopwatch.Stop();
_logger.LogInformation("Command {CommandType} completed in {ElapsedMs}ms",
commandType, stopwatch.ElapsedMilliseconds);
return result;
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(ex, "Command {CommandType} failed after {ElapsedMs}ms",
commandType, stopwatch.ElapsedMilliseconds);
throw;
}
}
}
Register the decorator:
services.AddLiteBus(builder =>
{
builder.AddCommandModule(module =>
{
module.RegisterFromAssembly(typeof(Program).Assembly);
});
});
// Add decorator
services.Decorate<ICommandMediator, LoggingCommandMediatorDecorator>();
To integrate LiteBus with external messaging systems like RabbitMQ or Kafka:
// Example RabbitMQ consumer that publishes to LiteBus
public class RabbitMQEventConsumer : BackgroundService
{
private readonly IEventMediator _eventMediator;
private readonly IConnection _rabbitConnection;
private readonly string _queueName;
public RabbitMQEventConsumer(
IEventMediator eventMediator,
IConnectionFactory connectionFactory,
string queueName)
{
_eventMediator = eventMediator;
_rabbitConnection = connectionFactory.CreateConnection();
_queueName = queueName;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var channel = _rabbitConnection.CreateModel();
channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (_, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
try
{
// Deserialize to appropriate event type
var eventData = JsonSerializer.Deserialize<EventEnvelope>(message);
var eventType = Type.GetType(eventData.EventType);
var @event = JsonSerializer.Deserialize(eventData.Payload, eventType) as IEvent;
if (@event != null)
{
// Publish to LiteBus
await _eventMediator.PublishAsync(@event, stoppingToken);
channel.BasicAck(ea.DeliveryTag, multiple: false);
}
}
catch (Exception ex)
{
// Handle error and potentially nack the message
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: true);
}
};
channel.BasicConsume(_queueName, autoAck: false, consumer);
// Keep the service running until cancellation is requested
await Task.Delay(Timeout.Infinite, stoppingToken);
}
public override void Dispose()
{
_rabbitConnection?.Dispose();
base.Dispose();
}
}
// Example envelope for external messages
public class EventEnvelope
{
public string EventType { get; set; }
public string Payload { get; set; }
}
- Maintain Separation of Concerns - Keep custom extensions focused on specific functionality
- Follow Existing Patterns - Model custom modules after the existing command, query, and event modules
- Avoid Deep Coupling - Design extensions to work with LiteBus interfaces rather than implementations
- Test Thoroughly - Custom extensions should have comprehensive test coverage
- Consider Performance - Be mindful of the performance implications of custom implementations
- Document Behavior - Clearly document how your extensions modify the standard behavior
- Core Concepts
- Event Contracts
- Event Handlers
- Event Mediator/Publisher
- Advanced Features
- Best Practices
- Execution Context
- Handler Tags
- Handler Ordering
- Testing with LiteBus
- Performance Considerations
- Best Practices