diff --git a/.gitignore b/.gitignore index 8408edbc49..68b0dae60d 100644 --- a/.gitignore +++ b/.gitignore @@ -341,3 +341,6 @@ tests/certs/ # Ralph loop RALPH_STOP ralph.log + +# Prompt files +PROMPT.md diff --git a/PROMPT.md b/PROMPT.md new file mode 100644 index 0000000000..b73b5b466f --- /dev/null +++ b/PROMPT.md @@ -0,0 +1,119 @@ +# Session Resume — Replay Outbox on Inbox Duplicate + +## Branch +`replay_on_seen` + +## Issue +#2541 — Force Replay of Outbox on seeing duplicate Processed Inbox Message + +## Spec +`specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/` + +## Status +- [x] Requirements — approved (2026-04-16), updated 2026-04-17 (ConversationId → CausationId rename, AC-8 updated to all persistent stores) +- [x] Design — ADR 0057 accepted, 5 rounds of review (all findings addressed) +- [x] Tasks — approved (2026-04-17), 5 rounds of review (all findings addressed), 23 tasks +- [ ] Implementation — not started + +## Current position +Tasks approved after 5 adversarial review rounds. Ready to begin implementation with `/spec:implement`, starting at Task 1. + +### Task review history +- Round 1: 3 findings ≥60 (75: missing async terminal step, 72: monolithic persistent store tasks, 62: telemetry covers too many behaviors) — all addressed +- Round 2: 1 finding ≥60 (65: missing InstrumentationOptions.Brighter gating) — addressed +- Round 3: 1 finding ≥60 (75: InstrumentationOptions inaccessible from UseInboxHandler) — addressed by adding Part B to Task 1 +- Round 4: 1 finding ≥60 (75: Spanner misclassified as NoSQL) — addressed by moving to relational tasks +- Round 5: PASS — 0 findings ≥60 + +## Task overview (23 tasks) + +### Structural prerequisites (Tasks 1-5) +1. UseInboxHandler uses pipeline's `this.Context` + expose `InstrumentationOptions` as protected property on `RequestHandler` +2. Enrich `PipelineStepDescription` with `Attribute` property +3. `Describe()` includes global inbox attributes + `InboxConfiguration` in validation path (depends on 2) +4. Expose `Outbox` from `IAmAnOutboxProducerMediator` +5. New types: `OnceOnlyAction.Replay`, `RequestContextBagNames.CausationId`, `BrighterSemanticConventions.CausationId`, `IAmACausationTrackingInbox`, `IAmACausationTrackingOutbox` + +### Core behavior (Tasks 6-13, test-first) +6. InMemoryInbox stores and retrieves CausationId +7. InMemoryOutbox stores CausationId + `ReplayCausation` +8. Sync UseInboxHandler generates CausationId on first handling +9. Async UseInboxHandlerAsync generates CausationId on first handling +10. Sync UseInboxHandler replays outbox on duplicate (Replay action) +11. Async UseInboxHandlerAsync replays outbox on duplicate (Replay action) +12. Sync UseInboxHandler handles Replay with no outbox (terminal step) +13. Async UseInboxHandlerAsync handles Replay with no outbox (terminal step) + +### Infrastructure (Tasks 14-18, test-first) +14. Pipeline validation: Replay requires causation-tracking support +15. UseInboxHandler Replay telemetry event on pipeline span +16. UseInboxHandler Throw/Warn/Add telemetry events (tidy improvement) +17. DI registration of `IAmACausationTrackingOutbox` +18. Base test classes for persistent store causation tracking + +### Persistent stores (Tasks 19-22, test-first) +19. Relational inbox stores: MsSql, MySql, Postgres, Sqlite, Spanner +20. NoSQL inbox stores: DynamoDB, DynamoDB.V4, Firestore, MongoDb +21. Relational outbox stores: MsSql, MySql, PostgreSql, Sqlite, Spanner +22. NoSQL outbox stores: DynamoDB, DynamoDB.V4, Firestore, MongoDb + +### Verification (Task 23) +23. Build + run all core tests + +## Key design decisions (ADR 0057) + +### Core feature +- **Causation Id**: Links an inbox entry to the outbox messages produced during that handler invocation. Propagated via `RequestContext.Bag` (key: `Brighter-CausationId`). Distinct from CorrelationId (request-reply), JobId, WorkflowId. +- **OnceOnlyAction.Replay**: New enum value. When inbox detects duplicate and action is Replay, it clears dispatch state on matching outbox messages so the sweeper resends them. +- **Two new role interfaces** (opt-in, non-breaking): + - `IAmACausationTrackingInbox` — knows the CausationId for an inbox entry + - `IAmACausationTrackingOutbox` — replays a causation's outbox messages; knows if schema supports it (`SupportsCausationTracking()`) +- **UseInboxHandler** gains optional `IAmACausationTrackingOutbox?` constructor param (resolved via standard MSDI `ActivatorUtilities`) + +### Observability +- `UseInboxHandler` currently has no telemetry — adding span events for all paths (Add, Throw, Warn, Replay) +- Events written to `Context.Span` (pipeline's Activity), gated on `InstrumentationOptions.Brighter` +- New `BrighterSemanticConventions.CausationId` constant (`"paramore.brighter.causation_id"`) — distinct from existing `ConversationId` which carries `CorrelationId` +- No new child spans; OutboxSweeper already creates its own trace on sweep + +### Persistent store strategy +- All 18 Brighter-maintained stores (9 inbox, 9 outbox) get CausationId support +- Schema migration is opt-in — users only need to migrate if they use Replay +- Separate migration PR lands on master first, merges into this branch +- `SupportsCausationTracking()` is a permanent runtime schema check (not transitional) + +### Structural prerequisites (tidy-first) +1. **RequestHandler**: Expose `instrumentationOptions` as `protected InstrumentationOptions InstrumentationOptions => instrumentationOptions;` (same for async) +2. **UseInboxHandler**: Switch from private `InitRequestContext()` to pipeline's `this.Context` so Bag data is shared across pipeline +3. **PipelineStepDescription**: Add non-positional `RequestHandlerAttribute? Attribute { get; init; }` property +4. **Describe() global inbox**: Pass `InboxConfiguration` into `ValidatePipelines()` → `PipelineBuilder`. `Describe()` injects global inbox attributes using same `MethodInfo` guard checks as `Build()` +5. **IAmAnOutboxProducerMediator**: Add `IAmAnOutbox? Outbox` read-only property +6. **DI registration**: Register outbox as `IAmACausationTrackingOutbox` alongside primary interface when it implements it + +### Pipeline validation +- `HandlerPipelineValidationRules.ReplayRequiresCausationTracking(IAmAnInbox? inbox, IAmAnOutbox? outbox)` — collapsed `Specification` +- Both inbox and outbox captured via closure; checks: inbox implements tracking, inbox schema supports it, outbox present, outbox implements tracking, outbox schema supports it + +### Test strategy +- In-memory stores first (base tests in `Paramore.Brighter.Base.Test`) +- Outbox persistent store tests generated via Liquid templates +- Inbox persistent store tests derived manually from base classes + +### Out of scope +- Schema migrations for persistent stores (separate PR, lands first) +- Saga/workflow orchestration +- Immediate send replay (sweeper only) +- Migration tooling for existing data (columns nullable, existing rows have null CausationId) + +## Design notes for implementation +- Brighter uses `DateTimeOffset` over `DateTime` in APIs +- `BrighterSemanticConventions.ConversationId` (`messaging.message.conversation_id`) already exists and carries `CorrelationId` — do NOT reuse for CausationId +- `UseInboxHandlerAsync.cs` has a duplicate `base.InitializeFromAttributeParams()` call — fix in Task 1 tidy-first pass +- Spanner is relational (implements `IRelationalDatabaseInboxQueries`), NOT NoSQL + +## Files modified +- `docs/adr/0057-replay-outbox-on-inbox-duplicate.md` — the ADR +- `specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/` — requirements.md, README.md, tasks.md, review-tasks.md, .issue-number, .adr-list, .requirements-approved, .design-approved, .tasks-approved + +## Next step +Begin implementation with `/spec:implement`, starting at Task 1 (structural: UseInboxHandler uses pipeline Context + expose InstrumentationOptions). diff --git a/docs/adr/0057-replay-outbox-on-inbox-duplicate.md b/docs/adr/0057-replay-outbox-on-inbox-duplicate.md new file mode 100644 index 0000000000..d6f9035cbe --- /dev/null +++ b/docs/adr/0057-replay-outbox-on-inbox-duplicate.md @@ -0,0 +1,624 @@ +# 57. Replay Outbox Messages on Inbox Duplicate Detection + +Date: 2026-04-16 + +## Status + +Accepted + +## Context + +**Parent Requirement**: [specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/requirements.md](../../specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/requirements.md) + +When a handler processes a command, it may produce downstream outbox messages that trigger subsequent workflow steps. If a workflow fails partway through — the handler completed and its outbox messages were dispatched, but a downstream consumer never processed its message — there is no mechanism to replay the workflow. Re-sending the original command to the first handler results in the inbox detecting a duplicate, and either throwing or warning. The downstream messages are never resent. + +We need to allow the inbox to trigger a replay of the outbox messages that were originally produced when handling a command, without re-executing the handler logic itself. The outbox sweeper already handles dispatching outstanding messages; we simply need to mark the relevant outbox messages as undispatched. + +### Forces + +- The inbox handler (`UseInboxHandler`) currently has no knowledge of or access to the outbox. These are separate concerns in the current architecture. +- The outbox is generic over a transaction type (`IAmAnOutboxSync`), making it difficult to inject directly into the inbox handler without knowing the transaction type. +- Not all inbox/outbox implementations will support this feature (e.g., third-party or older implementations). The change must be non-breaking and opt-in. +- Some handlers have an inbox but no outbox (terminal steps). The design must handle this gracefully. +- All Brighter-maintained persistent store schemas (inbox and outbox) will add the `CausationId` column. However, users upgrading Brighter are not required to migrate their store schema unless they intend to use `OnceOnlyAction.Replay`. A separate PR provides inbox/outbox schema migration tooling and will be merged into master before this feature, then merged into this branch. `SupportsCausationTracking()` performs a runtime schema check so that users running new Brighter code against an old (un-migrated) schema get a clear validation error at startup rather than a runtime failure. + +## Decision + +### New Concept: Causation Id + +A **Causation Id** links an inbox entry to the outbox messages produced during the same handler invocation. It is distinct from: + +- **CorrelationId** — used for request-reply patterns +- **JobId / WorkflowId** — reserved for future workflow orchestration across multiple steps + +The Causation Id is scoped to a single handler invocation: "when I handled command X, I produced messages M1, M2, M3 — they all share Causation Id C." + +#### Propagation + +The Causation Id is propagated via `RequestContext.Bag` using a well-known key defined in `RequestContextBagNames`: + +``` +RequestContextBagNames.CausationId = "Brighter-CausationId" // new well-known key +``` + +The `UseInboxHandler` is responsible for generating the Causation Id and placing it in the pipeline's `RequestContext.Bag`. On first handling (no duplicate), `UseInboxHandler.Handle()` generates the Causation Id (defaulting to the command's `Id`) and stores it in the Bag before calling `base.Handle()`. This ensures it is available to all downstream pipeline steps, including the outbox `Add` operation in `CommandProcessor.DepositPost`. + +**Prerequisite**: `UseInboxHandler` currently creates a private `RequestContext` via `InitRequestContext()` instead of using the pipeline's `this.Context`. This must be changed — the handler must use the pipeline's `IRequestContext` (available via the `Context` property inherited from `RequestHandler`) so that Bag data is shared across the pipeline. This is a structural refactor that affects the existing `Throw` and `Warn` paths as well, and should be done as a tidy-first change before the behavioral changes. + +Both the inbox `Add` and outbox `Add` operations read the Causation Id from `RequestContext.Bag` and store it. + +### New OnceOnlyAction: Replay + +A new enum value is added to `OnceOnlyAction`: + +```csharp +public enum OnceOnlyAction +{ + Throw, + Warn, + Replay // Clear dispatched state on matching outbox messages +} +``` + +When `UseInboxHandler` detects a duplicate and the action is `Replay`, it: + +1. Retrieves the Causation Id from the inbox entry +2. Replays the causation's outbox messages by clearing their `DispatchedAt` +3. Returns the request without executing the handler + +### New Role Interfaces + +Two new interfaces define the **causation tracking** capability as an optional role that inbox/outbox implementations can provide: + +```csharp +/// +/// Role: An inbox that can track and retrieve Causation Ids. +/// Responsibility: Knowing which causation an inbox entry belongs to. +/// +public interface IAmACausationTrackingInbox +{ + bool SupportsCausationTracking(); + + Task SupportsCausationTrackingAsync( + CancellationToken cancellationToken = default); + + string? GetCausationId(string id, string contextKey, + RequestContext? requestContext, int timeoutInMilliseconds = -1); + + Task GetCausationIdAsync(string id, string contextKey, + RequestContext? requestContext, int timeoutInMilliseconds = -1, + CancellationToken cancellationToken = default); +} + +/// +/// Role: An outbox that can replay messages for a causation. +/// Responsibility: Knowing if causation tracking is supported, and +/// doing the reset of dispatch state for a causation's outbox messages. +/// +public interface IAmACausationTrackingOutbox +{ + bool SupportsCausationTracking(); + + Task SupportsCausationTrackingAsync( + CancellationToken cancellationToken = default); + + void ReplayCausation(string causationId, RequestContext? requestContext, + Dictionary? args = null); + + Task ReplayCausationAsync(string causationId, RequestContext? requestContext, + Dictionary? args = null, + CancellationToken cancellationToken = default); +} +``` + +These follow the existing naming convention (`IAmA*`) and are separate from the core inbox/outbox interfaces, so implementations that don't support causation tracking continue to work. + +### Architecture Overview + +``` +Command arrives (duplicate) + │ + ▼ +┌──────────────────────┐ +│ UseInboxHandler │ +│ │ +│ inbox.Exists() ─►true +│ action == Replay? │ +│ │yes │ +│ ▼ │ +│ inbox.GetCausationId() +│ │ │ +│ ▼ │ +│ outbox.ReplayCausation(causationId) +│ │ │ +│ ▼ │ +│ return (skip handler) +└──────────────────────┘ + │ + ▼ (later) +┌──────────────────────┐ +│ Outbox Sweeper │ +│ │ +│ OutstandingMessages()│ +│ ── finds messages │ +│ with cleared │ +│ DispatchedAt │ +│ ── re-dispatches │ +└──────────────────────┘ +``` + +### Modified UseInboxHandler + +The `UseInboxHandler` gains an optional outbox dependency. Since the outbox is generic over `TTransaction`, we inject the causation-tracking role interface directly, avoiding the generic type parameter: + +```csharp +public class UseInboxHandler : RequestHandler where T : class, IRequest +{ + private readonly IAmAnInboxSync _inbox; + private readonly IAmACausationTrackingOutbox? _outbox; // optional + + public UseInboxHandler(IAmAnInboxSync inbox, + IAmACausationTrackingOutbox? outbox = null) + { + _inbox = inbox; + _outbox = outbox; + } +} +``` + +The `Handle` method is extended: + +```csharp +if (exists && _onceOnlyAction is OnceOnlyAction.Replay) +{ + Log.CommandHasAlreadyBeenSeenReplayingOutbox(s_logger, request.Id); + + if (_inbox is IAmACausationTrackingInbox trackingInbox + && _outbox is not null) + { + var causationId = trackingInbox.GetCausationId( + request.Id, _contextKey, requestContext); + + if (causationId is not null) + { + _outbox.ReplayCausation(causationId, requestContext); + } + } + + return request; +} +``` + +The same pattern applies to `UseInboxHandlerAsync`. + +### Causation Id Storage + +#### Inbox Storage + +The inbox `Add` method reads `CausationId` from `RequestContext.Bag` and stores it alongside the request. For `InMemoryInbox`, the `InboxItem` record gains a `CausationId` property: + +```csharp +public class InboxItem(Type requestType, string requestBody, DateTimeOffset writeTime, string contextKey) +{ + // ... existing properties ... + public string? CausationId { get; set; } // new +} +``` + +#### Outbox Storage + +The outbox `Add` method reads `CausationId` from `RequestContext.Bag` and stores it alongside the message. For `InMemoryOutbox`, the `OutboxEntry` class gains a `CausationId` property: + +```csharp +public class OutboxEntry(Message message) : IHaveABoxWriteTime +{ + // ... existing properties (Message, WriteTime as DateTimeOffset, TimeFlushed) ... + public string? CausationId { get; set; } // new +} +``` + +The `ReplayCausation(causationId)` method finds all entries with a matching `CausationId` and clears the `TimeFlushed` to `DateTimeOffset.MinValue`, causing the sweeper to re-dispatch them. For a persistent outbox it sets `Dispatched` to NULL. + +#### Persistent Store Implementations + +All Brighter-maintained persistent inbox and outbox implementations add `CausationId` support: + +**Inbox stores** (9): DynamoDB, DynamoDB.V4, Firestore, MongoDb, MsSql, MySql, Postgres, Spanner, Sqlite +**Outbox stores** (9): DynamoDB, DynamoDB.V4, Firestore, MongoDb, MsSql, MySql, PostgreSql, Spanner, Sqlite + +Each persistent store: +1. Adds a nullable `CausationId` column/attribute to its schema (existing rows have null — no data migration needed) +2. Reads `CausationId` from `RequestContext.Bag` in its `Add` method and stores it +3. Implements `IAmACausationTrackingInbox` or `IAmACausationTrackingOutbox` as appropriate +4. Indexes `CausationId` in the outbox for efficient replay queries +5. Returns `true` from `SupportsCausationTracking()` once the schema supports it + +#### Test Strategy + +Implementation follows test-first, starting with in-memory stores and propagating to persistent stores via the base test pattern: + +1. **Base tests** in `tests/Paramore.Brighter.Base.Test` define causation tracking test cases against the `IAmACausationTrackingInbox` and `IAmACausationTrackingOutbox` interfaces +2. **In-memory stores** (`InMemoryInbox`, `InMemoryOutbox`) are implemented and validated first +3. **Outbox persistent store tests** are generated from base tests via the Liquid template generator in `tools/Paramore.Brighter.Test.Generator/` — new templates are added for causation tracking scenarios +4. **Inbox persistent store tests** are derived manually from the base test classes (the generator does not yet cover inbox tests) +5. Each persistent store test project inherits the causation tracking tests and provides its store-specific setup/teardown + +### Pipeline Validation + +#### Prerequisite: Enrich PipelineStepDescription (structural change) + +`PipelineBuilder.Describe()` already has access to the full attribute instances — `GetOtherHandlersInPipeline()` returns `IEnumerable` via reflection. However, `PipelineStepDescription` currently discards the attribute's properties, keeping only `AttributeType`, `HandlerType`, `Step`, and `Timing`. The `OnceOnlyAction` value on a `UseInboxAttribute` is lost. + +To enable attribute-aware validation rules, `PipelineStepDescription` is enriched with a non-positional `Attribute` property. The existing positional parameters are preserved to avoid breaking the record's constructor contract — tests and production code construct `PipelineStepDescription` using positional syntax (`new PipelineStepDescription(typeof(SomeAttribute), typeof(SomeHandler<>), Step: 0, HandlerTiming.Before)`), and this must continue to compile. + +```csharp +public record PipelineStepDescription( + Type AttributeType, // unchanged — positional contract preserved + Type HandlerType, + int Step, + HandlerTiming Timing) +{ + /// + /// The full attribute instance, when available. Set by PipelineBuilder.Describe() + /// which has access to the reflected attribute objects. May be null in test code + /// that constructs descriptions directly. + /// + public RequestHandlerAttribute? Attribute { get; init; } +} +``` + +The corresponding projection in `PipelineBuilder.Describe()` changes from: + +```csharp +.Select(a => new PipelineStepDescription(a.GetType(), a.GetHandlerType(), a.Step, a.Timing)) +``` + +to: + +```csharp +.Select(a => new PipelineStepDescription(a.GetType(), a.GetHandlerType(), a.Step, a.Timing) + { Attribute = a }) +``` + +This is a non-breaking structural change: +- The four positional parameters are unchanged — all existing constructors, tests, and deconstruction patterns compile as before. +- The `Attribute` property is additive — existing code that doesn't need it simply ignores it. +- Validation rules that need attribute details (like the replay rule) access `step.Attribute` and pattern-match: `step.Attribute is UseInboxAttribute { OnceOnlyAction: OnceOnlyAction.Replay }`. +- Rules must handle `Attribute` being `null` (test-constructed descriptions), which the replay rule does naturally — if `Attribute` is null, the pattern match fails and the step is skipped. + +#### Prerequisite: Pass InboxConfiguration into the validation path (structural change) + +`BrighterPipelineValidationExtensions.ValidatePipelines()` currently creates the `PipelineBuilder` without `InboxConfiguration`: + +```csharp +var pipelineBuilder = new PipelineBuilder(subscriberRegistry); // no InboxConfiguration +``` + +The describe-only constructor already accepts `InboxConfiguration?` as an optional parameter, but `ValidatePipelines()` does not resolve and pass it. This means `Describe()` does not include global inbox attributes — only per-handler `[UseInbox]` attributes are visible. + +The fix: resolve `InboxConfiguration` from the service provider and pass it through: + +```csharp +var inboxConfiguration = sp.GetService(); +var pipelineBuilder = new PipelineBuilder(subscriberRegistry, inboxConfiguration); +``` + +Additionally, `Describe()` should include global inbox attributes in its output, mirroring what `AddGlobalInboxAttributes()` does at build time. This ensures `Describe()` and `Build()` agree on what the pipeline looks like, which is already the stated design intent of `Describe()`. + +This change touches shared infrastructure — `Describe()` is the foundation of all pipeline validation — and should be treated as a tidy-first structural change with its own focused test coverage to prevent regression. `AddGlobalInboxAttributes()` (used during `Build()`) performs two guard checks before injecting the global inbox attribute: + +1. `handlerMethod.HasNoInboxAttributesInPipeline()` — calls `MethodInfo.IsDefined(typeof(NoGlobalInboxAttribute), true)` +2. `handlerMethod.HasExistingUseInboxAttributesInPipeline()` — calls `MethodInfo.IsDefined(typeof(UseInboxAttribute/UseInboxAsyncAttribute), true)` + +These are `MethodInfo` extension methods (in `ReflectionExtensions`). They do not require a handler instance — they only need the `MethodInfo`. `Describe()` already has the `MethodInfo` from `HandlerMethodDiscovery.FindHandlerMethod(handlerType, requestType)` at line 116. So the same guard checks can be called directly on `handlerMethod` inside `Describe()`: + +```csharp +public IEnumerable Describe(Type requestType) +{ + // ... existing code ... + foreach (var handlerType in handlerTypes) + { + var handlerMethod = HandlerMethodDiscovery.FindHandlerMethod(handlerType, requestType); + var attributes = handlerMethod.GetOtherHandlersInPipeline(); + + // Inject global inbox attribute if applicable (same guards as Build path) + if (_inboxConfiguration != null + && !handlerMethod.HasNoInboxAttributesInPipeline() + && !handlerMethod.HasExistingUseInboxAttributesInPipeline()) + { + var isAsync = HandlerMethodDiscovery.IsAsyncHandler(handlerType); + RequestHandlerAttribute globalInbox = isAsync + ? new UseInboxAsyncAttribute( + step: 0, + contextKey: _inboxConfiguration.Context!(handlerType), + onceOnly: _inboxConfiguration.OnceOnly, + timing: HandlerTiming.Before, + onceOnlyAction: _inboxConfiguration.ActionOnExists) + : new UseInboxAttribute( + step: 0, + contextKey: _inboxConfiguration.Context!(handlerType), + onceOnly: _inboxConfiguration.OnceOnly, + timing: HandlerTiming.Before, + onceOnlyAction: _inboxConfiguration.ActionOnExists); + attributes = attributes.Append(globalInbox); + } + + // ... rest of Describe (filter before/after, build description) ... + } +} +``` + +The only difference from `Build()` is that `Build()` calls `_inboxConfiguration.Context(implicitHandler.GetType())` on the handler instance, while `Describe()` calls `_inboxConfiguration.Context(handlerType)` on the `Type` directly. Since `InboxConfiguration.Context` is `Func`, both produce the same result — the handler instance's `GetType()` returns the same `Type` that `Describe()` already has. + +#### Validation rule: ReplayRequiresCausationTracking + +A new collapsed specification is added to `HandlerPipelineValidationRules`, following the same pattern as `BackstopAttributeOrdering()` and `AttributeAsyncConsistency()`. Both the inbox and outbox instances are captured via closure in the factory method: + +```csharp +public static ISpecification ReplayRequiresCausationTracking( + IAmAnInbox? inbox, IAmAnOutbox? outbox) + => new Specification(d => + { + var hasReplay = d.BeforeSteps.Any(s => + s.Attribute is UseInboxAttribute { OnceOnlyAction: OnceOnlyAction.Replay } + || s.Attribute is UseInboxAsyncAttribute { OnceOnlyAction: OnceOnlyAction.Replay }); + + if (!hasReplay) return []; + + var findings = new List(); + + // Check 1: inbox must implement IAmACausationTrackingInbox + if (inbox is not IAmACausationTrackingInbox trackingInbox) + { + findings.Add(ValidationResult.Fail(new ValidationError( + ValidationSeverity.Error, + $"Handler '{d.HandlerType.Name}'", + "Inbox is configured with OnceOnlyAction.Replay but the inbox does not implement " + + "IAmACausationTrackingInbox. Upgrade the inbox implementation."))); + } + else if (!trackingInbox.SupportsCausationTracking()) + { + // Check 2: inbox schema must support CausationId + findings.Add(ValidationResult.Fail(new ValidationError( + ValidationSeverity.Warning, + $"Handler '{d.HandlerType.Name}'", + "Inbox implements IAmACausationTrackingInbox but SupportsCausationTracking() " + + "returned false. The inbox schema may need upgrading to add the CausationId column."))); + } + + // Check 3: outbox must be configured + if (outbox is null) + { + findings.Add(ValidationResult.Fail(new ValidationError( + ValidationSeverity.Warning, + $"Handler '{d.HandlerType.Name}'", + "Inbox is configured with OnceOnlyAction.Replay but no outbox is configured. " + + "Replay will be a no-op (terminal step)."))); + return findings; + } + + // Check 4: outbox must implement IAmACausationTrackingOutbox + if (outbox is not IAmACausationTrackingOutbox trackingOutbox) + { + findings.Add(ValidationResult.Fail(new ValidationError( + ValidationSeverity.Error, + $"Handler '{d.HandlerType.Name}'", + "Inbox is configured with OnceOnlyAction.Replay but the outbox does not implement " + + "IAmACausationTrackingOutbox. Upgrade the outbox implementation."))); + return findings; + } + + // Check 5: outbox schema must support CausationId + if (!trackingOutbox.SupportsCausationTracking()) + { + findings.Add(ValidationResult.Fail(new ValidationError( + ValidationSeverity.Warning, + $"Handler '{d.HandlerType.Name}'", + "Outbox implements IAmACausationTrackingOutbox but SupportsCausationTracking() " + + "returned false. The outbox schema may need upgrading to add the CausationId column."))); + } + + return findings; + }); +``` + +This mirrors the collapsed specification form used by `BackstopAttributeOrdering()` and `AttributeAsyncConsistency()` — a result evaluator that returns zero or more `ValidationResult`s per pipeline description. + +#### Wiring: PipelineValidator and ValidatePipelines() + +The specification is added to the existing handler pipeline specs array in `PipelineValidator.ValidateHandlerPipelines()`. The outbox is resolved from DI and passed through: + +In `ValidatePipelines()`: + +```csharp +var inboxConfiguration = sp.GetService(); +var inbox = inboxConfiguration?.Inbox; // IAmAnInbox — already available + +var mediator = sp.GetService(); +var outbox = mediator?.Outbox; // IAmAnOutbox? — new read-only property (see below) + +var pipelineBuilder = new PipelineBuilder(subscriberRegistry, inboxConfiguration); +return new PipelineValidator(pipelineBuilder, publications, subscriptions, consumerSpecList, inbox, outbox); +``` + +The inbox is obtained from `InboxConfiguration.Inbox` which is already `IAmAnInbox`. This is the same `InboxConfiguration` that is now passed into `PipelineBuilder` for `Describe()` to inject global inbox attributes. + +For the outbox, `OutboxProducerMediator` stores the outbox instances as private fields (`_outBox`, `_asyncOutbox`). A new read-only property is added to `IAmAnOutboxProducerMediator` to expose the outbox for validation: + +```csharp +// On IAmAnOutboxProducerMediator: +IAmAnOutbox? Outbox { get; } +``` + +Implemented in `OutboxProducerMediator`: + +```csharp +public IAmAnOutbox? Outbox => (IAmAnOutbox?)_outBox ?? _asyncOutbox; +``` + +This is safe because both `IAmAnOutboxSync` and `IAmAnOutboxAsync` inherit from `IAmAnOutbox`. The property returns whichever is available (sync preferred), or null if neither is configured. + +`PipelineValidator` gains optional `IAmAnInbox?` and `IAmAnOutbox?` constructor parameters and includes the new rule in its specs array: + +```csharp +private void ValidateHandlerPipelines(List findings) +{ + var descriptions = _pipelineBuilder.Describe(); + var specs = new ISpecification[] + { + HandlerPipelineValidationRules.HandlerTypeVisibility(), + HandlerPipelineValidationRules.BackstopAttributeOrdering(), + HandlerPipelineValidationRules.AttributeAsyncConsistency(), + HandlerPipelineValidationRules.ReplayRequiresCausationTracking(_inbox, _outbox) // new + }; + + EvaluateSpecs(descriptions, specs, findings); +} +``` + +No new validation method or separate code path is needed — the rule flows through the existing `EvaluateSpecs` infrastructure alongside all other handler pipeline rules. + +This approach works because: + +- The enriched `PipelineStepDescription` gives us access to `UseInboxAttribute.OnceOnlyAction` on each step +- Global inbox configuration is included in `Describe()` output, so both per-handler and global configurations are validated +- The outbox instance is captured by the specification's closure, keeping `PipelineValidator` generic — it does not need to know about replay semantics +- The rule is composable and independently testable, consistent with the Specification pattern established in ADR 0053 + +The `SupportsCausationTracking()` method is a permanent runtime schema check. It allows users to upgrade Brighter without being forced to migrate their store schema — the feature is only available once the schema supports it. A separate PR provides inbox/outbox migration tooling and will be merged before this feature. + +### Observability + +`UseInboxHandler` currently has no telemetry — it writes structured log messages but does not add events to the pipeline's Activity span. This is a gap for all inbox paths, not just Replay. + +The CommandProcessor creates a span via `BrighterTracer.CreateSpan()` and stores it in `RequestContext.Span`. The base `RequestHandler.Handle()` writes handler entry events to this span via `BrighterTracer.WriteHandlerEvent()`. `UseInboxHandler` should follow the same pattern: write events to `Context.Span` (available after the tidy-first prerequisite that switches from `InitRequestContext()` to the pipeline's `this.Context`). + +#### Events added to the pipeline span + +All events are guarded by `Context?.Span != null` and gated on `InstrumentationOptions.Brighter`: + +| Path | Event Name | Tags | +|------|-----------|------| +| First handling (no duplicate) | `"UseInboxHandler Add"` | `request.id`, `context_key` | +| Duplicate + Throw | `"UseInboxHandler Duplicate Throw"` | `request.id`, `context_key` | +| Duplicate + Warn | `"UseInboxHandler Duplicate Warn"` | `request.id`, `context_key` | +| Duplicate + Replay | `"UseInboxHandler Duplicate Replay"` | `request.id`, `context_key`, `causation_id` | + +The Replay event includes the `CausationId` using a new `BrighterSemanticConventions.CausationId` constant (`"paramore.brighter.causation_id"`) — distinct from the existing `ConversationId` constant which carries the `CorrelationId` for request-reply patterns. + +#### No new spans + +`UseInboxHandler` does not create child spans. It adds events to the existing pipeline span, consistent with how all other built-in handler decorators work. The `OutboxSweeper` already creates its own Activity when it runs `SweepAsync`, so the re-dispatched messages get their own independent trace — there is no parent-child link between the replay trigger and the sweep, which is correct because the sweep is asynchronous and may pick up messages from multiple replays. + +#### Implementation sketch + +```csharp +// Inside Handle(), after the Replay branch executes: +if (Context?.Span != null) +{ + var tags = new ActivityTagsCollection + { + { BrighterSemanticConventions.RequestId, request.Id }, + { BrighterSemanticConventions.CausationId /* new constant: "paramore.brighter.causation_id" */, causationId } + }; + Context.Span.AddEvent(new ActivityEvent( + "UseInboxHandler Duplicate Replay", DateTimeOffset.UtcNow, tags)); +} +``` + +The same pattern applies to the Throw, Warn, and Add paths (without the `CausationId` tag). + +### Attribute Changes + +`UseInboxAttribute` and `UseInboxAsyncAttribute` already accept `OnceOnlyAction` as a parameter. Adding `Replay` to the enum is sufficient — no attribute changes needed beyond the enum value. + +### Configuration Changes + +`InboxConfiguration.ActionOnExists` already stores a `OnceOnlyAction`. No changes needed. + +### DI Registration + +`UseInboxHandler` gains an optional `IAmACausationTrackingOutbox?` constructor parameter. For DI resolution to work, the outbox must be registered under this interface in addition to its primary registration. + +Outbox implementations that support causation tracking (starting with `InMemoryOutbox`) implement `IAmACausationTrackingOutbox` directly. The DI registration in `ServiceCollectionExtensions` (or equivalent setup code) must register the same outbox instance as both its primary interface and `IAmACausationTrackingOutbox`: + +```csharp +// Existing registration (unchanged): +services.AddSingleton(outbox); + +// Additional registration for causation tracking: +if (outbox is IAmACausationTrackingOutbox) + services.AddSingleton((IAmACausationTrackingOutbox)outbox); +``` + +When the outbox does not implement `IAmACausationTrackingOutbox`, no registration is made and `UseInboxHandler` receives `null` for its optional parameter — the handler degrades gracefully (pipeline validation catches the mismatch at startup if `Replay` is configured). + +**DI resolution path**: `UseInboxHandler` is not explicitly registered in the container — it is resolved by type via `ServiceProviderHandlerFactory`, which calls `IServiceProvider.GetService(handlerType)`. The container uses `ActivatorUtilities` to construct the handler, which supports optional constructor parameters: it resolves `IAmAnInboxSync` from the container and passes `null` for the optional `IAmACausationTrackingOutbox?` parameter when that service is not registered. This is standard `Microsoft.Extensions.DependencyInjection` behavior and requires no special handling. + +### Key Components and Responsibilities + +| Component | Role | Responsibility | +|-----------|------|----------------| +| `UseInboxHandler` | Coordinator | Deciding whether to replay; generating the CausationId; delegating to inbox and outbox | +| `IAmACausationTrackingInbox` | Information Holder | Knowing the Causation Id for an inbox entry | +| `IAmACausationTrackingOutbox` | Service Provider | Knowing if causation tracking is supported by the schema; doing the replay of a causation's outbox messages | +| `RequestContext.Bag` | Structurer | Carrying the Causation Id through the pipeline | +| `PipelineStepDescription` | Information Holder | Knowing the full attribute instance (including `OnceOnlyAction`) for validation | +| `PipelineValidator` | Controller | Deciding if the pipeline is correctly configured for replay | +| Outbox Sweeper | Service Provider | Doing the re-dispatch (existing, unchanged) | + +## Consequences + +### Positive + +- Enables workflow replay without re-executing handler logic — "skip what's done, resend what follows" +- Non-breaking: existing implementations work unchanged; new behavior is opt-in via `OnceOnlyAction.Replay` +- Uses the existing outbox sweeper for re-dispatch — no new dispatch mechanism needed +- Pipeline validation catches misconfiguration at startup, not at runtime +- The structural prerequisites (enriched `PipelineStepDescription`, `InboxConfiguration` in validation path) improve the validation infrastructure generally — future rules can inspect any attribute property +- Fixing `UseInboxHandler` to use the pipeline's `RequestContext` instead of creating its own is a correctness improvement that benefits all inbox paths +- The role interfaces (`IAmACausationTrackingInbox`, `IAmACausationTrackingOutbox`) follow existing patterns and can be adopted incrementally by store implementations + +### Negative + +- `UseInboxHandler` gains an optional outbox dependency, adding complexity to its constructor and DI registration +- All 18 Brighter-maintained store implementations (9 inbox, 9 outbox) need schema and code changes — significant breadth of change, though each individual change is mechanical +- Migration of existing data is not provided — new columns are nullable, so existing rows have null `CausationId` and replay is unavailable for historical entries +- `SupportsCausationTracking()` is a permanent runtime schema check on both inbox and outbox — it protects users who upgrade Brighter but have not yet migrated their store schema. Pipeline validation uses it at startup so that misconfiguration (Replay enabled on an un-migrated schema) produces a clear error, not a silent runtime failure +- The `Replay` action silently does nothing if the inbox/outbox don't support causation tracking at runtime (though pipeline validation should catch this at startup) + +### Risks and Mitigations + +| Risk | Mitigation | +|------|------------| +| Outbox has many messages for a causation; clearing all is slow | CausationId should be indexed in persistent stores. For in-memory, the scan is bounded by outbox size and expiry. | +| Handler produces different messages on re-execution vs original | Not applicable — the handler is not re-executed. The *same* outbox messages from the original execution are replayed. | +| Race condition: sweeper dispatches while we're clearing | Acceptable: worst case, a message is dispatched twice. Downstream inbox deduplication handles this. | +| Replay configured but no outbox (terminal step) | Pipeline validation warns. At runtime, `_outbox` is null, so the handler simply returns without replay — safe no-op. | +| Inbox `Add` fails after `base.Handle()` succeeds (including outbox writes) | The outbox messages will have CausationIds but the inbox entry won't exist, making replay impossible for that invocation. This is acceptable: since the inbox entry was never written, the message is not marked as "seen" — the transport will redeliver it, the handler will re-execute (inbox `Exists()` returns false), and both inbox and outbox entries will be written correctly on the retry. | + +## Alternatives Considered + +### 1. Use JobId for Correlation + +The existing `JobId` field on `MessageHeader` could correlate inbox and outbox entries. Rejected because `JobId` represents an entire workflow instance — replaying by `JobId` would resend *all* messages for that job across *all* steps, not just the messages from the specific step being replayed. + +### 2. Use CorrelationId for Correlation + +`CorrelationId` is used for request-reply patterns and has different semantics. Overloading it for replay correlation would conflate two distinct concepts. + +### 3. Signal Replay via RequestContext, Handle in Middleware + +Instead of giving the inbox handler an outbox reference, set a flag in `RequestContext.Bag` and have a separate middleware component perform the outbox clearing. This adds a new handler type to the pipeline and complicates configuration for a single-purpose operation. The inbox handler is the natural place for this decision since it already owns the "what to do on duplicate" responsibility. + +### 4. Re-execute the Handler on Duplicate + +Instead of replaying stored outbox messages, re-execute the handler to produce new messages. Rejected because this defeats the purpose of the inbox (preventing non-idempotent re-execution) and could produce *different* messages depending on current state. + +## References + +- Requirements: [specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/requirements.md](../../specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/requirements.md) +- GitHub Issue: #2541 +- Existing ADRs: + - [0054 - Roslyn Analyzer Extensions for Pipeline Validation](0054-roslyn-analyzer-extensions-for-pipeline-validation.md) + - [0056 - Timed Outbox Archiver Sync Fallback](0056-timed-outbox-archiver-sync-fallback.md) diff --git a/specs/.current-spec b/specs/.current-spec deleted file mode 100644 index 2bf3855984..0000000000 --- a/specs/.current-spec +++ /dev/null @@ -1 +0,0 @@ -0026-timed-outbox-archiver-sync-fallback diff --git a/specs/.current_spec b/specs/.current_spec index cc611d2e8a..631f1ea138 100644 --- a/specs/.current_spec +++ b/specs/.current_spec @@ -1 +1 @@ -0022-Defer-Message-Action-Backstop-Handler +0027-replay-matching-outbox-events-when-inbox-has-already-seen diff --git a/specs/0022-Defer-Message-Action-Backstop-Handler/review-tasks.md b/specs/0022-Defer-Message-Action-Backstop-Handler/review-tasks.md new file mode 100644 index 0000000000..042a562e75 --- /dev/null +++ b/specs/0022-Defer-Message-Action-Backstop-Handler/review-tasks.md @@ -0,0 +1,71 @@ +# Review: tasks — 0022-Defer-Message-Action-Backstop-Handler + +**Date**: 2026-04-17 +**Threshold**: 60 +**Verdict**: NEEDS WORK + +> **Note**: This phase is already approved. Findings are informational — consider whether any warrant re-opening the phase. + +1 finding at or above threshold 60. Address these before approving. + +## Findings + +### 1. Task 7 only tests Proactor path but implementation modifies both Reactor and Proactor (Score: 70) + +Task 7 specifies a single test file in `tests/Paramore.Brighter.Core.Tests/MessageDispatch/Proactor/` and says the implementation touches both `Proactor.cs` and `Reactor.cs`. However, the task only has one test file covering the Proactor (async) path. There is no corresponding test for the Reactor (sync) path with the delay override. + +**Evidence**: Task 7 says: +> "In `Proactor.cs`: Change `catch (DeferMessageAction)` to `catch (DeferMessageAction deferAction)`, extract `deferAction.Delay`, pass to `RequeueMessage`" +> "In `Reactor.cs`: Same change for the sync path" + +But only one test file is listed: `When_a_command_handler_throws_a_defer_message_with_delay_Then_message_is_requeued_with_delay.cs` in the `Proactor/` directory. + +**Recommendation**: Add a separate test task (or expand Task 7) to include a Reactor-path test: `tests/Paramore.Brighter.Core.Tests/MessageDispatch/Reactor/When_a_command_handler_throws_a_defer_message_with_delay_Then_message_is_requeued_with_delay.cs`. + +--- + +### 2. Task 7 test verifies only the "with delay" case; the "null delay fallback" case is described but has no separate test file (Score: 55) + +Task 7 bullet says to verify two behaviors: (1) when `Delay` is set, pump uses it, and (2) when `Delay` is null, pump falls back to subscription `RequeueDelay`. Only one test file is specified, which presumably covers both cases in a single test class. + +**Evidence**: Task 7 lists: +> "When handler throws `DeferMessageAction` with a `Delay` value, the pump passes that delay to `Channel.RequeueAsync`" +> "When handler throws `DeferMessageAction` without a `Delay` (null), the pump falls back to subscription `RequeueDelay`" + +But only one test file is named. + +**Recommendation**: Consider splitting into two test files or at minimum ensure the single test file clearly has two separate test methods. + +--- + +### 3. Task 1 describes work already completed in codebase (Score: 45) + +All tasks appear to be already implemented in the codebase (all four types exist, all test files exist, pump changes are in place), but all checkboxes remain unchecked. + +**Evidence**: `DeferMessageAction.cs` already has `TimeSpan? Delay`, all constructors, and all files referenced in Tasks 2-7 exist. All checkboxes are `- [ ]`. + +**Recommendation**: Mark completed tasks with `[x]` to reflect actual state. + +--- + +### 4. Branch name in header is stale (Score: 30) + +The tasks.md header says `**Branch**: \`error_examples\` (or new feature branch)` but the current branch is `replay_on_seen`. + +**Evidence**: Line 6: `**Branch**: \`error_examples\` (or new feature branch)` + +**Recommendation**: Update the branch name or remove it. + +--- + +## Summary + +| Score Range | Count | +|-------------|-------| +| 90-100 (Critical) | 0 | +| 70-89 (High) | 1 | +| 50-69 (Medium) | 1 | +| 0-49 (Low) | 2 | + +**Total findings**: 4 +**Findings at or above threshold (60)**: 1 diff --git a/specs/0026-timed-outbox-archiver-sync-fallback/.tasks-approved b/specs/0026-timed-outbox-archiver-sync-fallback/.tasks-approved new file mode 100644 index 0000000000..e69de29bb2 diff --git a/specs/0026-timed-outbox-archiver-sync-fallback/review-tasks.md b/specs/0026-timed-outbox-archiver-sync-fallback/review-tasks.md new file mode 100644 index 0000000000..2a0841203d --- /dev/null +++ b/specs/0026-timed-outbox-archiver-sync-fallback/review-tasks.md @@ -0,0 +1,81 @@ +# Review: tasks — 0026-timed-outbox-archiver-sync-fallback + +**Date**: 2026-04-15 +**Threshold**: 60 +**Verdict**: NEEDS WORK + +3 findings at or above threshold 60. Address these before approving. + +## Findings + +### 1. Implementation file path for TimedOutboxArchiver not specified (Score: 72) + +Task 2 describes implementing a three-way branch in `TimedOutboxArchiver.Archive()`, adding a `NoOutboxConfigured` log message, and removing a duplicate `OutboxSweeperSleeping` call. However, the task only specifies the file path for the TIDY step (`src/Paramore.Brighter/OutboxArchiver.cs`). The actual implementation changes are in a different project/file: `src/Paramore.Brighter.Outbox.Hosting/TimedOutboxArchiver.cs`. + +**Evidence**: Task 2 implementation bullet points reference `TimedOutboxArchiver.Archive()` and `Log` partial class changes but never state which file to edit. Task 1 explicitly lists `File: src/Paramore.Brighter/OutboxArchiver.cs` but no equivalent line appears for Tasks 2-4. + +**Recommendation**: Add `File: src/Paramore.Brighter.Outbox.Hosting/TimedOutboxArchiver.cs` to the implementation section of Task 2. + +--- + +### 2. FR1 (async path unchanged) has no dedicated test task (Score: 65) + +FR1 states "When an async outbox is available, `TimedOutboxArchiver` calls `ArchiveAsync` (current behavior, unchanged)." Task 3 partially covers this with AC3 (prefers async when both are available), but there is no test for the scenario where ONLY an async outbox is registered (no sync). The three-way branch introduces a regression risk for async-only outboxes. + +**Evidence**: FR1 says "current behavior, unchanged." Task 3 tests "both sync and async" but not "async only via TimedOutboxArchiver." + +**Recommendation**: Either add a test task for async-only outbox through `TimedOutboxArchiver`, or explicitly document why existing coverage is sufficient. + +--- + +### 3. NoOutboxConfigured log message location ambiguous (Score: 62) + +Task 2 says "Add `NoOutboxConfigured` log message to the `Log` partial class" but both `TimedOutboxArchiver` and `OutboxArchiver` have their own `Log` partial class. The call site is in `TimedOutboxArchiver` but this is not stated explicitly. + +**Evidence**: Task 2 implementation says `Log.NoOutboxConfigured(s_logger)` but `s_logger` exists in both classes. + +**Recommendation**: Specify: "Add `NoOutboxConfigured` log message to the `Log` partial class in `TimedOutboxArchiver.cs`." + +--- + +### 4. OutboxArchiver is generic but test setup doesn't mention type parameters (Score: 55) + +`OutboxArchiver` requires two generic type parameters. The test descriptions don't mention what concrete types to use. + +**Evidence**: `public partial class OutboxArchiver where TMessage : Message` at line 40 of `OutboxArchiver.cs`. + +**Recommendation**: Specify the concrete generic types to use in tests based on existing archiving test patterns. + +--- + +### 5. ADR decision to remove duplicate OutboxSweeperSleeping is bundled into behavioral task (Score: 45) + +The duplicate `OutboxSweeperSleeping` log removal is a structural/tidy change bundled into behavioral Task 2, contrary to tidy-first philosophy. + +**Evidence**: Task 2 implementation says "Remove duplicate `Log.OutboxSweeperSleeping` call (keep only the one in `finally`)." + +**Recommendation**: Move to Step 1 as a separate TIDY task. + +--- + +### 6. Task numbering inconsistency in dependency chain (Score: 40) + +The dependency chain references "Task 1 → Task 2 → Task 3 → Task 4" but there are only 3 bullet-pointed tasks in Step 2. + +**Evidence**: `Task 1 (TIDY) → Task 2 (sync fallback) → Task 3 (async preference) → Task 4 (no outbox warning)` but document has Step 1 (1 task) + Step 2 (3 tasks). + +**Recommendation**: Number tasks explicitly or fix the dependency chain. + +--- + +## Summary + +| Score Range | Count | +|-------------|-------| +| 90-100 (Critical) | 0 | +| 70-89 (High) | 1 | +| 50-69 (Medium) | 3 | +| 0-49 (Low) | 2 | + +**Total findings**: 6 +**Findings at or above threshold (60)**: 3 diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.adr-list b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.adr-list new file mode 100644 index 0000000000..2276c56f52 --- /dev/null +++ b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.adr-list @@ -0,0 +1 @@ +0057-replay-outbox-on-inbox-duplicate.md diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.design-approved b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.design-approved new file mode 100644 index 0000000000..e69de29bb2 diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.issue-number b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.issue-number new file mode 100644 index 0000000000..bcf6e41ede --- /dev/null +++ b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.issue-number @@ -0,0 +1 @@ +2541 diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.requirements-approved b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.requirements-approved new file mode 100644 index 0000000000..129ad63e39 --- /dev/null +++ b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.requirements-approved @@ -0,0 +1 @@ +Approved: 2026-04-16 diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.tasks-approved b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/.tasks-approved new file mode 100644 index 0000000000..e69de29bb2 diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/README.md b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/README.md new file mode 100644 index 0000000000..972aef8d40 --- /dev/null +++ b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/README.md @@ -0,0 +1,11 @@ +# 0027 - Replay Matching Outbox Events When Inbox Has Already Seen + +**Created:** 2026-04-15 + +## Status + +- [x] Requirements (`requirements.md`) - Approved 2026-04-16 +- [x] Design (`docs/adr/0057-replay-outbox-on-inbox-duplicate.md`) - Ready for review +- [ ] Tasks (`tasks.md`) +- [ ] Implementation +- [ ] Review diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/requirements.md b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/requirements.md new file mode 100644 index 0000000000..a1209e222d --- /dev/null +++ b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/requirements.md @@ -0,0 +1,77 @@ +# Requirements + +> **Note**: This document captures user requirements and needs. Technical design decisions and implementation details should be documented in an Architecture Decision Record (ADR) in `docs/adr/`. + +**Linked Issue**: #2541 + +## Problem Statement + +As a developer building multi-step workflows with Brighter, I would like to be able to trigger automatic replay of downstream outbox messages when a duplicate command is received in the inbox, so that I can re-trigger a workflow by re-sending a message to a Brigter consumer, have it skip already-completed steps, but send outgoing messages that force te re-execution of any steps subsequent steps eventually triggering those that we re never actioned. + +Today, when the inbox detects a duplicate message (via `OnceOnly`), it either throws (`OnceOnlyAction.Throw`) or warns and skips (`OnceOnlyAction.Warn`). In neither case does it cause downstream messages that were originally produced to be resent. This means that if a workflow fails partway through — the handler completed but some downstream consumers never received or processed their messages — there is no mechanism to retry the workflow; the inbox will skip processing and no messages will be raised to trigger downstream consumers. + +We don't want to reprocess the message, if we were idempotent there would be no need for the Inbox. Instead, we want to resend the **same** outgoing messages that we sent the last time the handler was executed. This avoids the problem that the inbox does not create true idempotency because the downstream messages raised are not the same as the last time we executed the handler. The outgoing messages are not side-effects, but direct effects. + +In some cases there may be an Inbox, but no Outbox. This a terminal step, which doesn't raise further messages. It is important to test for the presence of an Outbox before attempting to add a causation id or clear the associated Outbox messages. + +## Proposed Solution + +Introduce a **Causation Id** that links an incoming inbox entry to the outgoing outbox messages produced during that handler's execution. The causation id captures the causal relationship: this incoming request *caused* these outgoing messages. When the inbox detects a duplicate message and a "replay downstream on duplicate" option is enabled, it clears the `DispatchedAt` timestamp on all outbox messages sharing that Causation Id. This causes the outbox sweeper to re-dispatch those messages, effectively replaying the downstream workflow steps. + +From a user perspective: +- A new `CausationId` is propagated through the request context during handler execution and stored in both the inbox and outbox. +- The inbox can be configured (via attribute or `InboxConfiguration`) with a flag to replay downstream messages on duplicate detection instead.. +- When replay is triggered, the outbox sweeper picks up the "un-dispatched" messages and resends them naturally — no new dispatch mechanism is needed. + +## Requirements + +### Functional Requirements + +1. **Causation Id propagation**: A `CausationId` must be added to the request context when a request enters a handler pipeline (if it does not already exist) and propagated to all outbox messages produced during that handler's execution. +2. **Causation Id storage**: Both the inbox and outbox must store the `CausationId`, allowing correlation between an incoming command and its downstream messages. +3. **Replay on duplicate**: When the inbox detects a duplicate (message already exists) and the replay flag is enabled, it must clear the `DispatchedAt` field on all outbox messages with the matching `CausationId`. +4. **Sweeper re-dispatch**: Outbox messages with a cleared `DispatchedAt` must be picked up by the existing outbox sweeper and re-dispatched — no new dispatch path is required. This is existing functionality. +5. **Causation Id is distinct from Job Id**: The `CausationId` represents a single handler invocation's downstream messages, not an entire job. Re-running a workflow should only replay the messages for the specific step being re-triggered, not all messages for that job. +6. **New OnceOnly action**: A new `OnceOnlyAction` value (e.g., `ReplayOutbox`) or a separate flag should indicate that duplicate detection should trigger downstream replay rather than throwing or warning. + +### Non-functional Requirements + +- **Non-breaking change**: Existing inbox/outbox implementations that do not support `CausationId` must continue to work without modification. The new behavior is opt-in. +- **Pipeline validation**: At startup, if replay-on-duplicate is configured, pipeline validation must verify that both the inbox and outbox implementations support `CausationId`. Unsupported implementations should produce a clear validation error. +- **Performance**: Clearing `DispatchedAt` on outbox messages by `CausationId` should be efficient. The outbox store may need an index on `CausationId`. +- **Observability**: Replay events should be traceable — when messages are replayed, this should be visible in logs and telemetry. + +### Constraints and Assumptions + +- Only the outbox sweeper is used for re-dispatch; immediate send is not supported for replay (as noted in the issue: "if you don't have a sweeper it's unlikely you also want this kind of support"). +- The `CausationId` is separate from the existing `CorrelationId` (used for request-reply) and `JobId`/`WorkflowId` (reserved for future workflow orchestration). It specifically represents "the set of outbox messages caused by handling this inbox entry." +- The existing `JobId` and `WorkflowId` fields on `MessageHeader` are reserved for future use and should not be repurposed for this feature. + + +### Out of Scope + +- Saga/workflow orchestration — this feature enables replay of a single step's downstream messages, not orchestration of multi-step workflows. +- Immediate send replay — only sweeper-based re-dispatch is supported. +- Automatic retry of the handler logic itself — the handler is not re-executed, only its previously-produced outbox messages are replayed. +- Migration tooling for existing outbox/inbox data — new columns will be nullable; existing rows will have null `CausationId`. + +## Acceptance Criteria + +1. When a handler produces outbox messages, all messages share the same `CausationId` as the inbox entry for the triggering command. +2. When a duplicate command arrives and replay is configured, the outbox messages for that `CausationId` have their `DispatchedAt` cleared. +3. The outbox sweeper subsequently re-dispatches those messages. +4. When replay is not configured, existing `OnceOnly` behavior (throw or warn) is unchanged. +5. When replay is configured but the inbox or outbox implementation does not support `CausationId`, pipeline validation at startup fails with a descriptive error. +6. The `CausationId` is independent of `JobId` — replaying one step does not affect other steps in the same job. +7. In-memory inbox and outbox implementations support `CausationId` for testing. +8. All persistent inbox and outbox implementations support `CausationId`. Base tests in `Paramore.Brighter.Base.Test` verify the causation tracking interfaces; persistent store tests are derived from the base tests (outbox via the Liquid template generator, inbox manually). + +## Additional Context + +The codebase already has infrastructure for workflow-level correlation: +- `MessageHeader.JobId` and `MessageHeader.WorkflowId` — reserved for future use, stored in DB schemas +- `CorrelationId` — used for request-reply patterns +- `RequestContext.Bag` — carries arbitrary key-value data through the pipeline +- `RequestContextBagNames` — defines well-known bag keys + +The `CausationId` fills a gap between per-message correlation (`CorrelationId`) and per-workflow correlation (`JobId`/`WorkflowId`): it represents "all the outbox messages caused by handling a single inbox entry." diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/review-design.md b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/review-design.md new file mode 100644 index 0000000000..d58df24d25 --- /dev/null +++ b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/review-design.md @@ -0,0 +1,51 @@ +# Review: design — 0027-replay-matching-outbox-events-when-inbox-has-already-seen + +**Date**: 2026-04-17 +**Threshold**: 60 +**Verdict**: PASS + +No findings at or above threshold 60. Consider addressing lower-scored items. + +## Findings + +### 1. InboxItem shown with primary constructor syntax but actual class uses traditional constructor (Score: 45) + +The ADR's code example shows `InboxItem` using C# primary constructor syntax, but the actual `InboxItem` class at `src/Paramore.Brighter/InMemoryInbox.cs` uses a traditional class with a regular constructor and explicit property assignments. An implementer following the ADR example might incorrectly refactor the class to use a primary constructor. + +**Evidence**: ADR line 201 vs actual `InMemoryInbox.cs` lines 39-54. + +**Recommendation**: Update the ADR example to show the `CausationId` property addition against the actual class structure, or note that the example is illustrative. + +--- + +### 2. ADR does not explicitly reference FR-N identifiers from requirements (Score: 40) + +The requirements document enumerates functional requirements as numbered items (1-6) and acceptance criteria (1-8). The ADR references the requirements document by path but never cites specific requirement numbers. While coverage is complete on inspection, explicit cross-referencing would improve traceability. + +**Evidence**: The ADR's "Parent Requirement" link is the only reference to the requirements document; no individual FR-N or AC-N citations appear. + +**Recommendation**: Consider adding a brief traceability note mapping key design decisions to specific requirements. + +--- + +### 3. `DescribePipelines()` also needs `InboxConfiguration` for consistency (Score: 35) + +The ADR correctly identifies that `ValidatePipelines()` creates `PipelineBuilder` without `InboxConfiguration` and proposes fixing this. However, `DescribePipelines()` also creates `PipelineBuilder` without `InboxConfiguration`. The ADR only mentions the `ValidatePipelines()` path. This is minor since `DescribePipelines()` is diagnostic only. + +**Evidence**: `BrighterPipelineValidationExtensions.cs` lines 59 and 89 both create `PipelineBuilder` without `InboxConfiguration`. + +**Recommendation**: Note that `DescribePipelines()` should also receive `InboxConfiguration` for consistency. + +--- + +## Summary + +| Score Range | Count | +|-------------|-------| +| 90-100 (Critical) | 0 | +| 70-89 (High) | 0 | +| 50-69 (Medium) | 0 | +| 0-49 (Low) | 3 | + +**Total findings**: 3 +**Findings at or above threshold (60)**: 0 diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/review-tasks.md b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/review-tasks.md new file mode 100644 index 0000000000..8e608d32e1 --- /dev/null +++ b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/review-tasks.md @@ -0,0 +1,81 @@ +# Review: tasks — 0027-replay-matching-outbox-events-when-inbox-has-already-seen + +**Date**: 2026-04-17 +**Threshold**: 60 +**Verdict**: PASS + +No findings at or above threshold 60. Consider addressing lower-scored items. + +## Findings + +### 1. Task 10/11 constructor injection may conflict with existing DI resolution pattern (Score: 55) + +Tasks 10/11 add optional `IAmACausationTrackingOutbox?` constructor parameter. The ADR explains this works with `ActivatorUtilities` optional parameters, and Task 17 handles DI registration. Task 10's tests use manually-constructed handlers, so Task 17 is not a dependency for unit testing. + +**Evidence**: Task 10 deps: "6, 7, 8"; Task 17 deps: "5, 6, 7". No circular dependency. + +**Recommendation**: No change needed. Task ordering is correct for unit-test-first approach. + +--- + +### 2. Task 14 test verifies 7 scenarios in one test file (Score: 50) + +Task 14 lists 7 verification scenarios for one validation rule. These are different inputs to the same specification. + +**Evidence**: Task 14 test verification list has 7 bullet points. + +**Recommendation**: Implementing agent should use `[Theory]`/`[InlineData]` or separate `[Fact]` methods. No task change needed. + +--- + +### 3. Task 8/9 CausationId generation timing relative to inbox Add (Score: 45) + +CausationId is set before `base.Handle()`, then inbox `Add()` is called after. Outbox `Add` (in `DepositPost`) runs during `base.Handle()`. Both correctly read CausationId from the Bag. + +**Evidence**: `UseInboxHandler.Handle()` lines 102-106. + +**Recommendation**: No change needed. The ordering is correct. + +--- + +### 4. FR4 has no dedicated task (Score: 45) + +FR4 (Sweeper re-dispatch) is existing functionality. Tasks 10/11 verify messages are marked for re-dispatch by checking `TimeFlushed` is cleared. + +**Evidence**: FR Coverage table correctly notes existing behavior. + +**Recommendation**: No change needed. + +--- + +### 5. Task 16 depends on Task 15 unnecessarily (Score: 40) + +Both are independent telemetry additions to different code branches. Serial ordering is conservative but not harmful. + +**Evidence**: Task summary: Task 16 depends on "1, 15". + +**Recommendation**: Could remove Task 15 dependency to allow parallelism, but not required. + +--- + +### 6. UseInboxHandlerAsync has duplicate InitializeFromAttributeParams call (Score: 35) + +`UseInboxHandlerAsync.cs` calls `base.InitializeFromAttributeParams()` twice. Task 1 should fix this as part of the tidy-first pass. + +**Evidence**: `UseInboxHandlerAsync.cs` lines 69-71. + +**Recommendation**: Agent implementing Task 1 should notice and fix. Minor omission. + +--- + +## Summary + +| Score Range | Count | +|-------------|-------| +| 90-100 (Critical) | 0 | +| 70-89 (High) | 0 | +| 50-69 (Medium) | 2 | +| 0-49 (Low) | 4 | + +**Total findings**: 6 +**Findings at or above threshold (60)**: 0 diff --git a/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/tasks.md b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/tasks.md new file mode 100644 index 0000000000..a3a5065f41 --- /dev/null +++ b/specs/0027-replay-matching-outbox-events-when-inbox-has-already-seen/tasks.md @@ -0,0 +1,484 @@ +# Tasks: Replay Matching Outbox Events When Inbox Has Already Seen + +**Spec**: 0027-replay-matching-outbox-events-when-inbox-has-already-seen +**ADR**: [0057 — Replay Outbox Messages on Inbox Duplicate Detection](../../docs/adr/0057-replay-outbox-on-inbox-duplicate.md) +**Branch**: `replay_on_seen` + +## Prerequisites + +- [x] Requirements approved +- [x] ADR 0057 accepted + +--- + +## Task 1: Structural — UseInboxHandler uses pipeline's `this.Context` instead of `InitRequestContext()`, expose `InstrumentationOptions` + +This is a **tidy-first structural change** with two parts: + +**Part A**: Fix `UseInboxHandler` and `UseInboxHandlerAsync` to use the pipeline's `IRequestContext` (inherited `Context` property) instead of creating a private `RequestContext` via `InitRequestContext()`. This is a prerequisite for CausationId propagation via `RequestContext.Bag`. + +**Part B**: Expose `instrumentationOptions` from `RequestHandler` as a protected property so that subclasses (including `UseInboxHandler`) can gate telemetry events on `InstrumentationOptions.Brighter`. Currently `instrumentationOptions` is a primary constructor parameter on `RequestHandler` (private field, inaccessible to derived classes). + +- **Files**: + - `src/Paramore.Brighter/RequestHandler.cs` — add `protected InstrumentationOptions InstrumentationOptions => instrumentationOptions;` + - `src/Paramore.Brighter/RequestHandlerAsync.cs` — same protected property + - `src/Paramore.Brighter/Inbox/Handlers/UseInboxHandler.cs` + - `src/Paramore.Brighter/Inbox/Handlers/UseInboxHandlerAsync.cs` +- Changes (Part A): + - Remove the private `InitRequestContext()` method from both UseInbox files + - Replace all `var requestContext = InitRequestContext()` calls with `this.Context` + - Pass `this.Context` to `_inbox.Exists()`, `_inbox.ExistsAsync()`, `_inbox.Add()`, `_inbox.AddAsync()` +- Changes (Part B): + - Add `protected InstrumentationOptions InstrumentationOptions => instrumentationOptions;` to `RequestHandler` + - Add same to `RequestHandlerAsync` +- **Verification**: Run existing OnceOnly tests — all must pass: + - `tests/Paramore.Brighter.Core.Tests/OnceOnly/` (all 8 test files) +- Commit separately as a structural (tidy) change + +--- + +## Task 2: Structural — Enrich `PipelineStepDescription` with `Attribute` property + +This is a **tidy-first structural change** — adding a non-positional `Attribute` property to `PipelineStepDescription` so validation rules can inspect attribute properties like `OnceOnlyAction`. + +- **Files**: + - `src/Paramore.Brighter/Validation/PipelineStepDescription.cs` — add `public RequestHandlerAttribute? Attribute { get; init; }` (non-positional property) + - `src/Paramore.Brighter/PipelineBuilder.cs` (or wherever `Describe()` projects attributes) — change `.Select(a => new PipelineStepDescription(...))` to include `{ Attribute = a }` +- **Verification**: Run existing validation tests — all must pass: + - `tests/Paramore.Brighter.Core.Tests/Validation/` (all test files) +- Commit separately as a structural (tidy) change + +--- + +## Task 3: Structural — `Describe()` includes global inbox attributes + `InboxConfiguration` passed to validation path + +This is a **tidy-first structural change** — ensuring `PipelineBuilder.Describe()` includes global inbox attributes (matching what `Build()` does) and that `ValidatePipelines()` passes `InboxConfiguration` through. + +**Depends on**: Task 2 (`PipelineStepDescription.Attribute` property must exist for global inbox attributes to be fully useful) + +- **Files**: + - `src/Paramore.Brighter.Extensions.DependencyInjection/BrighterPipelineValidationExtensions.cs` — resolve `InboxConfiguration` from DI, pass to `PipelineBuilder` + - `src/Paramore.Brighter/PipelineBuilder.cs` — in `Describe()`, inject global inbox attribute using same guards as `AddGlobalInboxAttributes()` (`HasNoInboxAttributesInPipeline()`, `HasExistingUseInboxAttributesInPipeline()`) +- **Verification**: Run existing validation + pipeline tests: + - `tests/Paramore.Brighter.Core.Tests/Validation/` + - `tests/Paramore.Brighter.Core.Tests/CommandProcessors/Pipeline/` +- Commit separately as a structural (tidy) change + +--- + +## Task 4: Structural — Expose `Outbox` from `IAmAnOutboxProducerMediator` + +This is a **tidy-first structural change** — adding a read-only `IAmAnOutbox? Outbox` property so pipeline validation can access the outbox instance. + +- **Files**: + - `src/Paramore.Brighter/OutboxProducerMediator.cs` — add `IAmAnOutbox? Outbox` property to interface and implementation +- Changes: + - Add `IAmAnOutbox? Outbox { get; }` to `IAmAnOutboxProducerMediator` interface + - Implement in `OutboxProducerMediator`: `public IAmAnOutbox? Outbox => (IAmAnOutbox?)_outBox ?? _asyncOutbox;` +- **Verification**: Project builds without errors +- Commit separately as a structural (tidy) change + +--- + +## Task 5: Structural — New types: `OnceOnlyAction.Replay`, CausationId constants, role interfaces + +This is a **structural change** adding the new types needed before behavioral work. No behavior yet — just type definitions. + +- **Files**: + - `src/Paramore.Brighter/Inbox/OnceOnlyAction.cs` — add `Replay` enum value + - `src/Paramore.Brighter/RequestContextBagNames.cs` — add `public const string CausationId = "Brighter-CausationId";` + - `src/Paramore.Brighter/Observability/BrighterSemanticConventions.cs` — add `public const string CausationId = "paramore.brighter.causation_id";` + - `src/Paramore.Brighter/Inbox/IAmACausationTrackingInbox.cs` — new file with `SupportsCausationTracking()`, `SupportsCausationTrackingAsync()`, `GetCausationId()`, `GetCausationIdAsync()` methods + - `src/Paramore.Brighter/IAmACausationTrackingOutbox.cs` — new file with `SupportsCausationTracking()`, `SupportsCausationTrackingAsync()`, `ReplayCausation()`, `ReplayCausationAsync()` methods +- **Verification**: Project builds without errors +- Commit separately + +--- + +## Task 6: TEST + IMPLEMENT — InMemoryInbox stores CausationId and retrieves it + +- [ ] **TEST + IMPLEMENT: InMemoryInbox reads CausationId from RequestContext.Bag on Add and returns it via GetCausationId** + - **USE COMMAND**: `/test-first when adding to inbox with CausationId in context bag should store and retrieve it` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_adding_to_inbox_with_causation_id_should_store_and_retrieve.cs` + - Test should verify: + - Add a command with `CausationId` in `RequestContext.Bag` + - `GetCausationId()` returns the stored CausationId for that command + - `GetCausationIdAsync()` returns the same value + - When no CausationId in Bag, `GetCausationId()` returns null + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - `InMemoryInbox` implements `IAmACausationTrackingInbox` + - `InboxItem` gains `string? CausationId` property + - `Add()`/`AddAsync()` reads CausationId from `requestContext?.Bag` using `RequestContextBagNames.CausationId` key + - `GetCausationId()`/`GetCausationIdAsync()` looks up the inbox entry and returns its CausationId + - `SupportsCausationTracking()` returns `true` + +--- + +## Task 7: TEST + IMPLEMENT — InMemoryOutbox stores CausationId and replays by clearing dispatch state + +- [ ] **TEST + IMPLEMENT: InMemoryOutbox reads CausationId from RequestContext.Bag on Add and ReplayCausation clears TimeFlushed** + - **USE COMMAND**: `/test-first when replaying causation on outbox should clear dispatch state for matching messages` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_replaying_causation_on_outbox_should_clear_dispatch_state.cs` + - Test should verify: + - Add multiple messages to outbox with same CausationId in `RequestContext.Bag` + - Mark them as dispatched (set `TimeFlushed`) + - Call `ReplayCausation(causationId)` + - Verify `TimeFlushed` is reset to `DateTimeOffset.MinValue` for matching messages + - Verify messages with different CausationId are not affected + - Verify `ReplayCausationAsync()` produces the same result + - Verify `SupportsCausationTracking()` returns `true` + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - `InMemoryOutbox` implements `IAmACausationTrackingOutbox` + - `OutboxEntry` gains `string? CausationId` property + - `Add()`/`AddAsync()` reads CausationId from `requestContext?.Bag` using `RequestContextBagNames.CausationId` key + - `ReplayCausation()` finds all entries with matching CausationId and sets `TimeFlushed = DateTimeOffset.MinValue` + - `SupportsCausationTracking()` returns `true` + +--- + +## Task 8: TEST + IMPLEMENT — Sync UseInboxHandler generates CausationId in Bag on first handling + +- [ ] **TEST + IMPLEMENT: Sync UseInboxHandler sets CausationId in RequestContext.Bag when handling a new command** + - **USE COMMAND**: `/test-first when sync inbox handler handles new command should set causation id in context bag` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_handling_new_command_should_set_causation_id_in_context_bag.cs` + - Test doubles needed: + - Reuse or extend existing test double handler from `tests/Paramore.Brighter.Core.Tests/OnceOnly/TestDoubles/` + - Test should verify: + - After Handle(), `RequestContext.Bag` contains `RequestContextBagNames.CausationId` key + - The CausationId value defaults to the command's `Id` + - The inbox entry has the same CausationId stored + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - In `UseInboxHandler.Handle()`, before calling `base.Handle()`: + - Generate CausationId (default to `request.Id`) + - Store in `Context.Bag[RequestContextBagNames.CausationId] = causationId` + - The inbox `Add()` already reads from the Bag (Task 6) + +--- + +## Task 9: TEST + IMPLEMENT — Async UseInboxHandlerAsync generates CausationId in Bag on first handling + +- [ ] **TEST + IMPLEMENT: Async UseInboxHandlerAsync sets CausationId in RequestContext.Bag when handling a new command** + - **USE COMMAND**: `/test-first when async inbox handler handles new command should set causation id in context bag` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_handling_new_command_async_should_set_causation_id_in_context_bag.cs` + - Test doubles needed: + - Reuse or extend existing async test double handler from `tests/Paramore.Brighter.Core.Tests/OnceOnly/TestDoubles/` + - Test should verify: + - After HandleAsync(), `RequestContext.Bag` contains `RequestContextBagNames.CausationId` key + - The CausationId value defaults to the command's `Id` + - The inbox entry has the same CausationId stored + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - In `UseInboxHandlerAsync.HandleAsync()`, same logic as sync: generate CausationId, store in `Context.Bag` + +--- + +## Task 10: TEST + IMPLEMENT — Sync UseInboxHandler replays outbox on duplicate when Replay configured + +- [ ] **TEST + IMPLEMENT: Sync UseInboxHandler replays outbox messages when duplicate detected and OnceOnlyAction is Replay** + - **USE COMMAND**: `/test-first when sync inbox handler detects duplicate with replay configured should clear outbox dispatch state` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_handling_duplicate_command_with_replay_should_clear_outbox_dispatch.cs` + - Test doubles needed: + - Handler decorated with `[UseInbox(step: 1, contextKey: "test", onceOnly: true, onceOnlyAction: OnceOnlyAction.Replay)]` + - InMemoryInbox pre-populated with existing entry (with CausationId) + - InMemoryOutbox with dispatched messages sharing that CausationId + - Test should verify: + - Handler is NOT re-executed (request returned without calling `base.Handle()`) + - Outbox messages with matching CausationId have their dispatch state cleared + - Outbox messages with different CausationId are not affected + - The CausationId is retrieved from the inbox entry via `IAmACausationTrackingInbox.GetCausationId()` + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - `UseInboxHandler` constructor gains optional `IAmACausationTrackingOutbox? outbox = null` parameter + - In `Handle()`, add new branch: `if (exists && _onceOnlyAction is OnceOnlyAction.Replay)` — retrieve CausationId from inbox, call `_outbox.ReplayCausation()`, return request + - Log replay action via source-generated `Log.CommandHasAlreadyBeenSeenReplayingOutbox` + +--- + +## Task 11: TEST + IMPLEMENT — Async UseInboxHandlerAsync replays outbox on duplicate when Replay configured + +- [ ] **TEST + IMPLEMENT: Async UseInboxHandlerAsync replays outbox messages when duplicate detected and OnceOnlyAction is Replay** + - **USE COMMAND**: `/test-first when async inbox handler detects duplicate with replay configured should clear outbox dispatch state` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_handling_duplicate_command_async_with_replay_should_clear_outbox_dispatch.cs` + - Test doubles needed: + - Async handler decorated with `[UseInboxAsync(step: 1, contextKey: "test", onceOnly: true, onceOnlyAction: OnceOnlyAction.Replay)]` + - InMemoryInbox pre-populated with existing entry (with CausationId) + - InMemoryOutbox with dispatched messages sharing that CausationId + - Test should verify: + - Handler is NOT re-executed + - Outbox messages with matching CausationId have their dispatch state cleared + - CausationId retrieved via `IAmACausationTrackingInbox.GetCausationIdAsync()` + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - `UseInboxHandlerAsync` constructor gains optional `IAmACausationTrackingOutbox? outbox = null` parameter + - In `HandleAsync()`, add Replay branch matching sync implementation using async methods + +--- + +## Task 12: TEST + IMPLEMENT — Sync UseInboxHandler handles Replay gracefully when no outbox configured (terminal step) + +- [ ] **TEST + IMPLEMENT: Sync UseInboxHandler with Replay configured but no outbox returns without error** + - **USE COMMAND**: `/test-first when sync inbox handler detects duplicate with replay but no outbox should return without error` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_handling_duplicate_with_replay_and_no_outbox_should_return_without_error.cs` + - Test should verify: + - When `_outbox` is null, the handler returns the request without throwing + - Handler is NOT re-executed + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - The null check on `_outbox` in the Replay branch already handles this (from Task 10) + - This task validates the no-outbox terminal step scenario + +--- + +## Task 13: TEST + IMPLEMENT — Async UseInboxHandlerAsync handles Replay gracefully when no outbox configured (terminal step) + +- [ ] **TEST + IMPLEMENT: Async UseInboxHandlerAsync with Replay configured but no outbox returns without error** + - **USE COMMAND**: `/test-first when async inbox handler detects duplicate with replay but no outbox should return without error` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_handling_duplicate_async_with_replay_and_no_outbox_should_return_without_error.cs` + - Test should verify: + - When `_outbox` is null, the async handler returns the request without throwing + - Handler is NOT re-executed + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - The null check on `_outbox` in the Replay branch already handles this (from Task 11) + - This task validates the no-outbox terminal step scenario for the async path + +--- + +## Task 14: TEST + IMPLEMENT — Pipeline validation rejects Replay without causation-tracking support + +- [ ] **TEST + IMPLEMENT: Pipeline validation detects OnceOnlyAction.Replay without causation-tracking inbox or outbox** + - **USE COMMAND**: `/test-first when pipeline has replay configured without causation tracking should report validation error` + - Test location: `tests/Paramore.Brighter.Core.Tests/Validation/` + - Test file: `When_replay_configured_without_causation_tracking_should_report_error.cs` + - Test should verify: + - Replay with inbox that does not implement `IAmACausationTrackingInbox` → Error + - Replay with inbox that implements `IAmACausationTrackingInbox` but `SupportsCausationTracking()` returns false → Warning + - Replay with no outbox configured → Warning (terminal step) + - Replay with outbox that does not implement `IAmACausationTrackingOutbox` → Error + - Replay with outbox that implements `IAmACausationTrackingOutbox` but `SupportsCausationTracking()` returns false → Warning + - Replay with both inbox and outbox supporting causation tracking → no findings + - Non-Replay (Throw/Warn) pipelines → no findings regardless of causation tracking support + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - Add `ReplayRequiresCausationTracking(IAmAnInbox?, IAmAnOutbox?)` to `HandlerPipelineValidationRules` + - Wire into `PipelineValidator.ValidateHandlerPipelines()` specs array + - In `BrighterPipelineValidationExtensions.ValidatePipelines()`: resolve outbox via `IAmAnOutboxProducerMediator.Outbox`, pass to `PipelineValidator` + +--- + +## Task 15: TEST + IMPLEMENT — UseInboxHandler adds Replay telemetry event to pipeline span + +- [ ] **TEST + IMPLEMENT: UseInboxHandler writes ActivityEvent to the pipeline span when Replay is triggered** + - **USE COMMAND**: `/test-first when inbox handler replays duplicate should add replay telemetry event to span` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_replaying_duplicate_should_add_replay_telemetry_event_to_span.cs` + - Test should verify: + - When Replay is triggered, an ActivityEvent named `"UseInboxHandler Duplicate Replay"` is added to `Context.Span` + - Event tags include `request.id` and `causation_id` (using `BrighterSemanticConventions` constants) + - Events are only added when `Context?.Span != null` and `InstrumentationOptions` includes `InstrumentationOptions.Brighter` (no event when either condition is false) + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - In `UseInboxHandler.Handle()` and `UseInboxHandlerAsync.HandleAsync()`: add `ActivityEvent` to `Context.Span` in the Replay branch + - Guard with `Context?.Span != null` and gate on `InstrumentationOptions.HasFlag(InstrumentationOptions.Brighter)` (using the protected property exposed in Task 1 Part B) + - Use `BrighterSemanticConventions.CausationId` for the causation tag + +--- + +## Task 16: TEST + IMPLEMENT — UseInboxHandler adds telemetry events for Throw, Warn, and Add paths (tidy improvement) + +- [ ] **TEST + IMPLEMENT: UseInboxHandler writes ActivityEvents for existing Throw, Warn, and Add paths** + - **USE COMMAND**: `/test-first when inbox handler handles command should add telemetry events for all paths` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_inbox_handler_handles_command_should_add_telemetry_events.cs` + - Test should verify: + - When Throw is triggered, an ActivityEvent named `"UseInboxHandler Duplicate Throw"` is added to `Context.Span` + - When Warn is triggered, an ActivityEvent named `"UseInboxHandler Duplicate Warn"` is added + - When first handling (Add), an ActivityEvent named `"UseInboxHandler Add"` is added + - Events are only added when `Context?.Span != null` and `InstrumentationOptions` includes `InstrumentationOptions.Brighter` (no event when either condition is false) + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - In `UseInboxHandler.Handle()` and `UseInboxHandlerAsync.HandleAsync()`: add `ActivityEvent` to `Context.Span` in the Throw, Warn, and Add branches + - Guard with `Context?.Span != null` and gate on `InstrumentationOptions.HasFlag(InstrumentationOptions.Brighter)` (using the protected property exposed in Task 1 Part B) + - **Note**: This is a tidy improvement to existing paths, not directly required by the Replay feature. Commit separately + +--- + +## Task 17: TEST + IMPLEMENT — DI registration of `IAmACausationTrackingOutbox` + +- [ ] **TEST + IMPLEMENT: ServiceCollection registers IAmACausationTrackingOutbox when outbox supports it** + - **USE COMMAND**: `/test-first when registering outbox that supports causation tracking should register under role interface` + - Test location: `tests/Paramore.Brighter.Core.Tests/OnceOnly/` + - Test file: `When_registering_outbox_with_causation_tracking_should_register_role_interface.cs` + - Test should verify: + - When outbox implements `IAmACausationTrackingOutbox`, it is resolvable from DI as `IAmACausationTrackingOutbox` + - When outbox does not implement `IAmACausationTrackingOutbox`, resolving returns null + - Same outbox instance is returned for both primary interface and `IAmACausationTrackingOutbox` + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - In `ServiceCollectionExtensions` (or equivalent DI setup): after registering outbox, check `if (outbox is IAmACausationTrackingOutbox)` and register under that interface + - Note: `IAmACausationTrackingInbox` does NOT need separate DI registration — `UseInboxHandler` already has the inbox instance and pattern-matches it at runtime (`if (_inbox is IAmACausationTrackingInbox trackingInbox)`) + +--- + +## Task 18: Base test classes for causation tracking in Paramore.Brighter.Base.Test + +- [ ] **TEST + IMPLEMENT: Base test classes define causation tracking scenarios for persistent store tests** + - **USE COMMAND**: `/test-first when persistent inbox stores causation id should match base test expectations` + - Test location: `tests/Paramore.Brighter.Base.Test/` + - Test should define base test classes: + - `CausationTrackingInboxBaseTests` — abstract tests for `IAmACausationTrackingInbox`: Add with CausationId, GetCausationId retrieval, SupportsCausationTracking + - `CausationTrackingOutboxBaseTests` — abstract tests for `IAmACausationTrackingOutbox`: Add with CausationId, ReplayCausation clearing dispatch state, SupportsCausationTracking + - These are validated against InMemoryInbox/InMemoryOutbox first + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Implementation: + - Create abstract base test classes with virtual setup methods for store-specific initialization + - Persistent store test projects will inherit these and provide their store implementations + +--- + +## Task 19: TEST + IMPLEMENT — Relational inbox stores implement `IAmACausationTrackingInbox` (MsSql, MySql, Postgres, Sqlite, Spanner) + +- [ ] **TEST + IMPLEMENT: Relational inbox stores add CausationId column and implement IAmACausationTrackingInbox** + - **USE COMMAND**: `/test-first when relational inbox stores causation id should store and retrieve via base tests` + - Stores: MsSql, MySql, Postgres, Sqlite, Spanner + - Each store: + - Adds nullable `CausationId` column to schema (existing rows have null — no data migration) + - Reads `CausationId` from `RequestContext.Bag` in `Add()`/`AddAsync()` and stores it + - Implements `IAmACausationTrackingInbox` (`SupportsCausationTracking()`, `GetCausationId()`, `GetCausationIdAsync()`) + - `SupportsCausationTracking()` performs runtime schema check (column exists?) + - Tests: Each store's test project inherits from `CausationTrackingInboxBaseTests` (Task 18) + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Depends on: Task 18 + +--- + +## Task 20: TEST + IMPLEMENT — NoSQL inbox stores implement `IAmACausationTrackingInbox` (DynamoDB, DynamoDB.V4, Firestore, MongoDb) + +- [ ] **TEST + IMPLEMENT: NoSQL inbox stores add CausationId attribute and implement IAmACausationTrackingInbox** + - **USE COMMAND**: `/test-first when nosql inbox stores causation id should store and retrieve via base tests` + - Stores: DynamoDB, DynamoDB.V4, Firestore, MongoDb + - Each store: + - Adds nullable `CausationId` attribute/field to schema (existing entries have null — no data migration) + - Reads `CausationId` from `RequestContext.Bag` in `Add()`/`AddAsync()` and stores it + - Implements `IAmACausationTrackingInbox` (`SupportsCausationTracking()`, `GetCausationId()`, `GetCausationIdAsync()`) + - `SupportsCausationTracking()` performs runtime schema check (attribute exists?) + - Tests: Each store's test project inherits from `CausationTrackingInboxBaseTests` (Task 18) + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Depends on: Task 18 + +--- + +## Task 21: TEST + IMPLEMENT — Relational outbox stores implement `IAmACausationTrackingOutbox` (MsSql, MySql, PostgreSql, Sqlite, Spanner) + +- [ ] **TEST + IMPLEMENT: Relational outbox stores add CausationId column and implement IAmACausationTrackingOutbox** + - **USE COMMAND**: `/test-first when relational outbox stores causation id should store and replay via base tests` + - Stores: MsSql, MySql, PostgreSql, Sqlite, Spanner + - Each store: + - Adds nullable `CausationId` column to schema, indexed for efficient replay queries + - Reads `CausationId` from `RequestContext.Bag` in `Add()`/`AddAsync()` and stores it + - Implements `IAmACausationTrackingOutbox` (`SupportsCausationTracking()`, `ReplayCausation()`, `ReplayCausationAsync()`) + - `SupportsCausationTracking()` performs runtime schema check (column exists?) + - Tests: Update Liquid templates in `tools/Paramore.Brighter.Test.Generator/` to generate causation tracking test cases from `CausationTrackingOutboxBaseTests` + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Depends on: Task 18 + +--- + +## Task 22: TEST + IMPLEMENT — NoSQL outbox stores implement `IAmACausationTrackingOutbox` (DynamoDB, DynamoDB.V4, Firestore, MongoDb) + +- [ ] **TEST + IMPLEMENT: NoSQL outbox stores add CausationId attribute and implement IAmACausationTrackingOutbox** + - **USE COMMAND**: `/test-first when nosql outbox stores causation id should store and replay via base tests` + - Stores: DynamoDB, DynamoDB.V4, Firestore, MongoDb + - Each store: + - Adds nullable `CausationId` attribute/field to schema, indexed for efficient replay queries + - Reads `CausationId` from `RequestContext.Bag` in `Add()`/`AddAsync()` and stores it + - Implements `IAmACausationTrackingOutbox` (`SupportsCausationTracking()`, `ReplayCausation()`, `ReplayCausationAsync()`) + - `SupportsCausationTracking()` performs runtime schema check (attribute exists?) + - Tests: Each store's test project inherits from `CausationTrackingOutboxBaseTests` + - **⛔ STOP HERE — WAIT FOR USER APPROVAL in IDE before implementing** + - Depends on: Task 18 + +--- + +## Task 23: Build verification + +- [ ] **Build and run all core tests** + - Run `dotnet build src/Paramore.Brighter/Paramore.Brighter.csproj` — must compile + - Run `dotnet test tests/Paramore.Brighter.Core.Tests/Paramore.Brighter.Core.Tests.csproj` — all tests pass + - Verify existing OnceOnly tests pass (no regressions): + - `When_Handling_A_Command_With_A_Inbox_Enabled` + - `When_Handling_A_Command_Once_Only_With_Throw_Enabled` + - `When_Handling_A_Command_Once_Only_With_Warn_Enabled` + - (and their async equivalents) + - Verify existing Validation tests pass (no regressions) + +--- + +## Task Summary + +| # | Type | Description | Depends On | +|---|------|-------------|------------| +| 1 | Structural (tidy) | UseInboxHandler uses pipeline's `this.Context` | — | +| 2 | Structural (tidy) | Enrich `PipelineStepDescription` with `Attribute` property | — | +| 3 | Structural (tidy) | `Describe()` includes global inbox + `InboxConfiguration` in validation | 2 | +| 4 | Structural (tidy) | Expose `Outbox` from `IAmAnOutboxProducerMediator` | — | +| 5 | Structural | New types: `OnceOnlyAction.Replay`, CausationId constants, role interfaces | — | +| 6 | Test + Implement | InMemoryInbox stores and retrieves CausationId | 5 | +| 7 | Test + Implement | InMemoryOutbox stores CausationId + `ReplayCausation` | 5 | +| 8 | Test + Implement | Sync UseInboxHandler generates CausationId on first handling | 1, 6 | +| 9 | Test + Implement | Async UseInboxHandlerAsync generates CausationId on first handling | 1, 6 | +| 10 | Test + Implement | Sync UseInboxHandler replays outbox on duplicate (Replay) | 6, 7, 8 | +| 11 | Test + Implement | Async UseInboxHandlerAsync replays outbox on duplicate (Replay) | 6, 7, 9 | +| 12 | Test + Implement | Sync UseInboxHandler handles Replay with no outbox (terminal step) | 10 | +| 13 | Test + Implement | Async UseInboxHandlerAsync handles Replay with no outbox (terminal step) | 11 | +| 14 | Test + Implement | Pipeline validation: Replay requires causation-tracking support | 2, 3, 4, 5 | +| 15 | Test + Implement | UseInboxHandler Replay telemetry event on pipeline span | 1, 10, 11 | +| 16 | Test + Implement | UseInboxHandler Throw/Warn/Add telemetry events (tidy improvement) | 1, 15 | +| 17 | Test + Implement | DI registration of `IAmACausationTrackingOutbox` | 5, 6, 7 | +| 18 | Test + Implement | Base test classes for persistent store causation tracking | 6, 7 | +| 19 | Test + Implement | Relational inbox stores: `IAmACausationTrackingInbox` (MsSql, MySql, Postgres, Sqlite, Spanner) | 18 | +| 20 | Test + Implement | NoSQL inbox stores: `IAmACausationTrackingInbox` (DynamoDB, DynamoDB.V4, Firestore, MongoDb) | 18 | +| 21 | Test + Implement | Relational outbox stores: `IAmACausationTrackingOutbox` (MsSql, MySql, PostgreSql, Sqlite, Spanner) | 18 | +| 22 | Test + Implement | NoSQL outbox stores: `IAmACausationTrackingOutbox` (DynamoDB, DynamoDB.V4, Firestore, MongoDb) | 18 | +| 23 | Verification | Build + run all core tests | 1–22 | + +## FR Coverage + +| FR | Description | Task(s) | +|----|-------------|---------| +| FR1 | CausationId propagation via RequestContext.Bag | 5, 8, 9 | +| FR2 | CausationId stored in inbox and outbox | 6, 7, 19, 20, 21, 22 | +| FR3 | Replay on duplicate clears DispatchedAt | 10, 11 | +| FR4 | Sweeper re-dispatch (existing, unchanged) | — (existing behavior; outbox state assertions in Tasks 10/11 verify messages are marked for re-dispatch) | +| FR5 | CausationId distinct from JobId | 5 (separate constant/concept) | +| FR6 | New OnceOnlyAction.Replay | 5 | + +## ADR Decision Coverage + +| ADR Decision | Task(s) | +|-------------|---------| +| Causation Id concept + propagation via Bag | 5, 8, 9 | +| OnceOnlyAction.Replay | 5 | +| IAmACausationTrackingInbox / IAmACausationTrackingOutbox | 5 | +| InMemoryInbox/Outbox support | 6, 7 | +| UseInboxHandler Replay logic | 10, 11, 12, 13 | +| Pipeline validation: ReplayRequiresCausationTracking | 14 | +| PipelineStepDescription enrichment | 2 | +| Describe() global inbox + InboxConfiguration in validation | 3 | +| Outbox property on IAmAnOutboxProducerMediator | 4 | +| Observability (Replay telemetry) | 15 | +| Observability (Throw/Warn/Add telemetry — tidy improvement) | 16 | +| DI registration | 17 | +| Persistent store implementations | 19, 20, 21, 22 | +| UseInboxHandler uses pipeline Context (prerequisite) | 1 |