diff --git a/.changeset/short-gifts-repeat.md b/.changeset/short-gifts-repeat.md new file mode 100644 index 000000000..2ca4a6eb4 --- /dev/null +++ b/.changeset/short-gifts-repeat.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fix internal client skipping sync lines, causing checksum mismatch errors. diff --git a/packages/common/src/utils/stream_transform.ts b/packages/common/src/utils/stream_transform.ts index 4f82cc1c6..213b9b301 100644 --- a/packages/common/src/utils/stream_transform.ts +++ b/packages/common/src/utils/stream_transform.ts @@ -50,6 +50,7 @@ export function injectable(source: SimpleAsyncIterator): InjectableIterato let waiter: Waiter | undefined = undefined; // An active, waiting next() call. // A pending upstream event that couldn't be dispatched because inject() has been called before it was resolved. let pendingSourceEvent: ((w: Waiter) => void) | null = null; + let sourceFetchInFlight = false; let pendingInjectedEvents: T[] = []; @@ -61,6 +62,8 @@ export function injectable(source: SimpleAsyncIterator): InjectableIterato const fetchFromSource = () => { const resolveWaiter = (propagate: (w: Waiter) => void) => { + sourceFetchInFlight = false; + const active = consumeWaiter(); if (active) { propagate(active); @@ -69,6 +72,7 @@ export function injectable(source: SimpleAsyncIterator): InjectableIterato } }; + sourceFetchInFlight = true; const nextFromSource = source.next(); nextFromSource.then( (value) => { @@ -101,7 +105,9 @@ export function injectable(source: SimpleAsyncIterator): InjectableIterato // Nothing pending? Fetch from source waiter = { resolve, reject }; - return fetchFromSource(); + if (!sourceFetchInFlight) { + fetchFromSource(); + } }); }, inject: (event) => { diff --git a/packages/common/tests/utils/stream_transform.test.ts b/packages/common/tests/utils/stream_transform.test.ts index 024aaaf0c..7250efa0f 100644 --- a/packages/common/tests/utils/stream_transform.test.ts +++ b/packages/common/tests/utils/stream_transform.test.ts @@ -92,6 +92,32 @@ describe('injectable', () => { expect(await next).toStrictEqual(doneResult); } }); + + test('does not start a second source fetch while one is already in flight', async () => { + const outstandingEvents: Array<(value: IteratorResult) => void> = []; + + const stream = injectable({ + next() { + expect(outstandingEvents).toHaveLength(0); + return new Promise((r) => outstandingEvents.push(r)); + } + }); + + // First downstream call starts an event + const p1 = stream.next(); + expect(outstandingEvents).toHaveLength(1); + + // inject() steals the waiter; event remains in flight + stream.inject(-1); + expect(await p1).toStrictEqual(valueResult(-1)); + + // Second downstream call while event is still pending. Should wait for it instead of starting another one. + const p2 = stream.next(); + + // Clean up + outstandingEvents[0](valueResult(10)); + expect(await p2).toStrictEqual(valueResult(10)); + }); }); describe('bson objects', () => {