Skip to content

Distributed mediator: queues, pub/sub, and worker infrastructure#149

Open
ejsmith wants to merge 29 commits into
mainfrom
queues
Open

Distributed mediator: queues, pub/sub, and worker infrastructure#149
ejsmith wants to merge 29 commits into
mainfrom
queues

Conversation

@ejsmith
Copy link
Copy Markdown
Contributor

@ejsmith ejsmith commented Apr 4, 2026

Summary

Adds distributed messaging infrastructure to Foundatio.Mediator — durable queues, pub/sub, and background workers — all integrated with the existing mediator pipeline.

New Packages

Foundatio.Mediator.Distributed

Core distributed abstractions and in-memory implementations:

  • IQueueClient — Durable message queue abstraction with acknowledge/reject/dead-letter semantics
  • IPubSubClient — Publish/subscribe abstraction for fan-out notifications
  • IQueueJobStateStore — Job state tracking for long-running queue work items
  • QueueWorker — Background hosted service that dequeues messages and dispatches through the mediator pipeline
  • QueueMiddleware — Middleware that intercepts mediator calls and routes [Queue]-attributed messages to queues
  • DistributedNotificationWorker — Bridges pub/sub messages into the local mediator for cross-process notifications
  • QueueContext — Rich context for queue handlers (acknowledge, reject, defer, progress reporting)
  • [Queue] attribute — Declarative queue routing with configurable concurrency, retry policies, and visibility timeouts
  • QueueRetryDelay — Static helper for computing retry delays with exponential backoff, jitter, and configurable cap
  • InMemoryQueueClient / InMemoryPubSubClient / InMemoryQueueJobStateStore — In-memory implementations for development and testing
  • Queue dashboard handlers — Built-in handlers for queue monitoring and management APIs

Foundatio.Mediator.Distributed.Aws

AWS implementations:

  • SqsQueueClient — Amazon SQS queue client with FIFO support, dead-letter queues, and automatic infrastructure provisioning
  • SqsPubSubClient — SNS fan-out publishing with per-node SQS subscription queues for true pub/sub across nodes

Foundatio.Mediator.Distributed.Redis

Redis implementations:

  • RedisQueueJobStateStore — Redis-backed job state tracking with atomic MULTI/EXEC transactions, TTL-based cleanup, and pub/sub change notifications

Key Features

  • Declarative queue routing — Apply [Queue] to message types to automatically route through queues instead of in-process dispatch
  • Retry policies — Configurable exponential backoff with jitter, max attempts, and dead-letter routing
  • Job tracking — First-class support for long-running jobs with progress reporting and state management
  • Queue dashboard — Built-in handlers for listing queues, viewing job state, counters, and managing queue entries
  • Distributed notifications — Publish events that are received across process boundaries via pub/sub
  • Aspire integration — AppHost and ServiceDefaults for the CleanArchitectureSample

Sample Updates

  • CleanArchitectureSample — Full distributed example with Redis job state, queue dashboard UI, live events via SSE (global event buffering), Aspire orchestration, and real-time queue monitoring

Tests

  • Foundatio.Mediator.Distributed.Tests — In-memory queue client, pub/sub, job state store, queue worker integration, retry delay computation
  • Foundatio.Mediator.Distributed.Aws.Tests — SQS queue client and SQS pub/sub tests against LocalStack
  • Foundatio.Mediator.Distributed.Redis.Tests — Redis job state store tests

Files Changed

156 files changed, ~12,900 insertions, ~1,300 deletions

Comment thread src/Foundatio.Mediator.Distributed/InMemoryQueueJobStateStore.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/InMemoryPubSubClient.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread samples/ConsoleSample/ServiceConfiguration.cs Fixed
- Security: Add MessageTypeResolver allowlist for type deserialization,
  replacing unsafe Type.GetType() in DistributedNotificationWorker
- Add PubSubEntry type to align IPubSubClient.PublishAsync with QueueEntry pattern
- Add IAsyncDisposable to IQueueClient and IPubSubClient interfaces
- Make IQueueClient.DeadLetterAsync a required method (no silent default)
- Replace ContinueWith with async/await in IQueueJobStateStore defaults
- Cache queue metadata in QueueMiddleware via ConcurrentDictionary keyed by
  DescriptorId, eliminating per-call reflection
- Thread CancellationToken through QueueMiddleware to SendAsync/SetJobStateAsync
- Replace lock(Random) with Random.Shared in QueueWorker retry delay
- Rename DistributedOptions to DistributedQueueOptions for clarity
- Rename AddSnsSqsPubSubClient to AddMediatorSnsSqsPubSub for naming consistency
- Update all test files for new PubSubEntry API
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/QueueWorker.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClient.cs Fixed
@ejsmith ejsmith marked this pull request as ready for review April 7, 2026 19:32
Comment thread src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClient.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed/InMemoryQueueJobStateStore.cs Fixed
Comment thread src/Foundatio.Mediator.Distributed.Aws/SqsPubSubClient.cs Fixed
- Fix potential null dereference in InMemoryQueueJobStateStore.GetJobStateAsync by separating TryGetValue and IsExpired checks into distinct branches

- Fix double-checked locking false positive in SqsPubSubClient.EnsureSharedQueueAsync by reading _sharedQueue into a local variable before each null check

- Add using declarations to all SemaphoreSlim locals in SqsPubSubClientTests and InMemoryPubSubClientTests for proper deterministic disposal
try
{
current = _sharedQueue;
if (current is not null)
{
// Filter out SQS long-polling (ReceiveMessage) to reduce trace noise
o.FilterHttpRequestMessage = req =>
req.Headers.TryGetValues("X-Amz-Target", out var values) != true
var queueNames = workers.Select(w => w.QueueName).ToList();
IReadOnlyList<QueueStats> allStats = [];
try { allStats = await _queueClient.GetQueueStatsAsync(queueNames, ct).ConfigureAwait(false); }
catch { /* Transport may not support stats */ }
var statsList = await _queueClient.GetQueueStatsAsync([query.QueueName], ct).ConfigureAwait(false);
stats = statsList.FirstOrDefault();
}
catch { /* Transport may not support stats */ }
}).ToList()
};
}
catch { /* State store may not support counters */ }
if (_stateStore is not null)
{
try { counterStats = await _stateStore.GetCounterStatsAsync(worker.QueueName, TimeSpan.FromHours(24), ct).ConfigureAwait(false); }
catch { /* State store may not be available */ }
if (worker.TrackProgress)
{
try { processingCount = await _stateStore.GetJobCountByStatusAsync(worker.QueueName, QueueJobStatus.Processing, ct).ConfigureAwait(false); }
catch { /* State store may not be available */ }
Comment on lines +76 to +80
catch (Exception ex)
{
logger.LogError(ex, "Failed to initialize distributed infrastructure");
ready.SetFailed(ex);
}
Comment on lines +158 to +162
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish distributed notification {MessageType} to bus",
notification.GetType().Name);
}
// Cancel all active subscription consumer tasks
foreach (var cts in _activeCts)
{
try { cts.Cancel(); cts.Dispose(); } catch { }
@niemyjski niemyjski self-requested a review April 11, 2026 00:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants