Skip to content

Commit e39359d

Browse files
Fix: DataStream Race Condition (and flaky unit tests) (#819)
1 parent 25ece59 commit e39359d

6 files changed

Lines changed: 81 additions & 50 deletions

File tree

.changeset/nice-pillows-juggle.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fixed delayed streaming processing, due to a race condition, when connecting via the HTTP connection method (could potentially also affect WebSockets).

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,11 @@ export abstract class AbstractRemote {
573573

574574
const stream = new DataStream<T, string>({
575575
logger: this.logger,
576-
mapLine: mapLine
576+
mapLine: mapLine,
577+
pressure: {
578+
highWaterMark: 20,
579+
lowWaterMark: 10
580+
}
577581
});
578582

579583
abortSignal?.addEventListener('abort', () => {
@@ -585,46 +589,54 @@ export abstract class AbstractRemote {
585589
let buffer = '';
586590

587591

592+
const consumeStream = async () => {
593+
while (!stream.closed && !abortSignal?.aborted && !readerReleased) {
594+
const { done, value } = await reader.read();
595+
if (done) {
596+
const remaining = buffer.trim();
597+
if (remaining.length != 0) {
598+
stream.enqueueData(remaining);
599+
}
588600

589-
const l = stream.registerListener({
590-
lowWater: async () => {
591-
if (stream.closed || abortSignal?.aborted || readerReleased) {
592-
return
593-
}
594-
try {
595-
let didCompleteLine = false;
596-
while (!didCompleteLine) {
597-
const { done, value } = await reader.read();
598-
if (done) {
599-
const remaining = buffer.trim();
600-
if (remaining.length != 0) {
601-
stream.enqueueData(remaining);
602-
}
601+
stream.close();
602+
await closeReader();
603+
return;
604+
}
603605

604-
stream.close();
605-
await closeReader();
606-
return;
607-
}
606+
const data = decoder.decode(value, { stream: true });
607+
buffer += data;
608608

609-
const data = decoder.decode(value, { stream: true });
610-
buffer += data;
609+
const lines = buffer.split('\n');
610+
for (var i = 0; i < lines.length - 1; i++) {
611+
var l = lines[i].trim();
612+
if (l.length > 0) {
613+
stream.enqueueData(l);
614+
}
615+
}
611616

612-
const lines = buffer.split('\n');
613-
for (var i = 0; i < lines.length - 1; i++) {
614-
var l = lines[i].trim();
615-
if (l.length > 0) {
616-
stream.enqueueData(l);
617-
didCompleteLine = true;
617+
buffer = lines[lines.length - 1];
618+
619+
// Implement backpressure by waiting for the low water mark to be reached
620+
if (stream.dataQueue.length > stream.highWatermark) {
621+
await new Promise<void>((resolve) => {
622+
const dispose = stream.registerListener({
623+
lowWater: async () => {
624+
resolve();
625+
dispose();
626+
},
627+
closed: () => {
628+
resolve();
629+
dispose();
618630
}
619-
}
620-
621-
buffer = lines[lines.length - 1];
622-
}
623-
} catch (ex) {
624-
stream.close();
625-
throw ex;
631+
})
632+
})
626633
}
627-
},
634+
}
635+
}
636+
637+
consumeStream().catch(ex => this.logger.error('Error consuming stream', ex));
638+
639+
const l = stream.registerListener({
628640
closed: () => {
629641
closeReader();
630642
l?.();

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,7 @@ export interface RequiredAdditionalConnectionOptions extends Required<Additional
194194
}
195195

196196
export interface StreamingSyncImplementation
197-
extends BaseObserverInterface<StreamingSyncImplementationListener>,
198-
Disposable {
197+
extends BaseObserverInterface<StreamingSyncImplementationListener>, Disposable {
199198
/**
200199
* Connects to the sync service
201200
*/

packages/common/src/utils/DataStream.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@ export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataS
110110
return null;
111111
}
112112

113+
// Wait for any pending processing to complete first.
114+
// This ensures we register our listener before calling processQueue(),
115+
// avoiding a race where processQueue() sees no reader and returns early.
116+
if (this.processingPromise) {
117+
await this.processingPromise;
118+
}
119+
120+
// Re-check after await - stream may have closed while we were waiting
121+
if (this.closed) {
122+
return null;
123+
}
124+
113125
return new Promise((resolve, reject) => {
114126
const l = this.registerListener({
115127
data: async (data) => {
@@ -151,7 +163,7 @@ export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataS
151163

152164
const promise = (this.processingPromise = this._processQueue());
153165
promise.finally(() => {
154-
return (this.processingPromise = null);
166+
this.processingPromise = null;
155167
});
156168
return promise;
157169
}
@@ -190,7 +202,6 @@ export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataS
190202
}
191203

192204
if (this.dataQueue.length > 0) {
193-
// Next tick
194205
setTimeout(() => this.processQueue());
195206
}
196207
}

packages/node/tests/sync.test.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
257257
await waitForProgress(database, [0, 2]);
258258
pushDataLine(syncService, 'a', 2);
259259
await waitForProgress(database, [2, 2]);
260+
260261
pushCheckpointComplete(syncService);
261262
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
262263
},
@@ -973,8 +974,6 @@ async function waitForProgress(
973974
return false;
974975
}
975976

976-
//console.log('checking', progress);
977-
978977
const check = (expected: [number, number], actual: ProgressWithOperations): boolean => {
979978
return actual.downloadedOperations == expected[0] && actual.totalOperations == expected[1];
980979
};
@@ -985,7 +984,6 @@ async function waitForProgress(
985984

986985
for (const [priority, expected] of forPriorities) {
987986
if (!check(expected, progress.untilPriority(priority))) {
988-
//console.log('failed for', priority, expected, progress);
989987
return false;
990988
}
991989
}

packages/web/tests/mockSyncServiceExample.test.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,23 @@ describe('Mock Sync Service Example', { timeout: 100000 }, () => {
6363
}
6464
});
6565

66+
// Wait for sync to complete (hasSynced becomes true) before closing the response
67+
await vi.waitFor(
68+
() => {
69+
expect(database.currentStatus.hasSynced).toBe(true);
70+
},
71+
{ timeout: 5000 }
72+
);
73+
6674
// Complete the response
6775
await mockService.completeResponse(syncRequestId);
6876

69-
// Wait for sync to complete and verify the data was saved
70-
await vi.waitFor(async () => {
71-
const rows = await database.getAll('SELECT * FROM lists WHERE id = ?', ['1']);
72-
expect(rows).toHaveLength(1);
73-
expect(rows[0]).toMatchObject({
74-
id: '1',
75-
name: 'from server'
76-
});
77+
// Verify the data was saved
78+
const rows = await database.getAll('SELECT * FROM lists WHERE id = ?', ['1']);
79+
expect(rows).toHaveLength(1);
80+
expect(rows[0]).toMatchObject({
81+
id: '1',
82+
name: 'from server'
7783
});
7884

7985
// Verify the data by querying the database

0 commit comments

Comments
 (0)