DO-NOT-MERGE: Design #147 integration event re-drive#148
Conversation
📝 WalkthroughWalkthroughThis PR implements a provisional-to-committed dedupe state machine backed by explicit broker delivery-injection confirmations. It introduces ChangesIntegration Event Delivery Confirmation & Deduplication Redesign
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a design specification for implementing integration event re-drives after lost steers, transitioning from optimistic one-shot deduplication claims to provisional claims that are only committed upon delivery confirmation. The feedback on this design highlights three key areas for improvement: first, checking broker protocol versions or capability flags to prevent a 15-second timeout latency penalty on older brokers that do not support the new delivery_injected event; second, clarifying the lifecycle transition of blindClaim to ensure genuine edits are not incorrectly suppressed; and third, introducing an AbortSignal and tracking background tasks to prevent memory leaks and errors when a project is closed while tasks are still in-flight.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| - Unsupported brokers may return no `eventId` or no targets; the bridge treats | ||
| those as accepted-only and commits immediately, preserving current behavior. |
There was a problem hiding this comment.
During rolling upgrades or when interacting with older broker versions that return an eventId but do not yet support or emit the delivery_injected event, the helper will always wait for the full 15-second timeout before failing. This would lead to a 15-second latency penalty and unnecessary duplicate re-drives for every message during the transition period.\n\nTo prevent this, the design should specify checking the broker's protocol version or capability flags (available via the session metadata) to explicitly detect if the broker supports delivery_injected, rather than relying solely on the presence of eventId or targets.
| - A blind claim can still learn the first later content hash as in #145, but that | ||
| hash stays provisional until delivery is confirmed. |
There was a problem hiding this comment.
To ensure genuine edits with different content hashes are not incorrectly suppressed by an active blindClaim once its content hash is resolved, the design should clarify the transition lifecycle: when the content hash is learned, the blindClaim (which is a DeliveryClaim) should be moved/promoted into the contentHashes map under that resolved hash, and the blindClaim field should be cleared.
| reintroduce the old head-of-line block. The bridge must start the helper as a | ||
| background confirmation task, attach `.then(commit).catch(release)`, and return | ||
| from the pacer callback immediately after starting the send attempt. The pacer | ||
| rate-limits starts, not confirmation completion. |
There was a problem hiding this comment.
Since these confirmation tasks run asynchronously in the background and can take up to 15 seconds to resolve or timeout, there is a risk of memory leaks, log spam, or null-pointer/property-access errors if the project is closed or reconciled (via close(projectId)) while tasks are still in-flight.\n\nTo ensure clean teardown, the design should:\n1. Support an AbortSignal in the options of sendMessageAndWaitForInjected.\n2. Track active background confirmation tasks per project.\n3. Abort all pending tasks when close(projectId) is called.
b09e585 to
f737f63
Compare
|
Fixed one issue in docs/specs/2026-06-07-integration-event-redrive.md: the unsupported-broker guidance now matches the current Local validation passed:
I did not verify GitHub-side check status, mergeability, or macOS packaging smoke from this sandbox, so I’m not marking this READY. |
f737f63 to
da006c9
Compare
|
Reviewed and patched PR #148. Changed docs/specs/2026-06-07-integration-event-redrive.md to address the validated bot feedback:
Validation run:
I’m not printing |
|
Updated docs/specs/2026-06-07-integration-event-redrive.md to address the validated review findings:
Validation passed locally:
I did not verify GitHub-side check status or mergeability from here, so I’m not marking this READY. |
|
ℹ️ pr-reviewer: review only — no file changes were applied to the PR (nothing to commit after review). The notes below are advisory and were not pushed. Reviewed PR #148 against the current checkout and the public PR page: #148 No code/doc edits were needed. The PR currently adds only docs/specs/2026-06-07-integration-event-redrive.md. I validated the prior Gemini findings against the current file; the unsupported-broker handling, Local validation passed:
I’m not printing |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/specs/2026-06-07-integration-event-redrive.md`:
- Line 5: The README contains line-leading issue references like "`#145`" and
"`#147`" that are being parsed as ATX headings; update the sentence to escape the
leading hashes (e.g., "\`#145`", "\`#147`") or rephrase the sentence so the issue
numbers are not at the start of the line; edit the occurrence of "`#145` has
landed and project-lead explicitly clears `#147`. The broker" (and the similar
occurrence at line 28) to use escaped hashes or alternate wording to satisfy
markdownlint MD018.
In `@src/main/broker.ts`:
- Around line 3130-3139: The timeout for waitForInjection currently starts
immediately and can reject before callers attach awaits, causing unhandled
rejections; change the logic so the timer (timer/setTimeout) is created only
after send acceptance (i.e., after sendMessage resolves and eventId/targets are
known), while still keeping resolveWait/rejectWait and buffering
pendingTargets/events produced before acceptance for later replay; update code
around
waitForInjection/resolveWait/rejectWait/settled/pendingTargets/eventId/input.to/timeoutMs
to initialize the timeout post-acceptance and ensure any buffered items are
replayed into the same promise flow so late send latencies won't trigger an
unobserved rejection.
- Around line 3112-3116: sendMessageAndWaitForInjected currently only accepts {
timeoutMs } and only short-circuits on unsupported_operation/empty targets, so
it doesn't support AbortSignal-based cancellation or metadata-based
known-unsupported short-circuits; update the function signature to accept an
AbortSignal (e.g., options: { timeoutMs?: number; signal?: AbortSignal }) and
inside sendMessageAndWaitForInjected (and the similar logic at lines ~3189-3192)
wire the signal into whatever wait/promise logic you use (clear timers/listeners
and reject/return early when aborted), and add a pre-wait check of the delivery
metadata/targets for the documented unsupported flag (per
docs/specs/2026-06-07-integration-event-redrive.md) so known-unsupported
sessions immediately return the appropriate DeliveryConfirmationResult instead
of waiting the full timeout. Ensure the returned type and behavior remain
DeliveryConfirmationResult and that both timeoutMs and signal short-circuit
cleanup are handled consistently.
In `@src/main/integration-event-bridge.ts`:
- Around line 2405-2421: This is intentional: when usesConcreteAgentTargets is
true but bridge.sendMessageAndWaitForInjected is missing, keep the hard-fail
(throw new Error('Broker delivery_injected confirmation is unavailable')) and do
not convert it to a warning; add a short clarifying comment above the
usesConcreteAgentTargets/canTrackInjectedDelivery check referencing the
delivery_injected confirmation contract and tests/mocks that expect a hard
failure, and ensure the thrown error message remains exactly 'Broker
delivery_injected confirmation is unavailable' so existing tests/specs continue
to pass.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 6890a917-3a19-4932-a49e-9ef6333864af
📒 Files selected for processing (5)
docs/specs/2026-06-07-integration-event-redrive.mdsrc/main/__tests__/integration-event-bridge.test.tssrc/main/broker.test.tssrc/main/broker.tssrc/main/integration-event-bridge.ts
|
|
||
| Issue: #147. Base: #145 (`split/132-slack-integration-event-context-retry`). | ||
| Status: IMPLEMENTATION APPROVED FOR DIRECT-AGENT PATH ONLY. Do not merge until | ||
| #145 has landed and project-lead explicitly clears #147. The broker |
There was a problem hiding this comment.
Escape line-leading issue references to avoid accidental headings.
#145 at line start is parsed as an ATX heading marker in Markdown, which breaks section structure. Escape the hash (or rephrase the sentence) for those references.
✏️ Suggested doc fix
-#145 has landed and project-lead explicitly clears `#147`. The broker
+\`#145` has landed and project-lead explicitly clears `#147`. The broker
...
-#145 fixes the separate "content unavailable" path, but it cannot fix a lost
+\`#145` fixes the separate "content unavailable" path, but it cannot fix a lostAs reported by markdownlint MD018 for these exact lines.
Also applies to: 28-28
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)
[warning] 5-5: No space after hash on atx style heading
(MD018, no-missing-space-atx)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/specs/2026-06-07-integration-event-redrive.md` at line 5, The README
contains line-leading issue references like "`#145`" and "`#147`" that are being
parsed as ATX headings; update the sentence to escape the leading hashes (e.g.,
"\`#145`", "\`#147`") or rephrase the sentence so the issue numbers are not at the
start of the line; edit the occurrence of "`#145` has landed and project-lead
explicitly clears `#147`. The broker" (and the similar occurrence at line 28) to
use escaped hashes or alternate wording to satisfy markdownlint MD018.
Source: Linters/SAST tools
| async sendMessageAndWaitForInjected( | ||
| projectId: string | undefined, | ||
| input: SendMessageInput, | ||
| options: { timeoutMs?: number } = {} | ||
| ): Promise<DeliveryConfirmationResult> { |
There was a problem hiding this comment.
sendMessageAndWaitForInjected is missing cancellation and known-unsupported short-circuit behavior.
The new helper only supports { timeoutMs } and only short-circuits on unsupported_operation/empty targets. That means project-close flows cannot abort in-flight waits, and known unsupported sessions can still burn full timeouts.
Based on the delivery contract documented in docs/specs/2026-06-07-integration-event-redrive.md (AbortSignal support and metadata-based unsupported detection).
Also applies to: 3189-3192
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/main/broker.ts` around lines 3112 - 3116, sendMessageAndWaitForInjected
currently only accepts { timeoutMs } and only short-circuits on
unsupported_operation/empty targets, so it doesn't support AbortSignal-based
cancellation or metadata-based known-unsupported short-circuits; update the
function signature to accept an AbortSignal (e.g., options: { timeoutMs?:
number; signal?: AbortSignal }) and inside sendMessageAndWaitForInjected (and
the similar logic at lines ~3189-3192) wire the signal into whatever
wait/promise logic you use (clear timers/listeners and reject/return early when
aborted), and add a pre-wait check of the delivery metadata/targets for the
documented unsupported flag (per
docs/specs/2026-06-07-integration-event-redrive.md) so known-unsupported
sessions immediately return the appropriate DeliveryConfirmationResult instead
of waiting the full timeout. Ensure the returned type and behavior remain
DeliveryConfirmationResult and that both timeoutMs and signal short-circuit
cleanup are handled consistently.
| const waitForInjection = new Promise<void>((resolve, reject) => { | ||
| resolveWait = resolve | ||
| rejectWait = reject | ||
| timer = setTimeout(() => { | ||
| if (settled) return | ||
| settled = true | ||
| const pending = Array.from(pendingTargets) | ||
| const targetSummary = pending.length > 0 ? ` (${pending.join(', ')})` : '' | ||
| reject(new Error(`Timed out waiting for delivery injection for ${eventId || input.to}${targetSummary}`)) | ||
| }, timeoutMs) |
There was a problem hiding this comment.
Timeout can reject before it is awaited, risking unhandled rejection on slow sends.
waitForInjection starts its timeout before sendMessage resolves. If send latency exceeds timeoutMs, the promise rejects before any await/catch is attached in the main flow.
Consider starting the timeout window after send acceptance (once event_id/targets are known) while still buffering pre-resolution events for replay.
Also applies to: 3178-3198
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/main/broker.ts` around lines 3130 - 3139, The timeout for
waitForInjection currently starts immediately and can reject before callers
attach awaits, causing unhandled rejections; change the logic so the timer
(timer/setTimeout) is created only after send acceptance (i.e., after
sendMessage resolves and eventId/targets are known), while still keeping
resolveWait/rejectWait and buffering pendingTargets/events produced before
acceptance for later replay; update code around
waitForInjection/resolveWait/rejectWait/settled/pendingTargets/eventId/input.to/timeoutMs
to initialize the timeout post-acceptance and ensure any buffered items are
replayed into the same promise flow so late send latencies won't trigger an
unobserved rejection.
| const usesConcreteAgentTargets = uniqueRecipients.every((recipient) => !recipient.startsWith('#')) | ||
| const canTrackInjectedDelivery = usesConcreteAgentTargets && typeof bridge.sendMessageAndWaitForInjected === 'function' | ||
| const shouldTrackDedupe = canTrackInjectedDelivery | ||
| if (!usesConcreteAgentTargets) { | ||
| warnIntegrationEventAggregated( | ||
| `delivery injected tracking skipped for channel targets:${projectId}`, | ||
| 'delivery injected tracking skipped for channel targets', | ||
| { | ||
| projectId, | ||
| eventId: event.id, | ||
| path: event.resource.path, | ||
| recipients: uniqueRecipients | ||
| } | ||
| ) | ||
| } else if (!canTrackInjectedDelivery) { | ||
| throw new Error('Broker delivery_injected confirmation is unavailable') | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if there are other callers of the bridge that might not provide sendMessageAndWaitForInjected
rg -nP 'IntegrationEventBridge|broker:\s*\{' --type=ts -g '!*.test.ts' -g '!*.spec.ts' -C3Repository: AgentWorkforce/pear
Length of output: 3214
🏁 Script executed:
rg -n --hidden --glob '!**/node_modules/**' 'sendMessageAndWaitForInjected' -S .Repository: AgentWorkforce/pear
Length of output: 1779
🏁 Script executed:
rg -n 'BrokerEventBridge' src -SRepository: AgentWorkforce/pear
Length of output: 806
🏁 Script executed:
sed -n '2350,2580p' src/main/integration-event-bridge.tsRepository: AgentWorkforce/pear
Length of output: 8844
🏁 Script executed:
rg -n 'const bridge|let bridge|bridge\s*=' src/main/integration-event-bridge.tsRepository: AgentWorkforce/pear
Length of output: 107
🏁 Script executed:
rg -n --hidden --glob '!**/node_modules/**' 'sendMessageAndWaitForInjected' -S .Repository: AgentWorkforce/pear
Length of output: 1779
🏁 Script executed:
rg -n 'BrokerEventBridge' src -SRepository: AgentWorkforce/pear
Length of output: 806
🏁 Script executed:
sed -n '2350,2580p' src/main/integration-event-bridge.tsRepository: AgentWorkforce/pear
Length of output: 8844
🏁 Script executed:
rg -n 'const bridge|let bridge|bridge\s*=' src/main/integration-event-bridge.tsRepository: AgentWorkforce/pear
Length of output: 107
🏁 Script executed:
rg -n 'start\(' src/main/integration-event-bridge.ts | head -n 50Repository: AgentWorkforce/pear
Length of output: 90
Confirm hard-fail contract for missing sendMessageAndWaitForInjected
The integration-event bridge intentionally throws (Broker delivery_injected confirmation is unavailable) whenever all recipients are concrete agents but the broker bridge lacks sendMessageAndWaitForInjected—this is a deliberate contract, not an accidental rollout regression (the mock/tests/specs treat the missing method as a hard failure).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/main/integration-event-bridge.ts` around lines 2405 - 2421, This is
intentional: when usesConcreteAgentTargets is true but
bridge.sendMessageAndWaitForInjected is missing, keep the hard-fail (throw new
Error('Broker delivery_injected confirmation is unavailable')) and do not
convert it to a warning; add a short clarifying comment above the
usesConcreteAgentTargets/canTrackInjectedDelivery check referencing the
delivery_injected confirmation contract and tests/mocks that expect a hard
failure, and ensure the thrown error message remains exactly 'Broker
delivery_injected confirmation is unavailable' so existing tests/specs continue
to pass.
There was a problem hiding this comment.
3 issues found across 5 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/main/broker.ts">
<violation number="1" location="src/main/broker.ts:3115">
P2: `sendMessageAndWaitForInjected` does not accept an `AbortSignal`, but the design spec in this same PR documents `signal?: AbortSignal` as required for project-close cleanup. Without it, `close(projectId)` cannot abort in-flight injection waits, leaving background tasks that burn the full timeout and may attempt to commit/release claims on an already-disposed project.</violation>
<violation number="2" location="src/main/broker.ts:3130">
P1: The `waitForInjection` promise starts its timeout immediately upon construction, but isn't awaited until after `sendMessage` resolves. If `sendMessage` latency exceeds `timeoutMs`, the promise rejects while no `.catch()` or `await` is attached, triggering an `unhandledRejection` event in Node.js. Start the timeout after `sendMessage` returns (once `event_id`/targets are known), or attach a no-op `.catch()` to suppress the intermediate unhandled rejection while still replaying buffered events.</violation>
</file>
<file name="src/main/integration-event-bridge.ts">
<violation number="1" location="src/main/integration-event-bridge.ts:2659">
P1: Injected confirmation promises are created without an immediate rejection handler, which can produce unhandled promise rejections before the later aggregation catch is attached.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
| let resolveWait: (() => void) | undefined | ||
| let rejectWait: ((error: Error) => void) | undefined | ||
|
|
||
| const waitForInjection = new Promise<void>((resolve, reject) => { |
There was a problem hiding this comment.
P1: The waitForInjection promise starts its timeout immediately upon construction, but isn't awaited until after sendMessage resolves. If sendMessage latency exceeds timeoutMs, the promise rejects while no .catch() or await is attached, triggering an unhandledRejection event in Node.js. Start the timeout after sendMessage returns (once event_id/targets are known), or attach a no-op .catch() to suppress the intermediate unhandled rejection while still replaying buffered events.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/main/broker.ts, line 3130:
<comment>The `waitForInjection` promise starts its timeout immediately upon construction, but isn't awaited until after `sendMessage` resolves. If `sendMessage` latency exceeds `timeoutMs`, the promise rejects while no `.catch()` or `await` is attached, triggering an `unhandledRejection` event in Node.js. Start the timeout after `sendMessage` returns (once `event_id`/targets are known), or attach a no-op `.catch()` to suppress the intermediate unhandled rejection while still replaying buffered events.</comment>
<file context>
@@ -3106,6 +3109,99 @@ export class BrokerManager {
+ let resolveWait: (() => void) | undefined
+ let rejectWait: ((error: Error) => void) | undefined
+
+ const waitForInjection = new Promise<void>((resolve, reject) => {
+ resolveWait = resolve
+ rejectWait = reject
</file context>
| injectedConfirmation = bridge.sendMessageAndWaitForInjected!( | ||
| projectId, | ||
| message, | ||
| { timeoutMs: DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS } | ||
| ) | ||
| return Promise.resolve() |
There was a problem hiding this comment.
P1: Injected confirmation promises are created without an immediate rejection handler, which can produce unhandled promise rejections before the later aggregation catch is attached.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/main/integration-event-bridge.ts, line 2659:
<comment>Injected confirmation promises are created without an immediate rejection handler, which can produce unhandled promise rejections before the later aggregation catch is attached.</comment>
<file context>
@@ -2553,24 +2645,43 @@ export class IntegrationEventBridge {
+
+ let injectedConfirmation: Promise<unknown> | undefined
+ await pacer.enqueue(input, (message) => {
+ injectedConfirmation = bridge.sendMessageAndWaitForInjected!(
+ projectId,
+ message,
</file context>
| injectedConfirmation = bridge.sendMessageAndWaitForInjected!( | |
| projectId, | |
| message, | |
| { timeoutMs: DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS } | |
| ) | |
| return Promise.resolve() | |
| const confirmation = bridge.sendMessageAndWaitForInjected!( | |
| projectId, | |
| message, | |
| { timeoutMs: DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS } | |
| ) | |
| injectedConfirmation = confirmation.catch((error) => { | |
| throw error | |
| }) | |
| return Promise.resolve() |
| async sendMessageAndWaitForInjected( | ||
| projectId: string | undefined, | ||
| input: SendMessageInput, | ||
| options: { timeoutMs?: number } = {} |
There was a problem hiding this comment.
P2: sendMessageAndWaitForInjected does not accept an AbortSignal, but the design spec in this same PR documents signal?: AbortSignal as required for project-close cleanup. Without it, close(projectId) cannot abort in-flight injection waits, leaving background tasks that burn the full timeout and may attempt to commit/release claims on an already-disposed project.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/main/broker.ts, line 3115:
<comment>`sendMessageAndWaitForInjected` does not accept an `AbortSignal`, but the design spec in this same PR documents `signal?: AbortSignal` as required for project-close cleanup. Without it, `close(projectId)` cannot abort in-flight injection waits, leaving background tasks that burn the full timeout and may attempt to commit/release claims on an already-disposed project.</comment>
<file context>
@@ -3106,6 +3109,99 @@ export class BrokerManager {
+ async sendMessageAndWaitForInjected(
+ projectId: string | undefined,
+ input: SendMessageInput,
+ options: { timeoutMs?: number } = {}
+ ): Promise<DeliveryConfirmationResult> {
+ const session = input.to.startsWith('#')
</file context>
DO-NOT-MERGE: draft implementation stacked on #145. No merge until #145 lands and project-lead explicitly clears #147.
Summary:
BrokerManager.sendMessageAndWaitForInjected(...), which resolves only ondelivery_injectedfor the matchingevent_idand target.sendMessageAndWaitForDelivery(...)behavior intact;delivery_ack/delivery_verifiedremain success only for that existing helper, not for Integration-event steer loss: optimistic dedup claim suppresses re-drive of lost steers (relay#1056 §5) #147 commit.targets=[]; they keep delivery flowing and log a residual warning.Verification:
node --experimental-strip-types --no-warnings --test src/main/__tests__/integration-event-bridge.test.ts(61/61)npx --yes vitest run src/main/broker.test.ts(36/36)npm run buildgit diff --checkCross-links: #145, #147, relay#1058, relay#1056 §5.