-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAsyncWorker.ts
More file actions
149 lines (140 loc) · 5.44 KB
/
Copy pathAsyncWorker.ts
File metadata and controls
149 lines (140 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import { Queue } from "../misc/Queue.js";
import { nextWorkItemId } from "../misc/nextWorkItemId.js";
import type { IWorker, QueueingOptions, RejectFn, WorkerTasks, WorkItemData } from "../types.js";
import { InternalSharedWorker } from "./InternalSharedWorker.js";
import { InternalWorker } from "./InternalWorker.js";
import { WorkItem } from "./WorkItem.js";
import { WorkItemInternal } from "./WorkItemInternal.js";
/**
* Determines if a worker-like object is a SharedWorker using duck typing.
* @param worker The worker object to test
* @returns true if the worker appears to be a SharedWorker, false otherwise
*/
function isSharedWorker(worker: any): worker is SharedWorker {
// SharedWorker has a 'port' property (MessagePort) with the required methods
return worker &&
typeof worker === 'object' &&
'port' in worker &&
worker.port &&
typeof worker.port === 'object' &&
typeof worker.port.postMessage === 'function' &&
typeof worker.port.addEventListener === 'function' &&
typeof worker.port.removeEventListener === 'function';
}
export type EnqueueFn<Fn extends ((...args: any[]) => any) = (() => any)> =
(payload: Fn extends () => any ? void : Parameters<Fn>[0], options?: QueueingOptions) => WorkItem<ReturnType<Fn>>;
export type Enqueue<T extends Record<string, (...args: any[]) => any>> = {
[K in keyof T]: EnqueueFn<T[K]>;
};
/**
* Defines all possible work item status values.
*/
export const WorkItemStatus = Object.freeze({
/**
* Initial work item status. This status value will not be seen in work items that immediately get transmitted to
* the worker (such as out-of-order work items).
*/
Enqueued: 0,
/**
* The work item has started. In this context, "started" means that the message that starts the worker's task has
* been transmitted. Whether or not the actual work has started is unknown.
*/
Started: 1,
/**
* The work item has been cancelled using its `cancel()` method.
*/
Cancelled: 2,
/**
* The work item has been terminated because the worker was terminated using the `terminate()` method.
*/
Terminated: 3,
/**
* The work item has completed and its promise has been fulfilled (resolved or rejected).
*/
Completed: 4,
});
/**
* Defines the type of the work item status values.
*/
export type WorkItemStatusEnum = typeof WorkItemStatus[keyof typeof WorkItemStatus];
/**
* Abstracts a worker (dedicated or shared) and provides asynchronous syntax to control its functionality.
*/
export class AsyncWorker<Tasks extends Record<string, (...args: any[]) => any>> {
#iWorker: IWorker;
#queue;
#taskRunning;
#enqueue;
constructor(worker: Worker | SharedWorker, tasks: Tasks) {
this.#iWorker = isSharedWorker(worker) ? new InternalSharedWorker(worker) : new InternalWorker(worker as Worker);
this.#queue = new Queue<WorkItemInternal>();
this.#taskRunning = false;
this.#enqueue = Object.keys(tasks).reduce((prev, curr) => {
prev[curr as keyof Tasks] = this.#enqueueTask.bind(this, curr);
return prev;
}, {} as Enqueue<Tasks>);
}
#createTaskPromise<T>() {
let resolve: (data: T | PromiseLike<T>) => void;
let reject: RejectFn;
const promise = new Promise<T>((rslv, rjct) => {
resolve = rslv;
reject = rjct;
});
return {
resolve: resolve!,
reject: reject!,
promise
};
}
/**
* Returns the object used to enqueue work items.
*/
get enqueue() {
return this.#enqueue;
}
#enqueueTask<K extends keyof WorkerTasks<Tasks>>(task: K, payload: WorkerTasks<Tasks>[K]['payload'], options?: QueueingOptions) {
const workItemId = nextWorkItemId();
const { resolve, reject, promise } = this.#createTaskPromise<WorkerTasks<Tasks>[K]["return"]>();
const workItemData = {
id: workItemId,
task: task as string,
promise,
resolve,
reject,
payload
} satisfies WorkItemData<WorkerTasks<Tasks>[K]["return"]>;
const workItem = new WorkItemInternal(this.#iWorker, workItemData, options);
if (this.#taskRunning && !options?.outOfOrder) {
this.#queue.enqueue(workItem);
}
else {
if (!options?.outOfOrder) {
this.#taskRunning = true;
promise.finally(this.#startNextWorkItem.bind(this));
}
workItem.start();
}
return new WorkItem(workItem);
}
#startNextWorkItem() {
if (!this.#queue.isEmpty) {
const nextWorkItem = this.#queue.dequeue();
nextWorkItem.data.promise.finally(this.#startNextWorkItem.bind(this));
this.#taskRunning = true;
nextWorkItem.start();
}
else {
this.#taskRunning = false;
}
}
/**
* Terminates the underlying web worker. Note that this is only possible for dedicated workers. If an attempt to
* terminate a shared worker is tried, an error will be thrown.
* @returns `true` if the worker is terminated and all pending work items' promises have been rejected, or `false`
* if the worker had already been terminated and therefore no further action took place.
*/
terminate() {
return this.#iWorker.terminate();
}
};