SQL Server persistence provider for the Pulse outbox pattern and idempotency store using plain ADO.NET. Provides optimized T-SQL operations with proper transaction support and locking strategies for reliable event delivery and at-most-once command execution in high-throughput scenarios.
- Plain ADO.NET: No ORM overhead, direct SQL Server access via
Microsoft.Data.SqlClient - Outbox Pattern: Reliable event delivery with transaction support
- Idempotency Store: At-most-once command execution without requiring EF Core
- Transaction Support: Enlist outbox operations in existing
SqlTransactioninstances - Optimized Queries: Uses stored procedures with ROWLOCK/READPAST hints for concurrent access
- Dead Letter Management: Built-in support for inspecting, replaying, and monitoring dead-letter messages via
IOutboxManagement - Configurable Schema: Customize schema and table names for multi-tenant scenarios
- Schema Interchangeability: Uses canonical schema compatible with Entity Framework provider
Install-Package NetEvolve.Pulse.SqlServerdotnet add package NetEvolve.Pulse.SqlServer<PackageReference Include="NetEvolve.Pulse.SqlServer" Version="x.x.x" />Before using this provider, execute the schema script to create the required database objects.
Important
The script uses SQLCMD variables (:setvar) and must be run in SQLCMD mode.
sqlcmd utility:
sqlcmd -S your-server -d your-database -i OutboxMessage.sqlSQL Server Management Studio (SSMS):
Enable SQLCMD Mode via Query > SQLCMD Mode (Ctrl+Shift+Q), then execute the script.
Azure Data Studio:
Enable SQLCMD via the query toolbar before executing.
The script exposes the following configurable variables at the top of OutboxMessage.sql:
| Variable | Default | Description |
|---|---|---|
SchemaName |
pulse |
Database schema name |
TableName |
OutboxMessage |
Table name |
To use custom names, change the :setvar values before executing:
:setvar SchemaName "myapp"
:setvar TableName "Events"The script creates:
- The configured schema (default:
[pulse]) - The
[OutboxMessage]table with two optimized non-clustered indexes
Core stored procedures:
| Procedure | Purpose |
|---|---|
usp_GetPendingOutboxMessages |
Retrieves and locks pending messages for processing |
usp_GetFailedOutboxMessagesForRetry |
Retrieves failed messages eligible for retry |
usp_MarkOutboxMessageCompleted |
Marks a message as successfully processed |
usp_MarkOutboxMessageFailed |
Marks a message as failed with error details |
usp_MarkOutboxMessageDeadLetter |
Moves a message to dead-letter status |
usp_DeleteCompletedOutboxMessages |
Removes old completed messages older than a given threshold |
Management stored procedures:
| Procedure | Purpose |
|---|---|
usp_GetDeadLetterOutboxMessages |
Returns a paginated list of dead-letter messages |
usp_GetDeadLetterOutboxMessage |
Returns a single dead-letter message by ID |
usp_GetDeadLetterOutboxMessageCount |
Returns the total count of dead-letter messages |
usp_ReplayOutboxMessage |
Resets a single dead-letter message to Pending |
usp_ReplayAllDeadLetterOutboxMessages |
Resets all dead-letter messages to Pending |
usp_GetOutboxStatistics |
Returns message counts grouped by status |
using Microsoft.Extensions.DependencyInjection;
using NetEvolve.Pulse;
var services = new ServiceCollection();
services.AddPulse(config => config
.AddOutbox(
options => options.Schema = "pulse",
processorOptions => processorOptions.BatchSize = 100)
.AddSqlServerOutbox("Server=.;Database=MyDb;Integrated Security=true;TrustServerCertificate=true;")
);The SQL Server idempotency store provides at-most-once command execution for applications using raw ADO.NET or Dapper, without requiring EF Core.
using Microsoft.Extensions.DependencyInjection;
using NetEvolve.Pulse;
var services = new ServiceCollection();
services.AddPulse(config => config
.AddIdempotency()
.AddSqlServerIdempotencyStore("Server=.;Database=MyDb;Integrated Security=true;TrustServerCertificate=true;")
);Execute the IdempotencyKey.sql script from the Scripts folder (same SQLCMD requirements as OutboxMessage.sql):
sqlcmd -S your-server -d your-database -i IdempotencyKey.sqlThe script creates:
- The
[IdempotencyKey]table withIdempotencyKey(PK) andCreatedAtcolumns - Stored procedures:
usp_ExistsIdempotencyKey,usp_InsertIdempotencyKey,usp_DeleteExpiredIdempotencyKeys
using NetEvolve.Pulse.Extensibility.Idempotency;
public sealed record CreateOrderCommand(string OrderId, decimal Amount)
: IIdempotentCommand<OrderCreatedResult>
{
// The IdempotencyKey is checked before the handler executes
public string IdempotencyKey => OrderId;
}
public sealed class CreateOrderCommandHandler
: ICommandHandler<CreateOrderCommand, OrderCreatedResult>
{
public async Task<OrderCreatedResult> HandleAsync(
CreateOrderCommand command,
CancellationToken cancellationToken)
{
// This handler will only execute once per unique OrderId
// Subsequent requests with the same OrderId will throw IdempotencyConflictException
return new OrderCreatedResult(command.OrderId);
}
}services.AddPulse(config => config
.AddIdempotency()
.AddSqlServerIdempotencyStore(
"Server=.;Database=MyDb;Integrated Security=true;",
options =>
{
options.Schema = "pulse";
options.TableName = "IdempotencyKey";
options.TimeToLive = TimeSpan.FromHours(24); // Keys expire after 24 hours
})
);services.AddPulse(config => config
.AddOutbox()
.AddSqlServerOutbox(
sp => sp.GetRequiredService<IConfiguration>().GetConnectionString("Outbox")!,
options =>
{
options.Schema = "messaging";
options.TableName = "OutboxMessage";
})
);AddSqlServerOutbox(...) registers the following services:
| Service | Implementation | Lifetime |
|---|---|---|
IOutboxRepository |
SqlServerOutboxRepository |
Scoped |
IOutboxManagement |
SqlServerOutboxManagement |
Scoped |
TimeProvider |
TimeProvider.System |
Singleton (if not already registered) |
AddSqlServerIdempotencyStore(...) registers the following services:
| Service | Implementation | Lifetime |
|---|---|---|
IIdempotencyKeyRepository |
SqlServerIdempotencyKeyRepository |
Scoped |
IIdempotencyStore |
IdempotencyStore |
Scoped |
TimeProvider |
TimeProvider.System |
Singleton (if not already registered) |
The IOutboxManagement service is automatically registered when calling AddSqlServerOutbox(...). It provides operations for inspecting and recovering dead-letter messages, as well as monitoring outbox health.
public class OutboxMonitorService
{
private readonly IOutboxManagement _management;
public OutboxMonitorService(IOutboxManagement management) =>
_management = management;
public async Task PrintStatisticsAsync(CancellationToken ct)
{
var stats = await _management.GetStatisticsAsync(ct);
Console.WriteLine($"Pending: {stats.Pending}");
Console.WriteLine($"Processing: {stats.Processing}");
Console.WriteLine($"Completed: {stats.Completed}");
Console.WriteLine($"Failed: {stats.Failed}");
Console.WriteLine($"Dead Letter: {stats.DeadLetter}");
Console.WriteLine($"Total: {stats.Total}");
}
public async Task ReplayAllDeadLettersAsync(CancellationToken ct)
{
var replayed = await _management.ReplayAllDeadLetterAsync(ct);
Console.WriteLine($"Replayed {replayed} dead-letter messages.");
}
}| Method | Description |
|---|---|
GetStatisticsAsync() |
Returns message counts grouped by status (OutboxStatistics) |
GetDeadLetterMessagesAsync(pageSize, page) |
Returns a paginated list of dead-letter messages |
GetDeadLetterMessageAsync(messageId) |
Returns a single dead-letter message by ID |
GetDeadLetterCountAsync() |
Returns the total count of dead-letter messages |
ReplayMessageAsync(messageId) |
Resets a single dead-letter message to Pending for reprocessing |
ReplayAllDeadLetterAsync() |
Resets all dead-letter messages to Pending and returns the updated count |
public class OrderService
{
private readonly string _connectionString;
private readonly IServiceProvider _serviceProvider;
public async Task CreateOrderAsync(CreateOrderRequest request, CancellationToken ct)
{
await using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync(ct);
await using var transaction = connection.BeginTransaction();
try
{
// Business operation
await using var cmd = new SqlCommand("INSERT INTO [Order] ...", connection, transaction);
await cmd.ExecuteNonQueryAsync(ct);
// Store event in outbox (same transaction)
var outbox = new SqlServerEventOutbox(
connection,
_serviceProvider.GetRequiredService<IOptions<OutboxOptions>>(),
TimeProvider.System,
transaction);
await outbox.StoreAsync(new OrderCreatedEvent { OrderId = orderId }, ct);
await transaction.CommitAsync(ct);
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
}public class UnitOfWork : IOutboxTransactionScope, IAsyncDisposable
{
private readonly SqlConnection _connection;
private SqlTransaction? _transaction;
public async Task BeginTransactionAsync(CancellationToken ct)
{
await _connection.OpenAsync(ct);
_transaction = _connection.BeginTransaction();
}
public object? GetCurrentTransaction() => _transaction;
public async Task CommitAsync(CancellationToken ct)
{
if (_transaction is not null)
{
await _transaction.CommitAsync(ct);
}
}
}
// Register in DI
services.AddScoped<IOutboxTransactionScope, UnitOfWork>();To use a custom schema or table name, update the :setvar variables at the top of OutboxMessage.sql before executing, then configure the same names in code:
:setvar SchemaName "myapp"
:setvar TableName "Events"services.AddPulse(config => config
.AddOutbox(options =>
{
options.Schema = "myapp"; // Default: "pulse"
options.TableName = "Events"; // Default: "OutboxMessage"
})
.AddSqlServerOutbox(connectionString)
);The default schema includes optimized indexes for:
- Pending message polling (
Status,CreatedAt) - Completed message cleanup (
Status,ProcessedAt)
Operations use stored procedures with:
ROWLOCKfor row-level lockingREADPASTto skip locked rows during pollingSET NOCOUNT ONto reduce network traffic
Configure batch size based on your throughput requirements:
.AddOutbox(processorOptions: options =>
{
options.BatchSize = 500; // Messages per poll
options.PollingInterval = TimeSpan.FromSeconds(1);
options.EnableBatchSending = true; // Use batch transport
})This provider uses the canonical outbox schema, making it fully interchangeable with the Entity Framework provider:
- Development: Start with Entity Framework for rapid iteration
- Production: Switch to ADO.NET for maximum performance
- Mixed: Use both providers against the same database
// Both configurations work with the same database table
.AddSqlServerOutbox(connectionString)
// or
.AddEntityFrameworkOutbox<MyDbContext>()- .NET 8.0, .NET 9.0, or .NET 10.0
- SQL Server 2016 or later (or Azure SQL Database)
Microsoft.Data.SqlClientfor database connectivityMicrosoft.Extensions.Hostingfor the background processor
- NetEvolve.Pulse - Core mediator and outbox abstractions
- NetEvolve.Pulse.Dapr - Dapr pub/sub integration for event dispatch
- NetEvolve.Pulse.Extensibility - Core contracts and abstractions
- NetEvolve.Pulse.EntityFramework - Entity Framework persistence
- NetEvolve.Pulse.Polly - Polly v8 resilience policies integration
For complete documentation, please visit the official documentation.
Contributions are welcome! Please read the Contributing Guidelines before submitting a pull request.
- Issues: Report bugs or request features on GitHub Issues
- Documentation: Read the full documentation at https://github.com/dailydevops/pulse
This project is licensed under the MIT License - see the LICENSE file for details.
Note
Made with ❤️ by the NetEvolve Team