Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
## Table of contents

- [Pool](#pool)
- [`pool = new FixedThreadPool(numberOfThreads, fileURL, opts)`](#pool--new-fixedthreadpoolnumberofthreads-fileurl-opts)
- [`pool = new DynamicThreadPool(min, max, fileURL, opts)`](#pool--new-dynamicthreadpoolmin-max-fileurl-opts)
- [`pool = new FixedThreadPool(numberOfThreads, specifier, opts)`](#pool--new-fixedthreadpoolnumberofthreads-specifier-opts)
- [`pool = new DynamicThreadPool(min, max, specifier, opts)`](#pool--new-dynamicthreadpoolmin-max-specifier-opts)
- [`pool.execute(data, name, abortSignal, transferList)`](#poolexecutedata-name-abortsignal-transferlist)
- [`pool.mapExecute(data, name, abortSignals, transferList)`](#poolmapexecutedata-name-abortsignals-transferlist)
- [`pool.start()`](#poolstart)
Expand All @@ -25,20 +25,20 @@

## Pool

### `pool = new FixedThreadPool(numberOfThreads, fileURL, opts)`
### `pool = new FixedThreadPool(numberOfThreads, specifier, opts)`

`numberOfThreads` (mandatory) Number of workers for this pool.\
`fileURL` (mandatory) URL to a file with a worker implementation.\
`specifier` (mandatory) Specifier to a file with a worker implementation.\
`opts` (optional) An object with the pool options properties described below.

### `pool = new DynamicThreadPool(min, max, fileURL, opts)`
### `pool = new DynamicThreadPool(min, max, specifier, opts)`

`min` (mandatory) Same as _FixedThreadPool_ numberOfThreads, this number of
workers will be always active.\
`max` (mandatory) Max number of workers that this pool can contain, the newly
created workers will die after a threshold (default is 1 minute, you can
override it in your worker implementation).\
`fileURL` (mandatory) URL to a file with a worker implementation.\
`specifier` (mandatory) Specifier to a file with a worker implementation.\
`opts` (optional) An object with the pool options properties described below.

### `pool.execute(data, name, abortSignal, transferList)`
Expand Down
32 changes: 18 additions & 14 deletions src/pools/abstract-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import {
} from './selection-strategies/selection-strategies-types.ts'
import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.ts'
import {
checkFileURL,
checkSpecifier,
checkValidPriority,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
Expand Down Expand Up @@ -152,13 +152,13 @@ export abstract class AbstractPool<
* Constructs a new poolifier pool.
*
* @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
* @param fileURL - URL to the worker file.
* @param specifier - Specifier to the worker file.
* @param opts - Options for the pool.
* @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
*/
public constructor(
protected readonly minimumNumberOfWorkers: number,
protected readonly fileURL: URL,
protected readonly specifier: URL | string,
protected readonly opts: PoolOptions,
protected readonly maximumNumberOfWorkers?: number,
) {
Expand All @@ -168,7 +168,7 @@ export abstract class AbstractPool<
)
}
this.checkPoolType()
checkFileURL(this.fileURL)
checkSpecifier(this.specifier)
this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
this.checkPoolOptions(this.opts)

Expand Down Expand Up @@ -208,7 +208,7 @@ export abstract class AbstractPool<
private checkPoolType(): void {
if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
throw new Error(
'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization',
'Cannot instantiate a fixed pool with a maximum number of workers defined at initialization',
)
}
}
Expand Down Expand Up @@ -2292,15 +2292,19 @@ export abstract class AbstractPool<
* @returns The created worker node.
*/
private createWorkerNode(): IWorkerNode<Worker, Data> {
const workerNode = new WorkerNode<Worker, Data>(this.worker, this.fileURL, {
workerOptions: this.opts.workerOptions,
tasksQueueBackPressureSize: this.opts.tasksQueueOptions?.size ??
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
).size,
tasksQueueBucketSize: defaultBucketSize,
tasksQueuePriority: this.getTasksQueuePriority(),
})
const workerNode = new WorkerNode<Worker, Data>(
this.worker,
this.specifier,
{
workerOptions: this.opts.workerOptions,
tasksQueueBackPressureSize: this.opts.tasksQueueOptions?.size ??
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
).size,
tasksQueueBucketSize: defaultBucketSize,
tasksQueuePriority: this.getTasksQueuePriority(),
},
)
// Flag the worker node as ready at pool startup.
if (this.starting) {
workerNode.info.ready = true
Expand Down
6 changes: 3 additions & 3 deletions src/pools/thread/dynamic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ export class DynamicThreadPool<
*
* @param min - Minimum number of threads which are always active.
* @param max - Maximum number of threads that can be created by this pool.
* @param fileURL - URL to an implementation of a `ThreadWorker` file.
* @param specifier - Specifier to an implementation of a `ThreadWorker` file.
* @param opts - Options for this dynamic thread pool.
*/
public constructor(
min: number,
max: number,
fileURL: URL,
specifier: URL | string,
opts: ThreadPoolOptions = {},
) {
super(min, fileURL, opts, max)
super(min, specifier, opts, max)
checkDynamicPoolSize(
this.minimumNumberOfWorkers,
this.maximumNumberOfWorkers!,
Expand Down
6 changes: 3 additions & 3 deletions src/pools/thread/fixed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ export class FixedThreadPool<
* Constructs a new poolifier fixed thread pool.
*
* @param numberOfThreads - Number of threads for this pool.
* @param fileURL - URL to an implementation of a `ThreadWorker` file.
* @param specifier - Specifier to an implementation of a `ThreadWorker` file.
* @param opts - Options for this fixed thread pool.
*/
public constructor(
numberOfThreads: number,
fileURL: URL,
specifier: URL | string,
opts: ThreadPoolOptions = {},
maximumNumberOfThreads?: number,
) {
super(numberOfThreads, fileURL, opts, maximumNumberOfThreads)
super(numberOfThreads, specifier, opts, maximumNumberOfThreads)
}
/** @inheritDoc */
protected isMain(): boolean {
Expand Down
20 changes: 11 additions & 9 deletions src/pools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ export const getDefaultTasksQueueOptions = (
})
}

export const checkFileURL = (fileURL: URL | undefined): void => {
if (fileURL == null) {
throw new TypeError('The worker URL must be specified')
export const checkSpecifier = (specifier: URL | string | undefined): void => {
if (specifier == null) {
throw new TypeError('The worker specifier must be defined')
}
if (fileURL instanceof URL === false) {
throw new TypeError('The worker URL must be an instance of URL')
if (typeof specifier !== 'string' && !(specifier instanceof URL)) {
throw new TypeError(
'The worker specifier must be a string or an instance of URL',
)
}
}

Expand Down Expand Up @@ -178,7 +180,7 @@ export const checkValidTasksQueueOptions = (

export const checkWorkerNodeArguments = (
type: WorkerType | undefined,
fileURL: URL | undefined,
specifier: URL | string | undefined,
opts: WorkerNodeOptions | undefined,
): void => {
if (type == null) {
Expand All @@ -189,7 +191,7 @@ export const checkWorkerNodeArguments = (
`Cannot construct a worker node with an invalid worker type '${type}'`,
)
}
checkFileURL(fileURL)
checkSpecifier(specifier)
if (opts == null) {
throw new TypeError(
'Cannot construct a worker node without worker node options',
Expand Down Expand Up @@ -396,12 +398,12 @@ export const messageListenerToEventListener = <Message = unknown>(

export const createWorker = <Worker extends IWorker>(
type: WorkerType,
fileURL: URL,
specifier: URL | string,
opts: { workerOptions?: WorkerOptions },
): Worker => {
switch (type) {
case WorkerTypes.web:
return new Worker(fileURL, {
return new Worker(specifier, {
...(runtime === JSRuntime.bun && { smol: true }),
...opts.workerOptions,
type: 'module',
Expand Down
12 changes: 8 additions & 4 deletions src/pools/worker-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
* Constructs a new worker node.
*
* @param type - The worker type.
* @param fileURL - URL to the worker file.
* @param specifier - Specifier to the worker file.
* @param opts - The worker node options.
*/
constructor(type: WorkerType, fileURL: URL, opts: WorkerNodeOptions) {
constructor(
type: WorkerType,
specifier: URL | string,
opts: WorkerNodeOptions,
) {
super()
checkWorkerNodeArguments(type, fileURL, opts)
this.worker = createWorker<Worker>(type, fileURL, {
checkWorkerNodeArguments(type, specifier, opts)
this.worker = createWorker<Worker>(type, specifier, {
workerOptions: opts.workerOptions,
})
this.info = initWorkerInfo(this.worker)
Expand Down
10 changes: 6 additions & 4 deletions tests/pools/abstract-pool.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ describe({
expect(pool.destroying).toBe(false)
})

it('Verify that fileURL is checked', () => {
it('Verify that specifier is checked', () => {
expect(() => new FixedThreadPool(numberOfWorkers)).toThrow(
new TypeError('The worker URL must be specified'),
new TypeError('The worker specifier must be defined'),
)
expect(() => new FixedThreadPool(numberOfWorkers, 0)).toThrow(
new TypeError('The worker URL must be an instance of URL'),
new TypeError(
'The worker specifier must be a string or an instance of URL',
),
)
})

Expand Down Expand Up @@ -127,7 +129,7 @@ describe({
),
).toThrow(
new Error(
'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization',
'Cannot instantiate a fixed pool with a maximum number of workers defined at initialization',
),
)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/pools/thread/dynamic.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ describe({

it('Validation of inputs test', () => {
expect(() => new DynamicThreadPool(min)).toThrow(
'The worker URL must be specified',
'The worker specifier must be defined',
)
})

Expand Down
Loading