Skip to content

Commit de43787

Browse files
committed
Revert "fix: reject in-flight task promises when worker crashes (#146)"
This reverts commit 74d9529.
1 parent 74d9529 commit de43787

9 files changed

Lines changed: 37 additions & 332 deletions

File tree

src/pools/abstract-pool.ts

Lines changed: 30 additions & 250 deletions
Original file line numberDiff line numberDiff line change
@@ -559,9 +559,6 @@ export abstract class AbstractPool<
559559
* @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
560560
*/
561561
private getWorkerNodeKeyByWorkerId(workerId: string | undefined): number {
562-
if (workerId == null) {
563-
return -1
564-
}
565562
return this.workerNodes.findIndex(
566563
(workerNode) => workerNode.info.id === workerId,
567564
)
@@ -1176,21 +1173,11 @@ export abstract class AbstractPool<
11761173
abortSignal?.addEventListener(
11771174
'abort',
11781175
() => {
1179-
const promiseResponse = this.promiseResponseMap.get(task.taskId!)
1180-
if (promiseResponse == null) {
1181-
return
1182-
}
1183-
const currentWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1184-
promiseResponse.workerId,
1185-
)
1186-
if (currentWorkerNodeKey === -1) {
1187-
return
1188-
}
1189-
this.workerNodes[currentWorkerNodeKey]?.dispatchEvent(
1176+
this.workerNodes[workerNodeKey]?.dispatchEvent(
11901177
new CustomEvent<WorkerNodeEventDetail>('abortTask', {
11911178
detail: {
11921179
taskId: task.taskId,
1193-
workerId: promiseResponse.workerId,
1180+
workerId: this.getWorkerInfo(workerNodeKey)!.id!,
11941181
},
11951182
}),
11961183
)
@@ -1200,7 +1187,7 @@ export abstract class AbstractPool<
12001187
this.promiseResponseMap.set(task.taskId!, {
12011188
reject,
12021189
resolve,
1203-
workerId: this.workerNodes[workerNodeKey].info.id,
1190+
workerNodeKey,
12041191
abortSignal,
12051192
})
12061193
if (
@@ -1685,8 +1672,22 @@ export abstract class AbstractPool<
16851672
)
16861673
}
16871674
workerNode.worker.onerror = (errorEvent) => {
1688-
errorEvent.preventDefault()
1689-
this.handleWorkerNodeCrash(workerNode, errorEvent)
1675+
workerNode.info.ready = false
1676+
this.eventTarget?.dispatchEvent(
1677+
new ErrorEvent(PoolEvents.error, errorEvent),
1678+
)
1679+
if (this.started && !this.destroying) {
1680+
if (this.opts.restartWorkerOnError === true) {
1681+
if (workerNode.info.dynamic) {
1682+
this.createAndSetupDynamicWorkerNode()
1683+
} else if (!this.startingMinimumNumberOfWorkers) {
1684+
this.startMinimumNumberOfWorkers(true)
1685+
}
1686+
}
1687+
if (this.opts.enableTasksQueue === true) {
1688+
this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1689+
}
1690+
}
16901691
workerNode?.terminate()
16911692
}
16921693
workerNode.worker.addEventListener(
@@ -1700,41 +1701,11 @@ export abstract class AbstractPool<
17001701
workerNode.addEventListener(
17011702
'exit',
17021703
() => {
1703-
const workerNodeKey = this.workerNodes.indexOf(workerNode)
1704-
const exitError = new Error('Worker node exited unexpectedly')
1705-
if (
1706-
workerNode.info.ready &&
1707-
!workerNode.info.crashHandled &&
1708-
workerNodeKey !== -1 &&
1709-
!this.destroying
1710-
) {
1711-
this.handleWorkerNodeCrash(
1712-
workerNode,
1713-
new ErrorEvent('error', {
1714-
message: exitError.message,
1715-
error: exitError,
1716-
}),
1717-
)
1718-
}
1719-
if (
1720-
workerNodeKey !== -1 &&
1721-
!workerNode.info.crashHandled &&
1722-
workerNode.info.ready
1723-
) {
1724-
this.flushWorkerNodePromises(
1725-
workerNode,
1726-
this.destroying
1727-
? new Error('Worker node terminated during pool destroy')
1728-
: exitError,
1729-
)
1730-
}
17311704
this.removeWorkerNode(workerNode)
17321705
if (
17331706
this.started &&
17341707
!this.startingMinimumNumberOfWorkers &&
1735-
!this.destroying &&
1736-
(this.opts.restartWorkerOnError === true ||
1737-
!workerNode.info.crashHandled)
1708+
!this.destroying
17381709
) {
17391710
this.startMinimumNumberOfWorkers(true)
17401711
}
@@ -1945,143 +1916,10 @@ export abstract class AbstractPool<
19451916
if (destinationWorkerNodeKey === -1) {
19461917
break
19471918
}
1948-
const task = this.dequeueTask(sourceWorkerNodeKey)!
1949-
this.updatePromiseResponseWorkerId(task.taskId, destinationWorkerNodeKey)
1950-
this.handleTask(destinationWorkerNodeKey, task)
1951-
}
1952-
}
1953-
1954-
/**
1955-
* Rejects in-flight task promises for the given crashed worker node key.
1956-
* @param workerNodeKey - The worker node key.
1957-
* @param crashError - The crash error to reject promises with.
1958-
*/
1959-
private rejectInFlightTaskPromises(
1960-
workerNodeKey: number,
1961-
crashError: Error,
1962-
): void {
1963-
if (workerNodeKey === -1) {
1964-
return
1965-
}
1966-
const workerNode = this.workerNodes[workerNodeKey]
1967-
const crashedWorkerId = workerNode.info.id
1968-
if (crashedWorkerId == null) {
1969-
return
1970-
}
1971-
const queuedTaskIds = new Set<
1972-
`${string}-${string}-${string}-${string}-${string}`
1973-
>()
1974-
for (const task of workerNode.tasksQueue) {
1975-
queuedTaskIds.add(task.taskId!)
1976-
}
1977-
for (const [taskId, promiseResponse] of this.promiseResponseMap) {
1978-
if (
1979-
promiseResponse.workerId === crashedWorkerId &&
1980-
!queuedTaskIds.has(taskId)
1981-
) {
1982-
this.rejectTaskPromise(taskId, promiseResponse, workerNode, crashError)
1983-
}
1984-
}
1985-
this.checkAndEmitTaskExecutionFinishedEvents()
1986-
}
1987-
1988-
/**
1989-
* Rejects remaining queued task promises for the given crashed worker node key.
1990-
* @param workerNodeKey - The worker node key.
1991-
* @param crashError - The crash error to reject promises with.
1992-
*/
1993-
private rejectRemainingQueuedTaskPromises(
1994-
workerNodeKey: number,
1995-
crashError: Error,
1996-
): void {
1997-
if (workerNodeKey === -1) {
1998-
return
1999-
}
2000-
const workerNode = this.workerNodes[workerNodeKey]
2001-
if (this.tasksQueueSize(workerNodeKey) === 0) {
2002-
return
2003-
}
2004-
while (this.tasksQueueSize(workerNodeKey) > 0) {
2005-
const task = this.dequeueTask(workerNodeKey)
2006-
if (task?.taskId != null) {
2007-
const promiseResponse = this.promiseResponseMap.get(task.taskId)
2008-
if (promiseResponse != null) {
2009-
this.rejectTaskPromise(
2010-
task.taskId,
2011-
promiseResponse,
2012-
workerNode,
2013-
crashError,
2014-
false,
2015-
)
2016-
}
2017-
}
2018-
}
2019-
this.checkAndEmitTaskExecutionFinishedEvents()
2020-
}
2021-
2022-
private rejectTaskPromise(
2023-
taskId: `${string}-${string}-${string}-${string}-${string}`,
2024-
promiseResponse: PromiseResponseWrapper<Response>,
2025-
workerNode: IWorkerNode<Worker, Data>,
2026-
error: Error,
2027-
decrementExecuting = true,
2028-
): void {
2029-
promiseResponse.reject(error)
2030-
this.promiseResponseMap.delete(taskId)
2031-
if (decrementExecuting && workerNode.usage.tasks.executing > 0) {
2032-
--workerNode.usage.tasks.executing
2033-
}
2034-
++workerNode.usage.tasks.failed
2035-
workerNode.dispatchEvent(new Event('taskFinished'))
2036-
}
2037-
2038-
/**
2039-
* Rejects all unsettled promises targeting the given worker node.
2040-
* Called from the exit handler for unexpected exits of ready workers
2041-
* that were not already handled by the onerror crash path.
2042-
* @param workerNode - The worker node whose promises to flush.
2043-
* @param error - The rejection error.
2044-
*/
2045-
private flushWorkerNodePromises(
2046-
workerNode: IWorkerNode<Worker, Data>,
2047-
error: Error,
2048-
): void {
2049-
const workerId = workerNode.info.id
2050-
for (const [taskId, promiseResponse] of this.promiseResponseMap) {
2051-
if (
2052-
promiseResponse.workerId === workerId ||
2053-
(workerId == null && promiseResponse.workerId == null)
2054-
) {
2055-
this.rejectTaskPromise(taskId, promiseResponse, workerNode, error)
2056-
}
2057-
}
2058-
this.checkAndEmitTaskExecutionFinishedEvents()
2059-
}
2060-
2061-
/**
2062-
* Updates the promise response worker id after task steal or redistribute.
2063-
* Ensures crash-time rejection targets the correct worker.
2064-
* @param taskId - The task id.
2065-
* @param workerNodeKey - The destination worker node key.
2066-
*/
2067-
private updatePromiseResponseWorkerId(
2068-
taskId: `${string}-${string}-${string}-${string}-${string}` | undefined,
2069-
workerNodeKey: number,
2070-
): void {
2071-
if (taskId == null || workerNodeKey === -1) {
2072-
return
2073-
}
2074-
const workerNode = this.workerNodes[workerNodeKey]
2075-
if (workerNode.info.id == null) {
2076-
const promiseResponse = this.promiseResponseMap.get(taskId)
2077-
if (promiseResponse != null) {
2078-
promiseResponse.workerId = undefined
2079-
}
2080-
return
2081-
}
2082-
const promiseResponse = this.promiseResponseMap.get(taskId)
2083-
if (promiseResponse != null) {
2084-
promiseResponse.workerId = workerNode.info.id
1919+
this.handleTask(
1920+
destinationWorkerNodeKey,
1921+
this.dequeueTask(sourceWorkerNodeKey)!,
1922+
)
20851923
}
20861924
}
20871925

@@ -2181,10 +2019,6 @@ export abstract class AbstractPool<
21812019
}
21822020
sourceWorkerNode.info.stolen = false
21832021
destinationWorkerNode.info.stealing = false
2184-
this.updatePromiseResponseWorkerId(
2185-
stolenTask.taskId,
2186-
destinationWorkerNodeKey,
2187-
)
21882022
this.handleTask(destinationWorkerNodeKey, stolenTask)
21892023
this.updateTaskStolenStatisticsWorkerUsage(
21902024
destinationWorkerNodeKey,
@@ -2204,43 +2038,6 @@ export abstract class AbstractPool<
22042038
)
22052039
}
22062040

2207-
/**
2208-
* Handles a crashed worker node: emits error, rejects in-flight promises,
2209-
* restarts dynamic workers if configured, and redistributes queued tasks.
2210-
* Static worker restart is handled by the exit event handler.
2211-
* @param workerNode - The crashed worker node.
2212-
* @param errorEvent - The error event that caused the crash.
2213-
*/
2214-
private handleWorkerNodeCrash(
2215-
workerNode: IWorkerNode<Worker, Data>,
2216-
errorEvent: ErrorEvent,
2217-
): void {
2218-
workerNode.info.ready = false
2219-
workerNode.info.crashHandled = true
2220-
this.eventTarget?.dispatchEvent(
2221-
new ErrorEvent(PoolEvents.error, errorEvent),
2222-
)
2223-
const crashedWorkerNodeKey = this.workerNodes.indexOf(workerNode)
2224-
const crashError = new Error(
2225-
`Worker node crashed with error: '${errorEvent.message}'`,
2226-
{ cause: errorEvent.error ?? errorEvent },
2227-
)
2228-
this.rejectInFlightTaskPromises(crashedWorkerNodeKey, crashError)
2229-
if (this.started && !this.destroying) {
2230-
if (this.opts.restartWorkerOnError === true) {
2231-
if (workerNode.info.dynamic) {
2232-
this.createAndSetupDynamicWorkerNode()
2233-
}
2234-
}
2235-
if (this.opts.enableTasksQueue === true) {
2236-
this.redistributeQueuedTasks(crashedWorkerNodeKey)
2237-
}
2238-
}
2239-
if (this.opts.enableTasksQueue === true) {
2240-
this.rejectRemainingQueuedTaskPromises(crashedWorkerNodeKey, crashError)
2241-
}
2242-
}
2243-
22442041
private readonly handleWorkerNodeIdleEvent = (
22452042
event: CustomEvent<WorkerNodeEventDetail>,
22462043
previousStolenTask?: Task<Data>,
@@ -2424,9 +2221,6 @@ export abstract class AbstractPool<
24242221
)
24252222
}
24262223
const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
2427-
if (workerNodeKey === -1) {
2428-
return
2429-
}
24302224
const workerNode = this.workerNodes[workerNodeKey]
24312225
workerNode.info.ready = ready
24322226
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
@@ -2436,19 +2230,11 @@ export abstract class AbstractPool<
24362230
}
24372231

24382232
private handleTaskExecutionResponse(message: MessageValue<Response>): void {
2439-
const { name, taskId, workerError, data, workerId } = message
2233+
const { name, taskId, workerError, data } = message
24402234
const promiseResponse = this.promiseResponseMap.get(taskId!)
24412235
if (promiseResponse != null) {
2442-
const { resolve, reject } = promiseResponse
2443-
let workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
2444-
if (workerNodeKey === -1) {
2445-
workerNodeKey = this.getWorkerNodeKeyByWorkerId(
2446-
promiseResponse.workerId,
2447-
)
2448-
}
2449-
const workerNode = workerNodeKey !== -1
2450-
? this.workerNodes[workerNodeKey]
2451-
: undefined
2236+
const { resolve, reject, workerNodeKey } = promiseResponse
2237+
const workerNode = this.workerNodes[workerNodeKey]
24522238
if (workerError != null) {
24532239
this.eventTarget?.dispatchEvent(
24542240
new ErrorEvent(PoolEvents.taskError, { error: workerError }),
@@ -2462,18 +2248,12 @@ export abstract class AbstractPool<
24622248
} else {
24632249
resolve(data!)
24642250
}
2465-
if (workerNodeKey !== -1) {
2466-
this.afterTaskExecutionHook(workerNodeKey, message)
2467-
}
2251+
this.afterTaskExecutionHook(workerNodeKey, message)
24682252
queueMicrotask(() => {
24692253
workerNode?.dispatchEvent(new Event('taskFinished'))
24702254
this.promiseResponseMap.delete(taskId!)
24712255
this.checkAndEmitTaskExecutionFinishedEvents()
2472-
if (
2473-
workerNodeKey !== -1 &&
2474-
this.opts.enableTasksQueue === true &&
2475-
!this.destroying
2476-
) {
2256+
if (this.opts.enableTasksQueue === true && !this.destroying) {
24772257
if (
24782258
!this.isWorkerNodeBusy(workerNodeKey) &&
24792259
this.tasksQueueSize(workerNodeKey) > 0
@@ -2484,7 +2264,7 @@ export abstract class AbstractPool<
24842264
)
24852265
}
24862266
if (this.isWorkerNodeIdle(workerNodeKey)) {
2487-
workerNode?.dispatchEvent(
2267+
workerNode.dispatchEvent(
24882268
new CustomEvent<WorkerNodeEventDetail>('idle', {
24892269
detail: { workerNodeKey },
24902270
}),

src/pools/utils.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,6 @@ export const initWorkerInfo = (worker: IWorker): WorkerInfo => {
530530
backPressure: false,
531531
backPressureStealing: false,
532532
continuousStealing: false,
533-
crashHandled: false,
534533
queuedTaskAbortion: false,
535534
dynamic: false,
536535
id: getWorkerId(worker),

src/pools/worker.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,6 @@ export interface WorkerInfo {
164164
* This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes.
165165
*/
166166
continuousStealing: boolean
167-
/**
168-
* Crash handled flag.
169-
* This flag is set to `true` when worker node crash has been handled.
170-
*/
171-
crashHandled: boolean
172167
/**
173168
* Back pressure stealing flag.
174169
* This flag is set to `true` when worker node is stealing one task from another back pressured worker node.

0 commit comments

Comments
 (0)