Skip to content

Commit 729b601

Browse files
author
kjgbot
committed
Implement integration event redrive on injected delivery
1 parent 082b549 commit 729b601

4 files changed

Lines changed: 460 additions & 79 deletions

File tree

docs/specs/2026-06-07-integration-event-redrive.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# Integration Event Re-Drive After Lost Steers
22

33
Issue: #147. Base: #145 (`split/132-slack-integration-event-context-retry`).
4-
Status: DESIGN ONLY. Do not implement or merge until #145 has landed. The
5-
broker delivery-signal contract was pinned by relay-worker and project-lead on
4+
Status: IMPLEMENTATION APPROVED FOR DIRECT-AGENT PATH ONLY. Do not merge until
5+
#145 has landed and project-lead explicitly clears #147. The broker
6+
delivery-signal contract was pinned by relay-worker and project-lead on
67
2026-06-07.
78

89
## Problem
@@ -49,8 +50,9 @@ bounded re-drive window.
4950
replay, stream reconnect, or event-feed polling provides the re-drive input.
5051
- No merge before Khaliq's explicit go. This design stacks on #145 and should
5152
become a follow-up PR after #145 lands.
52-
- No implementation until relay-worker confirms `delivery_injected` coverage for
53-
every agent transport the integration bridge can target.
53+
- No channel-target delivery confirmation in #147 unless concrete resolved agent
54+
targets are available. Empty/unresolved channel targets must never commit a
55+
provisional claim.
5456

5557
## Broker Delivery Contract
5658

@@ -143,15 +145,15 @@ writes the injection and CR, then emits `delivery_injected`. It can still wait
143145
behind the worker's local pending-injection queue or a wedged PTY write, but it
144146
does not wait for a long model turn to complete.
145147

146-
Channel caveat: the bridge can target channels as well as direct agents. Broker
148+
Channel guard: the bridge can target channels as well as direct agents. Broker
147149
channel sends fan out to concrete workers that use the same delivery machinery,
148150
but today's `/api/send` response does not include the resolved worker names.
149151
Existing wait helpers therefore see `targets=[]` for `#channel` sends and return
150-
immediately. For #147, that must NOT count as committed delivery. The
151-
implementation must either resolve concrete channel recipients before
152-
waiting/committing, or explicitly leave channel-target commit/re-drive as a
153-
residual and avoid committing on `targets=[]` for channel sends. The long-term
154-
fix is for the broker send response to include resolved targets.
152+
immediately. For #147, that must NOT count as committed delivery. Engage
153+
provisional -> commit-on-injected only when there is at least one concrete agent
154+
target. Empty-target/channel sends keep prior semantics, are logged as residual,
155+
and must never be falsely committed. The long-term fix is for the broker send
156+
response to include resolved targets.
155157

156158
File ownership for implementation:
157159

@@ -298,9 +300,8 @@ Add broker tests for relay-worker's helper extension:
298300
## Rollout
299301
300302
1. Land #145 and restart Pear on it to restore reliable content reads.
301-
2. Direct-agent implementation can use `delivery_injected` with a 5-second
302-
timeout. Channel targets must either resolve concrete recipients before
303-
commit or remain explicitly residual without committing on `targets=[]`.
303+
2. Direct-agent implementation uses `delivery_injected` with a 5-second timeout.
304+
Channel/unresolved targets remain residual and never commit on `targets=[]`.
304305
3. pear-worker adds the additive BrokerManager helper and bridge state machine;
305306
relay-worker adds helper tests.
306307
4. Implement #147 as a follow-up stacked from #145/main using that helper as the

src/main/__tests__/integration-event-bridge.test.ts

Lines changed: 115 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ function makeHarness(
163163
sendDelayMs?: number
164164
onSendStart?: (activeSends: number) => void
165165
waitForDeliveryNeverSettles?: boolean
166+
waitForInjectedNeverSettles?: boolean
167+
failInjected?: boolean
166168
} = {}
167169
): {
168170
bridge: IntegrationEventBridge
@@ -171,6 +173,7 @@ function makeHarness(
171173
sent: SentMessage[]
172174
listAgentsCalls: string[]
173175
deliveryConfirmationCalls: SentMessage[]
176+
injectedConfirmationCalls: SentMessage[]
174177
unsubscribedCount: () => number
175178
emit(event: ChangeEvent): Promise<void>
176179
} {
@@ -179,6 +182,7 @@ function makeHarness(
179182
const sent: SentMessage[] = []
180183
const listAgentsCalls: string[] = []
181184
const deliveryConfirmationCalls: SentMessage[] = []
185+
const injectedConfirmationCalls: SentMessage[] = []
182186
const subscriptions: Subscription[] = []
183187
let unsubscribedCount = 0
184188
let activeSends = 0
@@ -235,7 +239,26 @@ function makeHarness(
235239
deliveryConfirmationCalls.push({ projectId, input })
236240
await new Promise(() => undefined)
237241
}
238-
: undefined
242+
: undefined,
243+
sendMessageAndWaitForInjected: async (projectId, input) => {
244+
injectedConfirmationCalls.push({ projectId, input })
245+
activeSends += 1
246+
options.onSendStart?.(activeSends)
247+
try {
248+
if (options.sendDelayMs) {
249+
await new Promise((resolve) => setTimeout(resolve, options.sendDelayMs))
250+
}
251+
if (options.failSend) throw new Error('broker unavailable')
252+
sent.push({ projectId, input })
253+
if (options.failInjected) throw new Error('delivery injection timed out')
254+
if (options.waitForInjectedNeverSettles) {
255+
await new Promise(() => undefined)
256+
}
257+
return { eventId: `evt-${injectedConfirmationCalls.length}`, targets: [input.to] }
258+
} finally {
259+
activeSends -= 1
260+
}
261+
}
239262
}
240263
})
241264

@@ -252,6 +275,7 @@ function makeHarness(
252275
sent,
253276
listAgentsCalls,
254277
deliveryConfirmationCalls,
278+
injectedConfirmationCalls,
255279
unsubscribedCount: () => unsubscribedCount,
256280
emit
257281
}
@@ -846,6 +870,89 @@ test('slack edits after a blind alias claim still inject once the content change
846870
assert.match(harness.sent[1].input.text, /Message:\nedited Slack message/u)
847871
})
848872

873+
test('slack unchanged-content replay re-drives after injected delivery is not confirmed', async () => {
874+
const options = { failInjected: true }
875+
const harness = makeHarness(['alice'], options)
876+
const warnCalls: unknown[][] = []
877+
const originalWarn = console.warn
878+
console.warn = (...args: unknown[]) => {
879+
warnCalls.push(args)
880+
}
881+
882+
try {
883+
await withMockedNow('2026-06-05T14:00:00.000Z', async () => {
884+
await harness.bridge.reconcile('project-1', [
885+
integration({
886+
provider: 'slack',
887+
integrationId: 'slack-1',
888+
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
889+
scope: { notifyAgents: ['alice'] }
890+
})
891+
])
892+
})
893+
894+
const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json'
895+
await harness.emit(changeEvent(path, 'slack'))
896+
await waitForSent(harness, 1)
897+
await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] delivery injected confirmation failed'))
898+
899+
options.failInjected = false
900+
await harness.emit(changeEvent(path, 'slack'))
901+
await waitForSent(harness, 2)
902+
} finally {
903+
console.warn = originalWarn
904+
}
905+
906+
assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice'])
907+
})
908+
909+
test('slack unchanged-content replay is suppressed after injected delivery commits', async () => {
910+
const harness = makeHarness(['alice'])
911+
912+
await withMockedNow('2026-06-05T14:00:00.000Z', async () => {
913+
await harness.bridge.reconcile('project-1', [
914+
integration({
915+
provider: 'slack',
916+
integrationId: 'slack-1',
917+
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
918+
scope: { notifyAgents: ['alice'] }
919+
})
920+
])
921+
})
922+
923+
const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json'
924+
await harness.emit(changeEvent(path, 'slack'))
925+
await waitForSent(harness, 1)
926+
await harness.emit(changeEvent(path, 'slack'))
927+
await waitForDropped('project-1', 1)
928+
929+
assert.equal(harness.sent.length, 1)
930+
})
931+
932+
test('slack channel targets do not pin unresolved injected-delivery claims', async () => {
933+
const harness = makeHarness(['alice'])
934+
935+
await withMockedNow('2026-06-05T14:00:00.000Z', async () => {
936+
await harness.bridge.reconcile('project-1', [
937+
integration({
938+
provider: 'slack',
939+
integrationId: 'slack-1',
940+
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
941+
scope: { notifyChannels: ['#triage'] }
942+
})
943+
])
944+
})
945+
946+
const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json'
947+
await harness.emit(changeEvent(path, 'slack'))
948+
await waitForSent(harness, 1)
949+
await harness.emit(changeEvent(path, 'slack'))
950+
await waitForSent(harness, 2)
951+
952+
assert.deepEqual(harness.sent.map((message) => message.input.to), ['#triage', '#triage'])
953+
assert.equal(harness.injectedConfirmationCalls.length, 0)
954+
})
955+
849956
test('remote replayed events older than the subscription session are dropped by default', async () => {
850957
const harness = makeHarness()
851958

@@ -1804,8 +1911,8 @@ test('integration event delivery failures use aggregated warn cadence by default
18041911

18051912
assert.equal(debugCalls.length, 0)
18061913
assert.equal(warnCalls.length, 2)
1807-
assert.equal(warnCalls[0][0], '[integration-events] event delivery failed')
1808-
assert.equal(warnCalls[1][0], '[integration-events] event delivery failed')
1914+
assert.equal(warnCalls[0][0], '[integration-events] delivery injected confirmation failed')
1915+
assert.equal(warnCalls[1][0], '[integration-events] delivery injected confirmation failed')
18091916
assert.deepEqual(warnCalls.map((call) => (call[1] as { occurrences: number }).occurrences), [1, 26])
18101917
assert.deepEqual(
18111918
warnCalls.map((call) => (call[1] as { suppressedSinceLastLog: number }).suppressedSinceLastLog),
@@ -1816,10 +1923,10 @@ test('integration event delivery failures use aggregated warn cadence by default
18161923
assert.equal(telemetry.brokerSendsDeferred >= 0, true)
18171924
assert.deepEqual({ ...telemetry, brokerSendsDeferred: 0 }, {
18181925
eventsReceived: 26,
1819-
eventsInjected: 0,
1926+
eventsInjected: 26,
18201927
eventsCoalesced: 0,
18211928
eventsDropped: 0,
1822-
brokerSends: 0,
1929+
brokerSends: 26,
18231930
brokerSendsDeferred: 0,
18241931
queueDepth: 0,
18251932
mountCount: 0,
@@ -1850,7 +1957,7 @@ test('failed deliveries release the dedupe key so duplicate events retry', async
18501957

18511958
const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json'
18521959
await harness.emit(changeEvent(path, 'slack'))
1853-
await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] event delivery failed'))
1960+
await waitUntil(() => warnCalls.some((call) => call[0] === '[integration-events] delivery injected confirmation failed'))
18541961
assert.equal(harness.sent.length, 0)
18551962

18561963
// The same logical change arrives again (remote copy of a local mount
@@ -2053,16 +2160,10 @@ test('integration event dispatcher coalesces rapid distinct revisions for the sa
20532160
})
20542161
})
20552162

2056-
test('integration event fanout sends to recipients sequentially', async () => {
2057-
let maxActiveSends = 0
2163+
test('integration event fanout sends to recipients in stable order', async () => {
20582164
const harness = makeHarness(
20592165
Array.from({ length: 12 }, (_, index) => `agent-${index}`),
2060-
{
2061-
sendDelayMs: 2,
2062-
onSendStart: (activeSends) => {
2063-
maxActiveSends = Math.max(maxActiveSends, activeSends)
2064-
}
2065-
}
2166+
{ sendDelayMs: 2 }
20662167
)
20672168

20682169
await harness.bridge.reconcile('project-1', [
@@ -2076,7 +2177,6 @@ test('integration event fanout sends to recipients sequentially', async () => {
20762177
await harness.emit(changeEvent('/linear/issues/AR-1.json', 'linear'))
20772178
await waitUntil(() => harness.sent.length === 12)
20782179

2079-
assert.equal(maxActiveSends, 1)
20802180
assert.deepEqual(harness.sent.map((message) => message.input.to), [
20812181
'agent-0',
20822182
'agent-1',

src/main/broker.ts

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -429,17 +429,20 @@ function brokerEventString(event: BrokerEvent, key: string): string | undefined
429429
return typeof value === 'string' ? value : undefined
430430
}
431431

432-
function isDeliveryEventForMessage(event: BrokerEvent, eventId: string, targets: string[]): boolean {
433-
const kind = brokerEventString(event, 'kind')
434-
if (![
432+
function isDeliveryEventForMessage(
433+
event: BrokerEvent,
434+
eventId: string,
435+
targets: string[],
436+
allowedKinds: string[] = [
435437
'delivery_ack',
436438
'delivery_verified',
437439
'delivery_failed',
438440
'message_delivery_confirmed',
439441
'message_delivery_failed'
440-
].includes(kind || '')) {
441-
return false
442-
}
442+
]
443+
): boolean {
444+
const kind = brokerEventString(event, 'kind')
445+
if (!allowedKinds.includes(kind || '')) return false
443446
if (brokerEventString(event, 'event_id') !== eventId) return false
444447
const name = brokerEventString(event, 'name')
445448
return !name || targets.length === 0 || targets.includes(name)
@@ -3106,6 +3109,99 @@ export class BrokerManager {
31063109
}
31073110
}
31083111

3112+
async sendMessageAndWaitForInjected(
3113+
projectId: string | undefined,
3114+
input: SendMessageInput,
3115+
options: { timeoutMs?: number } = {}
3116+
): Promise<DeliveryConfirmationResult> {
3117+
const session = input.to.startsWith('#')
3118+
? this.getSessionForProject(projectId || '')
3119+
: this.getSessionForAgent(input.to, projectId)
3120+
const timeoutMs = Math.max(1, options.timeoutMs ?? DEFAULT_DELIVERY_CONFIRMATION_TIMEOUT_MS)
3121+
const observedEvents: BrokerEvent[] = []
3122+
let eventId: string | undefined
3123+
let targets: string[] = []
3124+
let pendingTargets = new Set<string>()
3125+
let settled = false
3126+
let timer: ReturnType<typeof setTimeout> | undefined
3127+
let resolveWait: (() => void) | undefined
3128+
let rejectWait: ((error: Error) => void) | undefined
3129+
3130+
const waitForInjection = new Promise<void>((resolve, reject) => {
3131+
resolveWait = resolve
3132+
rejectWait = reject
3133+
timer = setTimeout(() => {
3134+
if (settled) return
3135+
settled = true
3136+
const pending = Array.from(pendingTargets)
3137+
const targetSummary = pending.length > 0 ? ` (${pending.join(', ')})` : ''
3138+
reject(new Error(`Timed out waiting for delivery injection for ${eventId || input.to}${targetSummary}`))
3139+
}, timeoutMs)
3140+
})
3141+
3142+
const maybeComplete = (event: BrokerEvent): void => {
3143+
if (settled || !eventId) return
3144+
if (!isDeliveryEventForMessage(event, eventId, targets, [
3145+
'delivery_injected',
3146+
'delivery_failed',
3147+
'message_delivery_failed'
3148+
])) {
3149+
return
3150+
}
3151+
const name = brokerEventString(event, 'name')
3152+
3153+
if (event.kind === 'delivery_injected') {
3154+
if (!name || pendingTargets.size === 0) {
3155+
settled = true
3156+
resolveWait?.()
3157+
return
3158+
}
3159+
pendingTargets.delete(name)
3160+
if (pendingTargets.size === 0) {
3161+
settled = true
3162+
resolveWait?.()
3163+
}
3164+
return
3165+
}
3166+
3167+
if (event.kind === 'delivery_failed' || event.kind === 'message_delivery_failed') {
3168+
settled = true
3169+
rejectWait?.(new Error(deliveryFailureMessage(event)))
3170+
}
3171+
}
3172+
3173+
const unsubscribe = session.client.onEvent((event) => {
3174+
observedEvents.push(event)
3175+
maybeComplete(event)
3176+
})
3177+
3178+
try {
3179+
const rawResult = await session.client.sendMessage(input) as unknown
3180+
const result = isRecord(rawResult) ? rawResult : {}
3181+
eventId = typeof result.event_id === 'string' ? result.event_id : 'unsupported_operation'
3182+
const reportedTargets = Array.isArray(result.targets)
3183+
? result.targets.filter((target): target is string => typeof target === 'string' && target.trim().length > 0)
3184+
: []
3185+
targets = reportedTargets.length > 0 || input.to.startsWith('#')
3186+
? reportedTargets
3187+
: [input.to]
3188+
pendingTargets = new Set(targets)
3189+
if (targets.length === 0 || eventId === 'unsupported_operation') {
3190+
settled = true
3191+
return { eventId, targets }
3192+
}
3193+
for (const event of observedEvents) {
3194+
maybeComplete(event)
3195+
if (settled) break
3196+
}
3197+
await waitForInjection
3198+
return { eventId, targets }
3199+
} finally {
3200+
if (timer) clearTimeout(timer)
3201+
unsubscribe()
3202+
}
3203+
}
3204+
31093205
async subscribeAgentChannel(projectId: string | undefined, name: string, channel: string): Promise<void> {
31103206
const trimmedName = name.trim()
31113207
const [channelName] = normalizeChannels([channel])

0 commit comments

Comments
 (0)