Skip to content

Commit f3ac89b

Browse files
committed
Align crud uploads with Dart implementation
1 parent b009b72 commit f3ac89b

7 files changed

Lines changed: 149 additions & 74 deletions

File tree

.changeset/warm-dots-sort.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Attempt a CRUD upload everytime `connect()` is called, even if we're unable to connect.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 25 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import Logger, { ILogger } from 'js-logger';
33
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
44
import { AbortOperation } from '../../../utils/AbortOperation.js';
55
import { BaseListener, BaseObserver, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js';
6-
import { throttleLeadingTrailing } from '../../../utils/async.js';
76
import { BucketStorageAdapter, PowerSyncControlCommand } from '../bucket/BucketStorageAdapter.js';
87
import { CrudEntry } from '../bucket/CrudEntry.js';
98
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
@@ -17,7 +16,7 @@ import {
1716
doneResult,
1817
injectable,
1918
InjectableIterator,
20-
map,
19+
notifyIterator,
2120
SimpleAsyncIterator,
2221
valueResult
2322
} from '../../../utils/stream_transform.js';
@@ -221,16 +220,13 @@ export abstract class AbstractStreamingSyncImplementation
221220
{
222221
protected options: AbstractStreamingSyncImplementationOptions;
223222
protected abortController: AbortController | null;
224-
// In rare cases, mostly for tests, uploads can be triggered without being properly connected.
225-
// This allows ensuring that all upload processes can be aborted.
226-
protected uploadAbortController: AbortController | undefined;
227223
protected crudUpdateListener?: () => void;
228224
protected streamingSyncPromise?: Promise<void>;
229225
protected logger: ILogger;
230226
private activeStreams: SubscribedStream[];
231227
private connectionMayHaveChanged = false;
228+
private crudUploadNotifier = notifyIterator();
232229

233-
private isUploadingCrud: boolean = false;
234230
private notifyCompletedUploads?: () => void;
235231
private handleActiveStreamsChange?: () => void;
236232

@@ -254,17 +250,7 @@ export abstract class AbstractStreamingSyncImplementation
254250
});
255251
this.abortController = null;
256252

257-
this.triggerCrudUpload = throttleLeadingTrailing(() => {
258-
if (!this.syncStatus.connected || this.isUploadingCrud) {
259-
return;
260-
}
261-
262-
this.isUploadingCrud = true;
263-
this._uploadAllCrud().finally(() => {
264-
this.notifyCompletedUploads?.();
265-
this.isUploadingCrud = false;
266-
});
267-
}, this.options.crudUploadThrottleMs!);
253+
this.triggerCrudUpload = () => this.crudUploadNotifier.notify();
268254
}
269255

270256
async waitForReady() {}
@@ -320,7 +306,6 @@ export abstract class AbstractStreamingSyncImplementation
320306
super.dispose();
321307
this.crudUpdateListener?.();
322308
this.crudUpdateListener = undefined;
323-
this.uploadAbortController?.abort();
324309
}
325310

326311
abstract obtainLock<T>(lockOptions: LockOptions<T>): Promise<T>;
@@ -334,7 +319,19 @@ export abstract class AbstractStreamingSyncImplementation
334319
return checkpoint;
335320
}
336321

337-
protected async _uploadAllCrud(): Promise<void> {
322+
private async crudUploadLoop(signal: AbortSignal): Promise<void> {
323+
while (!signal.aborted) {
324+
await Promise.all([
325+
// Start the initial CRUD upload on connect. Then, keep polling until we're done.
326+
this._uploadAllCrud(signal),
327+
this.delayRetry(signal, this.options.crudUploadThrottleMs!)
328+
]);
329+
330+
await this.crudUploadNotifier.next();
331+
}
332+
}
333+
334+
private async _uploadAllCrud(signal: AbortSignal): Promise<void> {
338335
return this.obtainLock({
339336
type: LockType.CRUD,
340337
callback: async () => {
@@ -343,17 +340,7 @@ export abstract class AbstractStreamingSyncImplementation
343340
*/
344341
let checkedCrudItem: CrudEntry | undefined;
345342

346-
const controller = new AbortController();
347-
this.uploadAbortController = controller;
348-
this.abortController?.signal.addEventListener(
349-
'abort',
350-
() => {
351-
controller.abort();
352-
},
353-
{ once: true }
354-
);
355-
356-
while (!controller.signal.aborted) {
343+
while (!signal.aborted) {
357344
try {
358345
/**
359346
* This is the first item in the FIFO CRUD queue.
@@ -398,7 +385,7 @@ The next upload iteration will be delayed.`);
398385
uploadError: ex as Error
399386
}
400387
});
401-
await this.delayRetry(controller.signal);
388+
await this.delayRetry(signal);
402389
if (!this.isConnected) {
403390
// Exit the upload loop if the sync stream is no longer connected
404391
break;
@@ -407,14 +394,15 @@ The next upload iteration will be delayed.`);
407394
`Caught exception when uploading. Upload will retry after a delay. Exception: ${(ex as Error).message}`
408395
);
409396
} finally {
397+
this.notifyCompletedUploads?.();
398+
410399
this.updateSyncStatus({
411400
dataFlow: {
412401
uploading: false
413402
}
414403
});
415404
}
416405
}
417-
this.uploadAbortController = undefined;
418406
}
419407
});
420408
}
@@ -469,15 +457,7 @@ The next upload iteration will be delayed.`);
469457
this.updateSyncStatus({ connected: false, connecting: false });
470458
}
471459

472-
/**
473-
* @deprecated use [connect instead]
474-
*/
475-
async streamingSync(signal?: AbortSignal, options?: PowerSyncConnectionOptions): Promise<void> {
476-
if (!signal) {
477-
this.abortController = new AbortController();
478-
signal = this.abortController.signal;
479-
}
480-
460+
private async streamingSync(signal: AbortSignal, options?: PowerSyncConnectionOptions): Promise<void> {
481461
/**
482462
* Listen for CRUD updates and trigger upstream uploads
483463
*/
@@ -509,6 +489,8 @@ The next upload iteration will be delayed.`);
509489
});
510490
});
511491

492+
this.crudUploadLoop(signal);
493+
512494
/**
513495
* This loops runs until [retry] is false or the abort signal is set to aborted.
514496
* Aborting the nestedAbortController will:
@@ -902,14 +884,13 @@ The next upload iteration will be delayed.`);
902884
this.iterateListeners((cb) => cb.statusUpdated?.(options));
903885
}
904886

905-
private async delayRetry(signal?: AbortSignal): Promise<void> {
887+
private async delayRetry(signal?: AbortSignal, delay = this.options.retryDelayMs): Promise<void> {
906888
return new Promise((resolve) => {
907889
if (signal?.aborted) {
908890
// If the signal is already aborted, resolve immediately
909891
resolve();
910892
return;
911893
}
912-
const { retryDelayMs } = this.options;
913894

914895
let timeoutId: ReturnType<typeof setTimeout> | undefined;
915896

@@ -923,7 +904,7 @@ The next upload iteration will be delayed.`);
923904
};
924905

925906
signal?.addEventListener('abort', endDelay, { once: true });
926-
timeoutId = setTimeout(endDelay, retryDelayMs);
907+
timeoutId = setTimeout(endDelay, delay);
927908
});
928909
}
929910

packages/common/src/utils/async.ts

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,3 @@ export function throttleTrailing(func: () => void, wait: number) {
1818
}
1919
};
2020
}
21-
22-
/**
23-
* Throttle a function to be called at most once every "wait" milliseconds,
24-
* on the leading and trailing edge.
25-
*
26-
* Roughly equivalent to lodash/throttle with {leading: true, trailing: true}
27-
*/
28-
export function throttleLeadingTrailing(func: () => void, wait: number) {
29-
let timeoutId: ReturnType<typeof setTimeout> | null = null;
30-
let lastCallTime: number = 0;
31-
32-
const invokeFunction = () => {
33-
func();
34-
lastCallTime = Date.now();
35-
timeoutId = null;
36-
};
37-
38-
return function () {
39-
const now = Date.now();
40-
const timeToWait = wait - (now - lastCallTime);
41-
42-
if (timeToWait <= 0) {
43-
// Leading edge: Call the function immediately if enough time has passed
44-
invokeFunction();
45-
} else if (!timeoutId) {
46-
// Set a timeout for the trailing edge if not already set
47-
timeoutId = setTimeout(invokeFunction, timeToWait);
48-
}
49-
};
50-
}

packages/common/src/utils/stream_transform.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,50 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
121121
};
122122
}
123123

124+
export interface NotifyIterator extends SimpleAsyncIterator<undefined> {
125+
notify(): void;
126+
}
127+
128+
/**
129+
* An iterator dispatching notifications without associated data.
130+
*/
131+
export function notifyIterator(): NotifyIterator {
132+
// Possible states:
133+
// - Idle (null), no outstanding listener and no outstanding notification.
134+
// - Waiting (function), has an outstanding listener and no notification.
135+
// - Pending (true), no outstanding listener but an undelivered notification.
136+
let state: null | ((complete: IteratorResult<undefined, any>) => void) | true = null;
137+
const item = valueResult(undefined);
138+
139+
return {
140+
notify() {
141+
if (typeof state == 'function') {
142+
// waiting -> idle
143+
state(item);
144+
state = null;
145+
} else {
146+
state = true; // idle -> pending or pending -> pending.
147+
}
148+
},
149+
next() {
150+
return new Promise((resolve) => {
151+
if (typeof state == 'function') {
152+
throw new Error('concurrent call to next');
153+
}
154+
155+
if (state != null) {
156+
// pending -> idle
157+
resolve(item);
158+
state = null;
159+
} else {
160+
// idle -> waiting
161+
state = resolve;
162+
}
163+
});
164+
}
165+
};
166+
}
167+
124168
/**
125169
* Splits a byte stream at line endings, emitting each line as a string.
126170
*/

packages/common/tests/utils/stream_transform.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
extractBsonObjects,
55
extractJsonLines,
66
injectable,
7+
notifyIterator,
78
SimpleAsyncIterator,
89
valueResult
910
} from '../../src/utils/stream_transform';
@@ -120,6 +121,48 @@ describe('injectable', () => {
120121
});
121122
});
122123

124+
describe('notifyIterator', () => {
125+
test('waits for event', async () => {
126+
const iterator = notifyIterator();
127+
let didReceiveEvent = false;
128+
iterator.next().then((e) => {
129+
expect(e.done).toStrictEqual(false);
130+
didReceiveEvent = true;
131+
});
132+
133+
await Promise.resolve();
134+
expect(didReceiveEvent).toBeFalsy();
135+
iterator.notify();
136+
137+
await Promise.resolve();
138+
expect(didReceiveEvent).toBeTruthy();
139+
});
140+
141+
test('merges events', async () => {
142+
const iterator = notifyIterator();
143+
for (let i = 0; i < 1000; i++) {
144+
iterator.notify();
145+
}
146+
147+
let didReceiveEvent = false;
148+
iterator.next().then((e) => {
149+
expect(e.done).toStrictEqual(false);
150+
didReceiveEvent = true;
151+
});
152+
await Promise.resolve();
153+
expect(didReceiveEvent).toBeTruthy();
154+
155+
// The first iterator.next() should have consumed everything.
156+
didReceiveEvent = false;
157+
iterator.next().then((e) => {
158+
expect(e.done).toStrictEqual(false);
159+
didReceiveEvent = true;
160+
});
161+
await Promise.resolve();
162+
expect(didReceiveEvent).toBeFalsy();
163+
});
164+
});
165+
123166
describe('bson objects', () => {
124167
test('empty stream', async () => {
125168
async function* source() {}

packages/node/tests/sync.test.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,28 @@ function defineSyncTests(bson: boolean) {
544544
expect(rows).toStrictEqual([{ name: 'from server' }]);
545545
});
546546

547+
mockSyncServiceTest('should upload on start of iteration', async ({ syncService }) => {
548+
let database = await syncService.createDatabase();
549+
await database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['local write']);
550+
551+
syncService.installRequestInterceptor(async (request) => {
552+
if (request.url.indexOf('/sync/stream') != -1) {
553+
throw new Error('Pretend that the service is unavailable');
554+
}
555+
});
556+
557+
const connector = new TestConnector();
558+
database.connect(connector, { ...options, retryDelayMs: 10_000, crudUploadThrottleMs: 100 });
559+
await database.waitForStatus((s) => s.dataFlowStatus.downloadError != null);
560+
561+
// We'll never connect due to the error, but we should still try to upload once.
562+
expect(connector.uploadDataInvocations).toStrictEqual(1);
563+
564+
// And even though we're still not connected, we should attempt uploads on crud changes.
565+
await database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['second local write']);
566+
await vi.waitFor(() => expect(connector.uploadDataInvocations).toStrictEqual(2));
567+
});
568+
547569
mockSyncServiceTest('handles uploads across checkpoints', async ({ syncService }) => {
548570
const logger = createLogger('test', { logLevel: Logger.TRACE });
549571
const logMessages: string[] = [];

0 commit comments

Comments
 (0)