Skip to content

Commit 91a58db

Browse files
committed
fix: tasks queuing fixes
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
1 parent 6fc2246 commit 91a58db

11 files changed

Lines changed: 242 additions & 184 deletions

src/circular-buffer.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ export class CircularBuffer {
5252
* @param number - Number to put into buffer.
5353
*/
5454
public put(number: number): void {
55-
this.items[this.writeIdx] = number
56-
this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1
57-
if (this.size < this.items.length) {
55+
if (this.full()) {
56+
this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1
57+
} else {
5858
++this.size
5959
}
60+
this.items[this.writeIdx] = number
61+
this.writeIdx = this.writeIdx === this.maxArrayIdx ? 0 : this.writeIdx + 1
6062
}
6163

6264
/**
@@ -65,10 +67,10 @@ export class CircularBuffer {
6567
* @returns Number from buffer.
6668
*/
6769
public get(): number | undefined {
68-
const number = this.items[this.readIdx]
69-
if (number === -1) {
70+
if (this.empty()) {
7071
return
7172
}
73+
const number = this.items[this.readIdx]
7274
this.items[this.readIdx] = -1
7375
this.readIdx = this.readIdx === this.maxArrayIdx ? 0 : this.readIdx + 1
7476
--this.size
@@ -81,7 +83,16 @@ export class CircularBuffer {
8183
* @returns Numbers' array.
8284
*/
8385
public toArray(): number[] {
84-
return Array.from(this.items.filter((item) => item !== -1))
86+
const array: number[] = []
87+
if (this.empty()) {
88+
return array
89+
}
90+
let currentIdx = this.readIdx
91+
for (let i = 0; i < this.size; i++) {
92+
array.push(this.items[currentIdx])
93+
currentIdx = currentIdx === this.maxArrayIdx ? 0 : currentIdx + 1
94+
}
95+
return array
8596
}
8697

8798
/**

src/pools/abstract-pool.ts

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -829,50 +829,60 @@ export abstract class AbstractPool<
829829
private async sendTaskFunctionOperationToWorkers(
830830
message: MessageValue<Data>,
831831
): Promise<boolean> {
832-
return await new Promise<boolean>((resolve, reject) => {
833-
const responsesReceived: MessageValue<Response>[] = []
834-
const taskFunctionOperationsListener = (
835-
message: MessageValue<Response>,
836-
): void => {
837-
this.checkMessageWorkerId(message)
838-
if (message.taskFunctionOperationStatus != null) {
839-
responsesReceived.push(message)
840-
if (responsesReceived.length === this.workerNodes.length) {
841-
if (
842-
responsesReceived.every(
843-
(message) => message.taskFunctionOperationStatus === true,
844-
)
845-
) {
846-
resolve(true)
847-
} else if (
848-
responsesReceived.some(
849-
(message) => message.taskFunctionOperationStatus === false,
850-
)
851-
) {
852-
const errorResponse = responsesReceived.find(
853-
(response) => response.taskFunctionOperationStatus === false,
854-
)
855-
reject(
856-
new Error(
857-
`Task function operation '${message.taskFunctionOperation}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${errorResponse?.workerError?.error.message}'`,
858-
),
859-
)
860-
}
861-
this.deregisterWorkerMessageListener(
862-
this.getWorkerNodeKeyByWorkerId(message.workerId),
863-
taskFunctionOperationsListener,
832+
const taskFunctionOperationsListener = (
833+
message: MessageValue<Response>,
834+
resolve: (value: boolean | PromiseLike<boolean>) => void,
835+
reject: (reason?: unknown) => void,
836+
responsesReceived: MessageValue<Response>[],
837+
): void => {
838+
this.checkMessageWorkerId(message)
839+
if (message.taskFunctionOperationStatus != null) {
840+
responsesReceived.push(message)
841+
if (responsesReceived.length === this.workerNodes.length) {
842+
if (
843+
responsesReceived.every(
844+
(msg) => msg.taskFunctionOperationStatus === true,
845+
)
846+
) {
847+
resolve(true)
848+
} else {
849+
const errorResponse = responsesReceived.find(
850+
(msg) => msg.taskFunctionOperationStatus === false,
851+
)
852+
reject(
853+
new Error(
854+
`Task function operation '${message.taskFunctionOperation}' failed on worker ${errorResponse?.workerId?.toString()} with error: '${errorResponse?.workerError?.error.message}'`,
855+
),
864856
)
865857
}
866858
}
867859
}
860+
}
861+
let listener: (message: MessageValue<Response>) => void
862+
try {
863+
return await new Promise<boolean>((resolve, reject) => {
864+
const responsesReceived: MessageValue<Response>[] = []
865+
listener = (message: MessageValue<Response>) => {
866+
taskFunctionOperationsListener(
867+
message,
868+
resolve,
869+
reject,
870+
responsesReceived,
871+
)
872+
}
873+
for (const workerNodeKey of this.workerNodes.keys()) {
874+
this.registerWorkerMessageListener(
875+
workerNodeKey,
876+
listener,
877+
)
878+
this.sendToWorker(workerNodeKey, message)
879+
}
880+
})
881+
} finally {
868882
for (const workerNodeKey of this.workerNodes.keys()) {
869-
this.registerWorkerMessageListener(
870-
workerNodeKey,
871-
taskFunctionOperationsListener,
872-
)
873-
this.sendToWorker(workerNodeKey, message)
883+
this.deregisterWorkerMessageListener(workerNodeKey, listener!)
874884
}
875-
})
885+
}
876886
}
877887

878888
/** @inheritDoc */

src/pools/selection-strategies/fair-share-worker-choice-strategy.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { min } from '../../utils.ts'
12
import type { IPool } from '../pool.ts'
23
import type { IWorker } from '../worker.ts'
34
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.ts'
@@ -81,23 +82,29 @@ export class FairShareWorkerChoiceStrategy<
8182
}
8283

8384
private fairShareNextWorkerNodeKey(): number | undefined {
84-
return this.pool.workerNodes.reduce(
85-
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
85+
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
86+
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
87+
if (!this.isWorkerNodeReady(workerNodeKey)) {
88+
return minWorkerNodeKey
89+
}
90+
if (minWorkerNodeKey === -1) {
91+
return workerNodeKey
92+
}
8693
if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
8794
workerNode.strategyData = {
8895
virtualTaskEndTimestamp: this
8996
.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
9097
}
9198
}
92-
return this.isWorkerNodeReady(workerNodeKey) &&
93-
workerNode.strategyData.virtualTaskEndTimestamp! <
94-
workerNodes[minWorkerNodeKey].strategyData!
95-
.virtualTaskEndTimestamp!
99+
return workerNode.strategyData.virtualTaskEndTimestamp! <
100+
workerNodes[minWorkerNodeKey].strategyData!
101+
.virtualTaskEndTimestamp!
96102
? workerNodeKey
97103
: minWorkerNodeKey
98104
},
99-
0,
105+
-1,
100106
)
107+
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
101108
}
102109

103110
/**

src/pools/selection-strategies/least-busy-worker-choice-strategy.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,23 @@ export class LeastBusyWorkerChoiceStrategy<
6969
}
7070

7171
private leastBusyNextWorkerNodeKey(): number | undefined {
72-
return this.pool.workerNodes.reduce(
73-
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
74-
return this.isWorkerNodeReady(workerNodeKey) &&
75-
(workerNode.usage.waitTime.aggregate ?? 0) +
76-
(workerNode.usage.runTime.aggregate ?? 0) <
77-
(workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) +
78-
(workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0)
72+
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
73+
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
74+
if (!this.isWorkerNodeReady(workerNodeKey)) {
75+
return minWorkerNodeKey
76+
}
77+
if (minWorkerNodeKey === -1) {
78+
return workerNodeKey
79+
}
80+
return (workerNode.usage.waitTime.aggregate ?? 0) +
81+
(workerNode.usage.runTime.aggregate ?? 0) <
82+
(workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) +
83+
(workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0)
7984
? workerNodeKey
8085
: minWorkerNodeKey
8186
},
82-
0,
87+
-1,
8388
)
89+
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
8490
}
8591
}

src/pools/selection-strategies/least-used-worker-choice-strategy.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,23 @@ export class LeastUsedWorkerChoiceStrategy<
5050
}
5151

5252
private leastUsedNextWorkerNodeKey(): number | undefined {
53-
return this.pool.workerNodes.reduce(
53+
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
5454
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
55-
return this.isWorkerNodeReady(workerNodeKey) &&
56-
workerNode.usage.tasks.executing + workerNode.usage.tasks.queued <
57-
workerNodes[minWorkerNodeKey].usage.tasks.executing +
58-
workerNodes[minWorkerNodeKey].usage.tasks.queued
55+
if (!this.isWorkerNodeReady(workerNodeKey)) {
56+
return minWorkerNodeKey
57+
}
58+
if (minWorkerNodeKey === -1) {
59+
return workerNodeKey
60+
}
61+
return workerNode.usage.tasks.executing +
62+
workerNode.usage.tasks.queued <
63+
workerNodes[minWorkerNodeKey].usage.tasks.executing +
64+
workerNodes[minWorkerNodeKey].usage.tasks.queued
5965
? workerNodeKey
6066
: minWorkerNodeKey
6167
},
62-
0,
68+
-1,
6369
)
70+
return chosenWorkerNodeKey === -1 ? undefined : chosenWorkerNodeKey
6471
}
6572
}

src/queues/abstract-fixed-queue.ts

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export abstract class AbstractFixedQueue<T> implements IFixedQueue<T> {
1717
/** @inheritdoc */
1818
public size!: number
1919
/** @inheritdoc */
20-
public nodeArray: FixedQueueNode<T>[]
20+
public nodeArray: (FixedQueueNode<T> | undefined)[]
2121

2222
/**
2323
* Constructs a fixed queue.
@@ -54,15 +54,41 @@ export abstract class AbstractFixedQueue<T> implements IFixedQueue<T> {
5454
if (index >= this.capacity) {
5555
index -= this.capacity
5656
}
57-
return this.nodeArray[index].data
57+
return this.nodeArray[index]!.data
5858
}
5959

6060
/** @inheritdoc */
6161
public delete(data: T): boolean {
62-
const index = this.nodeArray.findIndex((node) => node?.data === data)
63-
if (index !== -1) {
64-
this.nodeArray.splice(index, 1)
65-
this.nodeArray.length = this.capacity
62+
let currentPhysicalIndex = this.start
63+
let logicalIndex = -1
64+
for (let i = 0; i < this.size; i++) {
65+
if (this.nodeArray[currentPhysicalIndex]?.data === data) {
66+
logicalIndex = i
67+
break
68+
}
69+
currentPhysicalIndex++
70+
if (currentPhysicalIndex === this.capacity) {
71+
currentPhysicalIndex = 0
72+
}
73+
}
74+
if (logicalIndex !== -1) {
75+
let toShiftIndex = this.start + logicalIndex
76+
if (toShiftIndex >= this.capacity) {
77+
toShiftIndex -= this.capacity
78+
}
79+
for (let i = logicalIndex; i < this.size - 1; i++) {
80+
let nextIndex = toShiftIndex + 1
81+
if (nextIndex === this.capacity) {
82+
nextIndex = 0
83+
}
84+
this.nodeArray[toShiftIndex] = this.nodeArray[nextIndex]
85+
toShiftIndex = nextIndex
86+
}
87+
let end = this.start + this.size - 1
88+
if (end >= this.capacity) {
89+
end -= this.capacity
90+
}
91+
this.nodeArray[end] = undefined
6692
--this.size
6793
return true
6894
}
@@ -80,7 +106,7 @@ export abstract class AbstractFixedQueue<T> implements IFixedQueue<T> {
80106
if (this.start === this.capacity) {
81107
this.start = 0
82108
}
83-
return this.nodeArray[index].data
109+
return this.nodeArray[index]!.data
84110
}
85111

86112
/** @inheritdoc */
@@ -101,7 +127,7 @@ export abstract class AbstractFixedQueue<T> implements IFixedQueue<T> {
101127
done: true,
102128
}
103129
}
104-
const value = this.nodeArray[index].data
130+
const value = this.nodeArray[index]!.data
105131
++index
106132
++i
107133
if (index === this.capacity) {

src/queues/fixed-priority-queue.ts

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,35 @@ export class FixedPriorityQueue<T> extends AbstractFixedQueue<T>
1515
throw new Error('Fixed priority queue is full')
1616
}
1717
priority = priority ?? 0
18-
let inserted = false
19-
let index = this.start
18+
let insertionPhysicalIndex = -1
19+
let currentPhysicalIndex = this.start
2020
for (let i = 0; i < this.size; i++) {
21-
if (this.nodeArray[index].priority > priority) {
22-
this.nodeArray.splice(index, 0, { data, priority })
23-
this.nodeArray.length = this.capacity
24-
inserted = true
21+
if (this.nodeArray[currentPhysicalIndex]!.priority > priority) {
22+
insertionPhysicalIndex = currentPhysicalIndex
2523
break
2624
}
27-
++index
28-
if (index === this.capacity) {
29-
index = 0
25+
currentPhysicalIndex++
26+
if (currentPhysicalIndex === this.capacity) {
27+
currentPhysicalIndex = 0
3028
}
3129
}
32-
if (!inserted) {
33-
let index = this.start + this.size
34-
if (index >= this.capacity) {
35-
index -= this.capacity
30+
let end = this.start + this.size
31+
if (end >= this.capacity) {
32+
end -= this.capacity
33+
}
34+
if (insertionPhysicalIndex === -1) {
35+
insertionPhysicalIndex = end
36+
} else {
37+
let toShiftIndex = end
38+
while (toShiftIndex !== insertionPhysicalIndex) {
39+
const previousIndex = toShiftIndex === 0
40+
? this.capacity - 1
41+
: toShiftIndex - 1
42+
this.nodeArray[toShiftIndex] = this.nodeArray[previousIndex]
43+
toShiftIndex = previousIndex
3644
}
37-
this.nodeArray[index] = { data, priority }
3845
}
46+
this.nodeArray[insertionPhysicalIndex] = { data, priority }
3947
return ++this.size
4048
}
4149
}

0 commit comments

Comments
 (0)