Skip to content

Commit 5ef22b1

Browse files
simplify logic
1 parent c4eccb8 commit 5ef22b1

5 files changed

Lines changed: 47 additions & 41 deletions

File tree

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -462,8 +462,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
462462
protected abstract runExclusive<T>(callback: () => Promise<T>): Promise<T>;
463463

464464
protected async connectInternal() {
465-
await this.disconnectInternal();
466-
467465
let appliedOptions: PowerSyncConnectionOptions | null = null;
468466

469467
/**
@@ -521,42 +519,25 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
521519
* Connects to stream of events from the PowerSync instance.
522520
*/
523521
async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
524-
await this.waitForReady();
525-
526-
// A pending connection should be present if this is true
527-
// The options also have not been used yet if this is true.
528-
// We can update this referrence in order to update the next connection attempt.
529-
const hadPendingConnectionOptions = !!this.pendingConnectionOptions;
530-
531522
// This overrides options if present.
532523
this.pendingConnectionOptions = {
533524
connector,
534525
options: options ?? {}
535526
};
536527

537-
if (hadPendingConnectionOptions) {
538-
// A connection attempt is already queued, but it hasn't used the options yet.
539-
// The params have been updated and will be used when connecting.
540-
if (!this.connectingPromise) {
541-
throw new Error(`Pending connection options were found without a pending connect operation`);
542-
}
543-
return await this.connectingPromise;
544-
} else if (this.connectingPromise) {
545-
// If we didn't have pending options, we are busy with a connect.
546-
// i.e. the pending connect used the options already and is busy proceeding.
547-
// The call which creates the connectingPromise should check if there are pendingConnectionOptions and automatically
548-
// schedule a connect. See below:
549-
} else {
550-
// No pending options or pending operation. Start one
551-
this.connectingPromise = this.connectInternal().finally(() => {
552-
if (this.pendingConnectionOptions) {
553-
return this.connectInternal();
554-
}
528+
await this.waitForReady();
529+
await this.disconnectInternal();
530+
531+
const chain = (result) => {
532+
if (this.pendingConnectionOptions) {
533+
return this.connectInternal().then(chain);
534+
} else {
555535
this.connectingPromise = null;
556-
return;
557-
});
558-
return await this.connectingPromise;
559-
}
536+
return result;
537+
}
538+
};
539+
540+
return this.connectingPromise ?? this.connectInternal().then(chain);
560541
}
561542

562543
/**
@@ -569,7 +550,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
569550
// A disconnect is already in progress
570551
return await this.disconnectingPromise;
571552
}
572-
573553
// Wait if a sync stream implementation is being created before closing it
574554
// (it must be assigned before we can properly dispose it)
575555
await this.syncStreamInitPromise;

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import Logger, { ILogger } from 'js-logger';
22

3+
import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js';
4+
import { DataStream } from 'src/utils/DataStream.js';
35
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
46
import { AbortOperation } from '../../../utils/AbortOperation.js';
57
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
68
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
79
import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
810
import { CrudEntry } from '../bucket/CrudEntry.js';
911
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
10-
import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js';
12+
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
1113
import {
1214
BucketRequest,
1315
StreamingSyncLine,
@@ -19,8 +21,6 @@ import {
1921
isStreamingSyncCheckpointPartiallyComplete,
2022
isStreamingSyncData
2123
} from './streaming-sync-types.js';
22-
import { DataStream } from 'src/utils/DataStream.js';
23-
import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js';
2424

2525
export enum LockType {
2626
CRUD = 'crud',
@@ -341,12 +341,22 @@ The next upload iteration will be delayed.`);
341341
await this.disconnect();
342342
}
343343

344-
this.abortController = new AbortController();
344+
const controller = new AbortController();
345+
this.abortController = controller;
345346
this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options);
346347

347348
// Return a promise that resolves when the connection status is updated
348349
return new Promise<void>((resolve) => {
349-
const l = this.registerListener({
350+
let disposer: () => void;
351+
352+
const complete = () => {
353+
disposer?.();
354+
resolve();
355+
};
356+
357+
controller.signal.addEventListener('abort', complete, { once: true });
358+
359+
disposer = this.registerListener({
350360
statusUpdated: (update) => {
351361
// This is triggered as soon as a connection is read from
352362
if (typeof update.connected == 'undefined') {
@@ -361,8 +371,8 @@ The next upload iteration will be delayed.`);
361371
this.logger.warn('Initial connect attempt did not successfully connect to server');
362372
}
363373

364-
resolve();
365-
l();
374+
controller.signal.removeEventListener('abort', complete);
375+
complete();
366376
}
367377
});
368378
});

packages/web/tests/stream.test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,20 @@ function describeStreamingTests(createConnectedDatabase: () => Promise<Connected
182182
// In this case it should most likely be 1 attempt since all the calls
183183
// are in the same for loop
184184
expect(spy.mock.calls.length).lessThan(connectionAttempts);
185+
186+
// Now with random delays
187+
for (let i = connectionAttempts; i >= 0; i--) {
188+
await new Promise((r) => setTimeout(r, Math.random() * 100));
189+
powersync.connect(new TestConnector(), { params: { count: i } });
190+
}
191+
192+
await vi.waitFor(
193+
() => {
194+
const call = spy.mock.lastCall![1] as PowerSyncConnectionOptions;
195+
expect(call.params!['count']).eq(0);
196+
},
197+
{ timeout: 2000, interval: 100 }
198+
);
185199
});
186200

187201
it('Should trigger upload connector when connected', async () => {

packages/web/tests/utils/MockStreamOpenFactory.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ export class MockRemote extends AbstractRemote {
8181
if (this.errorOnStreamStart) {
8282
controller.error(new Error('Mock error on stream start'));
8383
}
84-
84+
if (signal?.aborted) {
85+
controller.close();
86+
}
8587
signal?.addEventListener('abort', () => {
8688
try {
8789
controller.close();

packages/web/vitest.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ const config: UserConfigExport = {
5050
*/
5151
isolate: true,
5252
provider: 'playwright',
53-
headless: true,
53+
headless: false,
5454
instances: [
5555
{
5656
browser: 'chromium'

0 commit comments

Comments
 (0)