Skip to content

Commit 6bbaefe

Browse files
committed
Don't support concurrent waiters
1 parent f898a0b commit 6bbaefe

2 files changed

Lines changed: 16 additions & 13 deletions

File tree

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ export abstract class AbstractStreamingSyncImplementation
215215
protected options: AbstractStreamingSyncImplementationOptions;
216216
protected abortController: AbortController | null;
217217
protected crudUpdateListener?: () => void;
218-
protected streamingSyncPromise?: Promise<void>;
218+
protected streamingSyncPromise?: Promise<[void, void]>;
219219
protected logger: ILogger;
220220
private activeStreams: SubscribedStream[];
221221
private connectionMayHaveChanged = false;
@@ -409,7 +409,10 @@ The next upload iteration will be delayed.`);
409409

410410
const controller = new AbortController();
411411
this.abortController = controller;
412-
this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options);
412+
this.streamingSyncPromise = Promise.all([
413+
this.crudUploadLoop(controller.signal).catch((ex) => this.logger.error('Error in crud upload loop', ex)),
414+
this.streamingSync(controller.signal, options)
415+
]);
413416

414417
// Return a promise that resolves when the connection status is updated to indicate that we're connected.
415418
return new Promise<void>((resolve) => {
@@ -484,8 +487,6 @@ The next upload iteration will be delayed.`);
484487
});
485488
});
486489

487-
this.crudUploadLoop(signal).catch((ex) => this.logger.error('Error in crud upload loop', ex));
488-
489490
/**
490491
* This loops runs until [retry] is false or the abort signal is set to aborted.
491492
* Aborting the nestedAbortController will:

packages/common/src/utils/async.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,31 @@ export interface AsyncNotifier {
2727
waitForNotification(signal: AbortSignal): Promise<void>;
2828

2929
/**
30-
* Notifies any pending listener, or makes the next {@link waitForNotification} complete immediately if no listener
30+
* Notifies a pending listener, or makes the next {@link waitForNotification} complete immediately if no listener
3131
* is currently active.
3232
*/
3333
notify(): void;
3434
}
3535

3636
export function asyncNotifier(): AsyncNotifier {
37-
let waitingConsumers: (() => void)[] = [];
37+
let waitingConsumer: (() => void) | null = null;
3838
let hasPendingNotification = false;
3939

4040
return {
4141
notify() {
42-
if (waitingConsumers.length > 0) {
43-
waitingConsumers.splice(0, 1)[0]();
42+
if (waitingConsumer != null) {
43+
waitingConsumer();
44+
waitingConsumer = null;
4445
} else {
4546
hasPendingNotification = true;
4647
}
4748
},
4849
waitForNotification(signal: AbortSignal) {
4950
return new Promise((resolve) => {
51+
if (waitingConsumer != null) {
52+
throw new Error('Illegal call to waitForNotification, already has a waiter.');
53+
}
54+
5055
if (signal.aborted) {
5156
resolve();
5257
} else if (hasPendingNotification) {
@@ -59,14 +64,11 @@ export function asyncNotifier(): AsyncNotifier {
5964
}
6065

6166
function onAbort() {
62-
const i = waitingConsumers.indexOf(complete);
63-
if (i > -1) {
64-
waitingConsumers.splice(i, 1);
65-
}
67+
waitingConsumer = null;
6668
resolve();
6769
}
6870

69-
waitingConsumers.push(complete);
71+
waitingConsumer = complete;
7072
signal.addEventListener('abort', onAbort);
7173
}
7274
});

0 commit comments

Comments
 (0)