Skip to content

Commit d446b08

Browse files
committed
AI feedback
1 parent 46b3e4d commit d446b08

5 files changed

Lines changed: 117 additions & 96 deletions

File tree

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ import {
1616
doneResult,
1717
injectable,
1818
InjectableIterator,
19-
notifyIterator,
2019
SimpleAsyncIterator,
2120
valueResult
2221
} from '../../../utils/stream_transform.js';
22+
import { asyncNotifier } from '../../../utils/async.js';
2323
import { StreamingSyncRequestParameterType } from './JsonValue.js';
2424

2525
export enum LockType {
@@ -219,13 +219,12 @@ export abstract class AbstractStreamingSyncImplementation
219219
protected logger: ILogger;
220220
private activeStreams: SubscribedStream[];
221221
private connectionMayHaveChanged = false;
222-
private crudUploadNotifier = notifyIterator();
222+
private crudUploadNotifier = asyncNotifier();
223223

224224
private notifyCompletedUploads?: () => void;
225225
private handleActiveStreamsChange?: () => void;
226226

227227
syncStatus: SyncStatus;
228-
triggerCrudUpload: () => void;
229228

230229
constructor(options: AbstractStreamingSyncImplementationOptions) {
231230
super();
@@ -243,8 +242,10 @@ export abstract class AbstractStreamingSyncImplementation
243242
}
244243
});
245244
this.abortController = null;
245+
}
246246

247-
this.triggerCrudUpload = () => this.crudUploadNotifier.notify();
247+
triggerCrudUpload() {
248+
this.crudUploadNotifier.notify();
248249
}
249250

250251
async waitForReady() {}
@@ -321,7 +322,7 @@ export abstract class AbstractStreamingSyncImplementation
321322
this.delayRetry(signal, this.options.crudUploadThrottleMs!)
322323
]);
323324

324-
await this.crudUploadNotifier.next();
325+
await this.crudUploadNotifier.waitForNotification(signal);
325326
}
326327
}
327328

@@ -483,7 +484,7 @@ The next upload iteration will be delayed.`);
483484
});
484485
});
485486

486-
this.crudUploadLoop(signal);
487+
this.crudUploadLoop(signal).catch((ex) => this.logger.error('Error in crud upload loop', ex));
487488

488489
/**
489490
* This loops runs until [retry] is false or the abort signal is set to aborted.

packages/common/src/utils/async.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,60 @@ export function throttleTrailing(func: () => void, wait: number) {
1818
}
1919
};
2020
}
21+
22+
export interface AsyncNotifier {
23+
/**
24+
* @param signal Also resolve the promise once this signal completes.
25+
* @returns A promise that resolves once {@link notify} is called after this promise was last resolved.
26+
*/
27+
waitForNotification(signal: AbortSignal): Promise<void>;
28+
29+
/**
30+
* Notifies any pending listener, or makes the next {@link waitForNotification} complete immediately if no listener
31+
* is currently active.
32+
*/
33+
notify(): void;
34+
}
35+
36+
export function asyncNotifier(): AsyncNotifier {
37+
let waitingConsumers: (() => void)[] = [];
38+
let hasPendingNotification = false;
39+
40+
return {
41+
notify() {
42+
if (waitingConsumers.length > 0) {
43+
waitingConsumers.splice(0, 1)[0]();
44+
} else {
45+
hasPendingNotification = true;
46+
}
47+
},
48+
waitForNotification(signal: AbortSignal) {
49+
return new Promise((resolve) => {
50+
if (hasPendingNotification) {
51+
resolve();
52+
hasPendingNotification = false;
53+
} else {
54+
function complete() {
55+
signal.removeEventListener('abort', onAbort);
56+
resolve();
57+
}
58+
59+
function onAbort() {
60+
const i = waitingConsumers.indexOf(complete);
61+
if (i > -1) {
62+
waitingConsumers.splice(i, 1);
63+
}
64+
resolve();
65+
}
66+
67+
waitingConsumers.push(complete);
68+
if (signal.aborted) {
69+
onAbort();
70+
} else {
71+
signal.addEventListener('abort', onAbort);
72+
}
73+
}
74+
});
75+
}
76+
};
77+
}

packages/common/src/utils/stream_transform.ts

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -121,53 +121,6 @@ export function injectable<T>(source: SimpleAsyncIterator<T>): InjectableIterato
121121
};
122122
}
123123

124-
export interface NotifyIterator extends SimpleAsyncIterator<undefined> {
125-
notify(): void;
126-
}
127-
128-
/**
129-
* An iterator dispatching notifications without associated data.
130-
*
131-
* Backpressure is handled by folding calls to `notify()`: The iterator emits an item once there's been at least one
132-
* `notify()` call since the last completed call to `next()`.
133-
*/
134-
export function notifyIterator(): NotifyIterator {
135-
// Possible states:
136-
// - Idle (null), no outstanding listener and no outstanding notification.
137-
// - Waiting (function), has an outstanding listener and no notification.
138-
// - Pending (true), no outstanding listener but an undelivered notification.
139-
let state: null | ((complete: IteratorResult<undefined, any>) => void) | true = null;
140-
const item = valueResult(undefined);
141-
142-
return {
143-
notify() {
144-
if (typeof state == 'function') {
145-
// waiting -> idle
146-
state(item);
147-
state = null;
148-
} else {
149-
state = true; // idle -> pending or pending -> pending.
150-
}
151-
},
152-
next() {
153-
return new Promise((resolve) => {
154-
if (typeof state == 'function') {
155-
throw new Error('concurrent call to next');
156-
}
157-
158-
if (state != null) {
159-
// pending -> idle
160-
resolve(item);
161-
state = null;
162-
} else {
163-
// idle -> waiting
164-
state = resolve;
165-
}
166-
});
167-
}
168-
};
169-
}
170-
171124
/**
172125
* Splits a byte stream at line endings, emitting each line as a string.
173126
*/
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { describe, expect, test } from 'vitest';
2+
import { asyncNotifier } from '../../src/utils/async';
3+
4+
describe('notifyIterator', () => {
5+
const neverAbort = new AbortController().signal;
6+
7+
test('waits for event', async () => {
8+
const notifier = asyncNotifier();
9+
let didReceiveEvent = false;
10+
notifier.waitForNotification(neverAbort).then((e) => {
11+
didReceiveEvent = true;
12+
});
13+
14+
await Promise.resolve();
15+
expect(didReceiveEvent).toBeFalsy();
16+
notifier.notify();
17+
18+
await Promise.resolve();
19+
expect(didReceiveEvent).toBeTruthy();
20+
});
21+
22+
test('merges events', async () => {
23+
const notifier = asyncNotifier();
24+
for (let i = 0; i < 1000; i++) {
25+
notifier.notify();
26+
}
27+
28+
let didReceiveEvent = false;
29+
notifier.waitForNotification(neverAbort).then((e) => {
30+
didReceiveEvent = true;
31+
});
32+
await Promise.resolve();
33+
expect(didReceiveEvent).toBeTruthy();
34+
35+
// The first iterator.next() should have consumed everything.
36+
didReceiveEvent = false;
37+
notifier.waitForNotification(neverAbort).then((e) => {
38+
didReceiveEvent = true;
39+
});
40+
await Promise.resolve();
41+
expect(didReceiveEvent).toBeFalsy();
42+
});
43+
44+
test('completes on abort', async () => {
45+
const notifier = asyncNotifier();
46+
await notifier.waitForNotification(AbortSignal.abort());
47+
});
48+
49+
test('completes on abort later', async () => {
50+
const notifier = asyncNotifier();
51+
await notifier.waitForNotification(AbortSignal.timeout(10));
52+
});
53+
});

packages/common/tests/utils/stream_transform.test.ts

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
extractBsonObjects,
55
extractJsonLines,
66
injectable,
7-
notifyIterator,
87
SimpleAsyncIterator,
98
valueResult
109
} from '../../src/utils/stream_transform';
@@ -121,48 +120,6 @@ describe('injectable', () => {
121120
});
122121
});
123122

124-
describe('notifyIterator', () => {
125-
test('waits for event', async () => {
126-
const iterator = notifyIterator();
127-
let didReceiveEvent = false;
128-
iterator.next().then((e) => {
129-
expect(e.done).toStrictEqual(false);
130-
didReceiveEvent = true;
131-
});
132-
133-
await Promise.resolve();
134-
expect(didReceiveEvent).toBeFalsy();
135-
iterator.notify();
136-
137-
await Promise.resolve();
138-
expect(didReceiveEvent).toBeTruthy();
139-
});
140-
141-
test('merges events', async () => {
142-
const iterator = notifyIterator();
143-
for (let i = 0; i < 1000; i++) {
144-
iterator.notify();
145-
}
146-
147-
let didReceiveEvent = false;
148-
iterator.next().then((e) => {
149-
expect(e.done).toStrictEqual(false);
150-
didReceiveEvent = true;
151-
});
152-
await Promise.resolve();
153-
expect(didReceiveEvent).toBeTruthy();
154-
155-
// The first iterator.next() should have consumed everything.
156-
didReceiveEvent = false;
157-
iterator.next().then((e) => {
158-
expect(e.done).toStrictEqual(false);
159-
didReceiveEvent = true;
160-
});
161-
await Promise.resolve();
162-
expect(didReceiveEvent).toBeFalsy();
163-
});
164-
});
165-
166123
describe('bson objects', () => {
167124
test('empty stream', async () => {
168125
async function* source() {}

0 commit comments

Comments
 (0)