Skip to content

Commit 8704578

Browse files
authored
Fix sync client dropping lines (#960)
1 parent 037eef0 commit 8704578

3 files changed

Lines changed: 38 additions & 1 deletion

File tree

.changeset/short-gifts-repeat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fix internal client skipping sync lines, causing checksum mismatch errors.

packages/common/src/utils/stream_transform.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
5050
let waiter: Waiter | undefined = undefined; // An active, waiting next() call.
5151
// A pending upstream event that couldn't be dispatched because inject() has been called before it was resolved.
5252
let pendingSourceEvent: ((w: Waiter) => void) | null = null;
53+
let sourceFetchInFlight = false;
5354

5455
let pendingInjectedEvents: T[] = [];
5556

@@ -61,6 +62,8 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
6162

6263
const fetchFromSource = () => {
6364
const resolveWaiter = (propagate: (w: Waiter) => void) => {
65+
sourceFetchInFlight = false;
66+
6467
const active = consumeWaiter();
6568
if (active) {
6669
propagate(active);
@@ -69,6 +72,7 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
6972
}
7073
};
7174

75+
sourceFetchInFlight = true;
7276
const nextFromSource = source.next();
7377
nextFromSource.then(
7478
(value) => {
@@ -101,7 +105,9 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
101105

102106
// Nothing pending? Fetch from source
103107
waiter = { resolve, reject };
104-
return fetchFromSource();
108+
if (!sourceFetchInFlight) {
109+
fetchFromSource();
110+
}
105111
});
106112
},
107113
inject: (event) => {

packages/common/tests/utils/stream_transform.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,32 @@ describe('injectable', () => {
9292
expect(await next).toStrictEqual(doneResult);
9393
}
9494
});
95+
96+
test('does not start a second source fetch while one is already in flight', async () => {
97+
const outstandingEvents: Array<(value: IteratorResult<number>) => void> = [];
98+
99+
const stream = injectable<number>({
100+
next() {
101+
expect(outstandingEvents).toHaveLength(0);
102+
return new Promise((r) => outstandingEvents.push(r));
103+
}
104+
});
105+
106+
// First downstream call starts an event
107+
const p1 = stream.next();
108+
expect(outstandingEvents).toHaveLength(1);
109+
110+
// inject() steals the waiter; event remains in flight
111+
stream.inject(-1);
112+
expect(await p1).toStrictEqual(valueResult(-1));
113+
114+
// Second downstream call while event is still pending. Should wait for it instead of starting another one.
115+
const p2 = stream.next();
116+
117+
// Clean up
118+
outstandingEvents[0](valueResult(10));
119+
expect(await p2).toStrictEqual(valueResult(10));
120+
});
95121
});
96122

97123
describe('bson objects', () => {

0 commit comments

Comments
 (0)