Skip to content

Commit 381910b

Browse files
committed
Create task queue
1 parent 3ec3a11 commit 381910b

11 files changed

Lines changed: 599 additions & 0 deletions

File tree

package-lock.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"ts-appconfig": "^1.2.0",
4343
"ts-node": "^10.4.0",
4444
"ts-packager": "^1.1.0",
45+
"ts-tiny-log": "^1.0.3",
4546
"typedoc": "^0.22.11",
4647
"typescript": "^4.5.4"
4748
},

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { Queue, QueueOptions, QueueCallback } from './queue';

src/parent.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { parentPort } from 'worker_threads';
2+
3+
/**
4+
* Messages sent from worker
5+
*/
6+
export enum WorkerMessageTypes {
7+
STARTED = 'STARTED',
8+
TASK_FINISHED = 'TASK_FINISHED',
9+
TASK_FAILED = 'TASK_FAILED',
10+
}
11+
12+
/**
13+
* Message interface from worker
14+
*/
15+
export interface WorkerMessage {
16+
message: WorkerMessageTypes;
17+
data?: any;
18+
}
19+
20+
export class ParentThread {
21+
/**
22+
* Post that the worker started up successfully to the main thread
23+
*
24+
* Runs on: Worker
25+
*/
26+
public workerStarted() {
27+
this.post({ message: WorkerMessageTypes.STARTED });
28+
}
29+
30+
/**
31+
* Post that the worker finished a task successfully
32+
*
33+
* Runs on: Worker
34+
*
35+
* @param response Task response
36+
*/
37+
public taskFinished(response?: any) {
38+
this.post({
39+
message: WorkerMessageTypes.TASK_FINISHED,
40+
data: response
41+
});
42+
}
43+
44+
/**
45+
* Post that the worker failed to complete a task
46+
*
47+
* Runs on: Worker
48+
*
49+
* @param response Task response
50+
*/
51+
public taskFailed(error?: Error) {
52+
this.post({
53+
message: WorkerMessageTypes.TASK_FAILED,
54+
data: error,
55+
});
56+
}
57+
58+
/**
59+
* Post a message to the main thread
60+
*
61+
* Runs on: Worker
62+
*
63+
* @param msg
64+
*/
65+
protected post(msg: WorkerMessage) {
66+
parentPort.postMessage(msg);
67+
}
68+
}

src/queue.ts

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
import { Log } from 'ts-tiny-log';
2+
import { parentPort, isMainThread, workerData } from 'worker_threads';
3+
4+
import { Task } from './task';
5+
import {
6+
ParentMessage,
7+
ParentMessageTypes,
8+
Worker,
9+
WorkerSpawnData,
10+
WorkerState
11+
} from './worker';
12+
import { ParentThread } from './parent';
13+
14+
export type QueueCallback<TIn, TOut> = (data: TIn) => Promise<TOut>;
15+
export type ErrorHandler = (err: Error) => void | Promise<void>;
16+
17+
export interface QueueOptions<TIn, TOut> {
18+
/**
19+
* Queue name. Must be unique
20+
*/
21+
name: string;
22+
23+
/**
24+
* Worker entry file. Must be a relative/absolute path/file
25+
*/
26+
workerEntry?: string;
27+
28+
/**
29+
* Number of workers. Default is 4
30+
*/
31+
nWorkers?: number;
32+
33+
/**
34+
* Function to run on worker startup
35+
*/
36+
startup?: (data: WorkerSpawnData) => Promise<void>;
37+
38+
/**
39+
* Function to run for the queue task
40+
*/
41+
callback: QueueCallback<TIn, TOut>;
42+
43+
/**
44+
* Function to run on error
45+
*/
46+
error?: ErrorHandler,
47+
48+
/**
49+
* Function to run on fatal error
50+
*/
51+
fatal?: ErrorHandler,
52+
53+
/**
54+
* Class for communicating Worker -> Parent
55+
*/
56+
parentType?: typeof ParentThread,
57+
58+
/**
59+
* Class for communicating Parent -> Worker
60+
*/
61+
workerType?: typeof Worker,
62+
}
63+
64+
/**
65+
* Queue class
66+
*
67+
* @typeParam TIn Queue task input type
68+
* @typeParam TOut Queue task output type
69+
*/
70+
export class Queue<TIn, TOut> {
71+
/* eslint-disable-next-line no-console */
72+
protected log: Log = <Log><unknown>console;
73+
74+
public defaultOptions: Partial<QueueOptions<TIn, TOut>> = {
75+
workerEntry: process.cwd(),
76+
nWorkers: 4,
77+
startup: async () => {},
78+
error: error => this.log.error('Queue Error', error),
79+
fatal: error => {
80+
this.log.error('Queue Fatal Error', error);
81+
process.exit();
82+
},
83+
parentType: ParentThread,
84+
workerType: Worker,
85+
};
86+
87+
protected tasks: Task<TIn, TOut>[] = [];
88+
protected workers: Worker<TIn, TOut>[] = [];
89+
protected parent: ParentThread;
90+
91+
protected options: QueueOptions<TIn, TOut>;
92+
93+
/**
94+
* Runs on: Main, Worker
95+
*
96+
* @param options Queue options
97+
*/
98+
public constructor(options: QueueOptions<TIn, TOut>) {
99+
this.options = options = { ...this.defaultOptions, ...options };
100+
101+
if (isMainThread) {
102+
this.buildPool()
103+
.catch(options.fatal);
104+
105+
this.start();
106+
}
107+
else if (workerData.queueName === this.options.name) {
108+
this.parent = new options.parentType();
109+
110+
options.startup(workerData)
111+
.then(() => {
112+
this.listenForWork();
113+
this.parent.workerStarted();
114+
})
115+
.catch(options.error);
116+
}
117+
}
118+
119+
/**
120+
* Push a new task to the queue to be run in parallel. This task will
121+
* not be joined back into the main thread on completion.
122+
*
123+
* Runs on: Main
124+
*
125+
* @param task Input data for the task
126+
*/
127+
public push(task: TIn): void {
128+
this.tasks.push({
129+
request: task,
130+
});
131+
}
132+
133+
/**
134+
* Push a new task to the queue. This method will return a promise that
135+
* will resolve when/if the task runs and completes
136+
*
137+
* Runs on: Main
138+
*
139+
* @param task Input data for the task
140+
* @return Returns the task result
141+
*/
142+
public async await(task: TIn): Promise<TOut> {
143+
return new Promise<TOut>((accept, reject) => {
144+
this.tasks.push({
145+
request: task,
146+
accept,
147+
reject,
148+
});
149+
});
150+
}
151+
152+
/**
153+
* Spawns a new Worker
154+
*
155+
* Runs on: Main
156+
*
157+
* @returns Returns the worker
158+
*/
159+
protected async spawnWorker(): Promise<Worker<TIn, TOut>> {
160+
return await (new this.options.workerType<TIn, TOut>()).spawn({
161+
queueName: this.options.name,
162+
entry: this.options.workerEntry,
163+
});
164+
}
165+
166+
/**
167+
* Build the worker pool
168+
*
169+
* Runs on: Main
170+
*/
171+
protected async buildPool(): Promise<void> {
172+
const promises = [];
173+
174+
for (let i = 0; i < this.options.nWorkers; i++) {
175+
promises.push(this.spawnWorker());
176+
}
177+
178+
this.workers = await Promise.all(promises);
179+
}
180+
181+
/**
182+
* Start working the queue
183+
*
184+
* Runs on: Main
185+
*/
186+
protected start(): void {
187+
setInterval(async () => {
188+
if (this.tasks.length) {
189+
const worker = await this.reserveWorker();
190+
191+
if (worker) {
192+
const nextTask = this.tasks.splice(0, 1)[0];
193+
194+
worker.startTask(nextTask);
195+
}
196+
}
197+
}, 250);
198+
}
199+
200+
/**
201+
* Reserves & returns a free worker
202+
*
203+
* Runs on: Main
204+
*
205+
* @return Returns the worker, or null if none are available
206+
*/
207+
protected async reserveWorker(): Promise<null | Worker<TIn, TOut>> {
208+
for (let i = 0; i < this.workers.length; i++) {
209+
const worker = this.workers[i];
210+
211+
if (worker.state === WorkerState.FREE) {
212+
worker.state = WorkerState.RESERVED;
213+
214+
return worker;
215+
}
216+
else if (worker.state === WorkerState.EXHAUSTED) {
217+
const worker = await this.spawnWorker();
218+
worker.state = WorkerState.RESERVED;
219+
this.workers[i] = worker;
220+
221+
return worker;
222+
}
223+
}
224+
225+
return null;
226+
}
227+
228+
/**
229+
* Start listening for work
230+
*
231+
* Runs on: Worker
232+
*
233+
* @param callback Callback to run on the worker
234+
*/
235+
protected listenForWork() {
236+
parentPort.on('message', (message: ParentMessage) => {
237+
if (message.type === ParentMessageTypes.START_TASK) {
238+
this.options.callback(message.data)
239+
.then((response?) => this.parent.taskFinished(response))
240+
.catch((err?) => this.parent.taskFailed(err));
241+
}
242+
});
243+
}
244+
}

src/task.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/**
2+
* Interface for a task
3+
*/
4+
export interface Task<TIn, TOut> {
5+
request: TIn;
6+
accept?: (response?: TOut) => void,
7+
reject?: (error?: Error) => void,
8+
}

0 commit comments

Comments
 (0)