Skip to content

Commit af54b3b

Browse files
feat: add task function worker node affinity (#126)
* feat: add task function worker node affinity Port task function worker node affinity from poolifier/poolifier PR #2269. Allows restricting task execution to specific worker nodes by their indices via a workerNodeKeys property on task function objects. * [autofix.ci] apply automated fixes * fix: add workerNodeKeys validation and remove misleading JSDoc * [autofix.ci] apply automated fixes * fix: add missing affinity checks in least-elu strategy * ci: migrate npm publish to OIDC trusted publisher * fix: add missing nextWorkerNodeKey assignment in weighted round robin and align workerNodeKeys JSDoc with upstream --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent f1f8848 commit af54b3b

26 files changed

Lines changed: 887 additions & 100 deletions

.github/workflows/release-please.yml

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ jobs:
8383
needs: build-release
8484
runs-on: ubuntu-latest
8585

86+
permissions:
87+
contents: read
88+
id-token: write
89+
8690
steps:
8791
- name: Checkout
8892
uses: actions/checkout@v6
@@ -108,23 +112,15 @@ jobs:
108112
- name: Publish Release
109113
if: ${{ contains(github.ref_name, '-') == false }}
110114
run: cd dist && npm publish
111-
env:
112-
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
113115

114116
- name: Publish Release Candidate
115117
if: ${{ contains(github.ref_name, '-rc') == true }}
116118
run: cd dist && npm publish --tag next
117-
env:
118-
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
119119

120120
- name: Publish Beta Release
121121
if: ${{ contains(github.ref_name, '-beta') == true }}
122122
run: cd dist && npm publish --tag beta
123-
env:
124-
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
125123

126124
- name: Publish Alpha Release
127125
if: ${{ contains(github.ref_name, '-alpha') == true }}
128126
run: cd dist && npm publish --tag alpha
129-
env:
130-
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Please consult our [general guidelines](#general-guidelines).
5858
- Support for abortable task function ✔
5959
- Support for multiple task functions with per task function queuing priority
6060
and tasks distribution strategy ✔
61+
- Support for worker node affinity per task function ✔
6162
- Support for task functions
6263
[CRUD](https://en.wikipedia.org/wiki/Create,_read,_update_and_delete)
6364
operations at runtime ✔

docs/api.md

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,22 @@ This method is available on both pool implementations and returns a boolean.
9191
`name` (mandatory) The task function name.\
9292
`fn` (mandatory) The task function
9393
`(data?: Data) => Response | Promise<Response>` or task function object
94-
`{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }`.
95-
Priority range is the same as Unix nice levels.
94+
`{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }`.
95+
Priority range is the same as Unix nice levels. `workerNodeKeys` is an array of
96+
worker node keys to restrict task execution to specific workers (worker node
97+
affinity).
98+
99+
#### Worker Node Affinity Notes
100+
101+
- Worker node keys are validated at registration time against the pool's maximum
102+
size (`maximumNumberOfWorkers ?? minimumNumberOfWorkers`).
103+
- The number of worker node keys cannot exceed the pool's maximum size
104+
(`maximumNumberOfWorkers ?? minimumNumberOfWorkers`).
105+
- In dynamic pools, you can reference worker node keys up to the maximum pool
106+
size. Workers that don't exist yet are automatically created when a task
107+
targeting them is executed.
108+
- At execution time, if no specified worker is ready, selection retries until
109+
one becomes available or retries are exhausted.
96110

97111
This method is available on both pool implementations and returns a boolean
98112
promise.
@@ -225,9 +239,12 @@ An object with these properties:
225239
### `class YourWorker extends ThreadWorker`
226240

227241
`taskFunctions` (mandatory) The task function or task functions object
228-
`Record<string, (data?: Data) => Response | Promise<Response> | { taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }>`
242+
`Record<string, (data?: Data) => Response | Promise<Response> | { taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }>`
229243
that you want to execute on the worker. Priority range is the same as Unix nice
230-
levels.\
244+
levels. `workerNodeKeys` is an array of worker node keys to restrict task
245+
execution to specific workers (worker node affinity). See
246+
[Worker Node Affinity Notes](#worker-node-affinity-notes) above for validation
247+
behavior.\
231248
`opts` (optional) An object with these properties:
232249

233250
- `killBehavior` (optional) - Dictates if your worker will be deleted in case a
@@ -270,8 +287,11 @@ This method is available on both worker implementations and returns
270287
`name` (mandatory) The task function name.\
271288
`fn` (mandatory) The task function
272289
`(data?: Data) => Response | Promise<Response>` or task function object
273-
`{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy }`.
274-
Priority range is the same as Unix nice levels.
290+
`{ taskFunction: (data?: Data) => Response | Promise<Response>, priority?: number, strategy?: WorkerChoiceStrategy, workerNodeKeys?: number[] }`.
291+
Priority range is the same as Unix nice levels. `workerNodeKeys` is an array of
292+
worker node keys to restrict task execution to specific workers (worker node
293+
affinity). See [Worker Node Affinity Notes](#worker-node-affinity-notes) above
294+
for validation behavior.
275295

276296
This method is available on both worker implementations and returns
277297
`{ status: boolean, error?: Error }`.

src/pools/abstract-pool.ts

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import {
4545
checkValidPriority,
4646
checkValidTasksQueueOptions,
4747
checkValidWorkerChoiceStrategy,
48+
checkValidWorkerNodeKeys,
4849
getDefaultTasksQueueOptions,
4950
updateEluWorkerUsage,
5051
updateRunTimeWorkerUsage,
@@ -842,8 +843,8 @@ export abstract class AbstractPool<
842843
private async sendTaskFunctionOperationToWorkers(
843844
message: MessageValue<Data>,
844845
): Promise<boolean> {
845-
const targetWorkerNodeKeys = [...this.workerNodes.keys()]
846-
if (targetWorkerNodeKeys.length === 0) {
846+
const targetWorkerNodeCount = this.workerNodes.length
847+
if (targetWorkerNodeCount === 0) {
847848
return true
848849
}
849850
const responsesReceived: MessageValue<Response>[] = []
@@ -853,14 +854,14 @@ export abstract class AbstractPool<
853854
reject: (reason?: unknown) => void,
854855
): void => {
855856
this.checkMessageWorkerId(message)
857+
const workerNodeKey = this.getWorkerNodeKeyByWorkerId(message.workerId)
856858
if (
857859
message.taskFunctionOperationStatus != null &&
858-
targetWorkerNodeKeys.includes(
859-
this.getWorkerNodeKeyByWorkerId(message.workerId),
860-
)
860+
workerNodeKey >= 0 &&
861+
workerNodeKey < targetWorkerNodeCount
861862
) {
862863
responsesReceived.push(message)
863-
if (responsesReceived.length >= targetWorkerNodeKeys.length) {
864+
if (responsesReceived.length >= targetWorkerNodeCount) {
864865
if (
865866
responsesReceived.every(
866867
(msg) => msg.taskFunctionOperationStatus === true,
@@ -883,19 +884,20 @@ export abstract class AbstractPool<
883884
}
884885
}
885886
let listener: ((message: MessageValue<Response>) => void) | undefined
887+
const workerNodeKeys = [...this.workerNodes.keys()]
886888
try {
887889
return await new Promise<boolean>((resolve, reject) => {
888890
listener = (message: MessageValue<Response>) => {
889891
taskFunctionOperationsListener(message, resolve, reject)
890892
}
891-
for (const workerNodeKey of targetWorkerNodeKeys) {
893+
for (const workerNodeKey of workerNodeKeys) {
892894
this.registerWorkerMessageListener(workerNodeKey, listener)
893895
this.sendToWorker(workerNodeKey, message)
894896
}
895897
})
896898
} finally {
897899
if (listener != null) {
898-
for (const workerNodeKey of targetWorkerNodeKeys) {
900+
for (const workerNodeKey of workerNodeKeys) {
899901
this.deregisterWorkerMessageListener(workerNodeKey, listener)
900902
}
901903
}
@@ -928,6 +930,10 @@ export abstract class AbstractPool<
928930
}
929931
checkValidPriority(fn.priority)
930932
checkValidWorkerChoiceStrategy(fn.strategy)
933+
checkValidWorkerNodeKeys(
934+
fn.workerNodeKeys,
935+
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
936+
)
931937
const opResult = await this.sendTaskFunctionOperationToWorkers({
932938
taskFunctionOperation: 'add',
933939
taskFunctionProperties: buildTaskFunctionProperties(name, fn),
@@ -1057,6 +1063,26 @@ export abstract class AbstractPool<
10571063
)?.strategy
10581064
}
10591065

1066+
/**
1067+
* Gets task function worker node keys affinity set, if any.
1068+
* @param name - The task function name.
1069+
* @returns The task function worker node keys affinity set, or `undefined` if not defined.
1070+
*/
1071+
private readonly getTaskFunctionWorkerNodeKeysSet = (
1072+
name?: string,
1073+
): ReadonlySet<number> | undefined => {
1074+
name = name ?? DEFAULT_TASK_NAME
1075+
const taskFunctionsProperties = this.listTaskFunctionsProperties()
1076+
if (name === DEFAULT_TASK_NAME) {
1077+
name = taskFunctionsProperties[1]?.name
1078+
}
1079+
const workerNodeKeys = taskFunctionsProperties.find(
1080+
(taskFunctionProperties: TaskFunctionProperties) =>
1081+
taskFunctionProperties.name === name,
1082+
)?.workerNodeKeys
1083+
return workerNodeKeys != null ? new Set(workerNodeKeys) : undefined
1084+
}
1085+
10601086
/**
10611087
* Gets worker node task function priority, if any.
10621088
*
@@ -1549,7 +1575,20 @@ export abstract class AbstractPool<
15491575
* @returns The chosen worker node key.
15501576
*/
15511577
private chooseWorkerNode(name?: string): number {
1552-
if (this.shallCreateDynamicWorker()) {
1578+
const workerNodeKeysSet = this.getTaskFunctionWorkerNodeKeysSet(name)
1579+
if (workerNodeKeysSet != null) {
1580+
const maxPoolSize = this.maximumNumberOfWorkers ??
1581+
this.minimumNumberOfWorkers
1582+
const targetSize = max(...workerNodeKeysSet) + 1
1583+
while (
1584+
this.started &&
1585+
!this.destroying &&
1586+
this.workerNodes.length < targetSize &&
1587+
this.workerNodes.length < maxPoolSize
1588+
) {
1589+
this.createAndSetupDynamicWorkerNode()
1590+
}
1591+
} else if (this.shallCreateDynamicWorker()) {
15531592
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
15541593
if (
15551594
this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
@@ -1560,6 +1599,7 @@ export abstract class AbstractPool<
15601599
}
15611600
return this.workerChoiceStrategiesContext!.execute(
15621601
this.getTaskFunctionWorkerChoiceStrategy(name),
1602+
workerNodeKeysSet,
15631603
)
15641604
}
15651605

@@ -2172,6 +2212,14 @@ export abstract class AbstractPool<
21722212
if (ready == null || !ready) {
21732213
throw new Error(`Worker ${workerId?.toString()} failed to initialize`)
21742214
}
2215+
const maxPoolSize = this.maximumNumberOfWorkers ??
2216+
this.minimumNumberOfWorkers
2217+
for (const taskFunctionProperties of taskFunctionsProperties ?? []) {
2218+
checkValidWorkerNodeKeys(
2219+
taskFunctionProperties.workerNodeKeys,
2220+
maxPoolSize,
2221+
)
2222+
}
21752223
const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
21762224
const workerNode = this.workerNodes[workerNodeKey]
21772225
workerNode.info.ready = ready

src/pools/selection-strategies/abstract-worker-choice-strategy.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ export abstract class AbstractWorkerChoiceStrategy<
101101
public abstract update(workerNodeKey: number): boolean
102102

103103
/** @inheritDoc */
104-
public abstract choose(): number | undefined
104+
public abstract choose(
105+
workerNodeKeysSet?: ReadonlySet<number>,
106+
): number | undefined
105107

106108
/** @inheritDoc */
107109
public abstract remove(workerNodeKey: number): boolean
@@ -144,6 +146,47 @@ export abstract class AbstractWorkerChoiceStrategy<
144146
return workerNodeKey
145147
}
146148

149+
/**
150+
* Gets the next worker node key in a round-robin fashion.
151+
*
152+
* @returns The next worker node key.
153+
*/
154+
protected getRoundRobinNextWorkerNodeKey(): number {
155+
return this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
156+
? 0
157+
: (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
158+
}
159+
160+
/**
161+
* Gets the worker node key from a single-element affinity set.
162+
*
163+
* @param workerNodeKeysSet - The worker node keys affinity set.
164+
* @returns The worker node key if ready, `undefined` otherwise.
165+
*/
166+
protected getSingleWorkerNodeKey(
167+
workerNodeKeysSet: ReadonlySet<number>,
168+
): number | undefined {
169+
const [workerNodeKey] = workerNodeKeysSet
170+
return this.isWorkerNodeReady(workerNodeKey) ? workerNodeKey : undefined
171+
}
172+
173+
/**
174+
* Whether the worker node is eligible for selection (ready and in affinity set).
175+
*
176+
* @param workerNodeKey - The worker node key.
177+
* @param workerNodeKeysSet - The worker node keys affinity set. If undefined, all workers are eligible.
178+
* @returns Whether the worker node is eligible.
179+
*/
180+
protected isWorkerNodeEligible(
181+
workerNodeKey: number,
182+
workerNodeKeysSet?: ReadonlySet<number>,
183+
): boolean {
184+
return (
185+
this.isWorkerNodeReady(workerNodeKey) &&
186+
(workerNodeKeysSet == null || workerNodeKeysSet.has(workerNodeKey))
187+
)
188+
}
189+
147190
/**
148191
* Gets the worker node task runtime.
149192
* If the task statistics require the average runtime, the average runtime is returned.

src/pools/selection-strategies/fair-share-worker-choice-strategy.ts

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ export class FairShareWorkerChoiceStrategy<
7777
}
7878

7979
/** @inheritDoc */
80-
public choose(): number | undefined {
80+
public choose(workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
8181
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
82-
this.nextWorkerNodeKey = this.fairShareNextWorkerNodeKey()
82+
this.nextWorkerNodeKey = this.fairShareNextWorkerNodeKey(workerNodeKeysSet)
8383
return this.nextWorkerNodeKey
8484
}
8585

@@ -96,27 +96,30 @@ export class FairShareWorkerChoiceStrategy<
9696
return true
9797
}
9898

99-
private fairShareNextWorkerNodeKey(): number | undefined {
99+
private fairShareNextWorkerNodeKey(
100+
workerNodeKeysSet?: ReadonlySet<number>,
101+
): number | undefined {
102+
if (workerNodeKeysSet?.size === 0) {
103+
return undefined
104+
}
105+
if (workerNodeKeysSet?.size === 1) {
106+
return this.getSingleWorkerNodeKey(workerNodeKeysSet)
107+
}
100108
const chosenWorkerNodeKey = this.pool.workerNodes.reduce(
101109
(minWorkerNodeKey: number, workerNode, workerNodeKey, workerNodes) => {
102-
if (!this.isWorkerNodeReady(workerNodeKey)) {
110+
if (!this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet)) {
103111
return minWorkerNodeKey
104112
}
105-
if (minWorkerNodeKey === -1) {
106-
workerNode.strategyData = {
107-
...workerNode.strategyData,
108-
virtualTaskEndTimestamp: this
109-
.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
110-
}
111-
return workerNodeKey
112-
}
113113
if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
114114
workerNode.strategyData = {
115115
...workerNode.strategyData,
116116
virtualTaskEndTimestamp: this
117117
.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
118118
}
119119
}
120+
if (minWorkerNodeKey === -1) {
121+
return workerNodeKey
122+
}
120123
return workerNode.strategyData.virtualTaskEndTimestamp! <
121124
workerNodes[minWorkerNodeKey].strategyData!.virtualTaskEndTimestamp!
122125
? workerNodeKey

src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,13 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
8585
}
8686

8787
/** @inheritDoc */
88-
public choose(): number | undefined {
88+
public choose(workerNodeKeysSet?: ReadonlySet<number>): number | undefined {
89+
if (workerNodeKeysSet?.size === 0) {
90+
return undefined
91+
}
92+
if (workerNodeKeysSet?.size === 1) {
93+
return this.getSingleWorkerNodeKey(workerNodeKeysSet)
94+
}
8995
for (
9096
let roundIndex = this.roundId;
9197
roundIndex < this.roundWeights.length;
@@ -106,7 +112,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
106112
}
107113
const workerWeight = this.opts!.weights![workerNodeKey]
108114
if (
109-
this.isWorkerNodeReady(workerNodeKey) &&
115+
this.isWorkerNodeEligible(workerNodeKey, workerNodeKeysSet) &&
110116
workerWeight >= this.roundWeights[roundIndex] &&
111117
this.workerNodeVirtualTaskExecutionTime < workerWeight
112118
) {
@@ -118,6 +124,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
118124
return this.nextWorkerNodeKey
119125
}
120126
}
127+
this.workerNodeId = 0
121128
}
122129
this.interleavedWeightedRoundRobinNextWorkerNodeId()
123130
return undefined

0 commit comments

Comments
 (0)