diff --git a/src/circular-buffer.ts b/src/circular-buffer.ts index cc1081835..d82ce319f 100644 --- a/src/circular-buffer.ts +++ b/src/circular-buffer.ts @@ -4,7 +4,7 @@ export const defaultBufferSize = 2048 /** - * Circular buffer designed for positive numbers. + * Circular buffer designed for numbers. * * @internal */ @@ -25,7 +25,7 @@ export class CircularBuffer { this.writeIdx = 0 this.maxArrayIdx = size - 1 this.size = 0 - this.items = new Float32Array(size).fill(-1) + this.items = new Float32Array(size) } /** @@ -52,11 +52,13 @@ export class CircularBuffer { * @param number - Number to put into buffer. */ public put(number: number): void { - this.items[this.writeIdx] = number - this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1 - if (this.size < this.items.length) { + if (this.full()) { + this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1 + } else { ++this.size } + this.items[this.writeIdx] = number + this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1 } /** @@ -65,11 +67,10 @@ export class CircularBuffer { * @returns Number from buffer. */ public get(): number | undefined { - const number = this.items[this.readIdx] - if (number === -1) { - return + if (this.empty()) { + return undefined } - this.items[this.readIdx] = -1 + const number = this.items[this.readIdx] this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1 --this.size return number @@ -81,7 +82,16 @@ export class CircularBuffer { * @returns Numbers' array. */ public toArray(): number[] { - return Array.from(this.items.filter((item) => item !== -1)) + const array: number[] = [] + if (this.empty()) { + return array + } + let currentIdx = this.readIdx + for (let i = 0; i < this.size; i++) { + array.push(this.items[currentIdx]) + currentIdx = currentIdx === this.maxArrayIdx ? 0 : currentIdx + 1 + } + return array } /** diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 87ca7fa23..569f5089e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -829,50 +829,59 @@ export abstract class AbstractPool< private async sendTaskFunctionOperationToWorkers( message: MessageValue, ): Promise { - return await new Promise((resolve, reject) => { - const responsesReceived: MessageValue[] = [] - const taskFunctionOperationsListener = ( - message: MessageValue, - ): void => { - this.checkMessageWorkerId(message) - if (message.taskFunctionOperationStatus != null) { - responsesReceived.push(message) - if (responsesReceived.length === this.workerNodes.length) { - if ( - responsesReceived.every( - (message) => message.taskFunctionOperationStatus === true, - ) - ) { - resolve(true) - } else if ( - responsesReceived.some( - (message) => message.taskFunctionOperationStatus === false, - ) - ) { - const errorResponse = responsesReceived.find( - (response) => response.taskFunctionOperationStatus === false, - ) - reject( - new Error( - `Task function operation '${message.taskFunctionOperation}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${errorResponse?.workerError?.error.message}'`, - ), - ) - } - this.deregisterWorkerMessageListener( - this.getWorkerNodeKeyByWorkerId(message.workerId), - taskFunctionOperationsListener, + const taskFunctionOperationsListener = ( + message: MessageValue, + resolve: (value: boolean | PromiseLike) => void, + reject: (reason?: unknown) => void, + responsesReceived: MessageValue[], + ): void => { + this.checkMessageWorkerId(message) + if (message.taskFunctionOperationStatus != null) { + responsesReceived.push(message) + if (responsesReceived.length >= this.workerNodes.length) { + if ( + responsesReceived.every( + (msg) => msg.taskFunctionOperationStatus === true, + ) + ) { + resolve(true) + } else { + const errorResponse = responsesReceived.find( + (msg) => msg.taskFunctionOperationStatus === false, + ) + reject( + new Error( + `Task function operation '${message.taskFunctionOperation}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${ + errorResponse?.workerError?.error?.message ?? 'Unknown error' + }'`, + ), ) } } } + } + let listener: (message: MessageValue) => void + try { + return await new Promise((resolve, reject) => { + const responsesReceived: MessageValue[] = [] + listener = (message: MessageValue) => { + taskFunctionOperationsListener( + message, + resolve, + reject, + responsesReceived, + ) + } + for (const workerNodeKey of this.workerNodes.keys()) { + this.registerWorkerMessageListener(workerNodeKey, listener) + this.sendToWorker(workerNodeKey, message) + } + }) + } finally { for (const workerNodeKey of this.workerNodes.keys()) { - this.registerWorkerMessageListener( - workerNodeKey, - taskFunctionOperationsListener, - ) - this.sendToWorker(workerNodeKey, message) + this.deregisterWorkerMessageListener(workerNodeKey, listener!) } - }) + } } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 29bcd346f..7a41cd252 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -116,16 +116,21 @@ export abstract class AbstractWorkerChoiceStrategy< } /** - * Check the next worker node key. + * Check the worker node key. + * @param workerNodeKey - The worker node key to check. + * @returns The worker node key if it is valid, otherwise undefined. */ - protected checkNextWorkerNodeKey(): void { + protected checkWorkerNodeKey( + workerNodeKey: number | undefined, + ): number | undefined { if ( - this.nextWorkerNodeKey != null && - (this.nextWorkerNodeKey < 0 || - !this.isWorkerNodeReady(this.nextWorkerNodeKey)) + workerNodeKey == null || + workerNodeKey < 0 || + workerNodeKey >= this.pool.workerNodes.length ) { - delete this.nextWorkerNodeKey + return undefined } + return workerNodeKey } /** diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 243107fbf..f6c79f246 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -76,28 +76,44 @@ export class FairShareWorkerChoiceStrategy< } /** @inheritDoc */ - public remove(): boolean { + public remove(workerNodeKey: number): boolean { + if ( + this.pool.workerNodes[workerNodeKey]?.strategyData + ?.virtualTaskEndTimestamp != null + ) { + delete this.pool.workerNodes[workerNodeKey].strategyData + .virtualTaskEndTimestamp + } return true } private fairShareNextWorkerNodeKey(): number | undefined { - return this.pool.workerNodes.reduce( - (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { + const chosenWorkerNodeKey = this.pool.workerNodes.reduce( + (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => { + if (!this.isWorkerNodeReady(workerNodeKey)) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + workerNode.strategyData = { + virtualTaskEndTimestamp: this + .computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey), + } + return workerNodeKey + } if (workerNode.strategyData?.virtualTaskEndTimestamp == null) { workerNode.strategyData = { virtualTaskEndTimestamp: this .computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey), } } - return this.isWorkerNodeReady(workerNodeKey) && - workerNode.strategyData.virtualTaskEndTimestamp! < - workerNodes[minWorkerNodeKey].strategyData! - .virtualTaskEndTimestamp! + return workerNode.strategyData.virtualTaskEndTimestamp! < + workerNodes[minWorkerNodeKey].strategyData!.virtualTaskEndTimestamp! ? workerNodeKey : minWorkerNodeKey }, - 0, + -1, ) + return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey } /** diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index c9504fcbd..ecb31b843 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -142,16 +142,17 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< return true } if ( - this.workerNodeId === workerNodeKey && - this.workerNodeId > this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey != null && + this.nextWorkerNodeKey >= workerNodeKey ) { - this.workerNodeId = this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey = + (this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) % + this.pool.workerNodes.length } - if ( - this.previousWorkerNodeKey === workerNodeKey && - this.previousWorkerNodeKey > this.pool.workerNodes.length - 1 - ) { - this.previousWorkerNodeKey = this.pool.workerNodes.length - 1 + if (this.workerNodeId >= workerNodeKey) { + this.workerNodeId = + (this.workerNodeId - 1 + this.pool.workerNodes.length) % + this.pool.workerNodes.length } return true } diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index d54c07aa6..9e34717d4 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -69,17 +69,23 @@ export class LeastBusyWorkerChoiceStrategy< } private leastBusyNextWorkerNodeKey(): number | undefined { - return this.pool.workerNodes.reduce( - (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return this.isWorkerNodeReady(workerNodeKey) && - (workerNode.usage.waitTime.aggregate ?? 0) + - (workerNode.usage.runTime.aggregate ?? 0) < - (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) + - (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) + const chosenWorkerNodeKey = this.pool.workerNodes.reduce( + (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => { + if (!this.isWorkerNodeReady(workerNodeKey)) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + return workerNodeKey + } + return (workerNode.usage.waitTime.aggregate ?? 0) + + (workerNode.usage.runTime.aggregate ?? 0) < + (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) + + (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) ? workerNodeKey : minWorkerNodeKey }, - 0, + -1, ) + return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey } } diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index f97386d5c..d0ecedd78 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -65,15 +65,21 @@ export class LeastEluWorkerChoiceStrategy< } private leastEluNextWorkerNodeKey(): number | undefined { - return this.pool.workerNodes.reduce( - (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return this.isWorkerNodeReady(workerNodeKey) && - (workerNode.usage.elu.active.aggregate ?? 0) < - (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0) + const chosenWorkerNodeKey = this.pool.workerNodes.reduce( + (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => { + if (!this.isWorkerNodeReady(workerNodeKey)) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + return workerNodeKey + } + return (workerNode.usage.elu.active.aggregate ?? 0) < + (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0) ? workerNodeKey : minWorkerNodeKey }, - 0, + -1, ) + return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey } } diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index 22c1cd845..ee13fd250 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -50,16 +50,23 @@ export class LeastUsedWorkerChoiceStrategy< } private leastUsedNextWorkerNodeKey(): number | undefined { - return this.pool.workerNodes.reduce( - (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return this.isWorkerNodeReady(workerNodeKey) && - workerNode.usage.tasks.executing + workerNode.usage.tasks.queued < - workerNodes[minWorkerNodeKey].usage.tasks.executing + - workerNodes[minWorkerNodeKey].usage.tasks.queued + const chosenWorkerNodeKey = this.pool.workerNodes.reduce( + (minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => { + if (!this.isWorkerNodeReady(workerNodeKey)) { + return minWorkerNodeKey + } + if (minWorkerNodeKey === -1) { + return workerNodeKey + } + return workerNode.usage.tasks.executing + + workerNode.usage.tasks.queued < + workerNodes[minWorkerNodeKey].usage.tasks.executing + + workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey : minWorkerNodeKey }, - 0, + -1, ) + return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey } } diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index 3358bf62d..528c7a1b4 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -40,30 +40,29 @@ export class RoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public choose(): number | undefined { - const chosenWorkerNodeKey = this.nextWorkerNodeKey - this.setPreviousWorkerNodeKey(chosenWorkerNodeKey) + this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey) this.roundRobinNextWorkerNodeKey() - this.checkNextWorkerNodeKey() - return chosenWorkerNodeKey + if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) { + return undefined + } + return this.checkWorkerNodeKey(this.nextWorkerNodeKey) } /** @inheritDoc */ public remove(workerNodeKey: number): boolean { if (this.pool.workerNodes.length === 0) { - this.reset() - return true - } - if ( - this.nextWorkerNodeKey === workerNodeKey && - this.nextWorkerNodeKey > this.pool.workerNodes.length - 1 - ) { - this.nextWorkerNodeKey = this.pool.workerNodes.length - 1 + return this.reset() } if ( - this.previousWorkerNodeKey === workerNodeKey && - this.previousWorkerNodeKey > this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey != null && + this.nextWorkerNodeKey >= workerNodeKey ) { - this.previousWorkerNodeKey = this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey = + (this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) % + this.pool.workerNodes.length + if (this.previousWorkerNodeKey >= workerNodeKey) { + this.previousWorkerNodeKey = this.nextWorkerNodeKey + } } return true } diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 5304bbdd5..f79cf1b68 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -68,46 +68,46 @@ export class WeightedRoundRobinWorkerChoiceStrategy< public choose(): number | undefined { this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey) this.weightedRoundRobinNextWorkerNodeKey() - this.checkNextWorkerNodeKey() - return this.nextWorkerNodeKey + if (!this.isWorkerNodeReady(this.nextWorkerNodeKey!)) { + return undefined + } + return this.checkWorkerNodeKey(this.nextWorkerNodeKey) } /** @inheritDoc */ public remove(workerNodeKey: number): boolean { if (this.pool.workerNodes.length === 0) { - this.reset() - return true + return this.reset() } if (this.nextWorkerNodeKey === workerNodeKey) { this.workerNodeVirtualTaskExecutionTime = 0 - if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) { - this.nextWorkerNodeKey = this.pool.workerNodes.length - 1 - } } if ( - this.previousWorkerNodeKey === workerNodeKey && - this.previousWorkerNodeKey > this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey != null && + this.nextWorkerNodeKey >= workerNodeKey ) { - this.previousWorkerNodeKey = this.pool.workerNodes.length - 1 + this.nextWorkerNodeKey = + (this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) % + this.pool.workerNodes.length + if (this.previousWorkerNodeKey >= workerNodeKey) { + this.previousWorkerNodeKey = this.nextWorkerNodeKey + } } return true } private weightedRoundRobinNextWorkerNodeKey(): number | undefined { - const workerWeight = - this.opts!.weights![this.nextWorkerNodeKey ?? this.previousWorkerNodeKey] + const workerNodeKey = this.nextWorkerNodeKey ?? this.previousWorkerNodeKey + const workerWeight = this.opts!.weights![workerNodeKey] if (this.workerNodeVirtualTaskExecutionTime < workerWeight) { - this.workerNodeVirtualTaskExecutionTime += this.getWorkerNodeTaskWaitTime( - this.nextWorkerNodeKey ?? this.previousWorkerNodeKey, - ) + - this.getWorkerNodeTaskRunTime( - this.nextWorkerNodeKey ?? this.previousWorkerNodeKey, - ) + this.workerNodeVirtualTaskExecutionTime += + this.getWorkerNodeTaskWaitTime(workerNodeKey) + + this.getWorkerNodeTaskRunTime(workerNodeKey) } else { this.nextWorkerNodeKey = this.nextWorkerNodeKey === this.pool.workerNodes.length - 1 ? 0 - : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1 + : workerNodeKey + 1 this.workerNodeVirtualTaskExecutionTime = 0 } return this.nextWorkerNodeKey diff --git a/src/pools/selection-strategies/worker-choice-strategies-context.ts b/src/pools/selection-strategies/worker-choice-strategies-context.ts index 89a9ddc1d..48a83502b 100644 --- a/src/pools/selection-strategies/worker-choice-strategies-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategies-context.ts @@ -170,17 +170,13 @@ export class WorkerChoiceStrategiesContext< * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined. */ private executeStrategy(workerChoiceStrategy: IWorkerChoiceStrategy): number { - let workerNodeKey: number | undefined - let chooseCount = 0 + let workerNodeKey: number | undefined = workerChoiceStrategy.choose() let retriesCount = 0 - do { + while (workerNodeKey == null && retriesCount < this.retries) { workerNodeKey = workerChoiceStrategy.choose() - if (workerNodeKey == null && chooseCount > 0) { - ++retriesCount - ++this.retriesCount - } - ++chooseCount - } while (workerNodeKey == null && retriesCount < this.retries) + retriesCount++ + this.retriesCount++ + } if (workerNodeKey == null) { throw new Error( `Worker node key chosen is null or undefined after ${retriesCount.toString()} retries`, diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index a2daf3062..19f77e2c6 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -42,7 +42,6 @@ export class WorkerNode public readonly tasksQueue: PriorityQueue> /** @inheritdoc */ public tasksQueueBackPressureSize: number - private setBackPressureFlag: boolean private readonly taskFunctionsUsage: Map /** @@ -84,7 +83,6 @@ export class WorkerNode opts.tasksQueueBucketSize, opts.tasksQueuePriority, ) - this.setBackPressureFlag = false this.taskFunctionsUsage = new Map() } @@ -101,19 +99,13 @@ export class WorkerNode /** @inheritdoc */ public enqueueTask(task: Task): number { const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) - if ( - !this.setBackPressureFlag && - this.hasBackPressure() && - !this.info.backPressure - ) { - this.setBackPressureFlag = true + if (this.hasBackPressure() && !this.info.backPressure) { this.info.backPressure = true this.dispatchEvent( new CustomEvent('backPressure', { detail: { workerId: this.info.id }, }), ) - this.setBackPressureFlag = false } return tasksQueueSize } @@ -121,14 +113,8 @@ export class WorkerNode /** @inheritdoc */ public dequeueTask(bucket?: number): Task | undefined { const task = this.tasksQueue.dequeue(bucket) - if ( - !this.setBackPressureFlag && - !this.hasBackPressure() && - this.info.backPressure - ) { - this.setBackPressureFlag = true + if (!this.hasBackPressure() && this.info.backPressure) { this.info.backPressure = false - this.setBackPressureFlag = false } return task } diff --git a/src/queues/abstract-fixed-queue.ts b/src/queues/abstract-fixed-queue.ts index 3f9537da7..ba4ff4dfe 100644 --- a/src/queues/abstract-fixed-queue.ts +++ b/src/queues/abstract-fixed-queue.ts @@ -17,7 +17,7 @@ export abstract class AbstractFixedQueue implements IFixedQueue { /** @inheritdoc */ public size!: number /** @inheritdoc */ - public nodeArray: FixedQueueNode[] + public nodeArray: (FixedQueueNode | undefined)[] /** * Constructs a fixed queue. @@ -54,15 +54,39 @@ export abstract class AbstractFixedQueue implements IFixedQueue { if (index >= this.capacity) { index -= this.capacity } - return this.nodeArray[index].data + return this.nodeArray[index]!.data } /** @inheritdoc */ public delete(data: T): boolean { - const index = this.nodeArray.findIndex((node) => node?.data === data) - if (index !== -1) { - this.nodeArray.splice(index, 1) - this.nodeArray.length = this.capacity + let currentPhysicalIndex = this.start + let logicalIndex = -1 + for (let i = 0; i < this.size; i++) { + if (this.nodeArray[currentPhysicalIndex]?.data === data) { + logicalIndex = i + break + } + currentPhysicalIndex++ + if (currentPhysicalIndex === this.capacity) { + currentPhysicalIndex = 0 + } + } + if (logicalIndex !== -1) { + if (logicalIndex === this.size - 1) { + this.nodeArray[currentPhysicalIndex] = undefined + --this.size + return true + } + let physicalShiftIndex = currentPhysicalIndex + for (let i = logicalIndex; i < this.size - 1; i++) { + let nextPhysicalIndex = physicalShiftIndex + 1 + if (nextPhysicalIndex === this.capacity) { + nextPhysicalIndex = 0 + } + this.nodeArray[physicalShiftIndex] = this.nodeArray[nextPhysicalIndex] + physicalShiftIndex = nextPhysicalIndex + } + this.nodeArray[physicalShiftIndex] = undefined --this.size return true } @@ -75,12 +99,12 @@ export abstract class AbstractFixedQueue implements IFixedQueue { return undefined } const index = this.start - --this.size ++this.start if (this.start === this.capacity) { this.start = 0 } - return this.nodeArray[index].data + --this.size + return this.nodeArray[index]!.data } /** @inheritdoc */ @@ -101,7 +125,7 @@ export abstract class AbstractFixedQueue implements IFixedQueue { done: true, } } - const value = this.nodeArray[index].data + const value = this.nodeArray[index]!.data ++index ++i if (index === this.capacity) { diff --git a/src/queues/fixed-priority-queue.ts b/src/queues/fixed-priority-queue.ts index f9c6175bc..859d807f0 100644 --- a/src/queues/fixed-priority-queue.ts +++ b/src/queues/fixed-priority-queue.ts @@ -15,27 +15,35 @@ export class FixedPriorityQueue extends AbstractFixedQueue throw new Error('Fixed priority queue is full') } priority = priority ?? 0 - let inserted = false - let index = this.start + let insertionPhysicalIndex = -1 + let currentPhysicalIndex = this.start for (let i = 0; i < this.size; i++) { - if (this.nodeArray[index].priority > priority) { - this.nodeArray.splice(index, 0, { data, priority }) - this.nodeArray.length = this.capacity - inserted = true + if (this.nodeArray[currentPhysicalIndex]!.priority > priority) { + insertionPhysicalIndex = currentPhysicalIndex break } - ++index - if (index === this.capacity) { - index = 0 + currentPhysicalIndex++ + if (currentPhysicalIndex === this.capacity) { + currentPhysicalIndex = 0 } } - if (!inserted) { - let index = this.start + this.size - if (index >= this.capacity) { - index -= this.capacity + let end = this.start + this.size + if (end >= this.capacity) { + end -= this.capacity + } + if (insertionPhysicalIndex === -1) { + insertionPhysicalIndex = end + } else { + let toShiftIndex = end + while (toShiftIndex !== insertionPhysicalIndex) { + const previousIndex = toShiftIndex === 0 + ? this.capacity - 1 + : toShiftIndex - 1 + this.nodeArray[toShiftIndex] = this.nodeArray[previousIndex] + toShiftIndex = previousIndex } - this.nodeArray[index] = { data, priority } } + this.nodeArray[insertionPhysicalIndex] = { data, priority } return ++this.size } } diff --git a/src/queues/priority-queue.ts b/src/queues/priority-queue.ts index 41ac42ea2..fdaaae358 100644 --- a/src/queues/priority-queue.ts +++ b/src/queues/priority-queue.ts @@ -20,6 +20,8 @@ export class PriorityQueue { private tail!: PriorityQueueNode private readonly bucketSize: number private priorityEnabled: boolean + /** The priority queue size. */ + public size!: number /** The priority queue maximum size. */ public maxSize!: number @@ -47,21 +49,6 @@ export class PriorityQueue { this.clear() } - /** - * The priority queue size. - * - * @returns The priority queue size. - */ - public get size(): number { - let node: PriorityQueueNode | undefined = this.tail - let size = 0 - while (node != null) { - size += node.size - node = node.next - } - return size - } - /** * Whether priority is enabled. * @@ -81,28 +68,11 @@ export class PriorityQueue { return } this.priorityEnabled = enablePriority - let head: PriorityQueueNode - let tail: PriorityQueueNode - let prev: PriorityQueueNode | undefined - let node: PriorityQueueNode | undefined = this.tail - let buckets = 0 - while (node != null) { - const currentNode = this.getPriorityQueueNode(node.nodeArray) - if (buckets === 0) { - tail = currentNode - } - if (prev != null) { - prev.next = currentNode - } - prev = currentNode - if (node.next == null) { - head = currentNode - } - ++buckets - node = node.next + const data: T[] = Array.from(this) + this.clear() + for (const dataItem of data) { + this.enqueue(dataItem) } - this.head = head! - this.tail = tail! } /** @@ -126,11 +96,11 @@ export class PriorityQueue { this.head = this.head.next = this.getPriorityQueueNode() } this.head.enqueue(data, priority) - const size = this.size - if (size > this.maxSize) { - this.maxSize = size + ++this.size + if (this.size > this.maxSize) { + this.maxSize = this.size } - return size + return this.size } /** @@ -143,18 +113,17 @@ export class PriorityQueue { let prev: PriorityQueueNode | undefined while (node != null) { if (node.delete(data)) { - if (node.empty()) { - if (node === this.tail && node.next != null) { - this.tail = node.next - delete node.next - } else if (node.next != null && prev != null) { - prev.next = node.next - delete node.next - } else if (node.next == null && prev != null) { - delete prev.next - this.head = prev + if (node.empty() && this.head !== this.tail) { + if (node === this.tail) { + this.tail = node.next! + } else { + prev!.next = node.next + if (node === this.head) { + this.head = prev! + } } } + --this.size return true } prev = node @@ -170,38 +139,39 @@ export class PriorityQueue { * @returns The dequeued data or `undefined` if the priority queue is empty. */ public dequeue(bucket?: number): T | undefined { - let tail: PriorityQueueNode | undefined = this.tail - let tailChanged = false + if (this.size === 0) { + return undefined + } + let targetNode: PriorityQueueNode | undefined = this.tail + let prev: PriorityQueueNode | undefined if (bucket != null && bucket > 0) { let currentBucket = 1 - while (tail != null) { - if (currentBucket === bucket) { - break - } + while (targetNode.next != null && currentBucket < bucket) { + prev = targetNode + targetNode = targetNode.next ++currentBucket - tail = tail.next } - tailChanged = tail !== this.tail + if (currentBucket < bucket || targetNode.empty() === true) { + return undefined + } + } else { + while (targetNode?.empty() === true && targetNode !== this.head) { + prev = targetNode + targetNode = targetNode.next + } } - const data = tail!.dequeue() - if (tail!.empty()) { - if (!tailChanged && tail!.next != null) { - this.tail = tail!.next - delete tail!.next - } else if (tailChanged) { - let node: PriorityQueueNode | undefined = this.tail - while (node != null) { - if (node.next === tail && tail!.next != null) { - node.next = tail!.next - delete tail!.next - break - } - if (node.next === tail && tail!.next == null) { - delete node.next - this.head = node - break - } - node = node.next + if (targetNode?.empty() === true) { + return undefined + } + const data = targetNode!.dequeue() + --this.size + if (targetNode!.empty() && this.head !== this.tail) { + if (targetNode === this.tail) { + this.tail = this.tail.next! + } else { + prev!.next = targetNode!.next + if (targetNode === this.head) { + this.head = prev! } } } @@ -213,6 +183,7 @@ export class PriorityQueue { */ public clear(): void { this.head = this.tail = this.getPriorityQueueNode() + this.size = 0 this.maxSize = 0 } @@ -223,32 +194,37 @@ export class PriorityQueue { * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols */ public [Symbol.iterator](): Iterator { + let node: PriorityQueueNode | undefined = this.tail let index = 0 - let node = this.tail - return { - next: () => { - const value = node.get(index) as T - if (value == null) { - return { - value: undefined, - done: true, - } + const getNextValue = (): IteratorResult => { + if (node == null) { + return { done: true, value: undefined } + } + + while (index >= node.size) { + node = node.next + index = 0 + if (node == null) { + return { done: true, value: undefined } } + } + + const value = node.get(index) + if (value == null) { ++index - if (index === node.capacity && node.next != null) { - node = node.next - index = 0 - } - return { - value, - done: false, - } - }, + return getNextValue() + } + + ++index + return { done: false, value } + } + return { + next: getNextValue, } } private getPriorityQueueNode( - nodeArray?: FixedQueueNode[], + nodeArray?: (FixedQueueNode | undefined)[], ): PriorityQueueNode { let fixedQueue: IFixedQueue if (this.priorityEnabled) { diff --git a/src/queues/queue-types.ts b/src/queues/queue-types.ts index 2049bc476..60400ce3a 100644 --- a/src/queues/queue-types.ts +++ b/src/queues/queue-types.ts @@ -25,7 +25,7 @@ export interface IFixedQueue { /** The fixed queue size. */ readonly size: number /** The fixed queue node array. */ - nodeArray: FixedQueueNode[] + nodeArray: (FixedQueueNode | undefined)[] /** * Checks if the fixed queue is empty. * @returns `true` if the fixed queue is empty, `false` otherwise. diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 9452123c6..ed9cbf1db 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -494,7 +494,7 @@ export abstract class AbstractWorker< this.taskAbortFunctions.set(taskId, () => { reject(new AbortError(`Task '${name}' id '${taskId}' aborted`)) }) - const taskFunction = this.taskFunctions.get(name)?.taskFunction + const taskFunction = this.taskFunctions.get(name)!.taskFunction if (isAsyncFunction(taskFunction)) { ;(taskFunction as TaskAsyncFunction)(data) .then(resolve) diff --git a/src/worker/utils.ts b/src/worker/utils.ts index 30d8d18b9..8d4b8ff2d 100644 --- a/src/worker/utils.ts +++ b/src/worker/utils.ts @@ -22,11 +22,8 @@ export const checkValidWorkerOptions = ( } if ( opts?.maxInactiveTime != null && - !Number.isSafeInteger(opts.maxInactiveTime) + (!Number.isSafeInteger(opts.maxInactiveTime) || opts.maxInactiveTime < 5) ) { - throw new TypeError('maxInactiveTime option is not an integer') - } - if (opts?.maxInactiveTime != null && opts.maxInactiveTime < 5) { throw new TypeError( 'maxInactiveTime option is not a positive integer greater or equal than 5', ) @@ -43,14 +40,7 @@ export const checkValidTaskFunctionObjectEntry = < name: string, fnObj: TaskFunctionObject, ): void => { - if (typeof name !== 'string') { - throw new TypeError('A taskFunctions parameter object key is not a string') - } - if (typeof name === 'string' && name.trim().length === 0) { - throw new TypeError( - 'A taskFunctions parameter object key is an empty string', - ) - } + checkTaskFunctionName(name) if (typeof fnObj.taskFunction !== 'function') { throw new TypeError( `taskFunction object 'taskFunction' property '${fnObj.taskFunction}' is not a function`, @@ -64,7 +54,7 @@ export const checkTaskFunctionName = (name: string): void => { if (typeof name !== 'string') { throw new TypeError('name parameter is not a string') } - if (typeof name === 'string' && name.trim().length === 0) { + if (name.trim().length === 0) { throw new TypeError('name parameter is an empty string') } } diff --git a/tests/circular-buffer.test.mjs b/tests/circular-buffer.test.mjs index 88b201423..205933bfb 100644 --- a/tests/circular-buffer.test.mjs +++ b/tests/circular-buffer.test.mjs @@ -41,29 +41,33 @@ describe('Circular buffer test suite', () => { it('Verify that circular buffer put() works as intended', () => { const circularBuffer = new CircularBuffer(4) circularBuffer.put(1) - expect(circularBuffer.items).toStrictEqual( - new Float32Array([1, -1, -1, -1]), - ) + expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 0, 0, 0])) + expect(circularBuffer.readIdx).toBe(0) expect(circularBuffer.writeIdx).toBe(1) expect(circularBuffer.size).toBe(1) circularBuffer.put(2) - expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, -1, -1])) + expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 0, 0])) + expect(circularBuffer.readIdx).toBe(0) expect(circularBuffer.writeIdx).toBe(2) expect(circularBuffer.size).toBe(2) circularBuffer.put(3) - expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 3, -1])) + expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 3, 0])) + expect(circularBuffer.readIdx).toBe(0) expect(circularBuffer.writeIdx).toBe(3) expect(circularBuffer.size).toBe(3) circularBuffer.put(4) expect(circularBuffer.items).toStrictEqual(new Float32Array([1, 2, 3, 4])) + expect(circularBuffer.readIdx).toBe(0) expect(circularBuffer.writeIdx).toBe(0) expect(circularBuffer.size).toBe(4) circularBuffer.put(5) expect(circularBuffer.items).toStrictEqual(new Float32Array([5, 2, 3, 4])) + expect(circularBuffer.readIdx).toBe(1) expect(circularBuffer.writeIdx).toBe(1) expect(circularBuffer.size).toBe(4) circularBuffer.put(6) expect(circularBuffer.items).toStrictEqual(new Float32Array([5, 6, 3, 4])) + expect(circularBuffer.readIdx).toBe(2) expect(circularBuffer.writeIdx).toBe(2) expect(circularBuffer.size).toBe(4) }) @@ -149,11 +153,16 @@ describe('Circular buffer test suite', () => { it('Verify that circular buffer toArray() works as intended', () => { const circularBuffer = new CircularBuffer(4) circularBuffer.put(1) + expect(circularBuffer.toArray()).toStrictEqual([1]) circularBuffer.put(2) + expect(circularBuffer.toArray()).toStrictEqual([1, 2]) circularBuffer.put(3) + expect(circularBuffer.toArray()).toStrictEqual([1, 2, 3]) circularBuffer.put(4) + expect(circularBuffer.toArray()).toStrictEqual([1, 2, 3, 4]) circularBuffer.put(5) + expect(circularBuffer.toArray()).toStrictEqual([2, 3, 4, 5]) circularBuffer.put(6) - expect(circularBuffer.toArray()).toStrictEqual([5, 6, 3, 4]) + expect(circularBuffer.toArray()).toStrictEqual([3, 4, 5, 6]) }) }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.mjs b/tests/pools/selection-strategies/selection-strategies.test.mjs index dac60413e..6cec7a3ec 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.mjs +++ b/tests/pools/selection-strategies/selection-strategies.test.mjs @@ -333,12 +333,12 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy, ).nextWorkerNodeKey, - ).toBe(0) + ).toBe(pool.workerNodes.length - 1) expect( pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy, ).previousWorkerNodeKey, - ).toBe(pool.workerNodes.length - 1) + ).toBe(pool.workerNodes.length - 2) // We need to clean up the resources after our test await pool.destroy() }) diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 8b73f9fdc..02c27336e 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -286,7 +286,6 @@ describe('Worker node test suite', () => { expect(threadWorkerNode.tasksQueueSize()).toBe( threadWorkerNode.tasksQueue.size, ) - expect(threadWorkerNode.setBackPressureFlag).toBe(false) expect(threadWorkerNode.taskFunctionsUsage).toBeInstanceOf(Map) }) diff --git a/tests/worker/abstract-worker.test.mjs b/tests/worker/abstract-worker.test.mjs index b8fee38d2..904555179 100644 --- a/tests/worker/abstract-worker.test.mjs +++ b/tests/worker/abstract-worker.test.mjs @@ -29,10 +29,14 @@ describe('Abstract worker test suite', () => { new TypeError("killBehavior option '0' is not valid"), ) expect(() => new ThreadWorker(() => {}, { maxInactiveTime: '' })).toThrow( - new TypeError('maxInactiveTime option is not an integer'), + new TypeError( + 'maxInactiveTime option is not a positive integer greater or equal than 5', + ), ) expect(() => new ThreadWorker(() => {}, { maxInactiveTime: 0.5 })).toThrow( - new TypeError('maxInactiveTime option is not an integer'), + new TypeError( + 'maxInactiveTime option is not a positive integer greater or equal than 5', + ), ) expect(() => new ThreadWorker(() => {}, { maxInactiveTime: 0 })).toThrow( new TypeError( @@ -143,7 +147,7 @@ describe('Abstract worker test suite', () => { } const fn2 = '' expect(() => new ThreadWorker({ '': fn1 })).toThrow( - new TypeError('A taskFunctions parameter object key is an empty string'), + new TypeError('name parameter is an empty string'), ) expect(() => new ThreadWorker({ fn1, fn2 })).toThrow( new TypeError(