Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,11 @@ function createElectricSync<T extends Row<unknown>>(
// Note that Electric sends a 409 error on a `must-refetch` message, but the
// ShapeStream handled this and it will not reach this handler, therefor
// this markReady will not be triggers by a `must-refetch`.
markReady()
// Guard against the race condition where collection.cleanup() aborts
// the stream but onError fires before the ShapeStream observes the
// abort signal. Calling markReady() on a cleaned-up collection throws
// CollectionStateError ("cleaned-up" → "ready" is invalid).
if (!abortController.signal.aborted) markReady()

if (shapeOptions.onError) {
return shapeOptions.onError(errorParams)
Expand Down
40 changes: 40 additions & 0 deletions packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1917,6 +1917,46 @@ describe(`Electric Integration`, () => {
expect(mockAbortController.abort).toHaveBeenCalledTimes(1)
})

it(`should not throw CollectionStateError when onError fires after cleanup`, async () => {
// Regression test: cleanup() aborts the stream, but onError can still fire
// before the ShapeStream observes the abort signal. Without the guard,
// calling markReady() on a cleaned-up collection throws CollectionStateError
// ("cleaned-up" → "ready" is an invalid lifecycle transition).
const realAbortController = new AbortController()
mockAbortController = {
abort: vi.fn(() => realAbortController.abort()),
signal: realAbortController.signal,
}
global.AbortController = vi.fn().mockImplementation(() => mockAbortController)

let capturedOnError: ((params: unknown) => void) | undefined
const { ShapeStream } = await import(`@electric-sql/client`)
vi.mocked(ShapeStream).mockImplementation((options: any) => {
capturedOnError = options.onError
return mockStream as any
})

const config = {
id: `on-error-after-cleanup-test`,
shapeOptions: {
url: `http://test-url`,
params: { table: `test_table` },
},
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))
expect(capturedOnError).toBeDefined()

// Simulate cleanup (this aborts the controller)
await testCollection.cleanup()
expect(testCollection.status).toBe(`cleaned-up`)

// Simulate onError firing after cleanup — should not throw CollectionStateError
expect(() => capturedOnError!({ status: 500, json: {} })).not.toThrow()
})

it(`should restart stream when collection is accessed after cleanup`, async () => {
const config = {
id: `restart-stream-test`,
Expand Down