Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/warm-dots-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Attempt a CRUD upload everytime `connect()` is called, even if we're unable to connect.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import Logger, { ILogger } from 'js-logger';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js';
import { throttleLeadingTrailing } from '../../../utils/async.js';
import { BucketStorageAdapter, PowerSyncControlCommand } from '../bucket/BucketStorageAdapter.js';
import { CrudEntry } from '../bucket/CrudEntry.js';
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
Expand All @@ -17,10 +16,10 @@ import {
doneResult,
injectable,
InjectableIterator,
map,
SimpleAsyncIterator,
valueResult
} from '../../../utils/stream_transform.js';
import { asyncNotifier } from '../../../utils/async.js';
import { StreamingSyncRequestParameterType } from './JsonValue.js';

export enum LockType {
Expand Down Expand Up @@ -209,33 +208,23 @@ export type SubscribedStream = {
params: Record<string, any> | null;
};

// The priority we assume when we receive checkpoint lines where no priority is set.
// This is the default priority used by the sync service, but can be set to an arbitrary
// value since sync services without priorities also won't send partial sync completion
// messages.
const FALLBACK_PRIORITY = 3;

export abstract class AbstractStreamingSyncImplementation
extends BaseObserver<StreamingSyncImplementationListener>
implements StreamingSyncImplementation
{
protected options: AbstractStreamingSyncImplementationOptions;
protected abortController: AbortController | null;
// In rare cases, mostly for tests, uploads can be triggered without being properly connected.
// This allows ensuring that all upload processes can be aborted.
protected uploadAbortController: AbortController | undefined;
protected crudUpdateListener?: () => void;
protected streamingSyncPromise?: Promise<void>;
protected streamingSyncPromise?: Promise<[void, void]>;
protected logger: ILogger;
private activeStreams: SubscribedStream[];
private connectionMayHaveChanged = false;
private crudUploadNotifier = asyncNotifier();

private isUploadingCrud: boolean = false;
private notifyCompletedUploads?: () => void;
private handleActiveStreamsChange?: () => void;

syncStatus: SyncStatus;
triggerCrudUpload: () => void;

constructor(options: AbstractStreamingSyncImplementationOptions) {
super();
Expand All @@ -253,18 +242,10 @@ export abstract class AbstractStreamingSyncImplementation
}
});
this.abortController = null;
}

this.triggerCrudUpload = throttleLeadingTrailing(() => {
if (!this.syncStatus.connected || this.isUploadingCrud) {
return;
}

this.isUploadingCrud = true;
this._uploadAllCrud().finally(() => {
this.notifyCompletedUploads?.();
this.isUploadingCrud = false;
});
}, this.options.crudUploadThrottleMs!);
triggerCrudUpload() {
this.crudUploadNotifier.notify();
}

async waitForReady() {}
Expand Down Expand Up @@ -320,7 +301,6 @@ export abstract class AbstractStreamingSyncImplementation
super.dispose();
this.crudUpdateListener?.();
this.crudUpdateListener = undefined;
this.uploadAbortController?.abort();
}

abstract obtainLock<T>(lockOptions: LockOptions<T>): Promise<T>;
Expand All @@ -334,7 +314,19 @@ export abstract class AbstractStreamingSyncImplementation
return checkpoint;
}

protected async _uploadAllCrud(): Promise<void> {
private async crudUploadLoop(signal: AbortSignal): Promise<void> {
while (!signal.aborted) {
await Promise.all([
// Start the initial CRUD upload on connect. Then, keep polling until we're done.
this._uploadAllCrud(signal),
this.delayRetry(signal, this.options.crudUploadThrottleMs!)
]);

await this.crudUploadNotifier.waitForNotification(signal);
}
}

private async _uploadAllCrud(signal: AbortSignal): Promise<void> {
return this.obtainLock({
type: LockType.CRUD,
callback: async () => {
Expand All @@ -343,17 +335,7 @@ export abstract class AbstractStreamingSyncImplementation
*/
let checkedCrudItem: CrudEntry | undefined;

const controller = new AbortController();
this.uploadAbortController = controller;
this.abortController?.signal.addEventListener(
'abort',
() => {
controller.abort();
},
{ once: true }
);

while (!controller.signal.aborted) {
while (!signal.aborted) {
try {
/**
* This is the first item in the FIFO CRUD queue.
Expand Down Expand Up @@ -384,7 +366,9 @@ The next upload iteration will be delayed.`);
} else {
// Uploading is completed
const neededUpdate = await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
if (neededUpdate == false && checkedCrudItem != null) {
if (neededUpdate) {
this.notifyCompletedUploads?.();
} else if (checkedCrudItem != null) {
// Only log this if there was something to upload
this.logger.debug('Upload complete, no write checkpoint needed.');
}
Expand All @@ -398,7 +382,7 @@ The next upload iteration will be delayed.`);
uploadError: ex as Error
}
});
await this.delayRetry(controller.signal);
await this.delayRetry(signal);
if (!this.isConnected) {
// Exit the upload loop if the sync stream is no longer connected
break;
Expand All @@ -414,7 +398,6 @@ The next upload iteration will be delayed.`);
});
}
}
this.uploadAbortController = undefined;
}
});
}
Expand All @@ -426,7 +409,10 @@ The next upload iteration will be delayed.`);

const controller = new AbortController();
this.abortController = controller;
this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options);
this.streamingSyncPromise = Promise.all([
this.crudUploadLoop(controller.signal).catch((ex) => this.logger.error('Error in crud upload loop', ex)),
this.streamingSync(controller.signal, options)
]);

// Return a promise that resolves when the connection status is updated to indicate that we're connected.
return new Promise<void>((resolve) => {
Expand Down Expand Up @@ -469,15 +455,7 @@ The next upload iteration will be delayed.`);
this.updateSyncStatus({ connected: false, connecting: false });
}

/**
* @deprecated use [connect instead]
*/
async streamingSync(signal?: AbortSignal, options?: PowerSyncConnectionOptions): Promise<void> {
if (!signal) {
this.abortController = new AbortController();
signal = this.abortController.signal;
}

private async streamingSync(signal: AbortSignal, options?: PowerSyncConnectionOptions): Promise<void> {
/**
* Listen for CRUD updates and trigger upstream uploads
*/
Expand Down Expand Up @@ -902,14 +880,13 @@ The next upload iteration will be delayed.`);
this.iterateListeners((cb) => cb.statusUpdated?.(options));
}

private async delayRetry(signal?: AbortSignal): Promise<void> {
private async delayRetry(signal?: AbortSignal, delay = this.options.retryDelayMs): Promise<void> {
return new Promise((resolve) => {
if (signal?.aborted) {
// If the signal is already aborted, resolve immediately
resolve();
return;
}
const { retryDelayMs } = this.options;

let timeoutId: ReturnType<typeof setTimeout> | undefined;

Expand All @@ -923,7 +900,7 @@ The next upload iteration will be delayed.`);
};

signal?.addEventListener('abort', endDelay, { once: true });
timeoutId = setTimeout(endDelay, retryDelayMs);
timeoutId = setTimeout(endDelay, delay);
});
}

Expand Down
75 changes: 51 additions & 24 deletions packages/common/src/utils/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,59 @@ export function throttleTrailing(func: () => void, wait: number) {
};
}

/**
* Throttle a function to be called at most once every "wait" milliseconds,
* on the leading and trailing edge.
*
* Roughly equivalent to lodash/throttle with {leading: true, trailing: true}
*/
export function throttleLeadingTrailing(func: () => void, wait: number) {
let timeoutId: ReturnType<typeof setTimeout> | null = null;
let lastCallTime: number = 0;
export interface AsyncNotifier {
/**
* @param signal Also resolve the promise once this signal completes.
* @returns A promise that resolves once {@link notify} is called after this promise was last resolved.
*/
waitForNotification(signal: AbortSignal): Promise<void>;

const invokeFunction = () => {
func();
lastCallTime = Date.now();
timeoutId = null;
};
/**
* Notifies a pending listener, or makes the next {@link waitForNotification} complete immediately if no listener
* is currently active.
*/
notify(): void;
}

return function () {
const now = Date.now();
const timeToWait = wait - (now - lastCallTime);

if (timeToWait <= 0) {
// Leading edge: Call the function immediately if enough time has passed
invokeFunction();
} else if (!timeoutId) {
// Set a timeout for the trailing edge if not already set
timeoutId = setTimeout(invokeFunction, timeToWait);
export function asyncNotifier(): AsyncNotifier {
let waitingConsumer: (() => void) | null = null;
let hasPendingNotification = false;

return {
notify() {
if (waitingConsumer != null) {
waitingConsumer();
waitingConsumer = null;
} else {
hasPendingNotification = true;
}
},
waitForNotification(signal: AbortSignal) {
return new Promise((resolve) => {
if (waitingConsumer != null) {
throw new Error('Illegal call to waitForNotification, already has a waiter.');
}

if (signal.aborted) {
resolve();
} else if (hasPendingNotification) {
resolve();
hasPendingNotification = false;
} else {
function complete() {
signal.removeEventListener('abort', onAbort);
resolve();
}

function onAbort() {
waitingConsumer = null;
resolve();
}

waitingConsumer = complete;
signal.addEventListener('abort', onAbort);
}
});
}
};
}
53 changes: 53 additions & 0 deletions packages/common/tests/utils/async.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { describe, expect, test } from 'vitest';
import { asyncNotifier } from '../../src/utils/async';

describe('asyncNotifier', () => {
const neverAbort = new AbortController().signal;

test('waits for event', async () => {
const notifier = asyncNotifier();
let didReceiveEvent = false;
notifier.waitForNotification(neverAbort).then((e) => {
didReceiveEvent = true;
});

await Promise.resolve();
expect(didReceiveEvent).toBeFalsy();
notifier.notify();

await Promise.resolve();
expect(didReceiveEvent).toBeTruthy();
});

test('merges events', async () => {
const notifier = asyncNotifier();
for (let i = 0; i < 1000; i++) {
notifier.notify();
}

let didReceiveEvent = false;
notifier.waitForNotification(neverAbort).then((e) => {
didReceiveEvent = true;
});
await Promise.resolve();
expect(didReceiveEvent).toBeTruthy();

// The first iterator.next() should have consumed everything.
didReceiveEvent = false;
notifier.waitForNotification(neverAbort).then((e) => {
didReceiveEvent = true;
});
await Promise.resolve();
expect(didReceiveEvent).toBeFalsy();
});

test('completes on abort', async () => {
const notifier = asyncNotifier();
await notifier.waitForNotification(AbortSignal.abort());
});

test('completes on abort later', async () => {
const notifier = asyncNotifier();
await notifier.waitForNotification(AbortSignal.timeout(10));
});
});
Loading
Loading