From 0d1492d67fef3238d8490e514da9205400f2a66b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 13 May 2026 18:11:54 +0200 Subject: [PATCH 1/2] Fix sync client dropping lines --- .changeset/short-gifts-repeat.md | 5 ++++ packages/common/src/utils/stream_transform.ts | 11 +++++++- .../tests/utils/stream_transform.test.ts | 26 +++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 .changeset/short-gifts-repeat.md diff --git a/.changeset/short-gifts-repeat.md b/.changeset/short-gifts-repeat.md new file mode 100644 index 000000000..2ca4a6eb4 --- /dev/null +++ b/.changeset/short-gifts-repeat.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fix internal client skipping sync lines, causing checksum mismatch errors. diff --git a/packages/common/src/utils/stream_transform.ts b/packages/common/src/utils/stream_transform.ts index 4f82cc1c6..846628657 100644 --- a/packages/common/src/utils/stream_transform.ts +++ b/packages/common/src/utils/stream_transform.ts @@ -50,6 +50,7 @@ export function injectable(source: SimpleAsyncIterator): InjectableIterato let waiter: Waiter | undefined = undefined; // An active, waiting next() call. // A pending upstream event that couldn't be dispatched because inject() has been called before it was resolved. let pendingSourceEvent: ((w: Waiter) => void) | null = null; + let sourceFetchInFlight = false; let pendingInjectedEvents: T[] = []; @@ -61,6 +62,8 @@ export function injectable(source: SimpleAsyncIterator): InjectableIterato const fetchFromSource = () => { const resolveWaiter = (propagate: (w: Waiter) => void) => { + sourceFetchInFlight = false; + const active = consumeWaiter(); if (active) { propagate(active); @@ -69,6 +72,7 @@ export function injectable(source: SimpleAsyncIterator): InjectableIterato } }; + sourceFetchInFlight = true; const nextFromSource = source.next(); nextFromSource.then( (value) => { @@ -101,7 +105,9 @@ export function injectable(source: SimpleAsyncIterator): InjectableIterato // Nothing pending? Fetch from source waiter = { resolve, reject }; - return fetchFromSource(); + if (!sourceFetchInFlight) { + fetchFromSource(); + } }); }, inject: (event) => { @@ -130,12 +136,14 @@ export function extractJsonLines( next: async () => { while (true) { if (isFinalEvent) { + console.log('sending line done'); return doneResult; } { const first = pendingLines.shift(); if (first) { + console.log('sending line', first); return { done: false, value: first }; } } @@ -145,6 +153,7 @@ export function extractJsonLines( const remaining = buffer.trim(); if (remaining.length != 0) { isFinalEvent = true; + console.log('sending line', remaining); return { done: false, value: remaining }; } diff --git a/packages/common/tests/utils/stream_transform.test.ts b/packages/common/tests/utils/stream_transform.test.ts index 024aaaf0c..7250efa0f 100644 --- a/packages/common/tests/utils/stream_transform.test.ts +++ b/packages/common/tests/utils/stream_transform.test.ts @@ -92,6 +92,32 @@ describe('injectable', () => { expect(await next).toStrictEqual(doneResult); } }); + + test('does not start a second source fetch while one is already in flight', async () => { + const outstandingEvents: Array<(value: IteratorResult) => void> = []; + + const stream = injectable({ + next() { + expect(outstandingEvents).toHaveLength(0); + return new Promise((r) => outstandingEvents.push(r)); + } + }); + + // First downstream call starts an event + const p1 = stream.next(); + expect(outstandingEvents).toHaveLength(1); + + // inject() steals the waiter; event remains in flight + stream.inject(-1); + expect(await p1).toStrictEqual(valueResult(-1)); + + // Second downstream call while event is still pending. Should wait for it instead of starting another one. + const p2 = stream.next(); + + // Clean up + outstandingEvents[0](valueResult(10)); + expect(await p2).toStrictEqual(valueResult(10)); + }); }); describe('bson objects', () => { From 49255068d3c73213e1f4e5e8ef53f36129773061 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 13 May 2026 18:24:15 +0200 Subject: [PATCH 2/2] Revert debug log changes --- packages/common/src/utils/stream_transform.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/common/src/utils/stream_transform.ts b/packages/common/src/utils/stream_transform.ts index 846628657..213b9b301 100644 --- a/packages/common/src/utils/stream_transform.ts +++ b/packages/common/src/utils/stream_transform.ts @@ -136,14 +136,12 @@ export function extractJsonLines( next: async () => { while (true) { if (isFinalEvent) { - console.log('sending line done'); return doneResult; } { const first = pendingLines.shift(); if (first) { - console.log('sending line', first); return { done: false, value: first }; } } @@ -153,7 +151,6 @@ export function extractJsonLines( const remaining = buffer.trim(); if (remaining.length != 0) { isFinalEvent = true; - console.log('sending line', remaining); return { done: false, value: remaining }; }