Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions src/circular-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
export const defaultBufferSize = 2048

/**
* Circular buffer designed for positive numbers.
* Circular buffer designed for numbers.
*
* @internal
*/
Expand All @@ -25,7 +25,7 @@ export class CircularBuffer {
this.writeIdx = 0
this.maxArrayIdx = size - 1
this.size = 0
this.items = new Float32Array(size).fill(-1)
this.items = new Float32Array(size)
}

/**
Expand All @@ -52,11 +52,13 @@ export class CircularBuffer {
* @param number - Number to put into buffer.
*/
public put(number: number): void {
this.items[this.writeIdx] = number
this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1
if (this.size < this.items.length) {
if (this.full()) {
this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1
} else {
++this.size
}
this.items[this.writeIdx] = number
this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1
}

/**
Expand All @@ -65,11 +67,10 @@ export class CircularBuffer {
* @returns Number from buffer.
*/
public get(): number | undefined {
const number = this.items[this.readIdx]
if (number === -1) {
return
if (this.empty()) {
return undefined
}
this.items[this.readIdx] = -1
const number = this.items[this.readIdx]
this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1
--this.size
return number
Expand All @@ -81,7 +82,16 @@ export class CircularBuffer {
* @returns Numbers' array.
*/
public toArray(): number[] {
return Array.from(this.items.filter((item) => item !== -1))
const array: number[] = []
if (this.empty()) {
return array
}
let currentIdx = this.readIdx
for (let i = 0; i < this.size; i++) {
array.push(this.items[currentIdx])
currentIdx = currentIdx === this.maxArrayIdx ? 0 : currentIdx + 1
}
return array
}

/**
Expand Down
85 changes: 47 additions & 38 deletions src/pools/abstract-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -829,50 +829,59 @@ export abstract class AbstractPool<
private async sendTaskFunctionOperationToWorkers(
message: MessageValue<Data>,
): Promise<boolean> {
return await new Promise<boolean>((resolve, reject) => {
const responsesReceived: MessageValue<Response>[] = []
const taskFunctionOperationsListener = (
message: MessageValue<Response>,
): void => {
this.checkMessageWorkerId(message)
if (message.taskFunctionOperationStatus != null) {
responsesReceived.push(message)
if (responsesReceived.length === this.workerNodes.length) {
if (
responsesReceived.every(
(message) => message.taskFunctionOperationStatus === true,
)
) {
resolve(true)
} else if (
responsesReceived.some(
(message) => message.taskFunctionOperationStatus === false,
)
) {
const errorResponse = responsesReceived.find(
(response) => response.taskFunctionOperationStatus === false,
)
reject(
new Error(
`Task function operation '${message.taskFunctionOperation}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${errorResponse?.workerError?.error.message}'`,
),
)
}
this.deregisterWorkerMessageListener(
this.getWorkerNodeKeyByWorkerId(message.workerId),
taskFunctionOperationsListener,
const taskFunctionOperationsListener = (
message: MessageValue<Response>,
resolve: (value: boolean | PromiseLike<boolean>) => void,
reject: (reason?: unknown) => void,
responsesReceived: MessageValue<Response>[],
): void => {
this.checkMessageWorkerId(message)
if (message.taskFunctionOperationStatus != null) {
responsesReceived.push(message)
if (responsesReceived.length >= this.workerNodes.length) {
if (
responsesReceived.every(
(msg) => msg.taskFunctionOperationStatus === true,
)
) {
resolve(true)
} else {
const errorResponse = responsesReceived.find(
(msg) => msg.taskFunctionOperationStatus === false,
)
reject(
new Error(
`Task function operation '${message.taskFunctionOperation}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${
errorResponse?.workerError?.error?.message ?? 'Unknown error'
}'`,
),
)
}
}
}
}
let listener: (message: MessageValue<Response>) => void
try {
return await new Promise<boolean>((resolve, reject) => {
const responsesReceived: MessageValue<Response>[] = []
listener = (message: MessageValue<Response>) => {
taskFunctionOperationsListener(
message,
resolve,
reject,
responsesReceived,
)
}
for (const workerNodeKey of this.workerNodes.keys()) {
this.registerWorkerMessageListener(workerNodeKey, listener)
this.sendToWorker(workerNodeKey, message)
}
})
} finally {
for (const workerNodeKey of this.workerNodes.keys()) {
this.registerWorkerMessageListener(
workerNodeKey,
taskFunctionOperationsListener,
)
this.sendToWorker(workerNodeKey, message)
this.deregisterWorkerMessageListener(workerNodeKey, listener!)
}
})
}
}

/** @inheritDoc */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,21 @@ export abstract class AbstractWorkerChoiceStrategy<
}

/**
* Check the next worker node key.
* Check the worker node key.
* @param workerNodeKey - The worker node key to check.
* @returns The worker node key if it is valid, otherwise undefined.
*/
protected checkNextWorkerNodeKey(): void {
protected checkWorkerNodeKey(
workerNodeKey: number | undefined,
): number | undefined {
if (
this.nextWorkerNodeKey != null &&
(this.nextWorkerNodeKey < 0 ||
!this.isWorkerNodeReady(this.nextWorkerNodeKey))
workerNodeKey == null ||
workerNodeKey < 0 ||
workerNodeKey >= this.pool.workerNodes.length
) {
delete this.nextWorkerNodeKey
return undefined
}
return workerNodeKey
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,44 @@ export class FairShareWorkerChoiceStrategy<
}

/** @inheritDoc */
public remove(): boolean {
public remove(workerNodeKey: number): boolean {
if (
this.pool.workerNodes[workerNodeKey]?.strategyData
?.virtualTaskEndTimestamp != null
) {
delete this.pool.workerNodes[workerNodeKey].strategyData
.virtualTaskEndTimestamp
}
return true
}

private fairShareNextWorkerNodeKey(): number | undefined {
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
if (!this.isWorkerNodeReady(workerNodeKey)) {
return minWorkerNodeKey
}
if (minWorkerNodeKey === -1) {
workerNode.strategyData = {
virtualTaskEndTimestamp: this
.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
}
return workerNodeKey
}
if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
workerNode.strategyData = {
virtualTaskEndTimestamp: this
.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
}
}
return this.isWorkerNodeReady(workerNodeKey) &&
workerNode.strategyData.virtualTaskEndTimestamp! <
workerNodes[minWorkerNodeKey].strategyData!
.virtualTaskEndTimestamp!
return workerNode.strategyData.virtualTaskEndTimestamp! <
workerNodes[minWorkerNodeKey].strategyData!.virtualTaskEndTimestamp!
? workerNodeKey
: minWorkerNodeKey
},
0,
-1,
)
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,17 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
return true
}
if (
this.workerNodeId === workerNodeKey &&
this.workerNodeId > this.pool.workerNodes.length - 1
this.nextWorkerNodeKey != null &&
this.nextWorkerNodeKey >= workerNodeKey
) {
this.workerNodeId = this.pool.workerNodes.length - 1
this.nextWorkerNodeKey =
(this.nextWorkerNodeKey - 1 + this.pool.workerNodes.length) %
this.pool.workerNodes.length
}
if (
this.previousWorkerNodeKey === workerNodeKey &&
this.previousWorkerNodeKey > this.pool.workerNodes.length - 1
) {
this.previousWorkerNodeKey = this.pool.workerNodes.length - 1
if (this.workerNodeId >= workerNodeKey) {
this.workerNodeId =
(this.workerNodeId - 1 + this.pool.workerNodes.length) %
this.pool.workerNodes.length
}
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,23 @@ export class LeastBusyWorkerChoiceStrategy<
}

private leastBusyNextWorkerNodeKey(): number | undefined {
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
return this.isWorkerNodeReady(workerNodeKey) &&
(workerNode.usage.waitTime.aggregate ?? 0) +
(workerNode.usage.runTime.aggregate ?? 0) <
(workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) +
(workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0)
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
if (!this.isWorkerNodeReady(workerNodeKey)) {
return minWorkerNodeKey
}
if (minWorkerNodeKey === -1) {
return workerNodeKey
}
return (workerNode.usage.waitTime.aggregate ?? 0) +
(workerNode.usage.runTime.aggregate ?? 0) <
(workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) +
(workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0)
? workerNodeKey
: minWorkerNodeKey
},
0,
-1,
)
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,21 @@ export class LeastEluWorkerChoiceStrategy<
}

private leastEluNextWorkerNodeKey(): number | undefined {
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
return this.isWorkerNodeReady(workerNodeKey) &&
(workerNode.usage.elu.active.aggregate ?? 0) <
(workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0)
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
if (!this.isWorkerNodeReady(workerNodeKey)) {
return minWorkerNodeKey
}
if (minWorkerNodeKey === -1) {
return workerNodeKey
}
return (workerNode.usage.elu.active.aggregate ?? 0) <
(workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0)
? workerNodeKey
: minWorkerNodeKey
},
0,
-1,
)
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,23 @@ export class LeastUsedWorkerChoiceStrategy<
}

private leastUsedNextWorkerNodeKey(): number | undefined {
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
return this.isWorkerNodeReady(workerNodeKey) &&
workerNode.usage.tasks.executing + workerNode.usage.tasks.queued <
workerNodes[minWorkerNodeKey].usage.tasks.executing +
workerNodes[minWorkerNodeKey].usage.tasks.queued
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
if (!this.isWorkerNodeReady(workerNodeKey)) {
return minWorkerNodeKey
}
if (minWorkerNodeKey === -1) {
return workerNodeKey
}
return workerNode.usage.tasks.executing +
workerNode.usage.tasks.queued <
workerNodes[minWorkerNodeKey].usage.tasks.executing +
workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
: minWorkerNodeKey
},
0,
-1,
)
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
}
}
Loading