Skip to content

Latest commit

 

History

History
330 lines (244 loc) · 12.5 KB

File metadata and controls

330 lines (244 loc) · 12.5 KB

NetEvolve.Pulse

NuGet Version NuGet Downloads License

NetEvolve.Pulse is a high-performance CQRS mediator for ASP.NET Core that wires commands, queries, and events through a scoped, interceptor-enabled pipeline.

Features

  • Typed CQRS mediator with single-handler enforcement for commands and queries plus fan-out events
  • Minimal DI integration via services.AddPulse(...) with scoped lifetimes for handlers and interceptors
  • Configurable interceptor pipeline (logging, metrics, tracing, validation) via IMediatorBuilder
  • Distributed query caching — register ICacheableQuery<TResponse> per query and enable transparent IDistributedCache caching with AddQueryCaching()
  • Outbox pattern with background processor for reliable event delivery via AddOutbox()
  • Parallel event dispatch for efficient domain event broadcasting
  • TimeProvider-aware for deterministic testing and scheduling scenarios
  • OpenTelemetry-friendly metrics and tracing through AddActivityAndMetrics()

Installation

NuGet Package Manager

Install-Package NetEvolve.Pulse

.NET CLI

dotnet add package NetEvolve.Pulse

PackageReference

<PackageReference Include="NetEvolve.Pulse" Version="x.x.x" />

Quick Start

using Microsoft.Extensions.DependencyInjection;
using NetEvolve.Pulse;
using NetEvolve.Pulse.Extensibility;

var services = new ServiceCollection();

// Register Pulse and handlers
services.AddPulse();
services.AddScoped<ICommandHandler<CreateOrderCommand, OrderCreated>, CreateOrderHandler>();

using var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();

var result = await mediator.SendAsync<CreateOrderCommand, OrderCreated>(
    new CreateOrderCommand("SKU-123"));

Console.WriteLine($"Created order {result.OrderId}");

public record CreateOrderCommand(string Sku) : ICommand<OrderCreated>;
public record OrderCreated(Guid OrderId);

public sealed class CreateOrderHandler
    : ICommandHandler<CreateOrderCommand, OrderCreated>
{
    public Task<OrderCreated> HandleAsync(
        CreateOrderCommand command,
        CancellationToken cancellationToken) =>
        Task.FromResult(new OrderCreated(Guid.NewGuid()));
}

Usage

Basic Example

services.AddPulse();
services.AddScoped<IQueryHandler<GetOrderQuery, Order>, GetOrderHandler>();
services.AddScoped<IEventHandler<OrderCreatedEvent>, OrderCreatedHandler>();

var order = await mediator.QueryAsync<GetOrderQuery, Order>(new GetOrderQuery(orderId));
await mediator.PublishAsync(new OrderCreatedEvent(order.Id));

public record GetOrderQuery(Guid Id) : IQuery<Order>;
public record Order(Guid Id, string Sku);
public record OrderCreatedEvent(Guid Id) : IEvent;

public sealed class GetOrderHandler : IQueryHandler<GetOrderQuery, Order>
{
    public Task<Order> HandleAsync(GetOrderQuery query, CancellationToken cancellationToken) =>
        Task.FromResult(new Order(query.Id, "SKU-123"));
}

public sealed class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
    public Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken)
    {
        // React to the event (logging, projections, etc.)
        return Task.CompletedTask;
    }
}

Advanced Example

// Enable tracing and metrics and add custom interceptors
services.AddPulse(config =>
{
    config.AddActivityAndMetrics();
});

services.AddScoped<ICommandHandler<ShipOrderCommand, Void>, ShipOrderHandler>();

public record ShipOrderCommand(Guid Id) : ICommand;

public sealed class ShipOrderHandler : ICommandHandler<ShipOrderCommand, Void>
{
    public Task<Void> HandleAsync(ShipOrderCommand command, CancellationToken cancellationToken)
    {
        // Shipping workflow here
        return Task.FromResult(Void.Completed);
    }
}

Configuration

// Configure Pulse during startup
services.AddPulse(config =>
{
    // Built-in observability
    config.AddActivityAndMetrics();

    // Add your own configurator extensions for validation, caching, etc.
    // config.AddCustomValidation();
});

Distributed Query Caching

Enable transparent IDistributedCache caching for queries. Any query that implements ICacheableQuery<TResponse> (from NetEvolve.Pulse.Extensibility) is served from the cache on subsequent invocations; all other queries pass through unchanged.

// 1. Register an IDistributedCache implementation
services.AddDistributedMemoryCache(); // or Redis, SQL Server, etc.

// 2. Enable the caching interceptor (with optional options)
services.AddPulse(config => config.AddQueryCaching(options =>
{
    // Choose between absolute (default) and sliding expiration
    options.ExpirationMode = CacheExpirationMode.Sliding;

    // Supply custom JsonSerializerOptions for cache serialization
    options.JsonSerializerOptions = new JsonSerializerOptions
    {
        PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
    };
}));

// 3. Implement ICacheableQuery<TResponse> on the queries you want cached
public record GetProductQuery(Guid Id) : ICacheableQuery<ProductDto>
{
    public string? CorrelationId { get; set; }

    // Unique per query result — include all discriminating parameters
    public string CacheKey => $"product:{Id}";

    // null = no explicit expiry (relies on cache defaults); or provide a TimeSpan
    public TimeSpan? Expiry => TimeSpan.FromMinutes(5);
}

Behavior summary:

Scenario Result
Query implements ICacheableQuery<TResponse> and cache entry exists Cached response returned; handler skipped
Query implements ICacheableQuery<TResponse> and no cache entry Handler invoked; response stored in cache
Query does not implement ICacheableQuery<TResponse> Handler always invoked; cache never consulted
IDistributedCache not registered in DI Interceptor falls through; handler invoked without error
Expiry = null and DefaultExpiry = null Entry stored without explicit expiry; cache default eviction policy applies
Expiry = null and DefaultExpiry is set DefaultExpiry value is applied using the configured ExpirationMode
ExpirationMode = Absolute (default) Expiry (or DefaultExpiry) is applied as absolute expiry relative to now
ExpirationMode = Sliding Expiry (or DefaultExpiry) window resets on each cache access

Outbox Pattern Configuration

The outbox pattern ensures reliable event delivery by persisting events before dispatching:

services.AddPulse(config => config
    .AddOutbox(
        options => options.Schema = "pulse",
        processorOptions =>
        {
            processorOptions.BatchSize = 100;              // Messages per batch (default: 100)
            processorOptions.PollingInterval = TimeSpan.FromSeconds(5);  // Poll delay (default: 5s)
            processorOptions.MaxRetryCount = 3;            // Max retries before dead letter (default: 3)
            processorOptions.ProcessingTimeout = TimeSpan.FromSeconds(30); // Per-message timeout (default: 30s)
            processorOptions.EnableBatchSending = false;   // Use batch transport (default: false)
        })
    // Choose a persistence provider:
    // .AddEntityFrameworkOutbox<MyDbContext>()
    // .AddSqlServerOutbox(connectionString)
);

Per-Event-Type Overrides

You can tune processing behaviour for individual event types using EventTypeOverrides. The dictionary key matches the EventType field of stored outbox messages. Any null property falls back to the global default:

processorOptions.EventTypeOverrides["MyNamespace.CriticalEvent"] = new OutboxEventTypeOptions
{
    MaxRetryCount = 10,                         // More retries for critical events
    ProcessingTimeout = TimeSpan.FromSeconds(10), // Tighter timeout
};

processorOptions.EventTypeOverrides["MyNamespace.BulkEvent"] = new OutboxEventTypeOptions
{
    MaxRetryCount = 1,                          // Fewer retries for low-priority bulk events
    ProcessingTimeout = TimeSpan.FromMinutes(2), // Longer timeout for large payloads
};

See NetEvolve.Pulse.EntityFramework or NetEvolve.Pulse.SqlServer for persistence provider setup.

Payload Serialization

Pulse uses IPayloadSerializer (from NetEvolve.Pulse.Extensibility) for all internal serialization needs, including outbox message payloads, distributed cache entries, and audit trail data. A default implementation based on System.Text.Json is registered automatically when you call AddPulse().

Default Behavior

No configuration is required — the built-in SystemTextJsonPayloadSerializer uses JsonSerializerOptions.Default:

services.AddPulse();
// SystemTextJsonPayloadSerializer is automatically registered

Configure JSON Serialization Options

Use the standard .NET options pattern to customize JSON serialization settings:

services.Configure<JsonSerializerOptions>(options =>
{
    options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
    options.WriteIndented = false;
    options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});
services.AddPulse();

These options will be used for all payload serialization throughout Pulse, including:

  • Outbox message payloads
  • Distributed cache query results
  • Any other internal serialization needs

Custom Serializer Implementation

Replace the default serializer with your own implementation by registering it before calling AddPulse():

using NetEvolve.Pulse.Extensibility;

// Register custom serializer (e.g., using Newtonsoft.Json)
services.AddSingleton<IPayloadSerializer, NewtonsoftJsonPayloadSerializer>();
services.AddPulse();

public sealed class NewtonsoftJsonPayloadSerializer : IPayloadSerializer
{
    public string Serialize<T>(T value) => 
        JsonConvert.SerializeObject(value);

    public string Serialize(object value, Type type) => 
        JsonConvert.SerializeObject(value, type, null);

    public byte[] SerializeToBytes<T>(T value) => 
        Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(value));

    public T? Deserialize<T>(string payload) => 
        JsonConvert.DeserializeObject<T>(payload);

    public T? Deserialize<T>(byte[] payload) => 
        JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(payload));
}

The custom serializer will be used for all payload operations within Pulse. Ensure your implementation is thread-safe, as the same instance may be accessed concurrently from multiple pipeline stages.

Requirements

  • .NET 8.0, .NET 9.0, or .NET 10.0
  • ASP.NET Core environment with Microsoft.Extensions.DependencyInjection
  • OpenTelemetry packages when using AddActivityAndMetrics()

Related Packages

Documentation

For complete documentation, please visit the official documentation.

Contributing

Contributions are welcome! Please read the Contributing Guidelines before submitting a pull request.

Support

License

This project is licensed under the MIT License - see the LICENSE file for details.


Note

Made with ❤️ by the NetEvolve Team