Skip to content

Commit 74d9529

Browse files
fix: reject in-flight task promises when worker crashes (#146)
* fix: reject in-flight task promises when worker crashes When a worker crashes (onerror event), tasks already dispatched to it had their promises stored in promiseResponseMap but were never settled. The error handler only restarted the worker and redistributed queued tasks — in-flight tasks were silently dropped, causing pool.execute() callers to hang indefinitely. The fix iterates promiseResponseMap on worker error, finds all entries targeting the crashed worker by stable workerId, and rejects them with a descriptive error before termination. Changes: - Replace workerNodeKey (array index) with stable workerId in PromiseResponseWrapper — indices become stale after removeWorkerNode - Add handleWorkerNodeCrash() to unify crash recovery logic - Add rejectInFlightTaskPromises() and rejectRemainingQueuedTaskPromises() - Add updatePromiseResponseWorkerId() for task steal/redistribute tracking - Resolve workerNodeKey dynamically from message.workerId at response time - Gate exit handler worker restart on restartWorkerOnError option - Add crash worker test and regression test for promise rejection Port of poolifier/poolifier#3211 * [autofix.ci] apply automated fixes * fix: add crashHandled flag, improve rejection logic and harden guards - Add crashHandled flag to WorkerInfo to prevent double crash handling between onerror and exit handlers - Move rejectRemainingQueuedTaskPromises outside started/destroying guard (queued promises must always be settled regardless of pool state) - Handle workerId == null case in rejectInFlightTaskPromises (crash before worker ID assignment) - Add { cause } to crash Error constructors for error chain traceability - Update task statistics (executing/failed) on crash rejection - Add workerNodeKey === -1 guards in updatePromiseResponseWorkerId and handleWorkerReadyResponse - Exit handler: detect unexpected exit via crashHandled flag, condition restart on restartWorkerOnError or normal exit - Tighten test bounds for stolen/sequentiallyStolen task counts Aligns with poolifier/poolifier#3211 latest changes * fix: harden crash-handling guards and eliminate DRY violations - Guard getWorkerNodeKeyByWorkerId against undefined workerId to prevent erroneous matching of uninitialized worker nodes - Call updatePromiseResponseWorkerId BEFORE handleTask in redistribute and stealTask for correctness-by-construction (prevents stale workerId if task response arrives synchronously) - Construct crashError once in handleWorkerNodeCrash and pass as param to rejectInFlightTaskPromises/rejectRemainingQueuedTaskPromises (DRY) - Add executing--/failed++ stats in rejectInFlightTaskPromises null-path for consistency with the non-null path - Fix handleWorkerNodeCrash JSDoc to accurately describe behavior * fix: add flushWorkerNodePromises catch-all for unsettled promises on exit Add flushWorkerNodePromises() method as a catch-all to reject any unsettled promises when a worker node exits without crash handling (e.g., unexpected exit without onerror, or termination during pool destroy). The exit handler now has three distinct blocks: 1. Crash detection: handleWorkerNodeCrash if ready && !crashHandled 2. Promise flush: flushWorkerNodePromises for remaining unsettled promises (guarded by ready && !crashHandled to skip intentional exits) 3. Cleanup: removeWorkerNode + conditional restart Aligns with poolifier/poolifier#3211 latest changes (flushWorkerNodePromises catch-all pattern) * fix: add crashHandled to WorkerInfo test expectations * [autofix.ci] apply automated fixes * refactor: address review findings — DRY helper, fallback workerId, simplify null-path - Extract rejectTaskPromise() helper to eliminate duplicated reject + delete + stats + taskFinished pattern across 3 methods - Add fallback to promiseResponse.workerId in handleTaskExecutionResponse when message.workerId lookup fails (worker already removed) - Simplify rejectInFlightTaskPromises null-ID path to early-return - Keep flush guard as workerNode.info.ready (not this.destroying) since synchronous terminate in web workers causes unhandled rejections during pool.destroy() — destroyWorkerNode already handles graceful shutdown * [autofix.ci] apply automated fixes * chore: remove unrelated config files added by mistake * docs: fix flushWorkerNodePromises JSDoc to match actual behavior The JSDoc was copied from upstream where exitCode !== 0 allows flush during pool destroy. In our web worker adaptation, synchronous terminate causes unhandled rejections if flush runs during destroy, so the guard uses workerNode.info.ready instead. Updated doc to reflect this. * fix: resolve abort target dynamically via promiseResponseMap The abort signal handler captured workerNodeKey at task submission time. After steal/redistribute, the stale index dispatches abortTask to the wrong worker node. Resolve the current worker dynamically from the stored workerId (kept up-to-date by updatePromiseResponseWorkerId). --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent fa8b814 commit 74d9529

9 files changed

Lines changed: 332 additions & 37 deletions

File tree

src/pools/abstract-pool.ts

Lines changed: 250 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,9 @@ export abstract class AbstractPool<
559559
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
560560
*/
561561
private getWorkerNodeKeyByWorkerId(workerId: string | undefined): number {
562+
if (workerId == null) {
563+
return -1
564+
}
562565
return this.workerNodes.findIndex(
563566
(workerNode) => workerNode.info.id === workerId,
564567
)
@@ -1173,11 +1176,21 @@ export abstract class AbstractPool<
11731176
abortSignal?.addEventListener(
11741177
'abort',
11751178
() => {
1176-
this.workerNodes[workerNodeKey]?.dispatchEvent(
1179+
const promiseResponse = this.promiseResponseMap.get(task.taskId!)
1180+
if (promiseResponse == null) {
1181+
return
1182+
}
1183+
const currentWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1184+
promiseResponse.workerId,
1185+
)
1186+
if (currentWorkerNodeKey === -1) {
1187+
return
1188+
}
1189+
this.workerNodes[currentWorkerNodeKey]?.dispatchEvent(
11771190
new CustomEvent<WorkerNodeEventDetail>('abortTask', {
11781191
detail: {
11791192
taskId: task.taskId,
1180-
workerId: this.getWorkerInfo(workerNodeKey)!.id!,
1193+
workerId: promiseResponse.workerId,
11811194
},
11821195
}),
11831196
)
@@ -1187,7 +1200,7 @@ export abstract class AbstractPool<
11871200
this.promiseResponseMap.set(task.taskId!, {
11881201
reject,
11891202
resolve,
1190-
workerNodeKey,
1203+
workerId: this.workerNodes[workerNodeKey].info.id,
11911204
abortSignal,
11921205
})
11931206
if (
@@ -1672,22 +1685,8 @@ export abstract class AbstractPool<
16721685
)
16731686
}
16741687
workerNode.worker.onerror = (errorEvent) => {
1675-
workerNode.info.ready = false
1676-
this.eventTarget?.dispatchEvent(
1677-
new ErrorEvent(PoolEvents.error, errorEvent),
1678-
)
1679-
if (this.started && !this.destroying) {
1680-
if (this.opts.restartWorkerOnError === true) {
1681-
if (workerNode.info.dynamic) {
1682-
this.createAndSetupDynamicWorkerNode()
1683-
} else if (!this.startingMinimumNumberOfWorkers) {
1684-
this.startMinimumNumberOfWorkers(true)
1685-
}
1686-
}
1687-
if (this.opts.enableTasksQueue === true) {
1688-
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1689-
}
1690-
}
1688+
errorEvent.preventDefault()
1689+
this.handleWorkerNodeCrash(workerNode, errorEvent)
16911690
workerNode?.terminate()
16921691
}
16931692
workerNode.worker.addEventListener(
@@ -1701,11 +1700,41 @@ export abstract class AbstractPool<
17011700
workerNode.addEventListener(
17021701
'exit',
17031702
() => {
1703+
const workerNodeKey = this.workerNodes.indexOf(workerNode)
1704+
const exitError = new Error('Worker node exited unexpectedly')
1705+
if (
1706+
workerNode.info.ready &&
1707+
!workerNode.info.crashHandled &&
1708+
workerNodeKey !== -1 &&
1709+
!this.destroying
1710+
) {
1711+
this.handleWorkerNodeCrash(
1712+
workerNode,
1713+
new ErrorEvent('error', {
1714+
message: exitError.message,
1715+
error: exitError,
1716+
}),
1717+
)
1718+
}
1719+
if (
1720+
workerNodeKey !== -1 &&
1721+
!workerNode.info.crashHandled &&
1722+
workerNode.info.ready
1723+
) {
1724+
this.flushWorkerNodePromises(
1725+
workerNode,
1726+
this.destroying
1727+
? new Error('Worker node terminated during pool destroy')
1728+
: exitError,
1729+
)
1730+
}
17041731
this.removeWorkerNode(workerNode)
17051732
if (
17061733
this.started &&
17071734
!this.startingMinimumNumberOfWorkers &&
1708-
!this.destroying
1735+
!this.destroying &&
1736+
(this.opts.restartWorkerOnError === true ||
1737+
!workerNode.info.crashHandled)
17091738
) {
17101739
this.startMinimumNumberOfWorkers(true)
17111740
}
@@ -1916,10 +1945,143 @@ export abstract class AbstractPool<
19161945
if (destinationWorkerNodeKey === -1) {
19171946
break
19181947
}
1919-
this.handleTask(
1920-
destinationWorkerNodeKey,
1921-
this.dequeueTask(sourceWorkerNodeKey)!,
1922-
)
1948+
const task = this.dequeueTask(sourceWorkerNodeKey)!
1949+
this.updatePromiseResponseWorkerId(task.taskId, destinationWorkerNodeKey)
1950+
this.handleTask(destinationWorkerNodeKey, task)
1951+
}
1952+
}
1953+
1954+
/**
1955+
* Rejects in-flight task promises for the given crashed worker node key.
1956+
* @param workerNodeKey - The worker node key.
1957+
* @param crashError - The crash error to reject promises with.
1958+
*/
1959+
private rejectInFlightTaskPromises(
1960+
workerNodeKey: number,
1961+
crashError: Error,
1962+
): void {
1963+
if (workerNodeKey === -1) {
1964+
return
1965+
}
1966+
const workerNode = this.workerNodes[workerNodeKey]
1967+
const crashedWorkerId = workerNode.info.id
1968+
if (crashedWorkerId == null) {
1969+
return
1970+
}
1971+
const queuedTaskIds = new Set<
1972+
`${string}-${string}-${string}-${string}-${string}`
1973+
>()
1974+
for (const task of workerNode.tasksQueue) {
1975+
queuedTaskIds.add(task.taskId!)
1976+
}
1977+
for (const [taskId, promiseResponse] of this.promiseResponseMap) {
1978+
if (
1979+
promiseResponse.workerId === crashedWorkerId &&
1980+
!queuedTaskIds.has(taskId)
1981+
) {
1982+
this.rejectTaskPromise(taskId, promiseResponse, workerNode, crashError)
1983+
}
1984+
}
1985+
this.checkAndEmitTaskExecutionFinishedEvents()
1986+
}
1987+
1988+
/**
1989+
* Rejects remaining queued task promises for the given crashed worker node key.
1990+
* @param workerNodeKey - The worker node key.
1991+
* @param crashError - The crash error to reject promises with.
1992+
*/
1993+
private rejectRemainingQueuedTaskPromises(
1994+
workerNodeKey: number,
1995+
crashError: Error,
1996+
): void {
1997+
if (workerNodeKey === -1) {
1998+
return
1999+
}
2000+
const workerNode = this.workerNodes[workerNodeKey]
2001+
if (this.tasksQueueSize(workerNodeKey) === 0) {
2002+
return
2003+
}
2004+
while (this.tasksQueueSize(workerNodeKey) > 0) {
2005+
const task = this.dequeueTask(workerNodeKey)
2006+
if (task?.taskId != null) {
2007+
const promiseResponse = this.promiseResponseMap.get(task.taskId)
2008+
if (promiseResponse != null) {
2009+
this.rejectTaskPromise(
2010+
task.taskId,
2011+
promiseResponse,
2012+
workerNode,
2013+
crashError,
2014+
false,
2015+
)
2016+
}
2017+
}
2018+
}
2019+
this.checkAndEmitTaskExecutionFinishedEvents()
2020+
}
2021+
2022+
private rejectTaskPromise(
2023+
taskId: `${string}-${string}-${string}-${string}-${string}`,
2024+
promiseResponse: PromiseResponseWrapper<Response>,
2025+
workerNode: IWorkerNode<Worker, Data>,
2026+
error: Error,
2027+
decrementExecuting = true,
2028+
): void {
2029+
promiseResponse.reject(error)
2030+
this.promiseResponseMap.delete(taskId)
2031+
if (decrementExecuting && workerNode.usage.tasks.executing > 0) {
2032+
--workerNode.usage.tasks.executing
2033+
}
2034+
++workerNode.usage.tasks.failed
2035+
workerNode.dispatchEvent(new Event('taskFinished'))
2036+
}
2037+
2038+
/**
2039+
* Rejects all unsettled promises targeting the given worker node.
2040+
* Called from the exit handler for unexpected exits of ready workers
2041+
* that were not already handled by the onerror crash path.
2042+
* @param workerNode - The worker node whose promises to flush.
2043+
* @param error - The rejection error.
2044+
*/
2045+
private flushWorkerNodePromises(
2046+
workerNode: IWorkerNode<Worker, Data>,
2047+
error: Error,
2048+
): void {
2049+
const workerId = workerNode.info.id
2050+
for (const [taskId, promiseResponse] of this.promiseResponseMap) {
2051+
if (
2052+
promiseResponse.workerId === workerId ||
2053+
(workerId == null && promiseResponse.workerId == null)
2054+
) {
2055+
this.rejectTaskPromise(taskId, promiseResponse, workerNode, error)
2056+
}
2057+
}
2058+
this.checkAndEmitTaskExecutionFinishedEvents()
2059+
}
2060+
2061+
/**
2062+
* Updates the promise response worker id after task steal or redistribute.
2063+
* Ensures crash-time rejection targets the correct worker.
2064+
* @param taskId - The task id.
2065+
* @param workerNodeKey - The destination worker node key.
2066+
*/
2067+
private updatePromiseResponseWorkerId(
2068+
taskId: `${string}-${string}-${string}-${string}-${string}` | undefined,
2069+
workerNodeKey: number,
2070+
): void {
2071+
if (taskId == null || workerNodeKey === -1) {
2072+
return
2073+
}
2074+
const workerNode = this.workerNodes[workerNodeKey]
2075+
if (workerNode.info.id == null) {
2076+
const promiseResponse = this.promiseResponseMap.get(taskId)
2077+
if (promiseResponse != null) {
2078+
promiseResponse.workerId = undefined
2079+
}
2080+
return
2081+
}
2082+
const promiseResponse = this.promiseResponseMap.get(taskId)
2083+
if (promiseResponse != null) {
2084+
promiseResponse.workerId = workerNode.info.id
19232085
}
19242086
}
19252087

@@ -2019,6 +2181,10 @@ export abstract class AbstractPool<
20192181
}
20202182
sourceWorkerNode.info.stolen = false
20212183
destinationWorkerNode.info.stealing = false
2184+
this.updatePromiseResponseWorkerId(
2185+
stolenTask.taskId,
2186+
destinationWorkerNodeKey,
2187+
)
20222188
this.handleTask(destinationWorkerNodeKey, stolenTask)
20232189
this.updateTaskStolenStatisticsWorkerUsage(
20242190
destinationWorkerNodeKey,
@@ -2038,6 +2204,43 @@ export abstract class AbstractPool<
20382204
)
20392205
}
20402206

2207+
/**
2208+
* Handles a crashed worker node: emits error, rejects in-flight promises,
2209+
* restarts dynamic workers if configured, and redistributes queued tasks.
2210+
* Static worker restart is handled by the exit event handler.
2211+
* @param workerNode - The crashed worker node.
2212+
* @param errorEvent - The error event that caused the crash.
2213+
*/
2214+
private handleWorkerNodeCrash(
2215+
workerNode: IWorkerNode<Worker, Data>,
2216+
errorEvent: ErrorEvent,
2217+
): void {
2218+
workerNode.info.ready = false
2219+
workerNode.info.crashHandled = true
2220+
this.eventTarget?.dispatchEvent(
2221+
new ErrorEvent(PoolEvents.error, errorEvent),
2222+
)
2223+
const crashedWorkerNodeKey = this.workerNodes.indexOf(workerNode)
2224+
const crashError = new Error(
2225+
`Worker node crashed with error: '${errorEvent.message}'`,
2226+
{ cause: errorEvent.error ?? errorEvent },
2227+
)
2228+
this.rejectInFlightTaskPromises(crashedWorkerNodeKey, crashError)
2229+
if (this.started && !this.destroying) {
2230+
if (this.opts.restartWorkerOnError === true) {
2231+
if (workerNode.info.dynamic) {
2232+
this.createAndSetupDynamicWorkerNode()
2233+
}
2234+
}
2235+
if (this.opts.enableTasksQueue === true) {
2236+
this.redistributeQueuedTasks(crashedWorkerNodeKey)
2237+
}
2238+
}
2239+
if (this.opts.enableTasksQueue === true) {
2240+
this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, crashError)
2241+
}
2242+
}
2243+
20412244
private readonly handleWorkerNodeIdleEvent = (
20422245
event: CustomEvent<WorkerNodeEventDetail>,
20432246
previousStolenTask?: Task<Data>,
@@ -2221,6 +2424,9 @@ export abstract class AbstractPool<
22212424
)
22222425
}
22232426
const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
2427+
if (workerNodeKey === -1) {
2428+
return
2429+
}
22242430
const workerNode = this.workerNodes[workerNodeKey]
22252431
workerNode.info.ready = ready
22262432
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
@@ -2230,11 +2436,19 @@ export abstract class AbstractPool<
22302436
}
22312437

22322438
private handleTaskExecutionResponse(message: MessageValue<Response>): void {
2233-
const { name, taskId, workerError, data } = message
2439+
const { name, taskId, workerError, data, workerId } = message
22342440
const promiseResponse = this.promiseResponseMap.get(taskId!)
22352441
if (promiseResponse != null) {
2236-
const { resolve, reject, workerNodeKey } = promiseResponse
2237-
const workerNode = this.workerNodes[workerNodeKey]
2442+
const { resolve, reject } = promiseResponse
2443+
let workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
2444+
if (workerNodeKey === -1) {
2445+
workerNodeKey = this.getWorkerNodeKeyByWorkerId(
2446+
promiseResponse.workerId,
2447+
)
2448+
}
2449+
const workerNode = workerNodeKey !== -1
2450+
? this.workerNodes[workerNodeKey]
2451+
: undefined
22382452
if (workerError != null) {
22392453
this.eventTarget?.dispatchEvent(
22402454
new ErrorEvent(PoolEvents.taskError, { error: workerError }),
@@ -2248,12 +2462,18 @@ export abstract class AbstractPool<
22482462
} else {
22492463
resolve(data!)
22502464
}
2251-
this.afterTaskExecutionHook(workerNodeKey, message)
2465+
if (workerNodeKey !== -1) {
2466+
this.afterTaskExecutionHook(workerNodeKey, message)
2467+
}
22522468
queueMicrotask(() => {
22532469
workerNode?.dispatchEvent(new Event('taskFinished'))
22542470
this.promiseResponseMap.delete(taskId!)
22552471
this.checkAndEmitTaskExecutionFinishedEvents()
2256-
if (this.opts.enableTasksQueue === true && !this.destroying) {
2472+
if (
2473+
workerNodeKey !== -1 &&
2474+
this.opts.enableTasksQueue === true &&
2475+
!this.destroying
2476+
) {
22572477
if (
22582478
!this.isWorkerNodeBusy(workerNodeKey) &&
22592479
this.tasksQueueSize(workerNodeKey) > 0
@@ -2264,7 +2484,7 @@ export abstract class AbstractPool<
22642484
)
22652485
}
22662486
if (this.isWorkerNodeIdle(workerNodeKey)) {
2267-
workerNode.dispatchEvent(
2487+
workerNode?.dispatchEvent(
22682488
new CustomEvent<WorkerNodeEventDetail>('idle', {
22692489
detail: { workerNodeKey },
22702490
}),

src/pools/utils.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ export const initWorkerInfo = (worker: IWorker): WorkerInfo => {
530530
backPressure: false,
531531
backPressureStealing: false,
532532
continuousStealing: false,
533+
crashHandled: false,
533534
queuedTaskAbortion: false,
534535
dynamic: false,
535536
id: getWorkerId(worker),

src/pools/worker.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ export interface WorkerInfo {
164164
* This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes.
165165
*/
166166
continuousStealing: boolean
167+
/**
168+
* Crash handled flag.
169+
* This flag is set to `true` when worker node crash has been handled.
170+
*/
171+
crashHandled: boolean
167172
/**
168173
* Back pressure stealing flag.
169174
* This flag is set to `true` when worker node is stealing one task from another back pressured worker node.

0 commit comments

Comments
 (0)