Skip to content

Commit 2051dd1

Browse files
kjgbotkjgbot
andauthored
Fix integration event stream fallback (#124)
Co-authored-by: kjgbot <kjgbot@agentrelay.dev>
1 parent b91ba12 commit 2051dd1

2 files changed

Lines changed: 313 additions & 4 deletions

File tree

src/main/__tests__/integration-event-bridge.test.ts

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { ChangeEvent, Subscription } from '@relayfile/sdk'
66
import {
77
getIntegrationEventTelemetrySnapshot,
88
IntegrationEventBridge,
9+
createWorkspaceScopedEventClient,
910
integrationSubscriptionSummaries,
1011
localWatchEventPathsForFilename,
1112
localWatchRootsFor,
@@ -14,6 +15,33 @@ import {
1415
} from '../integration-event-bridge.ts'
1516
import type { ConnectedIntegration } from '../integrations.ts'
1617

18+
type RelayFileSyncHandlerName = 'event' | 'error' | 'state' | 'open' | 'close' | 'pong'
19+
20+
class FakeRelayFileSync {
21+
handlers = new Map<RelayFileSyncHandlerName, Set<(payload: unknown) => void>>()
22+
started = false
23+
stopped = false
24+
25+
on(event: RelayFileSyncHandlerName, handler: (payload: unknown) => void): () => void {
26+
const handlers = this.handlers.get(event) ?? new Set<(payload: unknown) => void>()
27+
handlers.add(handler)
28+
this.handlers.set(event, handlers)
29+
return () => handlers.delete(handler)
30+
}
31+
32+
start(): void {
33+
this.started = true
34+
}
35+
36+
async stop(): Promise<void> {
37+
this.stopped = true
38+
}
39+
40+
emit(event: RelayFileSyncHandlerName, payload: unknown): void {
41+
for (const handler of this.handlers.get(event) ?? []) handler(payload)
42+
}
43+
}
44+
1745
type SentMessage = {
1846
projectId: string
1947
input: {
@@ -243,6 +271,163 @@ test('relayfile sdk path filters broaden partial-segment Slack DM globs', () =>
243271
])
244272
})
245273

274+
test('integration event remote stream falls back to event feed polling after repeated stream errors', async () => {
275+
const syncs: FakeRelayFileSync[] = []
276+
const received: ChangeEvent[] = []
277+
const getEventsCalls: Array<{ cursor?: string; limit?: number }> = []
278+
const pollEvent = {
279+
eventId: 'ws:file.created:/slack/channels/C123/messages/1780735314_000000/meta.json:rev-2:2026-06-06T08:41:00.000Z',
280+
type: 'file.created',
281+
path: '/slack/channels/C123/messages/1780735314_000000/meta.json',
282+
revision: 'rev-2',
283+
timestamp: '2026-06-06T08:41:00.000Z'
284+
} as const
285+
const client = {
286+
async getEvents(_workspaceId: string, options: { cursor?: string; limit?: number }) {
287+
getEventsCalls.push({ cursor: options.cursor, limit: options.limit })
288+
return getEventsCalls.length === 1
289+
? { events: [pollEvent], nextCursor: null }
290+
: { events: [], nextCursor: null }
291+
},
292+
async getResourceAtEvent() {
293+
throw new Error('not used')
294+
}
295+
}
296+
const eventClient = createWorkspaceScopedEventClient(
297+
client as never,
298+
'workspace-id',
299+
async () => 'workspace-token',
300+
'https://relayfile.example',
301+
(options) => {
302+
assert.equal(options.cursor, undefined)
303+
const sync = new FakeRelayFileSync()
304+
syncs.push(sync)
305+
return sync as never
306+
}
307+
)
308+
309+
const subscription = eventClient.subscribe(
310+
['/slack/channels/C123/**'],
311+
(event) => {
312+
received.push(event)
313+
},
314+
{ coalesce: 'none', from: 'legacy', pathScope: ['/slack/channels/C123/**'] }
315+
)
316+
317+
await waitUntil(() => syncs.length === 1)
318+
syncs[0].emit('event', {
319+
eventId: 'ws:file.created:/slack/channels/C123/messages/1780735200_000000/meta.json:rev-1:2026-06-06T08:40:00.000Z',
320+
type: 'file.created',
321+
path: '/slack/channels/C123/messages/1780735200_000000/meta.json',
322+
revision: 'rev-1',
323+
timestamp: '2026-06-06T08:40:00.000Z'
324+
})
325+
await waitUntil(() => received.length === 1)
326+
327+
for (let index = 0; index < 5; index += 1) {
328+
syncs[0].emit('error', new Error('websocket reconnect failed'))
329+
}
330+
331+
await waitUntil(() => received.length === 2)
332+
assert.equal(syncs[0].stopped, true)
333+
assert.deepEqual(getEventsCalls[0], {
334+
cursor: 'ws:file.created:/slack/channels/C123/messages/1780735200_000000/meta.json:rev-1:2026-06-06T08:40:00.000Z',
335+
limit: 1000
336+
})
337+
assert.equal(received[1].resource.path, '/slack/channels/C123/messages/1780735314_000000/meta.json')
338+
339+
await subscription.unsubscribe()
340+
})
341+
342+
test('integration event remote stream fallback replays the outage gap and logs non-empty error details', async () => {
343+
const syncs: FakeRelayFileSync[] = []
344+
const received: ChangeEvent[] = []
345+
const warnCalls: unknown[][] = []
346+
const originalWarn = console.warn
347+
console.warn = (...args: unknown[]) => {
348+
warnCalls.push(args)
349+
}
350+
351+
try {
352+
const client = {
353+
async getEvents(_workspaceId: string, options: { cursor?: string; limit?: number }) {
354+
assert.equal(
355+
options.cursor,
356+
'ws:file.created:/slack/channels/C123/messages/1780735200_000000/meta.json:rev-1:2026-06-06T08:40:00.000Z'
357+
)
358+
return {
359+
events: [
360+
{
361+
eventId: 'ws:file.created:/slack/channels/C123/messages/1780735250_000000/meta.json:rev-gap-1:2026-06-06T08:40:50.000Z',
362+
type: 'file.created',
363+
path: '/slack/channels/C123/messages/1780735250_000000/meta.json',
364+
revision: 'rev-gap-1',
365+
timestamp: '2026-06-06T08:40:50.000Z'
366+
},
367+
{
368+
eventId: 'ws:file.created:/slack/channels/C123/messages/1780735314_000000/meta.json:rev-gap-2:2026-06-06T08:41:00.000Z',
369+
type: 'file.created',
370+
path: '/slack/channels/C123/messages/1780735314_000000/meta.json',
371+
revision: 'rev-gap-2',
372+
timestamp: '2026-06-06T08:41:00.000Z'
373+
}
374+
],
375+
nextCursor: null
376+
}
377+
},
378+
async getResourceAtEvent() {
379+
throw new Error('not used')
380+
}
381+
}
382+
const eventClient = createWorkspaceScopedEventClient(
383+
client as never,
384+
'workspace-id',
385+
async () => 'workspace-token',
386+
'https://relayfile.example',
387+
() => {
388+
const sync = new FakeRelayFileSync()
389+
syncs.push(sync)
390+
return sync as never
391+
}
392+
)
393+
394+
const subscription = eventClient.subscribe(
395+
['/slack/channels/C123/**'],
396+
(event) => {
397+
received.push(event)
398+
},
399+
{ coalesce: 'none', from: 'legacy', pathScope: ['/slack/channels/C123/**'] }
400+
)
401+
402+
await waitUntil(() => syncs.length === 1)
403+
syncs[0].emit('event', {
404+
eventId: 'ws:file.created:/slack/channels/C123/messages/1780735200_000000/meta.json:rev-1:2026-06-06T08:40:00.000Z',
405+
type: 'file.created',
406+
path: '/slack/channels/C123/messages/1780735200_000000/meta.json',
407+
revision: 'rev-1',
408+
timestamp: '2026-06-06T08:40:00.000Z'
409+
})
410+
await waitUntil(() => received.length === 1)
411+
412+
for (let index = 0; index < 5; index += 1) {
413+
syncs[0].emit('error', { type: 'error' })
414+
}
415+
416+
await waitUntil(() => received.length === 3)
417+
assert.deepEqual(received.slice(1).map((event) => event.resource.path), [
418+
'/slack/channels/C123/messages/1780735250_000000/meta.json',
419+
'/slack/channels/C123/messages/1780735314_000000/meta.json'
420+
])
421+
const remoteStreamError = warnCalls.find((call) => call[0] === '[integration-events] remote stream error')
422+
assert.ok(remoteStreamError)
423+
assert.equal((remoteStreamError[1] as { error?: string }).error, 'type=error')
424+
425+
await subscription.unsubscribe()
426+
} finally {
427+
console.warn = originalWarn
428+
}
429+
})
430+
246431
async function waitForSent(harness: { sent: SentMessage[] }, count: number, timeoutMs = 1_000): Promise<void> {
247432
const deadline = Date.now() + timeoutMs
248433
while (harness.sent.length < count && Date.now() < deadline) {

0 commit comments

Comments
 (0)