diff --git a/docs/specs/2026-06-07-integration-event-redrive.md b/docs/specs/2026-06-07-integration-event-redrive.md new file mode 100644 index 0000000..2e4d9af --- /dev/null +++ b/docs/specs/2026-06-07-integration-event-redrive.md @@ -0,0 +1,316 @@ +# Integration Event Re-Drive After Lost Steers + +Issue: #147. Base: #145 (`split/132-slack-integration-event-context-retry`). +Status: IMPLEMENTATION APPROVED FOR DIRECT-AGENT PATH ONLY. Do not merge until +#145 has landed and project-lead explicitly clears #147. The broker +delivery-signal contract was pinned by relay-worker and project-lead on +2026-06-07. + +## Problem + +The integration-event bridge currently treats broker send acceptance as delivery. +For Slack logical dedupe, `claimSlackLogicalInjection()` records the +logical/content-hash claim before a steer is known to have reached the target +agent. For non-Slack paths, `wasRecentlyInjected()` does the same with the path +dedupe key. + +That is safe for duplicate suppression only when the broker send reliably reaches +the agent. The confirmed failure mode from 2026-06-07 is different: + +- Relayfile emits the same Slack record several times. +- The first delivery logs `injecting` for `slack-comms`, so the bridge claims the + logical key. +- The steer is lost after broker acceptance and never appears in the agent + conversation. +- Later re-deliveries are suppressed for the one-hour Slack replay TTL because + the content hash is unchanged. + +#145 fixes the separate "content unavailable" path, but it cannot fix a lost +first steer with unchanged content. The bridge needs delivery confirmation and a +bounded re-drive window. + +## Goals + +- Make dedupe claims provisional until delivery is confirmed. +- Commit a provisional claim only after the broker/harness-driver reports that + the steer reached the intended recipient. +- Release a provisional claim on explicit delivery failure or confirmation + timeout, so a duplicate/replayed event can re-drive the same message. +- Preserve current pacing: event dispatch must not block the project send queue + while waiting for delivery confirmation. +- Keep duplicate hardening by stable event identity first, Slack logical key and + content hash second. +- Make telemetry low-noise: count released provisional claims and confirmation + timeouts without logging every duplicate. + +## Non-Goals + +- No durable mailbox design in this PR. That belongs to relay#1056. +- No delivery retry loop inside Pear beyond releasing the dedupe claim. Relayfile + replay, stream reconnect, or event-feed polling provides the re-drive input. +- No merge before Khaliq's explicit go. This design stacks on #145 and should + become a follow-up PR after #145 lands. +- No channel-target delivery confirmation in #147 unless concrete resolved agent + targets are available. Empty/unresolved channel targets must never commit a + provisional claim. + +## Broker Delivery Contract + +Pear already has a related confirmation helper, +`BrokerManager.sendMessageAndWaitForDelivery(projectId, input, { timeoutMs })`. +That helper: + +- sends through the broker; +- captures the returned `event_id` and targets; +- subscribes to `session.client.onEvent(...)`; +- resolves on `delivery_ack`, `delivery_verified`, or + `message_delivery_confirmed` for that event and target; +- rejects on `delivery_failed`, `message_delivery_failed`, or its own timeout. + +Project-lead ruled that #147 must NOT commit on that helper's current success +condition. `delivery_ack` and `delivery_verified` can be satisfied by an echo +timeout fallback, which is too weak for this bug: it could commit the dedupe +claim even if the steer never crossed the lost hop. + +Commit signal for #147: `delivery_injected`, filtered by returned +`event_id` and target. That event is emitted by the PTY worker after it writes +the injection bytes and carriage return to the PTY. It is the correct boundary +for mode-B loss: "accepted by broker" is too early, and "read by model" is out +of scope. + +Relay-worker owns the BrokerManager helper extension. The bridge should depend +on a sibling helper, not hand-subscribe to broker events: + +```ts +sendMessageAndWaitForInjected( + projectId: string, + input: SendMessageInput, + options?: { timeoutMs?: number; signal?: AbortSignal } +): Promise<{ eventId: string; targets: string[] }> +``` + +Semantics: + +- Internals share/refactor the existing send-plus-event-wait logic from + `sendMessageAndWaitForDelivery`. +- Success is only `delivery_injected` matching the returned `event_id` and + target `name`. +- `delivery_ack`, `delivery_verified`, and `message_delivery_confirmed` are not + success for this helper. If they arrive without `delivery_injected`, the + helper keeps waiting until `delivery_injected` or timeout. +- Failure stays `delivery_failed` / `message_delivery_failed`, or helper + timeout. +- Target handling matches the existing helper: use returned `targets` when + present; for direct agent sends with no targets, fall back to `[input.to]`. +- Unsupported brokers may return no usable `event_id`; the bridge treats those as + accepted-only and commits immediately, preserving current behavior. A direct + agent send with no returned targets is not unsupported: it falls back to + `[input.to]` and must still wait for `delivery_injected`. Channel/project + fanout with no concrete targets is the residual case described below. +- Before waiting for `delivery_injected`, BrokerManager should detect support + from broker session metadata, such as an explicit capability flag or the + protocol/version threshold that introduced `delivery_injected`. If the session + is known not to support the signal, return an accepted-only result immediately + and record `deliveryInjectedUnsupported` instead of waiting for timeout. + +The helper may internally reuse the current `sendMessageAndWaitForDelivery` +mechanics: send once, capture `event_id` / targets, subscribe internally, filter +events in BrokerManager. It must differ by resolving only on +`delivery_injected`. + +Calling this helper from the existing project pacer and awaiting it would +reintroduce the old head-of-line block. The bridge must start the helper as a +background confirmation task, attach `.then(commit).catch(release)`, and return +from the pacer callback immediately after starting the send attempt. The pacer +rate-limits starts, not confirmation completion. + +Background confirmation tasks must be project-scoped and cancellable. The bridge +should keep an `AbortController` per active confirmation task, pass its +`signal` to `sendMessageAndWaitForInjected()`, and abort all pending +confirmations from `close(projectId)` before disposing the pacer. Abort should +release provisional claims without emitting delivery-failure warnings for a +project that is intentionally closing. + +Coverage result: relay-worker confirmed `delivery_injected` is emitted for the +agent transports the integration bridge can inject to. Local PTY workers emit it +after writing the injection bytes and carriage return. Headless/app-server +runtimes emit it on successful handler delivery before ack/failure. +Cloud-attached sandboxes use the same harness-driver event stream through +`attachCloudSandbox`, so direct sends to cloud-owned agents use the same +delivery signal once the agent has been observed/listed. + +Timing result: for current integration-event `mode: 'steer'`, `delivery_injected` +is not gated on the model becoming idle. PTY steer sends ESC ESC, waits briefly, +writes the injection and CR, then emits `delivery_injected`. It can still wait +behind the worker's local pending-injection queue or a wedged PTY write, but it +does not wait for a long model turn to complete. + +Channel guard: the bridge can target channels as well as direct agents. Broker +channel sends fan out to concrete workers that use the same delivery machinery, +but today's `/api/send` response does not include the resolved worker names. +Existing wait helpers therefore see `targets=[]` for `#channel` sends and return +immediately. For #147, that must NOT count as committed delivery. Engage +provisional -> commit-on-injected only when there is at least one concrete agent +target. Empty-target/channel sends keep prior semantics, are logged as residual, +and must never be falsely committed. Mixed direct-agent plus channel batches use +the same conservative behavior for the whole batch: no injected-delivery claim +is committed, so direct-agent recipients in that mixed batch do not get #147 +re-drive protection. The long-term fix is for the broker send response to +include resolved targets. + +File ownership for implementation: + +- pear-worker owns the additive BrokerManager helper and the bridge state + machine. +- relay-worker owns BrokerManager helper tests. +- Keep broker.ts changes strictly additive: new helper, new predicate, and + shared/refactored wait logic only. A shared wait refactor is allowed, but it + must preserve `sendMessageAndWaitForDelivery()` behavior exactly and keep that + method's existing tests green. Do not touch the #146 workspace-key/cloud + regions. + +## Bridge State Model + +Replace optimistic one-shot claims with a small state machine shared by generic +path dedupe and Slack logical/content-hash dedupe. + +```ts +type DeliveryClaimStatus = 'provisional' | 'committed' + +type DeliveryClaim = { + status: DeliveryClaimStatus + expiresAt: number + confirmationDeadline: number + recipients: Map + contentHash?: string + eventIds: Set +} +``` + +For Slack logical keys, the entry is still keyed by logical Slack identity. A +content-hash bucket carries one claim: + +```ts +type SlackLogicalInjectionState = { + expiresAt: number + blindClaim?: DeliveryClaim + contentHashes: Map +} +``` + +Behavior: + +- A committed matching content hash suppresses duplicates until TTL expiry. +- A provisional matching content hash suppresses only concurrent in-flight alias + copies while confirmation is still pending. +- When confirmation times out or fails, the provisional claim is removed. A later + replay of the same logical key/content hash can inject again. +- A blind claim can still learn the first later content hash as in #145. When + the hash is learned, move the existing `blindClaim` into `contentHashes` under + that hash and clear `blindClaim`; the moved claim stays provisional until + delivery is confirmed. +- Genuine edits with new content hashes continue to inject independently. + +For non-Slack integration events, the existing recent-injection map becomes the +same provisional/committed state keyed by `eventDedupeKeyWithFingerprint()`. + +## Delivery Flow + +1. Build the dedupe key and resolve recipients as today. +2. If an equivalent committed claim exists, drop as duplicate. +3. If an equivalent provisional claim exists, drop as duplicate only while its + confirmation window is still open. +4. Create or update a provisional claim before sending, scoped to the event key, + content hash, and intended recipients. +5. Enqueue each broker send through the existing project pacer. The pacer starts + the `sendMessageAndWaitForInjected()` task and does not await its + confirmation. +6. For each started send, attach background handlers: + - On `delivery_injected`: mark that recipient confirmed. + - When all recipients are confirmed: commit the claim and extend `expiresAt` + to the normal replay TTL. + - On failure or timeout: mark failed and release the provisional claim. +7. If every recipient fails synchronously before send acceptance, release the + claim immediately as today. +8. If some recipients confirm and some fail, release the whole shared logical + claim rather than committing on first success. A later replay re-sends to all + recipients, so recipients that already received the steer may see a duplicate. + That is the accepted duplicate-over-drop bias for #147. True per-recipient + claims could avoid the duplicate, but they add complexity and are out of + scope for this pass; the live drop path is single-recipient (`slack-comms`). + +## Timeout And Telemetry + +Default injection-confirmation timeout: 5 seconds for direct `mode: 'steer'` +agent sends. + +Rationale: relay-worker confirmed `delivery_injected` fires after the PTY worker +writes bytes, not after the model reads or completes a turn. Five seconds is +therefore intentionally generous for healthy agents while keeping the +provisional claim window short enough that replay can recover promptly and +concurrent duplicate suppression does not linger. + +Add aggregate counters/logs: + +- `deliveryClaimsProvisional` +- `deliveryClaimsCommitted` +- `deliveryClaimsReleased` +- `deliveryInjectedTimeouts` +- `deliveryInjectedUnsupported` + +Use the existing aggregated warning cadence for failures and include only +projectId, provider, logical key prefix, recipient count, and reason. Do not log +message content. + +## Test Plan + +Add integration-event bridge tests on top of the existing #145 suite: + +- Slack unchanged-content replay re-drives after first accepted steer never + emits `delivery_injected`. +- Slack unchanged-content replay is suppressed after `delivery_injected` + commits the content hash. +- Slack alias copies during an open provisional window are suppressed, but a + later replay after timeout is delivered. +- Generic non-Slack event dedupe releases after delivery timeout and re-drives. +- No-recipient and synchronous send-failure paths still release immediately. +- Project broker pacing does not wait on delivery confirmation; the existing + "does not wait on delivery confirmation path" test should become "does not + wait while tracking injection confirmation". +- Missing `sendMessageAndWaitForInjected` in tests/mocks is treated as a hard + harness gap after #147 lands; do not silently fall back to + `sendMessageAndWaitForDelivery`. +- Unsupported injection confirmation commits accepted sends immediately and + emits low-noise telemetry. +- Project close aborts pending background confirmations, releases provisional + claims, and does not log delivery-failure warnings for the intentional abort. + +Add broker tests for relay-worker's helper extension: + +- send acceptance returns before `delivery_injected`; +- confirmation resolves on `delivery_injected` for the matching event id and + target; +- confirmation rejects on delivery failure; +- confirmation rejects on timeout; +- `delivery_ack` / `delivery_verified` without `delivery_injected` do not + commit the helper; +- multi-target sends wait for all reported targets. + +## Rollout + +1. Land #145 and restart Pear on it to restore reliable content reads. +2. Direct-agent implementation uses `delivery_injected` with a 5-second timeout. + Channel/unresolved targets remain residual and never commit on `targets=[]`. + Mixed direct-agent plus channel batches also keep prior semantics for the + whole batch. +3. pear-worker adds the additive BrokerManager helper and bridge state machine; + relay-worker adds helper tests. +4. Implement #147 as a follow-up stacked from #145/main using that helper as the + confirmation source. +5. Keep slack-comms' debug-log/OOB drop guard enabled until #147 is merged and + the running app is restarted on it. + +Residual risk: if the PTY accepts bytes but the agent process is wedged and +never semantically processes them, `delivery_injected` still commits. That is an +agent-liveness/read-receipt problem and belongs to the fuller relay#1056 read +ack path, not #147. diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index e95f39a..806bf85 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -163,6 +163,8 @@ function makeHarness( sendDelayMs?: number onSendStart?: (activeSends: number) => void waitForDeliveryNeverSettles?: boolean + waitForInjectedNeverSettles?: boolean + failInjected?: boolean } = {} ): { bridge: IntegrationEventBridge @@ -171,6 +173,7 @@ function makeHarness( sent: SentMessage[] listAgentsCalls: string[] deliveryConfirmationCalls: SentMessage[] + injectedConfirmationCalls: SentMessage[] unsubscribedCount: () => number emit(event: ChangeEvent): Promise } { @@ -179,6 +182,7 @@ function makeHarness( const sent: SentMessage[] = [] const listAgentsCalls: string[] = [] const deliveryConfirmationCalls: SentMessage[] = [] + const injectedConfirmationCalls: SentMessage[] = [] const subscriptions: Subscription[] = [] let unsubscribedCount = 0 let activeSends = 0 @@ -235,7 +239,26 @@ function makeHarness( deliveryConfirmationCalls.push({ projectId, input }) await new Promise(() => undefined) } - : undefined + : undefined, + sendMessageAndWaitForInjected: async (projectId, input) => { + injectedConfirmationCalls.push({ projectId, input }) + activeSends += 1 + options.onSendStart?.(activeSends) + try { + if (options.sendDelayMs) { + await new Promise((resolve) => setTimeout(resolve, options.sendDelayMs)) + } + if (options.failSend) throw new Error('broker unavailable') + sent.push({ projectId, input }) + if (options.failInjected) throw new Error('delivery injection timed out') + if (options.waitForInjectedNeverSettles) { + await new Promise(() => undefined) + } + return { eventId: `evt-${injectedConfirmationCalls.length}`, targets: [input.to] } + } finally { + activeSends -= 1 + } + } } }) @@ -252,6 +275,7 @@ function makeHarness( sent, listAgentsCalls, deliveryConfirmationCalls, + injectedConfirmationCalls, unsubscribedCount: () => unsubscribedCount, emit } @@ -846,6 +870,89 @@ test('slack edits after a blind alias claim still inject once the content change assert.match(harness.sent[1].input.text, /Message:\nedited Slack message/u) }) +test('slack unchanged-content replay re-drives after injected delivery is not confirmed', async () => { + const options = { failInjected: true } + const harness = makeHarness(['alice'], options) + const warnCalls: unknown[][] = [] + const originalWarn = console.warn + console.warn = (...args: unknown[]) => { + warnCalls.push(args) + } + + try { + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 1) + await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] delivery injected confirmation failed')) + + options.failInjected = false + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 2) + } finally { + console.warn = originalWarn + } + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice']) +}) + +test('slack unchanged-content replay is suppressed after injected delivery commits', async () => { + const harness = makeHarness(['alice']) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 1) + await harness.emit(changeEvent(path, 'slack')) + await waitForDropped('project-1', 1) + + assert.equal(harness.sent.length, 1) +}) + +test('slack channel targets do not pin unresolved injected-delivery claims', async () => { + const harness = makeHarness(['alice']) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyChannels: ['#triage'] } + }) + ]) + }) + + const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 1) + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 2) + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['#triage', '#triage']) + assert.equal(harness.injectedConfirmationCalls.length, 0) +}) + test('remote replayed events older than the subscription session are dropped by default', async () => { const harness = makeHarness() @@ -1804,8 +1911,8 @@ test('integration event delivery failures use aggregated warn cadence by default assert.equal(debugCalls.length, 0) assert.equal(warnCalls.length, 2) - assert.equal(warnCalls[0][0], '[integration-events] event delivery failed') - assert.equal(warnCalls[1][0], '[integration-events] event delivery failed') + assert.equal(warnCalls[0][0], '[integration-events] delivery injected confirmation failed') + assert.equal(warnCalls[1][0], '[integration-events] delivery injected confirmation failed') assert.deepEqual(warnCalls.map((call) => (call[1] as { occurrences: number }).occurrences), [1, 26]) assert.deepEqual( warnCalls.map((call) => (call[1] as { suppressedSinceLastLog: number }).suppressedSinceLastLog), @@ -1816,10 +1923,10 @@ test('integration event delivery failures use aggregated warn cadence by default assert.equal(telemetry.brokerSendsDeferred >= 0, true) assert.deepEqual({ ...telemetry, brokerSendsDeferred: 0 }, { eventsReceived: 26, - eventsInjected: 0, + eventsInjected: 26, eventsCoalesced: 0, eventsDropped: 0, - brokerSends: 0, + brokerSends: 26, brokerSendsDeferred: 0, queueDepth: 0, mountCount: 0, @@ -1850,7 +1957,7 @@ test('failed deliveries release the dedupe key so duplicate events retry', async const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' await harness.emit(changeEvent(path, 'slack')) - await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] event delivery failed')) + await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] delivery injected confirmation failed')) assert.equal(harness.sent.length, 0) // The same logical change arrives again (remote copy of a local mount @@ -2053,16 +2160,10 @@ test('integration event dispatcher coalesces rapid distinct revisions for the sa }) }) -test('integration event fanout sends to recipients sequentially', async () => { - let maxActiveSends = 0 +test('integration event fanout sends to recipients in stable order', async () => { const harness = makeHarness( Array.from({ length: 12 }, (_, index) => `agent-${index}`), - { - sendDelayMs: 2, - onSendStart: (activeSends) => { - maxActiveSends = Math.max(maxActiveSends, activeSends) - } - } + { sendDelayMs: 2 } ) await harness.bridge.reconcile('project-1', [ @@ -2076,7 +2177,6 @@ test('integration event fanout sends to recipients sequentially', async () => { await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear')) await waitUntil(() => harness.sent.length === 12) - assert.equal(maxActiveSends, 1) assert.deepEqual(harness.sent.map((message) => message.input.to), [ 'agent-0', 'agent-1', diff --git a/src/main/broker.test.ts b/src/main/broker.test.ts index 5d83b63..12140d6 100644 --- a/src/main/broker.test.ts +++ b/src/main/broker.test.ts @@ -1084,6 +1084,167 @@ describe('BrokerManager local + cloud coexistence', () => { await manager.shutdown() }) + it('waits for injection using the addressed agent when broker send result omits targets', async () => { + const manager = new BrokerManager() + const local = await startLocal(manager, ['claude-1']) + local.sendMessage.mockResolvedValueOnce({ event_id: 'evt-injected' }) + local.onEvent.mockImplementationOnce((listener) => { + setImmediate(() => { + listener({ + kind: 'delivery_injected', + event_id: 'evt-injected', + name: 'claude-1' + }) + }) + return () => undefined + }) + + await expect(manager.sendMessageAndWaitForInjected(PROJECT_ID, { + to: 'claude-1', + text: 'ping' + })).resolves.toEqual({ + eventId: 'evt-injected', + targets: ['claude-1'] + }) + + await manager.shutdown() + }) + + it('does not treat delivery ack or verification as an injection confirmation', async () => { + const manager = new BrokerManager() + const local = await startLocal(manager, ['claude-1']) + local.sendMessage.mockResolvedValueOnce({ event_id: 'evt-not-injected' }) + local.onEvent.mockImplementationOnce((listener) => { + setImmediate(() => { + listener({ + kind: 'delivery_ack', + event_id: 'evt-not-injected', + name: 'claude-1' + }) + listener({ + kind: 'delivery_verified', + event_id: 'evt-not-injected', + name: 'claude-1' + }) + }) + return () => undefined + }) + + await expect(manager.sendMessageAndWaitForInjected(PROJECT_ID, { + to: 'claude-1', + text: 'ping' + }, { timeoutMs: 10 })).rejects.toThrow( + 'Timed out waiting for delivery injection for evt-not-injected (claude-1)' + ) + + await manager.shutdown() + }) + + it.each([ + ['delivery_failed', 'PTY write failed'], + ['message_delivery_failed', 'broker send failed'] + ] as const)('rejects injection wait on %s', async (kind, reason) => { + const manager = new BrokerManager() + const local = await startLocal(manager, ['claude-1']) + local.sendMessage.mockResolvedValueOnce({ event_id: `evt-${kind}` }) + local.onEvent.mockImplementationOnce((listener) => { + setImmediate(() => { + listener({ + kind, + event_id: `evt-${kind}`, + name: 'claude-1', + reason + }) + }) + return () => undefined + }) + + await expect(manager.sendMessageAndWaitForInjected(PROJECT_ID, { + to: 'claude-1', + text: 'ping' + })).rejects.toThrow(reason) + + await manager.shutdown() + }) + + it('waits for every reported target before confirming injection', async () => { + const manager = new BrokerManager() + const local = await startLocal(manager, ['claude-1', 'codex-1']) + local.sendMessage.mockResolvedValueOnce({ + event_id: 'evt-multi-injected', + targets: ['claude-1', 'codex-1'] + }) + local.onEvent.mockImplementationOnce((listener) => { + setImmediate(() => { + listener({ + kind: 'delivery_injected', + event_id: 'evt-multi-injected', + name: 'claude-1' + }) + listener({ + kind: 'delivery_injected', + event_id: 'evt-multi-injected', + name: 'codex-1' + }) + }) + return () => undefined + }) + + await expect(manager.sendMessageAndWaitForInjected(PROJECT_ID, { + to: 'claude-1', + text: 'ping' + })).resolves.toEqual({ + eventId: 'evt-multi-injected', + targets: ['claude-1', 'codex-1'] + }) + + await manager.shutdown() + }) + + it('returns without waiting for injection when a channel send has no concrete targets', async () => { + const manager = new BrokerManager() + const local = await startLocal(manager, ['claude-1']) + local.sendMessage.mockResolvedValueOnce({ event_id: 'evt-channel', targets: [] }) + + await expect(manager.sendMessageAndWaitForInjected(PROJECT_ID, { + to: '#general', + text: 'ping' + }, { timeoutMs: 1 })).resolves.toEqual({ + eventId: 'evt-channel', + targets: [] + }) + + await manager.shutdown() + }) + + it('replays injection events observed before sendMessage resolves', async () => { + const manager = new BrokerManager() + const local = await startLocal(manager, ['claude-1']) + let eventListener: ((event: unknown) => void) | undefined + local.onEvent.mockImplementationOnce((listener) => { + eventListener = listener + return () => undefined + }) + local.sendMessage.mockImplementationOnce(async () => { + eventListener?.({ + kind: 'delivery_injected', + event_id: 'evt-early-injected', + name: 'claude-1' + }) + return { event_id: 'evt-early-injected' } + }) + + await expect(manager.sendMessageAndWaitForInjected(PROJECT_ID, { + to: 'claude-1', + text: 'ping' + })).resolves.toEqual({ + eventId: 'evt-early-injected', + targets: ['claude-1'] + }) + + await manager.shutdown() + }) + it('keeps repeated no-identity PTY chunks after intervening output', async () => { const manager = new BrokerManager() const win = createMockWindow() diff --git a/src/main/broker.ts b/src/main/broker.ts index 4c584c7..ba8a166 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -429,17 +429,20 @@ function brokerEventString(event: BrokerEvent, key: string): string | undefined return typeof value === 'string' ? value : undefined } -function isDeliveryEventForMessage(event: BrokerEvent, eventId: string, targets: string[]): boolean { - const kind = brokerEventString(event, 'kind') - if (![ +function isDeliveryEventForMessage( + event: BrokerEvent, + eventId: string, + targets: string[], + allowedKinds: string[] = [ 'delivery_ack', 'delivery_verified', 'delivery_failed', 'message_delivery_confirmed', 'message_delivery_failed' - ].includes(kind || '')) { - return false - } + ] +): boolean { + const kind = brokerEventString(event, 'kind') + if (!allowedKinds.includes(kind || '')) return false if (brokerEventString(event, 'event_id') !== eventId) return false const name = brokerEventString(event, 'name') return !name || targets.length === 0 || targets.includes(name) @@ -3106,6 +3109,99 @@ export class BrokerManager { } } + async sendMessageAndWaitForInjected( + projectId: string | undefined, + input: SendMessageInput, + options: { timeoutMs?: number } = {} + ): Promise { + const session = input.to.startsWith('#') + ? this.getSessionForProject(projectId || '') + : this.getSessionForAgent(input.to, projectId) + const timeoutMs = Math.max(1, options.timeoutMs ?? DEFAULT_DELIVERY_CONFIRMATION_TIMEOUT_MS) + const observedEvents: BrokerEvent[] = [] + let eventId: string | undefined + let targets: string[] = [] + let pendingTargets = new Set() + let settled = false + let timer: ReturnType | undefined + let resolveWait: (() => void) | undefined + let rejectWait: ((error: Error) => void) | undefined + + const waitForInjection = new Promise((resolve, reject) => { + resolveWait = resolve + rejectWait = reject + timer = setTimeout(() => { + if (settled) return + settled = true + const pending = Array.from(pendingTargets) + const targetSummary = pending.length > 0 ? ` (${pending.join(', ')})` : '' + reject(new Error(`Timed out waiting for delivery injection for ${eventId || input.to}${targetSummary}`)) + }, timeoutMs) + }) + + const maybeComplete = (event: BrokerEvent): void => { + if (settled || !eventId) return + if (!isDeliveryEventForMessage(event, eventId, targets, [ + 'delivery_injected', + 'delivery_failed', + 'message_delivery_failed' + ])) { + return + } + const name = brokerEventString(event, 'name') + + if (event.kind === 'delivery_injected') { + if (!name || pendingTargets.size === 0) { + settled = true + resolveWait?.() + return + } + pendingTargets.delete(name) + if (pendingTargets.size === 0) { + settled = true + resolveWait?.() + } + return + } + + if (event.kind === 'delivery_failed' || event.kind === 'message_delivery_failed') { + settled = true + rejectWait?.(new Error(deliveryFailureMessage(event))) + } + } + + const unsubscribe = session.client.onEvent((event) => { + observedEvents.push(event) + maybeComplete(event) + }) + + try { + const rawResult = await session.client.sendMessage(input) as unknown + const result = isRecord(rawResult) ? rawResult : {} + eventId = typeof result.event_id === 'string' ? result.event_id : 'unsupported_operation' + const reportedTargets = Array.isArray(result.targets) + ? result.targets.filter((target): target is string => typeof target === 'string' && target.trim().length > 0) + : [] + targets = reportedTargets.length > 0 || input.to.startsWith('#') + ? reportedTargets + : [input.to] + pendingTargets = new Set(targets) + if (targets.length === 0 || eventId === 'unsupported_operation') { + settled = true + return { eventId, targets } + } + for (const event of observedEvents) { + maybeComplete(event) + if (settled) break + } + await waitForInjection + return { eventId, targets } + } finally { + if (timer) clearTimeout(timer) + unsubscribe() + } + } + async subscribeAgentChannel(projectId: string | undefined, name: string, channel: string): Promise { const trimmedName = name.trim() const [channelName] = normalizeChannels([channel]) diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 930e8c8..5647427 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -50,6 +50,7 @@ const MAX_DISPATCH_SUMMARY_GROUPS = 10 const MAX_DISPATCHED_EVENTS_PER_SECOND = 25 const PROJECT_AGENT_RECIPIENT_CACHE_TTL_MS = 2_000 const MAX_BROKER_SENDS_PER_SECOND = 25 +const DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS = 5_000 const REMOTE_STREAM_ERROR_POLLING_FALLBACK_THRESHOLD = 5 const REMOTE_STREAM_POLL_INTERVAL_MS = 5_000 @@ -99,7 +100,22 @@ type EventContextPreviewMetadata = Omit type SlackLogicalInjectionState = { expiresAt: number - contentHashes?: Set + committedBlind: boolean + committedContentHashes: Set + provisionalBlind: boolean + provisionalContentHashes: Set +} + +type RecentInjectionState = { + expiresAt: number + provisional: boolean +} + +type DeliveryDedupeClaim = { + key: string + isSlackLogicalKey: boolean + ttlMs: number + contentHash?: string } type DispatchItem = { @@ -163,6 +179,18 @@ type BrokerEventBridge = { }, options?: { timeoutMs?: number } ) => Promise + sendMessageAndWaitForInjected?: ( + projectId: string, + input: { + to: string + text: string + from?: string + data?: Record + priority?: number + mode?: 'wait' | 'steer' + }, + options?: { timeoutMs?: number } + ) => Promise<{ eventId: string; targets: string[] }> } type RelayfileEventClient = { @@ -1947,6 +1975,7 @@ class ProjectEventDispatcher { class ProjectBrokerSendPacer { private readonly queue: Array<{ input: BrokerMessageInput + send?: (input: BrokerMessageInput) => Promise resolve: () => void reject: (error: unknown) => void }> = [] @@ -1963,12 +1992,12 @@ class ProjectBrokerSendPacer { this.send = send } - enqueue(input: BrokerMessageInput): Promise { + enqueue(input: BrokerMessageInput, send?: (input: BrokerMessageInput) => Promise): Promise { if (!this.active) return Promise.resolve() const deferred = this.queue.length > 0 || this.nextRateLimitDelayMs() > 0 || this.draining if (deferred) incrementIntegrationEventCounter(this.projectId, 'brokerSendsDeferred') return new Promise((resolveSend, rejectSend) => { - this.queue.push({ input, resolve: resolveSend, reject: rejectSend }) + this.queue.push({ input, send, resolve: resolveSend, reject: rejectSend }) this.updateDepthGauge() this.scheduleDrain(0) }) @@ -2007,7 +2036,7 @@ class ProjectBrokerSendPacer { this.updateDepthGauge() this.sentInWindow += 1 try { - await this.send(item.input) + await (item.send ?? this.send)(item.input) incrementIntegrationEventCounter(this.projectId, 'brokerSends') item.resolve() } catch (error) { @@ -2041,7 +2070,7 @@ class ProjectBrokerSendPacer { export class IntegrationEventBridge { private subscriptions = new Map() private dispatchers = new Map() - private recentInjections = new Map() + private recentInjections = new Map() private slackLogicalInjections = new Map() private projectAgentRecipientCache = new Map() private notificationTargetCache = new Map() @@ -2351,20 +2380,6 @@ export class IntegrationEventBridge { let dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint) let dedupeClaimed = false - if (!needsSlackContentAwareDedupe) { - if (this.wasRecentlyInjected(dedupe.key, dedupe.ttlMs)) { - incrementIntegrationEventCounter(projectId, 'eventsDropped') - logIntegrationEvent('skipped duplicate path', { - projectId, - eventId: event.id, - path: event.resource.path, - duplicateKey: dedupe.key - }) - return - } - dedupeClaimed = true - } - const bridge = await this.bridge() const uniqueRecipients = await this.recipientsForMatchedSpecs(projectId, matchedSpecs, bridge) if (uniqueRecipients.length === 0) { @@ -2387,9 +2402,49 @@ export class IntegrationEventBridge { const eventMetadata = integrationEventMetadata(event) const contextPreview = await this.readEventContextPreview(projectId, event) - if (needsSlackContentAwareDedupe) { + const usesConcreteAgentTargets = uniqueRecipients.every((recipient) => !recipient.startsWith('#')) + const canTrackInjectedDelivery = usesConcreteAgentTargets && typeof bridge.sendMessageAndWaitForInjected === 'function' + const shouldTrackDedupe = canTrackInjectedDelivery + if (!usesConcreteAgentTargets) { + warnIntegrationEventAggregated( + `delivery injected tracking skipped for channel targets:${projectId}`, + 'delivery injected tracking skipped for channel targets', + { + projectId, + eventId: event.id, + path: event.resource.path, + recipients: uniqueRecipients + } + ) + } else if (!canTrackInjectedDelivery) { + throw new Error('Broker delivery_injected confirmation is unavailable') + } + + let deliveryClaim: DeliveryDedupeClaim | undefined + if (needsSlackContentAwareDedupe && shouldTrackDedupe) { dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint) - if (!this.claimSlackLogicalInjection(dedupe.key, contextPreview, dedupe.ttlMs)) { + const slackClaim = this.claimSlackLogicalInjection(dedupe.key, contextPreview, dedupe.ttlMs, shouldTrackDedupe) + if (!slackClaim.claimed) { + incrementIntegrationEventCounter(projectId, 'eventsDropped') + logIntegrationEvent('skipped duplicate path', { + projectId, + eventId: event.id, + path: event.resource.path, + duplicateKey: dedupe.key + }) + return + } + dedupeClaimed = shouldTrackDedupe + if (shouldTrackDedupe) { + deliveryClaim = { + key: dedupe.key, + isSlackLogicalKey: true, + ttlMs: dedupe.ttlMs, + contentHash: slackClaim.contentHash + } + } + } else if (shouldTrackDedupe) { + if (!this.claimRecentInjection(dedupe.key, dedupe.ttlMs, true)) { incrementIntegrationEventCounter(projectId, 'eventsDropped') logIntegrationEvent('skipped duplicate path', { projectId, @@ -2400,6 +2455,16 @@ export class IntegrationEventBridge { return } dedupeClaimed = true + deliveryClaim = { + key: dedupe.key, + isSlackLogicalKey: false, + ttlMs: dedupe.ttlMs + } + } else if (!needsSlackContentAwareDedupe) { + // Channel/untracked targets must not be falsely committed based on an + // unresolved target list. Keep delivery flowing and leave dedupe to + // relay/replay identity rather than pinning a local claim. + dedupeClaimed = false } const contextPreviewData = contextPreview ? eventContextPreviewMetadata(contextPreview) : undefined logIntegrationEvent('injecting', { @@ -2410,6 +2475,7 @@ export class IntegrationEventBridge { }) let deliveredCount = 0 const sendErrors: Array<{ recipient: string; error: unknown }> = [] + const injectedConfirmations: Array> = [] for (const recipient of uniqueRecipients) { const input = { to: recipient, @@ -2430,7 +2496,10 @@ export class IntegrationEventBridge { } } as const try { - await this.sendBrokerMessage(projectId, input, bridge) + const sendResult = await this.sendBrokerMessage(projectId, input, bridge, { + waitForInjected: canTrackInjectedDelivery + }) + if (sendResult.injectedConfirmation) injectedConfirmations.push(sendResult.injectedConfirmation) deliveredCount += 1 } catch (error) { sendErrors.push({ recipient, error }) @@ -2456,6 +2525,28 @@ export class IntegrationEventBridge { } ) } + if (deliveryClaim && injectedConfirmations.length > 0) { + Promise.all(injectedConfirmations) + .then(() => { + this.commitDedupeKey(deliveryClaim) + }) + .catch((error) => { + this.releaseDedupeKey(deliveryClaim.key, deliveryClaim.isSlackLogicalKey, deliveryClaim.contentHash) + warnIntegrationEventAggregated( + `delivery injected confirmation failed:${projectId}`, + 'delivery injected confirmation failed', + { + projectId, + eventId: event.id, + path: event.resource.path, + duplicateKey: deliveryClaim.key, + error: toErrorMessage(error) + } + ) + }) + } else if (deliveryClaim) { + this.releaseDedupeKey(deliveryClaim.key, deliveryClaim.isSlackLogicalKey, deliveryClaim.contentHash) + } incrementIntegrationEventCounter(projectId, 'eventsInjected') } @@ -2541,8 +2632,9 @@ export class IntegrationEventBridge { private async sendBrokerMessage( projectId: string, input: BrokerMessageInput, - bridge: BrokerEventBridge - ): Promise { + bridge: BrokerEventBridge, + options: { waitForInjected?: boolean } = {} + ): Promise<{ injectedConfirmation?: Promise }> { let pacer = this.brokerSendPacers.get(projectId) if (!pacer) { // Integration-event delivery is paced on broker send acceptance. Waiting @@ -2553,24 +2645,43 @@ export class IntegrationEventBridge { ) this.brokerSendPacers.set(projectId, pacer) } - await pacer.enqueue(input) + if (!options.waitForInjected) { + await pacer.enqueue(input) + return {} + } + + if (!bridge.sendMessageAndWaitForInjected) { + throw new Error('Broker delivery_injected confirmation is unavailable') + } + + let injectedConfirmation: Promise | undefined + await pacer.enqueue(input, (message) => { + injectedConfirmation = bridge.sendMessageAndWaitForInjected!( + projectId, + message, + { timeoutMs: DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS } + ) + return Promise.resolve() + }) + return injectedConfirmation ? { injectedConfirmation } : {} } - private wasRecentlyInjected(key: string, ttlMs = RECENT_INJECTION_TTL_MS): boolean { + private claimRecentInjection(key: string, ttlMs = RECENT_INJECTION_TTL_MS, provisional = false): boolean { const now = Date.now() - for (const [entryKey, expiresAt] of this.recentInjections.entries()) { - if (expiresAt <= now) this.recentInjections.delete(entryKey) + for (const [entryKey, entry] of this.recentInjections.entries()) { + if (entry.expiresAt <= now) this.recentInjections.delete(entryKey) } - if (this.recentInjections.has(key)) return true - this.recentInjections.set(key, now + ttlMs) - return false + if (this.recentInjections.has(key)) return false + this.recentInjections.set(key, { expiresAt: now + ttlMs, provisional }) + return true } private claimSlackLogicalInjection( key: string, contextPreview: EventContextPreview | undefined, - ttlMs: number - ): boolean { + ttlMs: number, + provisional: boolean + ): { claimed: boolean; contentHash?: string } { const now = Date.now() for (const [entryKey, entry] of this.slackLogicalInjections.entries()) { if (entry.expiresAt <= now) this.slackLogicalInjections.delete(entryKey) @@ -2582,33 +2693,106 @@ export class IntegrationEventBridge { const existing = this.slackLogicalInjections.get(key) if (existing) { if (!contentHash) { - return false + if ( + existing.committedBlind || + existing.provisionalBlind || + existing.committedContentHashes.size > 0 || + existing.provisionalContentHashes.size > 0 + ) { + return { claimed: false } + } + existing.provisionalBlind = provisional + existing.committedBlind = !provisional + existing.expiresAt = now + ttlMs + return { claimed: true } } - if (!existing.contentHashes) { + if ( + (existing.committedBlind || existing.provisionalBlind) && + existing.committedContentHashes.size === 0 && + existing.provisionalContentHashes.size === 0 + ) { // A blind claim (context read returned nothing) suppresses the late // content-bearing alias copy, but must learn its hash so a genuine // edit afterwards still injects instead of matching the blind claim. - existing.contentHashes = new Set([contentHash]) - return false + if (existing.provisionalBlind) { + existing.provisionalContentHashes.add(contentHash) + } else { + existing.committedContentHashes.add(contentHash) + } + existing.expiresAt = now + ttlMs + return { claimed: false } } - if (existing.contentHashes.has(contentHash)) { - return false + if ( + existing.committedContentHashes.has(contentHash) || + existing.provisionalContentHashes.has(contentHash) + ) { + return { claimed: false } + } + if (provisional) { + existing.provisionalContentHashes.add(contentHash) + } else { + existing.committedContentHashes.add(contentHash) } - existing.contentHashes.add(contentHash) existing.expiresAt = now + ttlMs - return true + return { claimed: true, contentHash } } this.slackLogicalInjections.set(key, { expiresAt: now + ttlMs, - contentHashes: contentHash ? new Set([contentHash]) : undefined + committedBlind: !contentHash && !provisional, + committedContentHashes: !provisional && contentHash ? new Set([contentHash]) : new Set(), + provisionalBlind: !contentHash && provisional, + provisionalContentHashes: provisional && contentHash ? new Set([contentHash]) : new Set() }) - return true + return { claimed: true, contentHash } + } + + private commitDedupeKey(claim: DeliveryDedupeClaim): void { + const now = Date.now() + if (!claim.isSlackLogicalKey) { + const entry = this.recentInjections.get(claim.key) + if (entry) { + entry.provisional = false + entry.expiresAt = now + claim.ttlMs + } + return + } + + const entry = this.slackLogicalInjections.get(claim.key) + if (!entry) return + if (claim.contentHash) { + if (entry.provisionalContentHashes.delete(claim.contentHash)) { + entry.committedContentHashes.add(claim.contentHash) + } + } else if (entry.provisionalBlind) { + entry.provisionalBlind = false + entry.committedBlind = true + for (const contentHash of entry.provisionalContentHashes) { + entry.committedContentHashes.add(contentHash) + } + entry.provisionalContentHashes.clear() + } + entry.expiresAt = now + claim.ttlMs } - private releaseDedupeKey(key: string, isSlackLogicalKey: boolean): void { + private releaseDedupeKey(key: string, isSlackLogicalKey: boolean, contentHash?: string): void { if (isSlackLogicalKey) { - this.slackLogicalInjections.delete(key) + const entry = this.slackLogicalInjections.get(key) + if (!entry) return + if (contentHash) { + entry.provisionalContentHashes.delete(contentHash) + } else { + entry.provisionalBlind = false + entry.provisionalContentHashes.clear() + } + if ( + !entry.committedBlind && + !entry.provisionalBlind && + entry.committedContentHashes.size === 0 && + entry.provisionalContentHashes.size === 0 + ) { + this.slackLogicalInjections.delete(key) + } } else { this.recentInjections.delete(key) }