-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Expand file tree
/
Copy pathcreateTaskQueue.mjs
More file actions
60 lines (46 loc) · 1.88 KB
/
createTaskQueue.mjs
File metadata and controls
60 lines (46 loc) · 1.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import withResolvers from './utils/withResolvers.mjs';
export default function createTaskQueue() {
let queueWithCurrent = [];
const queue = {
cancelAll: () => {
queueWithCurrent.forEach(({ cancel }) => cancel());
},
push: fn => {
const cancelWithResolvers = withResolvers();
const resultWithResolvers = withResolvers();
const entry = { promise: resultWithResolvers.promise };
let abort;
const cancel = (entry.cancel = () => {
// Override the "fn" so we don't call the actual "fn" later.
// In this approach, we can reuse the logic inside "start" to handle post-cancellation.
fn = () => ({ result: Promise.reject(new Error('cancelled before start')) });
// Abort the task if it is currently running.
abort && abort();
cancelWithResolvers.reject(new Error('cancelled in the midway'));
});
const start = async () => {
const { abort: abortFn, result } = fn();
abort = abortFn;
try {
// Either wait for the actual result, or the task is being cancelled.
resultWithResolvers.resolve(await Promise.race([result, cancelWithResolvers.promise]));
} catch (error) {
resultWithResolvers.reject(error);
}
queueWithCurrent = queueWithCurrent.filter(e => e !== entry);
};
const lastEntry = queueWithCurrent[queueWithCurrent.length - 1];
const lastPromise = (lastEntry && lastEntry.promise) || Promise.resolve();
queueWithCurrent.push(entry);
// After the last promise resolved/rejected, we will start this task.
// We will start even if the last promise rejected.
lastPromise.then(start, start);
return {
cancel,
result: resultWithResolvers.promise
};
}
};
Object.defineProperty(queue, 'length', { get: () => queueWithCurrent.length });
return queue;
}