Skip to content

Commit 0d1492d

Browse files
committed
Fix sync client dropping lines
1 parent 037eef0 commit 0d1492d

3 files changed

Lines changed: 41 additions & 1 deletion

File tree

.changeset/short-gifts-repeat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fix internal client skipping sync lines, causing checksum mismatch errors.

packages/common/src/utils/stream_transform.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
5050
let waiter: Waiter | undefined = undefined; // An active, waiting next() call.
5151
// A pending upstream event that couldn't be dispatched because inject() has been called before it was resolved.
5252
let pendingSourceEvent: ((w: Waiter) => void) | null = null;
53+
let sourceFetchInFlight = false;
5354

5455
let pendingInjectedEvents: T[] = [];
5556

@@ -61,6 +62,8 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
6162

6263
const fetchFromSource = () => {
6364
const resolveWaiter = (propagate: (w: Waiter) => void) => {
65+
sourceFetchInFlight = false;
66+
6467
const active = consumeWaiter();
6568
if (active) {
6669
propagate(active);
@@ -69,6 +72,7 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
6972
}
7073
};
7174

75+
sourceFetchInFlight = true;
7276
const nextFromSource = source.next();
7377
nextFromSource.then(
7478
(value) => {
@@ -101,7 +105,9 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
101105

102106
// Nothing pending? Fetch from source
103107
waiter = { resolve, reject };
104-
return fetchFromSource();
108+
if (!sourceFetchInFlight) {
109+
fetchFromSource();
110+
}
105111
});
106112
},
107113
inject: (event) => {
@@ -130,12 +136,14 @@ export function extractJsonLines(
130136
next: async () => {
131137
while (true) {
132138
if (isFinalEvent) {
139+
console.log('sending line done');
133140
return doneResult;
134141
}
135142

136143
{
137144
const first = pendingLines.shift();
138145
if (first) {
146+
console.log('sending line', first);
139147
return { done: false, value: first };
140148
}
141149
}
@@ -145,6 +153,7 @@ export function extractJsonLines(
145153
const remaining = buffer.trim();
146154
if (remaining.length != 0) {
147155
isFinalEvent = true;
156+
console.log('sending line', remaining);
148157
return { done: false, value: remaining };
149158
}
150159

packages/common/tests/utils/stream_transform.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,32 @@ describe('injectable', () => {
9292
expect(await next).toStrictEqual(doneResult);
9393
}
9494
});
95+
96+
test('does not start a second source fetch while one is already in flight', async () => {
97+
const outstandingEvents: Array<(value: IteratorResult<number>) => void> = [];
98+
99+
const stream = injectable<number>({
100+
next() {
101+
expect(outstandingEvents).toHaveLength(0);
102+
return new Promise((r) => outstandingEvents.push(r));
103+
}
104+
});
105+
106+
// First downstream call starts an event
107+
const p1 = stream.next();
108+
expect(outstandingEvents).toHaveLength(1);
109+
110+
// inject() steals the waiter; event remains in flight
111+
stream.inject(-1);
112+
expect(await p1).toStrictEqual(valueResult(-1));
113+
114+
// Second downstream call while event is still pending. Should wait for it instead of starting another one.
115+
const p2 = stream.next();
116+
117+
// Clean up
118+
outstandingEvents[0](valueResult(10));
119+
expect(await p2).toStrictEqual(valueResult(10));
120+
});
95121
});
96122

97123
describe('bson objects', () => {

0 commit comments

Comments
 (0)