Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/short-gifts-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Fix internal client skipping sync lines, causing checksum mismatch errors.
8 changes: 7 additions & 1 deletion packages/common/src/utils/stream_transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
let waiter: Waiter | undefined = undefined; // An active, waiting next() call.
// A pending upstream event that couldn't be dispatched because inject() has been called before it was resolved.
let pendingSourceEvent: ((w: Waiter) => void) | null = null;
let sourceFetchInFlight = false;

let pendingInjectedEvents: T[] = [];

Expand All @@ -61,6 +62,8 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato

const fetchFromSource = () => {
const resolveWaiter = (propagate: (w: Waiter) => void) => {
sourceFetchInFlight = false;

const active = consumeWaiter();
if (active) {
propagate(active);
Expand All @@ -69,6 +72,7 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
}
};

sourceFetchInFlight = true;
const nextFromSource = source.next();
nextFromSource.then(
(value) => {
Expand Down Expand Up @@ -101,7 +105,9 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato

// Nothing pending? Fetch from source
waiter = { resolve, reject };
return fetchFromSource();
if (!sourceFetchInFlight) {
fetchFromSource();
}
});
},
inject: (event) => {
Expand Down
26 changes: 26 additions & 0 deletions packages/common/tests/utils/stream_transform.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,32 @@ describe('injectable', () => {
expect(await next).toStrictEqual(doneResult);
}
});

test('does not start a second source fetch while one is already in flight', async () => {
const outstandingEvents: Array<(value: IteratorResult<number>) => void> = [];

const stream = injectable<number>({
next() {
expect(outstandingEvents).toHaveLength(0);
return new Promise((r) => outstandingEvents.push(r));
}
});

// First downstream call starts an event
const p1 = stream.next();
expect(outstandingEvents).toHaveLength(1);

// inject() steals the waiter; event remains in flight
stream.inject(-1);
expect(await p1).toStrictEqual(valueResult(-1));

// Second downstream call while event is still pending. Should wait for it instead of starting another one.
const p2 = stream.next();

// Clean up
outstandingEvents[0](valueResult(10));
expect(await p2).toStrictEqual(valueResult(10));
});
});

describe('bson objects', () => {
Expand Down
Loading