Skip to content

Commit eeb4ad8

Browse files
committed
Shared worker: Immediate reconnect on connection interrupt
1 parent 1506543 commit eeb4ad8

5 files changed

Lines changed: 55 additions & 16 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/common': patch
3+
'@powersync/web': patch
4+
---
5+
6+
Using the Rust sync client on the web, immediately reconnect when the underlying SQLite connection is changed.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,7 @@ import {
2828
isStreamingSyncCheckpointPartiallyComplete,
2929
isStreamingSyncData
3030
} from './streaming-sync-types.js';
31-
import {
32-
extractBsonObjects,
33-
extractJsonLines,
34-
injectable,
35-
InjectableIterator,
36-
map,
37-
SimpleAsyncIterator
38-
} from '../../../utils/stream_transform.js';
31+
import { injectable, InjectableIterator, map, SimpleAsyncIterator } from '../../../utils/stream_transform.js';
3932
import type { BSON } from 'bson';
4033

4134
export enum LockType {
@@ -216,6 +209,7 @@ export interface StreamingSyncImplementation
216209
waitForStatus(status: SyncStatusOptions): Promise<void>;
217210
waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise<void>;
218211
updateSubscriptions(subscriptions: SubscribedStream[]): void;
212+
markConnectionMayHaveChanged(): void;
219213
}
220214

221215
export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000;
@@ -263,6 +257,7 @@ export abstract class AbstractStreamingSyncImplementation
263257
protected streamingSyncPromise?: Promise<void>;
264258
protected logger: ILogger;
265259
private activeStreams: SubscribedStream[];
260+
private connectionMayHaveChanged = false;
266261

267262
private isUploadingCrud: boolean = false;
268263
private notifyCompletedUploads?: () => void;
@@ -579,6 +574,10 @@ The next upload iteration will be delayed.`);
579574
this.logger.warn(ex);
580575
shouldDelayRetry = false;
581576
// A disconnect was requested, we should not delay since there is no explicit retry
577+
} else if (this.connectionMayHaveChanged && (ex as Error).message?.indexOf('No iteration is active') >= 0) {
578+
this.connectionMayHaveChanged = false;
579+
this.logger.info('Sync error after changed connection, retrying immediately');
580+
shouldDelayRetry = false;
582581
} else {
583582
this.logger.error(ex);
584583
}
@@ -614,6 +613,17 @@ The next upload iteration will be delayed.`);
614613
this.updateSyncStatus({ connected: false, connecting: false });
615614
}
616615

616+
markConnectionMayHaveChanged() {
617+
// By setting this field, we'll immediately retry if the next sync connection causes an error triggered by us not
618+
// having an active sync iteration on the connection in use.
619+
this.connectionMayHaveChanged = true;
620+
621+
// This triggers a `powersync_control` invocation if a sync iteration is currently active. This is a cheap call to
622+
// make when no subscriptions have actually changed, we're mainly interested in this immediately throwing if no
623+
// iteration is active. That allows us to reconnect ASAP, instead of having to wait for the next sync line.
624+
this.handleActiveStreamsChange?.();
625+
}
626+
617627
private async collectLocalBucketState(): Promise<[BucketRequest[], Map<string, BucketDescription | null>]> {
618628
const bucketEntries = await this.options.adapter.getBucketStates();
619629
const req: BucketRequest[] = bucketEntries.map((entry) => ({
@@ -1041,6 +1051,10 @@ The next upload iteration will be delayed.`);
10411051
rawResponse
10421052
);
10431053

1054+
if (op != PowerSyncControlCommand.STOP) {
1055+
// Evidently we have a working connection here, otherwise powersync_control would have failed.
1056+
syncImplementation.connectionMayHaveChanged = false;
1057+
}
10441058
await handleInstructions(JSON.parse(rawResponse));
10451059
}
10461060

packages/node/tests/sync.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,24 @@ describe('Sync', () => {
117117
expect.objectContaining({ key: 'lists/3/subkey_3' })
118118
]);
119119
});
120+
121+
mockSyncServiceTest('reconnects immediately after changed connection', async ({ syncService }) => {
122+
let database = await syncService.createDatabase();
123+
database.connect(new TestConnector(), {
124+
clientImplementation: SyncClientImplementation.RUST,
125+
connectionMethod: SyncStreamConnectionMethod.HTTP,
126+
// This large retry delay is to provoke test timeouts if the don't immediately reconnect.
127+
retryDelayMs: 60_000
128+
});
129+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
130+
131+
// Replicate what we'd see on the web when switching connections in the shared sync worker: The sync client would
132+
// suddenly see a database without an active sync iteration.
133+
await database.execute('SELECT powersync_control(?, null)', ['stop']);
134+
database.syncStreamImplementation!.markConnectionMayHaveChanged();
135+
await database.waitForStatus((s) => !s.connected);
136+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
137+
});
120138
});
121139

122140
function defineSyncTests(impl: SyncClientImplementation, bson: boolean) {

packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,9 @@ export class SSRStreamingSyncImplementation extends BaseObserver implements Stre
8585
* No-op in SSR mode.
8686
*/
8787
updateSubscriptions(): void {}
88+
89+
/**
90+
* No-op in SSR mode.
91+
*/
92+
markConnectionMayHaveChanged(): void {}
8893
}

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -618,14 +618,10 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
618618
const impl = sharedSync.connectionManager.syncStreamImplementation! as WebStreamingSyncImplementation;
619619
impl?.triggerCrudUpload();
620620

621-
/**
622-
* FIXME or IMPROVE ME
623-
* The Rust client implementation stores sync state on the connection level.
624-
* Reopening the database causes a state machine error which should cause the
625-
* StreamingSyncImplementation to reconnect. It would be nicer if we could trigger
626-
* this reconnect earlier.
627-
* This reconnect is not required for IndexedDB.
628-
*/
621+
// The Rust client implementation stores sync state on the connection level. Reopening the database causes a
622+
// disruption of the connection state and forces us to reconnect. We want to do that as soon as possible to
623+
// minimize downtime.
624+
impl?.markConnectionMayHaveChanged();
629625
}
630626
}
631627

0 commit comments

Comments
 (0)