The Event Outbox Pattern is implemented in the worker application to ensure reliable event publishing in a distributed system. This pattern guarantees at-least-once delivery of events to RabbitMQ while maintaining transactional consistency with the database operations.
Events are stored in MongoDB with the following status progression:
NEW→ Initial state when event is createdPROCESSING→ Event is claimed by a workerPUBLISHED→ Event has been successfully published to RabbitMQFAILED→ Event processing failed (optional state for manual intervention)
Located in apps/worker/src/modules/events/services/events.service.ts
-
Event Claiming (Every Second)
@Cron(CronExpression.EVERY_SECOND) async publishNewEvents()
- Atomically claims a batch of
NEWevents (batch size: 500) - Updates status to
PROCESSING - Assigns unique
workerId(format:hostname-processId) - Sets
processingAttimestamp
- Atomically claims a batch of
-
Transactional Processing
- For each claimed event:
- Starts MongoDB transaction
- Publishes event to RabbitMQ (
Exchange.EVENTS) - Updates event status to
PUBLISHED - Commits transaction
This ensures that events are only marked as
PUBLISHEDif they are successfully sent to RabbitMQ. - For each claimed event:
-
Concurrency Control
- Multiple worker instances can run simultaneously
- Each worker has a unique
workerId - Atomic claiming prevents duplicate processing
- Status updates check
workerIdto ensure only the claiming worker can update
@Cron('*/2 * * * *') // Every 2 minutes
async recoverStaleEvents()Handles scenarios where events get stuck in PROCESSING state:
- Worker crashed
- Network issues
- Process terminated
- Identifies events in
PROCESSINGstate for > 60 seconds - Resets them to
NEWstatus - Clears
workerIdandprocessingAt - Makes them available for reprocessing
sequenceDiagram
participant API
participant MongoDB
participant Worker
participant RabbitMQ
API->>MongoDB: Create event (NEW)
Worker->>MongoDB: Claim batch (PROCESSING)
Worker->>RabbitMQ: Publish event
Worker->>MongoDB: Update status (PUBLISHED)
alt Stale Event
Note over Worker: Event stuck in PROCESSING
Worker->>MongoDB: Reset to NEW after 60s
Note over Worker: Available for reprocessing
end
- Batch Size: 500 events per processing cycle
- Processing Interval: Every 1 second
- Stale Threshold: 60 seconds
- Recovery Check: Every 2 minutes
-
Transaction Failures
- If RabbitMQ publish fails, MongoDB transaction rolls back
- Event remains in
PROCESSINGstate - Will be recovered by stale event recovery
-
Duplicate Processing
- Prevented by atomic claiming
- Double-checked by
workerIdverification - At-least-once delivery semantics
Monitor these aspects for system health:
- Events stuck in
PROCESSINGstate - High stale event recovery counts
- Processing latency (time from
NEWtoPUBLISHED) - Worker instance health (process/host metrics)
-
Event Creation
- Always create events within the same transaction as the business operation
- Set initial status as
NEW
-
Event Consumers
- Implement idempotency
- Handle duplicate events gracefully
- Use event ID for deduplication if needed
-
Scaling
- Multiple worker instances can run safely
- Monitor processing metrics when scaling
- Adjust batch size and intervals if needed
-
High Stale Event Count
- Check worker instance logs
- Verify MongoDB connection health
- Monitor RabbitMQ connection status
-
Processing Delays
- Review batch size configuration
- Check MongoDB query performance
- Monitor RabbitMQ queue metrics
-
Duplicate Processing
- Verify
workerIduniqueness - Check stale threshold configuration
- Review consumer idempotency implementation
- Verify