Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion src/main/__tests__/integration-event-bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ function makeHarness(
beforeEach(() => {
resetIntegrationEventTelemetryForTests()
delete process.env.PEAR_INTEGRATION_EVENTS_DEBUG
delete process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS
})

test('relayfile sdk path filters broaden partial-segment Slack DM globs', () => {
Expand Down Expand Up @@ -909,6 +910,51 @@ test('slack unchanged-content replay re-drives after injected delivery is not co
assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice'])
})

test('content-present slack replay waits for hung injected delivery then re-drives after timeout release', async () => {
process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS = '20'
const options = { waitForInjectedNeverSettles: true }
const harness = makeHarness(['slack-comms'], options)
const warnCalls: unknown[][] = []
const originalWarn = console.warn
console.warn = (...args: unknown[]) => {
warnCalls.push(args)
}

try {
await withMockedNow('2026-06-05T14:00:00.000Z', async () => {
await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: { notifyAgents: ['slack-comms'] }
})
])
})

const path = '/slack/channels/C123ABC__proj-cloud/threads/1780893336_601259/replies/1780893336_601259.json'
await harness.emit(changeEvent(path, 'slack'))
await waitForSent(harness, 1)

// A replay of the same human message arrives while the original steer has
// been accepted but has not produced delivery_injected. It must not be
// finalized as a duplicate skip; after timeout releases the provisional
// claim, this replay re-drives delivery.
options.waitForInjectedNeverSettles = false
await harness.emit(changeEvent(path, 'slack'))
await waitForSent(harness, 2, 1_500)
await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] delivery injected confirmation failed'))
} finally {
console.warn = originalWarn
}

assert.deepEqual(harness.sent.map((message) => message.input.to), ['slack-comms', 'slack-comms'])
assert.match(harness.sent[0].input.text, /Message:\ntargeted Slack context/u)
assert.match(harness.sent[1].input.text, /Message:\ntargeted Slack context/u)
assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsDropped || 0, 0)
assert.equal(getIntegrationEventTelemetrySnapshot().projects['project-1']?.eventsInjected, 1)
})

test('slack unchanged-content replay is suppressed after injected delivery commits', async () => {
const harness = makeHarness(['alice'])

Expand Down Expand Up @@ -1965,7 +2011,7 @@ test('integration event delivery failures use aggregated warn cadence by default
assert.equal(telemetry.brokerSendsDeferred >= 0, true)
assert.deepEqual({ ...telemetry, brokerSendsDeferred: 0 }, {
eventsReceived: 26,
eventsInjected: 26,
eventsInjected: 0,
eventsCoalesced: 0,
eventsDropped: 0,
brokerSends: 26,
Expand Down
122 changes: 103 additions & 19 deletions src/main/integration-event-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const MAX_DISPATCH_SUMMARY_GROUPS = 10
const MAX_DISPATCHED_EVENTS_PER_SECOND = 25
const PROJECT_AGENT_RECIPIENT_CACHE_TTL_MS = 2_000
const MAX_BROKER_SENDS_PER_SECOND = 25
const DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS = 5_000
const DEFAULT_DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS = 5_000
const REMOTE_STREAM_ERROR_POLLING_FALLBACK_THRESHOLD = 5
const REMOTE_STREAM_POLL_INTERVAL_MS = 5_000

Expand Down Expand Up @@ -118,6 +118,13 @@ type DeliveryDedupeClaim = {
contentHash?: string
}

type DeliveryDedupeClaimOutcome = 'committed' | 'released'

type InFlightDedupeClaim = {
promise: Promise<DeliveryDedupeClaimOutcome>
settle: (outcome: DeliveryDedupeClaimOutcome) => void
}

type DispatchItem = {
event: ChangeEvent
specs: SubscriptionSpec[]
Expand Down Expand Up @@ -363,6 +370,21 @@ function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}

function deliveryInjectedConfirmationTimeoutMs(): number {
const value = Number.parseInt(process.env.PEAR_INTEGRATION_EVENT_INJECTED_CONFIRMATION_TIMEOUT_MS || '', 10)
return Number.isFinite(value) && value > 0 ? value : DEFAULT_DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS
}

function withTimeout<T>(promise: Promise<T>, timeoutMs: number, message: string): Promise<T> {
let timer: ReturnType<typeof setTimeout> | undefined
const timeout = new Promise<never>((_resolve, reject) => {
timer = setTimeout(() => reject(new Error(message)), timeoutMs)
})
return Promise.race([promise, timeout]).finally(() => {
if (timer) clearTimeout(timer)
})
}

function isRecord(value: unknown): value is Record<string, unknown> {
return !!value && typeof value === 'object' && !Array.isArray(value)
}
Expand Down Expand Up @@ -2111,6 +2133,7 @@ export class IntegrationEventBridge {
private dispatchers = new Map<string, ProjectEventDispatcher>()
private recentInjections = new Map<string, RecentInjectionState>()
private slackLogicalInjections = new Map<string, SlackLogicalInjectionState>()
private inFlightDedupeClaims = new Map<string, Set<Promise<DeliveryDedupeClaimOutcome>>>()
private projectAgentRecipientCache = new Map<string, ProjectAgentRecipientCacheEntry>()
private notificationTargetCache = new Map<string, NotificationTargetCacheEntry>()
private brokerSendPacers = new Map<string, ProjectBrokerSendPacer>()
Expand Down Expand Up @@ -2473,13 +2496,18 @@ export class IntegrationEventBridge {
dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint)
const slackClaim = this.claimSlackLogicalInjection(dedupe.key, contextPreview, dedupe.ttlMs, shouldTrackDedupe)
if (!slackClaim.claimed) {
const inFlightOutcome = await this.waitForInFlightDedupeClaims(dedupe.key)
if (inFlightOutcome === 'released') {
logIntegrationEvent('retrying duplicate path after unconfirmed delivery', {
projectId,
eventId: event.id,
path: event.resource.path,
duplicateKey: dedupe.key
})
return this.injectEvent(projectId, event, matchedSpecs)
}
incrementIntegrationEventCounter(projectId, 'eventsDropped')
logIntegrationEvent('skipped duplicate path', {
projectId,
eventId: event.id,
path: event.resource.path,
duplicateKey: dedupe.key
})
this.reportSkippedDuplicatePath(projectId, event, dedupe.key)
return
}
dedupeClaimed = shouldTrackDedupe
Expand All @@ -2493,13 +2521,18 @@ export class IntegrationEventBridge {
}
} else if (shouldTrackDedupe) {
if (!this.claimRecentInjection(dedupe.key, dedupe.ttlMs, true)) {
const inFlightOutcome = await this.waitForInFlightDedupeClaims(dedupe.key)
if (inFlightOutcome === 'released') {
logIntegrationEvent('retrying duplicate path after unconfirmed delivery', {
projectId,
eventId: event.id,
path: event.resource.path,
duplicateKey: dedupe.key
})
return this.injectEvent(projectId, event, matchedSpecs)
}
incrementIntegrationEventCounter(projectId, 'eventsDropped')
logIntegrationEvent('skipped duplicate path', {
projectId,
eventId: event.id,
path: event.resource.path,
duplicateKey: dedupe.key
})
this.reportSkippedDuplicatePath(projectId, event, dedupe.key)
return
}
dedupeClaimed = true
Expand All @@ -2514,6 +2547,7 @@ export class IntegrationEventBridge {
// relay/replay identity rather than pinning a local claim.
dedupeClaimed = false
}
const inFlightClaim = deliveryClaim ? this.trackInFlightDedupeClaim(deliveryClaim.key) : undefined
const contextPreviewData = contextPreview ? eventContextPreviewMetadata(contextPreview) : undefined
logIntegrationEvent('injecting', {
projectId,
Expand Down Expand Up @@ -2558,6 +2592,7 @@ export class IntegrationEventBridge {
// this event (remote copy of a local change, coalesced update) retries
// delivery instead of being dropped as a recent injection.
if (dedupeClaimed) this.releaseDedupeKey(dedupe.key, needsSlackContentAwareDedupe)
inFlightClaim?.settle('released')
throw sendErrors[0].error
}
if (sendErrors.length > 0) {
Expand All @@ -2574,9 +2609,11 @@ export class IntegrationEventBridge {
)
}
if (deliveryClaim && injectedConfirmations.length > 0) {
Promise.all(injectedConfirmations)
void Promise.all(injectedConfirmations)
.then(() => {
this.commitDedupeKey(deliveryClaim)
incrementIntegrationEventCounter(projectId, 'eventsInjected')
inFlightClaim?.settle('committed')
})
.catch((error) => {
this.releaseDedupeKey(deliveryClaim.key, deliveryClaim.isSlackLogicalKey, deliveryClaim.contentHash)
Expand All @@ -2591,11 +2628,27 @@ export class IntegrationEventBridge {
error: toErrorMessage(error)
}
)
inFlightClaim?.settle('released')
})
} else if (deliveryClaim) {
this.releaseDedupeKey(deliveryClaim.key, deliveryClaim.isSlackLogicalKey, deliveryClaim.contentHash)
inFlightClaim?.settle('released')
} else {
incrementIntegrationEventCounter(projectId, 'eventsInjected')
}
incrementIntegrationEventCounter(projectId, 'eventsInjected')
}

private reportSkippedDuplicatePath(projectId: string, event: ChangeEvent, duplicateKey: string): void {
warnIntegrationEventAggregated(
`skipped duplicate path:${projectId}`,
'skipped duplicate path',
{
projectId,
eventId: event.id,
path: event.resource.path,
duplicateKey
}
)
}

private async recipientsForMatchedSpecs(
Expand Down Expand Up @@ -2703,11 +2756,16 @@ export class IntegrationEventBridge {
}

let injectedConfirmation: Promise<unknown> | undefined
const timeoutMs = deliveryInjectedConfirmationTimeoutMs()
await pacer.enqueue(input, (message) => {
injectedConfirmation = bridge.sendMessageAndWaitForInjected!(
projectId,
message,
{ timeoutMs: DELIVERY_INJECTED_CONFIRMATION_TIMEOUT_MS }
injectedConfirmation = withTimeout(
bridge.sendMessageAndWaitForInjected!(
projectId,
message,
{ timeoutMs }
),
timeoutMs + 250,
`Timed out waiting for broker delivery_injected confirmation for ${message.to}`
)
return Promise.resolve()
})
Expand Down Expand Up @@ -2823,6 +2881,32 @@ export class IntegrationEventBridge {
entry.expiresAt = now + claim.ttlMs
}

private trackInFlightDedupeClaim(key: string): InFlightDedupeClaim {
let settle: (outcome: DeliveryDedupeClaimOutcome) => void = () => undefined
const promise = new Promise<DeliveryDedupeClaimOutcome>((resolve) => {
settle = resolve
})
let claims = this.inFlightDedupeClaims.get(key)
if (!claims) {
claims = new Set()
this.inFlightDedupeClaims.set(key, claims)
}
claims.add(promise)
void promise.finally(() => {
const current = this.inFlightDedupeClaims.get(key)
current?.delete(promise)
if (current?.size === 0) this.inFlightDedupeClaims.delete(key)
})
return { promise, settle }
}

private async waitForInFlightDedupeClaims(key: string): Promise<DeliveryDedupeClaimOutcome | null> {
const claims = Array.from(this.inFlightDedupeClaims.get(key) ?? [])
if (claims.length === 0) return null
const outcomes = await Promise.all(claims)
return outcomes.includes('released') ? 'released' : 'committed'
}

private releaseDedupeKey(key: string, isSlackLogicalKey: boolean, contentHash?: string): void {
if (isSlackLogicalKey) {
const entry = this.slackLogicalInjections.get(key)
Expand Down
Loading