Skip to content

Commit ee1e6c8

Browse files
kjgbotkjgbot
andauthored
Fix integration event dedupe release on injection timeout (#157)
Co-authored-by: kjgbot <kjgbot@agentrelay.dev>
1 parent a8a32d6 commit ee1e6c8

2 files changed

Lines changed: 150 additions & 20 deletions

File tree

src/main/__tests__/integration-event-bridge.test.ts

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ function makeHarness(
289289
beforeEach(() => {
290290
resetIntegrationEventTelemetryForTests()
291291
delete process.env.PEAR_INTEGRATION_EVENTS_DEBUG
292+
delete process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS
292293
})
293294

294295
test('relayfile sdk path filters broaden partial-segment Slack DM globs', () => {
@@ -1088,6 +1089,51 @@ test('slack unchanged-content replay re-drives after injected delivery is not co
10881089
assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice'])
10891090
})
10901091

1092+
test('content-present slack replay waits for hung injected delivery then re-drives after timeout release', async () => {
1093+
process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS = '20'
1094+
const options = { waitForInjectedNeverSettles: true }
1095+
const harness = makeHarness(['slack-comms'], options)
1096+
const warnCalls: unknown[][] = []
1097+
const originalWarn = console.warn
1098+
console.warn = (...args: unknown[]) => {
1099+
warnCalls.push(args)
1100+
}
1101+
1102+
try {
1103+
await withMockedNow('2026-06-05T14:00:00.000Z', async () => {
1104+
await harness.bridge.reconcile('project-1', [
1105+
integration({
1106+
provider: 'slack',
1107+
integrationId: 'slack-1',
1108+
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
1109+
scope: { notifyAgents: ['slack-comms'] }
1110+
})
1111+
])
1112+
})
1113+
1114+
const path = '/slack/channels/C123ABC__proj-cloud/threads/1780893336_601259/replies/1780893336_601259.json'
1115+
await harness.emit(changeEvent(path, 'slack'))
1116+
await waitForSent(harness, 1)
1117+
1118+
// A replay of the same human message arrives while the original steer has
1119+
// been accepted but has not produced delivery_injected. It must not be
1120+
// finalized as a duplicate skip; after timeout releases the provisional
1121+
// claim, this replay re-drives delivery.
1122+
options.waitForInjectedNeverSettles = false
1123+
await harness.emit(changeEvent(path, 'slack'))
1124+
await waitForSent(harness, 2, 1_500)
1125+
await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] delivery injected confirmation failed'))
1126+
} finally {
1127+
console.warn = originalWarn
1128+
}
1129+
1130+
assert.deepEqual(harness.sent.map((message) => message.input.to), ['slack-comms', 'slack-comms'])
1131+
assert.match(harness.sent[0].input.text, /Message:\ntargeted Slack context/u)
1132+
assert.match(harness.sent[1].input.text, /Message:\ntargeted Slack context/u)
1133+
assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped || 0, 0)
1134+
assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsInjected, 1)
1135+
})
1136+
10911137
test('slack unchanged-content replay is suppressed after injected delivery commits', async () => {
10921138
const harness = makeHarness(['alice'])
10931139

@@ -2144,7 +2190,7 @@ test('integration event delivery failures use aggregated warn cadence by default
21442190
assert.equal(telemetry.brokerSendsDeferred >= 0, true)
21452191
assert.deepEqual({ ...telemetry, brokerSendsDeferred: 0 }, {
21462192
eventsReceived: 26,
2147-
eventsInjected: 26,
2193+
eventsInjected: 0,
21482194
eventsCoalesced: 0,
21492195
eventsDropped: 0,
21502196
brokerSends: 26,

src/main/integration-event-bridge.ts

Lines changed: 103 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ const MAX_DISPATCH_SUMMARY_GROUPS = 10
5656
const MAX_DISPATCHED_EVENTS_PER_SECOND = 25
5757
const PROJECT_AGENT_RECIPIENT_CACHE_TTL_MS = 2_000
5858
const MAX_BROKER_SENDS_PER_SECOND = 25
59-
const DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS = 5_000
59+
const DEFAULT_DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS = 5_000
6060
const REMOTE_STREAM_ERROR_POLLING_FALLBACK_THRESHOLD = 5
6161
const REMOTE_STREAM_POLL_INTERVAL_MS = 5_000
6262

@@ -130,6 +130,13 @@ type DeliveryDedupeClaim = {
130130
contentHash?: string
131131
}
132132

133+
type DeliveryDedupeClaimOutcome = 'committed' | 'released'
134+
135+
type InFlightDedupeClaim = {
136+
promise: Promise<DeliveryDedupeClaimOutcome>
137+
settle: (outcome: DeliveryDedupeClaimOutcome) => void
138+
}
139+
133140
type DispatchItem = {
134141
event: ChangeEvent
135142
specs: SubscriptionSpec[]
@@ -375,6 +382,21 @@ function delay(ms: number): Promise<void> {
375382
return new Promise((resolve) => setTimeout(resolve, ms))
376383
}
377384

385+
function deliveryInjectedConfirmationTimeoutMs(): number {
386+
const value = Number.parseInt(process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS || '', 10)
387+
return Number.isFinite(value) && value > 0 ? value : DEFAULT_DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS
388+
}
389+
390+
function withTimeout<T>(promise: Promise<T>, timeoutMs: number, message: string): Promise<T> {
391+
let timer: ReturnType<typeof setTimeout> | undefined
392+
const timeout = new Promise<never>((_resolve, reject) => {
393+
timer = setTimeout(() => reject(new Error(message)), timeoutMs)
394+
})
395+
return Promise.race([promise, timeout]).finally(() => {
396+
if (timer) clearTimeout(timer)
397+
})
398+
}
399+
378400
function isRecord(value: unknown): value is Record<string, unknown> {
379401
return !!value && typeof value === 'object' && !Array.isArray(value)
380402
}
@@ -2215,6 +2237,7 @@ export class IntegrationEventBridge {
22152237
private dispatchers = new Map<string, ProjectEventDispatcher>()
22162238
private recentInjections = new Map<string, RecentInjectionState>()
22172239
private slackLogicalInjections = new Map<string, SlackLogicalInjectionState>()
2240+
private inFlightDedupeClaims = new Map<string, Set<Promise<DeliveryDedupeClaimOutcome>>>()
22182241
private projectAgentRecipientCache = new Map<string, ProjectAgentRecipientCacheEntry>()
22192242
private notificationTargetCache = new Map<string, NotificationTargetCacheEntry>()
22202243
private brokerSendPacers = new Map<string, ProjectBrokerSendPacer>()
@@ -2584,13 +2607,18 @@ export class IntegrationEventBridge {
25842607
dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint)
25852608
const slackClaim = this.claimSlackLogicalInjection(dedupe.key, contextPreview, dedupe.ttlMs, shouldTrackDedupe)
25862609
if (!slackClaim.claimed) {
2610+
const inFlightOutcome = await this.waitForInFlightDedupeClaims(dedupe.key)
2611+
if (inFlightOutcome === 'released') {
2612+
logIntegrationEvent('retrying duplicate path after unconfirmed delivery', {
2613+
projectId,
2614+
eventId: event.id,
2615+
path: event.resource.path,
2616+
duplicateKey: dedupe.key
2617+
})
2618+
return this.injectEvent(projectId, event, matchedSpecs)
2619+
}
25872620
incrementIntegrationEventCounter(projectId, 'eventsDropped')
2588-
logIntegrationEvent('skipped duplicate path', {
2589-
projectId,
2590-
eventId: event.id,
2591-
path: event.resource.path,
2592-
duplicateKey: dedupe.key
2593-
})
2621+
this.reportSkippedDuplicatePath(projectId, event, dedupe.key)
25942622
return
25952623
}
25962624
dedupeClaimed = shouldTrackDedupe
@@ -2604,13 +2632,18 @@ export class IntegrationEventBridge {
26042632
}
26052633
} else if (shouldTrackDedupe) {
26062634
if (!this.claimRecentInjection(dedupe.key, dedupe.ttlMs, true)) {
2635+
const inFlightOutcome = await this.waitForInFlightDedupeClaims(dedupe.key)
2636+
if (inFlightOutcome === 'released') {
2637+
logIntegrationEvent('retrying duplicate path after unconfirmed delivery', {
2638+
projectId,
2639+
eventId: event.id,
2640+
path: event.resource.path,
2641+
duplicateKey: dedupe.key
2642+
})
2643+
return this.injectEvent(projectId, event, matchedSpecs)
2644+
}
26072645
incrementIntegrationEventCounter(projectId, 'eventsDropped')
2608-
logIntegrationEvent('skipped duplicate path', {
2609-
projectId,
2610-
eventId: event.id,
2611-
path: event.resource.path,
2612-
duplicateKey: dedupe.key
2613-
})
2646+
this.reportSkippedDuplicatePath(projectId, event, dedupe.key)
26142647
return
26152648
}
26162649
dedupeClaimed = true
@@ -2625,6 +2658,7 @@ export class IntegrationEventBridge {
26252658
// relay/replay identity rather than pinning a local claim.
26262659
dedupeClaimed = false
26272660
}
2661+
const inFlightClaim = deliveryClaim ? this.trackInFlightDedupeClaim(deliveryClaim.key) : undefined
26282662
const contextPreviewData = contextPreview ? eventContextPreviewMetadata(contextPreview) : undefined
26292663
const resolvedResource = isRecord(event.resource)
26302664
? { ...event.resource, path: resolvedPath }
@@ -2672,6 +2706,7 @@ export class IntegrationEventBridge {
26722706
// this event (remote copy of a local change, coalesced update) retries
26732707
// delivery instead of being dropped as a recent injection.
26742708
if (dedupeClaimed) this.releaseDedupeKey(dedupe.key, needsSlackContentAwareDedupe)
2709+
inFlightClaim?.settle('released')
26752710
throw sendErrors[0].error
26762711
}
26772712
if (sendErrors.length > 0) {
@@ -2688,9 +2723,11 @@ export class IntegrationEventBridge {
26882723
)
26892724
}
26902725
if (deliveryClaim && injectedConfirmations.length > 0) {
2691-
Promise.all(injectedConfirmations)
2726+
void Promise.all(injectedConfirmations)
26922727
.then(() => {
26932728
this.commitDedupeKey(deliveryClaim)
2729+
incrementIntegrationEventCounter(projectId, 'eventsInjected')
2730+
inFlightClaim?.settle('committed')
26942731
})
26952732
.catch((error) => {
26962733
this.releaseDedupeKey(deliveryClaim.key, deliveryClaim.isSlackLogicalKey, deliveryClaim.contentHash)
@@ -2705,11 +2742,27 @@ export class IntegrationEventBridge {
27052742
error: toErrorMessage(error)
27062743
}
27072744
)
2745+
inFlightClaim?.settle('released')
27082746
})
27092747
} else if (deliveryClaim) {
27102748
this.releaseDedupeKey(deliveryClaim.key, deliveryClaim.isSlackLogicalKey, deliveryClaim.contentHash)
2749+
inFlightClaim?.settle('released')
2750+
} else {
2751+
incrementIntegrationEventCounter(projectId, 'eventsInjected')
27112752
}
2712-
incrementIntegrationEventCounter(projectId, 'eventsInjected')
2753+
}
2754+
2755+
private reportSkippedDuplicatePath(projectId: string, event: ChangeEvent, duplicateKey: string): void {
2756+
warnIntegrationEventAggregated(
2757+
`skipped duplicate path:${projectId}`,
2758+
'skipped duplicate path',
2759+
{
2760+
projectId,
2761+
eventId: event.id,
2762+
path: event.resource.path,
2763+
duplicateKey
2764+
}
2765+
)
27132766
}
27142767

27152768
private async recipientsForMatchedSpecs(
@@ -2817,11 +2870,16 @@ export class IntegrationEventBridge {
28172870
}
28182871

28192872
let injectedConfirmation: Promise<unknown> | undefined
2873+
const timeoutMs = deliveryInjectedConfirmationTimeoutMs()
28202874
await pacer.enqueue(input, (message) => {
2821-
injectedConfirmation = bridge.sendMessageAndWaitForInjected!(
2822-
projectId,
2823-
message,
2824-
{ timeoutMs: DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS }
2875+
injectedConfirmation = withTimeout(
2876+
bridge.sendMessageAndWaitForInjected!(
2877+
projectId,
2878+
message,
2879+
{ timeoutMs }
2880+
),
2881+
timeoutMs + 250,
2882+
`Timed out waiting for broker delivery_injected confirmation for ${message.to}`
28252883
)
28262884
return Promise.resolve()
28272885
})
@@ -2937,6 +2995,32 @@ export class IntegrationEventBridge {
29372995
entry.expiresAt = now + claim.ttlMs
29382996
}
29392997

2998+
private trackInFlightDedupeClaim(key: string): InFlightDedupeClaim {
2999+
let settle: (outcome: DeliveryDedupeClaimOutcome) => void = () => undefined
3000+
const promise = new Promise<DeliveryDedupeClaimOutcome>((resolve) => {
3001+
settle = resolve
3002+
})
3003+
let claims = this.inFlightDedupeClaims.get(key)
3004+
if (!claims) {
3005+
claims = new Set()
3006+
this.inFlightDedupeClaims.set(key, claims)
3007+
}
3008+
claims.add(promise)
3009+
void promise.finally(() => {
3010+
const current = this.inFlightDedupeClaims.get(key)
3011+
current?.delete(promise)
3012+
if (current?.size === 0) this.inFlightDedupeClaims.delete(key)
3013+
})
3014+
return { promise, settle }
3015+
}
3016+
3017+
private async waitForInFlightDedupeClaims(key: string): Promise<DeliveryDedupeClaimOutcome | null> {
3018+
const claims = Array.from(this.inFlightDedupeClaims.get(key) ?? [])
3019+
if (claims.length === 0) return null
3020+
const outcomes = await Promise.all(claims)
3021+
return outcomes.includes('released') ? 'released' : 'committed'
3022+
}
3023+
29403024
private releaseDedupeKey(key: string, isSlackLogicalKey: boolean, contentHash?: string): void {
29413025
if (isSlackLogicalKey) {
29423026
const entry = this.slackLogicalInjections.get(key)

0 commit comments

Comments
 (0)