NetEvolve.Pulse is a high-performance CQRS mediator for ASP.NET Core that wires commands, queries, and events through a scoped, interceptor-enabled pipeline.
- Typed CQRS mediator with single-handler enforcement for commands and queries plus fan-out events
- Minimal DI integration via
services.AddPulse(...)with scoped lifetimes for handlers and interceptors - Configurable interceptor pipeline (logging, metrics, tracing, validation) via
IMediatorBuilder - Distributed query caching — register
ICacheableQuery<TResponse>per query and enable transparentIDistributedCachecaching withAddQueryCaching() - Outbox pattern with background processor for reliable event delivery via
AddOutbox() - Parallel event dispatch for efficient domain event broadcasting
- TimeProvider-aware for deterministic testing and scheduling scenarios
- OpenTelemetry-friendly metrics and tracing through
AddActivityAndMetrics()
Install-Package NetEvolve.Pulsedotnet add package NetEvolve.Pulse<PackageReference Include="NetEvolve.Pulse" Version="x.x.x" />using Microsoft.Extensions.DependencyInjection;
using NetEvolve.Pulse;
using NetEvolve.Pulse.Extensibility;
var services = new ServiceCollection();
// Register Pulse and handlers
services.AddPulse();
services.AddScoped<ICommandHandler<CreateOrderCommand, OrderCreated>, CreateOrderHandler>();
using var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();
var result = await mediator.SendAsync<CreateOrderCommand, OrderCreated>(
new CreateOrderCommand("SKU-123"));
Console.WriteLine($"Created order {result.OrderId}");
public record CreateOrderCommand(string Sku) : ICommand<OrderCreated>;
public record OrderCreated(Guid OrderId);
public sealed class CreateOrderHandler
: ICommandHandler<CreateOrderCommand, OrderCreated>
{
public Task<OrderCreated> HandleAsync(
CreateOrderCommand command,
CancellationToken cancellationToken) =>
Task.FromResult(new OrderCreated(Guid.NewGuid()));
}services.AddPulse();
services.AddScoped<IQueryHandler<GetOrderQuery, Order>, GetOrderHandler>();
services.AddScoped<IEventHandler<OrderCreatedEvent>, OrderCreatedHandler>();
var order = await mediator.QueryAsync<GetOrderQuery, Order>(new GetOrderQuery(orderId));
await mediator.PublishAsync(new OrderCreatedEvent(order.Id));
public record GetOrderQuery(Guid Id) : IQuery<Order>;
public record Order(Guid Id, string Sku);
public record OrderCreatedEvent(Guid Id) : IEvent;
public sealed class GetOrderHandler : IQueryHandler<GetOrderQuery, Order>
{
public Task<Order> HandleAsync(GetOrderQuery query, CancellationToken cancellationToken) =>
Task.FromResult(new Order(query.Id, "SKU-123"));
}
public sealed class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
public Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken)
{
// React to the event (logging, projections, etc.)
return Task.CompletedTask;
}
}// Enable tracing and metrics and add custom interceptors
services.AddPulse(config =>
{
config.AddActivityAndMetrics();
});
services.AddScoped<ICommandHandler<ShipOrderCommand, Void>, ShipOrderHandler>();
public record ShipOrderCommand(Guid Id) : ICommand;
public sealed class ShipOrderHandler : ICommandHandler<ShipOrderCommand, Void>
{
public Task<Void> HandleAsync(ShipOrderCommand command, CancellationToken cancellationToken)
{
// Shipping workflow here
return Task.FromResult(Void.Completed);
}
}// Configure Pulse during startup
services.AddPulse(config =>
{
// Built-in observability
config.AddActivityAndMetrics();
// Add your own configurator extensions for validation, caching, etc.
// config.AddCustomValidation();
});Enable transparent IDistributedCache caching for queries. Any query that implements ICacheableQuery<TResponse> (from NetEvolve.Pulse.Extensibility) is served from the cache on subsequent invocations; all other queries pass through unchanged.
// 1. Register an IDistributedCache implementation
services.AddDistributedMemoryCache(); // or Redis, SQL Server, etc.
// 2. Enable the caching interceptor (with optional options)
services.AddPulse(config => config.AddQueryCaching(options =>
{
// Choose between absolute (default) and sliding expiration
options.ExpirationMode = CacheExpirationMode.Sliding;
// Supply custom JsonSerializerOptions for cache serialization
options.JsonSerializerOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};
}));
// 3. Implement ICacheableQuery<TResponse> on the queries you want cached
public record GetProductQuery(Guid Id) : ICacheableQuery<ProductDto>
{
public string? CorrelationId { get; set; }
// Unique per query result — include all discriminating parameters
public string CacheKey => $"product:{Id}";
// null = no explicit expiry (relies on cache defaults); or provide a TimeSpan
public TimeSpan? Expiry => TimeSpan.FromMinutes(5);
}Behavior summary:
| Scenario | Result |
|---|---|
Query implements ICacheableQuery<TResponse> and cache entry exists |
Cached response returned; handler skipped |
Query implements ICacheableQuery<TResponse> and no cache entry |
Handler invoked; response stored in cache |
Query does not implement ICacheableQuery<TResponse> |
Handler always invoked; cache never consulted |
IDistributedCache not registered in DI |
Interceptor falls through; handler invoked without error |
Expiry = null and DefaultExpiry = null |
Entry stored without explicit expiry; cache default eviction policy applies |
Expiry = null and DefaultExpiry is set |
DefaultExpiry value is applied using the configured ExpirationMode |
ExpirationMode = Absolute (default) |
Expiry (or DefaultExpiry) is applied as absolute expiry relative to now |
ExpirationMode = Sliding |
Expiry (or DefaultExpiry) window resets on each cache access |
The outbox pattern ensures reliable event delivery by persisting events before dispatching:
services.AddPulse(config => config
.AddOutbox(
options => options.Schema = "pulse",
processorOptions =>
{
processorOptions.BatchSize = 100; // Messages per batch (default: 100)
processorOptions.PollingInterval = TimeSpan.FromSeconds(5); // Poll delay (default: 5s)
processorOptions.MaxRetryCount = 3; // Max retries before dead letter (default: 3)
processorOptions.ProcessingTimeout = TimeSpan.FromSeconds(30); // Per-message timeout (default: 30s)
processorOptions.EnableBatchSending = false; // Use batch transport (default: false)
})
// Choose a persistence provider:
// .AddEntityFrameworkOutbox<MyDbContext>()
// .AddSqlServerOutbox(connectionString)
);You can tune processing behaviour for individual event types using EventTypeOverrides. The dictionary key matches the EventType field of stored outbox messages. Any null property falls back to the global default:
processorOptions.EventTypeOverrides["MyNamespace.CriticalEvent"] = new OutboxEventTypeOptions
{
MaxRetryCount = 10, // More retries for critical events
ProcessingTimeout = TimeSpan.FromSeconds(10), // Tighter timeout
};
processorOptions.EventTypeOverrides["MyNamespace.BulkEvent"] = new OutboxEventTypeOptions
{
MaxRetryCount = 1, // Fewer retries for low-priority bulk events
ProcessingTimeout = TimeSpan.FromMinutes(2), // Longer timeout for large payloads
};See NetEvolve.Pulse.EntityFramework or NetEvolve.Pulse.SqlServer for persistence provider setup.
Pulse uses IPayloadSerializer (from NetEvolve.Pulse.Extensibility) for all internal serialization needs, including outbox message payloads, distributed cache entries, and audit trail data. A default implementation based on System.Text.Json is registered automatically when you call AddPulse().
No configuration is required — the built-in SystemTextJsonPayloadSerializer uses JsonSerializerOptions.Default:
services.AddPulse();
// SystemTextJsonPayloadSerializer is automatically registeredUse the standard .NET options pattern to customize JSON serialization settings:
services.Configure<JsonSerializerOptions>(options =>
{
options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
options.WriteIndented = false;
options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});
services.AddPulse();These options will be used for all payload serialization throughout Pulse, including:
- Outbox message payloads
- Distributed cache query results
- Any other internal serialization needs
Replace the default serializer with your own implementation by registering it before calling AddPulse():
using NetEvolve.Pulse.Extensibility;
// Register custom serializer (e.g., using Newtonsoft.Json)
services.AddSingleton<IPayloadSerializer, NewtonsoftJsonPayloadSerializer>();
services.AddPulse();
public sealed class NewtonsoftJsonPayloadSerializer : IPayloadSerializer
{
public string Serialize<T>(T value) =>
JsonConvert.SerializeObject(value);
public string Serialize(object value, Type type) =>
JsonConvert.SerializeObject(value, type, null);
public byte[] SerializeToBytes<T>(T value) =>
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(value));
public T? Deserialize<T>(string payload) =>
JsonConvert.DeserializeObject<T>(payload);
public T? Deserialize<T>(byte[] payload) =>
JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(payload));
}The custom serializer will be used for all payload operations within Pulse. Ensure your implementation is thread-safe, as the same instance may be accessed concurrently from multiple pipeline stages.
- .NET 8.0, .NET 9.0, or .NET 10.0
- ASP.NET Core environment with
Microsoft.Extensions.DependencyInjection - OpenTelemetry packages when using
AddActivityAndMetrics()
- NetEvolve.Pulse.Dapr - Dapr pub/sub integration for event dispatch
- NetEvolve.Pulse.Extensibility - Core contracts and abstractions used by the mediator
- NetEvolve.Pulse.EntityFramework - Entity Framework Core persistence for the outbox pattern
- NetEvolve.Pulse.SqlServer - SQL Server ADO.NET persistence for the outbox pattern
- NetEvolve.Pulse.Polly - Polly v8 resilience policies integration
For complete documentation, please visit the official documentation.
Contributions are welcome! Please read the Contributing Guidelines before submitting a pull request.
- Issues: Report bugs or request features on GitHub Issues
- Documentation: Read the full documentation at https://github.com/dailydevops/pulse
This project is licensed under the MIT License - see the LICENSE file for details.
Note
Made with ❤️ by the NetEvolve Team