From a6738e433bc9fd67d8d77bf890a029fd0363ca05 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Mon, 8 Jun 2026 08:00:58 +0200 Subject: [PATCH] Fix integration event dedupe release on injection timeout --- .../integration-event-bridge.test.ts | 48 ++++++- src/main/integration-event-bridge.ts | 122 +++++++++++++++--- 2 files changed, 150 insertions(+), 20 deletions(-) diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 38d3627..b8c506e 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -286,6 +286,7 @@ function makeHarness( beforeEach(() => { resetIntegrationEventTelemetryForTests() delete process.env.PEAR_INTEGRATION_EVENTS_DEBUG + delete process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS }) test('relayfile sdk path filters broaden partial-segment Slack DM globs', () => { @@ -909,6 +910,51 @@ test('slack unchanged-content replay re-drives after injected delivery is not co assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice']) }) +test('content-present slack replay waits for hung injected delivery then re-drives after timeout release', async () => { + process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS = '20' + const options = { waitForInjectedNeverSettles: true } + const harness = makeHarness(['slack-comms'], options) + const warnCalls: unknown[][] = [] + const originalWarn = console.warn + console.warn = (...args: unknown[]) => { + warnCalls.push(args) + } + + try { + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['slack-comms'] } + }) + ]) + }) + + const path = '/slack/channels/C123ABC__proj-cloud/threads/1780893336_601259/replies/1780893336_601259.json' + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 1) + + // A replay of the same human message arrives while the original steer has + // been accepted but has not produced delivery_injected. It must not be + // finalized as a duplicate skip; after timeout releases the provisional + // claim, this replay re-drives delivery. + options.waitForInjectedNeverSettles = false + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 2, 1_500) + await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] delivery injected confirmation failed')) + } finally { + console.warn = originalWarn + } + + assert.deepEqual(harness.sent.map((message) => message.input.to), ['slack-comms', 'slack-comms']) + assert.match(harness.sent[0].input.text, /Message:\ntargeted Slack context/u) + assert.match(harness.sent[1].input.text, /Message:\ntargeted Slack context/u) + assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped || 0, 0) + assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsInjected, 1) +}) + test('slack unchanged-content replay is suppressed after injected delivery commits', async () => { const harness = makeHarness(['alice']) @@ -1965,7 +2011,7 @@ test('integration event delivery failures use aggregated warn cadence by default assert.equal(telemetry.brokerSendsDeferred >= 0, true) assert.deepEqual({ ...telemetry, brokerSendsDeferred: 0 }, { eventsReceived: 26, - eventsInjected: 26, + eventsInjected: 0, eventsCoalesced: 0, eventsDropped: 0, brokerSends: 26, diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 7f96e3c..931f6d2 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -50,7 +50,7 @@ const MAX_DISPATCH_SUMMARY_GROUPS = 10 const MAX_DISPATCHED_EVENTS_PER_SECOND = 25 const PROJECT_AGENT_RECIPIENT_CACHE_TTL_MS = 2_000 const MAX_BROKER_SENDS_PER_SECOND = 25 -const DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS = 5_000 +const DEFAULT_DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS = 5_000 const REMOTE_STREAM_ERROR_POLLING_FALLBACK_THRESHOLD = 5 const REMOTE_STREAM_POLL_INTERVAL_MS = 5_000 @@ -118,6 +118,13 @@ type DeliveryDedupeClaim = { contentHash?: string } +type DeliveryDedupeClaimOutcome = 'committed' | 'released' + +type InFlightDedupeClaim = { + promise: Promise + settle: (outcome: DeliveryDedupeClaimOutcome) => void +} + type DispatchItem = { event: ChangeEvent specs: SubscriptionSpec[] @@ -363,6 +370,21 @@ function delay(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)) } +function deliveryInjectedConfirmationTimeoutMs(): number { + const value = Number.parseInt(process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS || '', 10) + return Number.isFinite(value) && value > 0 ? value : DEFAULT_DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS +} + +function withTimeout(promise: Promise, timeoutMs: number, message: string): Promise { + let timer: ReturnType | undefined + const timeout = new Promise((_resolve, reject) => { + timer = setTimeout(() => reject(new Error(message)), timeoutMs) + }) + return Promise.race([promise, timeout]).finally(() => { + if (timer) clearTimeout(timer) + }) +} + function isRecord(value: unknown): value is Record { return !!value && typeof value === 'object' && !Array.isArray(value) } @@ -2111,6 +2133,7 @@ export class IntegrationEventBridge { private dispatchers = new Map() private recentInjections = new Map() private slackLogicalInjections = new Map() + private inFlightDedupeClaims = new Map>>() private projectAgentRecipientCache = new Map() private notificationTargetCache = new Map() private brokerSendPacers = new Map() @@ -2473,13 +2496,18 @@ export class IntegrationEventBridge { dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint) const slackClaim = this.claimSlackLogicalInjection(dedupe.key, contextPreview, dedupe.ttlMs, shouldTrackDedupe) if (!slackClaim.claimed) { + const inFlightOutcome = await this.waitForInFlightDedupeClaims(dedupe.key) + if (inFlightOutcome === 'released') { + logIntegrationEvent('retrying duplicate path after unconfirmed delivery', { + projectId, + eventId: event.id, + path: event.resource.path, + duplicateKey: dedupe.key + }) + return this.injectEvent(projectId, event, matchedSpecs) + } incrementIntegrationEventCounter(projectId, 'eventsDropped') - logIntegrationEvent('skipped duplicate path', { - projectId, - eventId: event.id, - path: event.resource.path, - duplicateKey: dedupe.key - }) + this.reportSkippedDuplicatePath(projectId, event, dedupe.key) return } dedupeClaimed = shouldTrackDedupe @@ -2493,13 +2521,18 @@ export class IntegrationEventBridge { } } else if (shouldTrackDedupe) { if (!this.claimRecentInjection(dedupe.key, dedupe.ttlMs, true)) { + const inFlightOutcome = await this.waitForInFlightDedupeClaims(dedupe.key) + if (inFlightOutcome === 'released') { + logIntegrationEvent('retrying duplicate path after unconfirmed delivery', { + projectId, + eventId: event.id, + path: event.resource.path, + duplicateKey: dedupe.key + }) + return this.injectEvent(projectId, event, matchedSpecs) + } incrementIntegrationEventCounter(projectId, 'eventsDropped') - logIntegrationEvent('skipped duplicate path', { - projectId, - eventId: event.id, - path: event.resource.path, - duplicateKey: dedupe.key - }) + this.reportSkippedDuplicatePath(projectId, event, dedupe.key) return } dedupeClaimed = true @@ -2514,6 +2547,7 @@ export class IntegrationEventBridge { // relay/replay identity rather than pinning a local claim. dedupeClaimed = false } + const inFlightClaim = deliveryClaim ? this.trackInFlightDedupeClaim(deliveryClaim.key) : undefined const contextPreviewData = contextPreview ? eventContextPreviewMetadata(contextPreview) : undefined logIntegrationEvent('injecting', { projectId, @@ -2558,6 +2592,7 @@ export class IntegrationEventBridge { // this event (remote copy of a local change, coalesced update) retries // delivery instead of being dropped as a recent injection. if (dedupeClaimed) this.releaseDedupeKey(dedupe.key, needsSlackContentAwareDedupe) + inFlightClaim?.settle('released') throw sendErrors[0].error } if (sendErrors.length > 0) { @@ -2574,9 +2609,11 @@ export class IntegrationEventBridge { ) } if (deliveryClaim && injectedConfirmations.length > 0) { - Promise.all(injectedConfirmations) + void Promise.all(injectedConfirmations) .then(() => { this.commitDedupeKey(deliveryClaim) + incrementIntegrationEventCounter(projectId, 'eventsInjected') + inFlightClaim?.settle('committed') }) .catch((error) => { this.releaseDedupeKey(deliveryClaim.key, deliveryClaim.isSlackLogicalKey, deliveryClaim.contentHash) @@ -2591,11 +2628,27 @@ export class IntegrationEventBridge { error: toErrorMessage(error) } ) + inFlightClaim?.settle('released') }) } else if (deliveryClaim) { this.releaseDedupeKey(deliveryClaim.key, deliveryClaim.isSlackLogicalKey, deliveryClaim.contentHash) + inFlightClaim?.settle('released') + } else { + incrementIntegrationEventCounter(projectId, 'eventsInjected') } - incrementIntegrationEventCounter(projectId, 'eventsInjected') + } + + private reportSkippedDuplicatePath(projectId: string, event: ChangeEvent, duplicateKey: string): void { + warnIntegrationEventAggregated( + `skipped duplicate path:${projectId}`, + 'skipped duplicate path', + { + projectId, + eventId: event.id, + path: event.resource.path, + duplicateKey + } + ) } private async recipientsForMatchedSpecs( @@ -2703,11 +2756,16 @@ export class IntegrationEventBridge { } let injectedConfirmation: Promise | undefined + const timeoutMs = deliveryInjectedConfirmationTimeoutMs() await pacer.enqueue(input, (message) => { - injectedConfirmation = bridge.sendMessageAndWaitForInjected!( - projectId, - message, - { timeoutMs: DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS } + injectedConfirmation = withTimeout( + bridge.sendMessageAndWaitForInjected!( + projectId, + message, + { timeoutMs } + ), + timeoutMs + 250, + `Timed out waiting for broker delivery_injected confirmation for ${message.to}` ) return Promise.resolve() }) @@ -2823,6 +2881,32 @@ export class IntegrationEventBridge { entry.expiresAt = now + claim.ttlMs } + private trackInFlightDedupeClaim(key: string): InFlightDedupeClaim { + let settle: (outcome: DeliveryDedupeClaimOutcome) => void = () => undefined + const promise = new Promise((resolve) => { + settle = resolve + }) + let claims = this.inFlightDedupeClaims.get(key) + if (!claims) { + claims = new Set() + this.inFlightDedupeClaims.set(key, claims) + } + claims.add(promise) + void promise.finally(() => { + const current = this.inFlightDedupeClaims.get(key) + current?.delete(promise) + if (current?.size === 0) this.inFlightDedupeClaims.delete(key) + }) + return { promise, settle } + } + + private async waitForInFlightDedupeClaims(key: string): Promise { + const claims = Array.from(this.inFlightDedupeClaims.get(key) ?? []) + if (claims.length === 0) return null + const outcomes = await Promise.all(claims) + return outcomes.includes('released') ? 'released' : 'committed' + } + private releaseDedupeKey(key: string, isSlackLogicalKey: boolean, contentHash?: string): void { if (isSlackLogicalKey) { const entry = this.slackLogicalInjections.get(key)