-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathpromisebuffer.ts
More file actions
96 lines (83 loc) · 3.1 KB
/
Copy pathpromisebuffer.ts
File metadata and controls
96 lines (83 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import { rejectedSyncPromise, resolvedSyncPromise } from './syncpromise';
import { safeUnref } from './timer';
export interface PromiseBuffer<T> {
// exposes the internal array so tests can assert on the state of it.
// XXX: this really should not be public api.
$: PromiseLike<T>[];
add(taskProducer: () => PromiseLike<T>): PromiseLike<T>;
drain(timeout?: number): PromiseLike<boolean>;
}
export const SENTRY_BUFFER_FULL_ERROR = Symbol.for('SentryBufferFullError');
/**
* Creates an new PromiseBuffer object with the specified limit
* @param limit max number of promises that can be stored in the buffer
*/
export function makePromiseBuffer<T>(limit: number = 100): PromiseBuffer<T> {
const buffer: Set<PromiseLike<T>> = new Set();
function isReady(): boolean {
return buffer.size < limit;
}
/**
* Remove a promise from the queue.
*
* @param task Can be any PromiseLike<T>
* @returns Removed promise.
*/
function remove(task: PromiseLike<T>): void {
buffer.delete(task);
}
/**
* Add a promise (representing an in-flight action) to the queue, and set it to remove itself on fulfillment.
*
* @param taskProducer A function producing any PromiseLike<T>; In previous versions this used to be `task:
* PromiseLike<T>`, but under that model, Promises were instantly created on the call-site and their executor
* functions therefore ran immediately. Thus, even if the buffer was full, the action still happened. By
* requiring the promise to be wrapped in a function, we can defer promise creation until after the buffer
* limit check.
* @returns The original promise.
*/
function add(taskProducer: () => PromiseLike<T>): PromiseLike<T> {
if (!isReady()) {
return rejectedSyncPromise(SENTRY_BUFFER_FULL_ERROR);
}
// start the task and add its promise to the queue
const task = taskProducer();
buffer.add(task);
void task.then(
() => remove(task),
() => remove(task),
);
return task;
}
/**
* Wait for all promises in the queue to resolve or for timeout to expire, whichever comes first.
*
* @param timeout The time, in ms, after which to resolve to `false` if the queue is still non-empty. Passing `0` (or
* not passing anything) will make the promise wait as long as it takes for the queue to drain before resolving to
* `true`.
* @returns A promise which will resolve to `true` if the queue is already empty or drains before the timeout, and
* `false` otherwise
*/
function drain(timeout?: number): PromiseLike<boolean> {
if (!buffer.size) {
return resolvedSyncPromise(true);
}
// We want to resolve even if one of the promises rejects
const drainPromise = Promise.allSettled(Array.from(buffer)).then(() => true);
if (!timeout) {
return drainPromise;
}
const promises = [
drainPromise,
new Promise<boolean>(resolve => safeUnref(setTimeout(() => resolve(false), timeout))),
];
return Promise.race(promises);
}
return {
get $(): PromiseLike<T>[] {
return Array.from(buffer);
},
add,
drain,
};
}