From c92fbf1bf652995455e20880677ab45fe0a81557 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ersin=20KO=C3=87?= Date: Fri, 5 Jun 2026 21:13:11 +0300 Subject: [PATCH] fix(plans): honor abort during parallel wave slot-wait MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In executeStepsInWaves, when all maxConcurrent slots were full of slow steps, a cancelled plan kept spinning the slot-wait poll until a step finished — the wave-top abort check can't fire while the inner scheduling loop holds control. Cancellation was still eventually honored (next wave top), but delayed, and the remaining ready steps were scheduled anyway. The slot-wait and the step-scheduling loop now re-check signal.aborted and break. Crucially they break rather than throw mid-loop: throwing would orphan the already-pushed stepPromises (Promise.all is what attaches their rejection handlers), reintroducing the unhandled-rejection class just fixed in JobQueueService. Scheduled steps are still awaited via Promise.all, then the abort is surfaced as 'cancelled'. Regression test: with maxConcurrent=1 and stepA gated, aborting while stepB waits for the slot leaves stepB unscheduled and resolves the plan 'cancelled' (not failed). Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 4 ++ packages/gateway/src/plans/executor.test.ts | 63 +++++++++++++++++++++ packages/gateway/src/plans/executor.ts | 23 +++++++- 3 files changed, 88 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7b44807..09d2224a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- **Plan cancellation is now prompt during parallel wave execution.** When every concurrency slot was full of slow steps, a cancelled plan kept spinning the `maxConcurrent` slot-wait until a step happened to finish — the wave-top abort check couldn't fire while the inner scheduling loop held control. The slot-wait (and the step-scheduling loop) now re-check `signal.aborted` and stop scheduling new steps immediately. Already-scheduled steps are still awaited via `Promise.all` before the abort is surfaced, so no in-flight promise is orphaned (which would otherwise become an unhandled rejection). + ## [0.7.1] - 2026-06-05 ### Fixed diff --git a/packages/gateway/src/plans/executor.test.ts b/packages/gateway/src/plans/executor.test.ts index e0387d1e..b98bc62f 100644 --- a/packages/gateway/src/plans/executor.test.ts +++ b/packages/gateway/src/plans/executor.test.ts @@ -936,6 +936,69 @@ describe('PlanExecutor', () => { }); }); + describe('wave execution abort', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it('stops scheduling further steps and resolves cancelled when aborted during the slot-wait', async () => { + // Regression: with all concurrency slots full of slow steps, a cancelled + // plan used to spin the maxConcurrent slot-wait until a step finished + // (the wave-top abort check can't fire while the inner loop holds + // control). The slot-wait now re-checks signal.aborted and breaks; the + // scheduled steps are still awaited via Promise.all (no orphaned promise + // / unhandled rejection), then the abort is surfaced as 'cancelled'. + const waveExecutor = new PlanExecutor({ + userId: 'user-1', + enableWaveExecution: true, + maxConcurrent: 1, + }); + const stepA = makeStep({ id: 'step-a', config: { toolName: 'tool_a', toolArgs: {} } }); + const stepB = makeStep({ id: 'step-b', config: { toolName: 'tool_b', toolArgs: {} } }); + + mockPlanService.getPlan.mockResolvedValue(makePlan()); + mockPlanService.getSteps.mockResolvedValue([stepA, stepB]); + mockPlanService.getStepsByStatus.mockResolvedValue([stepA, stepB]); + mockPlanService.areDependenciesMet.mockResolvedValue(true); + (hasTool as ReturnType).mockResolvedValue(true); + + // Gate tool_a so stepA holds the only slot while stepB waits. + let releaseA!: () => void; + const calledTools: string[] = []; + (executeTool as ReturnType).mockImplementation(async (toolName: string) => { + calledTools.push(toolName); + if (toolName === 'tool_a') { + await new Promise((resolve) => (releaseA = resolve)); + } + return { success: true, result: {} }; + }); + + const failedListener = vi.fn(); + waveExecutor.on('plan:failed', failedListener); + + const promise = waveExecutor.execute('plan-1'); + // Schedule stepA (occupying the slot) and enter the slot-wait for stepB. + await vi.advanceTimersByTimeAsync(20); + expect(calledTools).toEqual(['tool_a']); // stepB blocked on the full slot + + // Abort while stepB waits for a slot. + expect(await waveExecutor.abort('plan-1')).toBe(true); + + // Release stepA and let the executor unwind. + releaseA(); + await vi.advanceTimersByTimeAsync(50); + const result = await promise; + + // stepB was never scheduled; the plan is cancelled, not failed. + expect(calledTools).toEqual(['tool_a']); + expect(result.status).toBe('cancelled'); + expect(failedListener).not.toHaveBeenCalled(); + }); + }); + // ======================================================================== // Resume after pause // ======================================================================== diff --git a/packages/gateway/src/plans/executor.ts b/packages/gateway/src/plans/executor.ts index e4f8b708..1e5a9feb 100644 --- a/packages/gateway/src/plans/executor.ts +++ b/packages/gateway/src/plans/executor.ts @@ -463,10 +463,21 @@ export class PlanExecutor extends EventEmitter { const stepPromises: Promise[] = []; for (const step of readySteps) { - // Respect maxConcurrent + // Stop scheduling new steps if the plan was aborted mid-wave. We must + // NOT throw here — that would orphan the already-pushed stepPromises + // (Promise.all below is what attaches their rejection handlers), so a + // later-failing in-flight step would surface as an unhandled rejection. + // Instead break, let Promise.all await what was scheduled, then throw. + if (signal.aborted) break; + + // Respect maxConcurrent. Re-check abort inside the slot-wait so a + // cancelled plan with every slot full of slow steps stops promptly + // rather than spinning this poll until a step happens to finish. while (executingSteps.size >= this.config.maxConcurrent) { + if (signal.aborted) break; await new Promise((r) => setTimeout(r, 10)); } + if (signal.aborted) break; executingSteps.add(step.id); const promise = this.executeStep(planId, step, results, signal) @@ -480,9 +491,17 @@ export class PlanExecutor extends EventEmitter { stepPromises.push(promise); } - // Wait for all steps in this wave to complete + // Wait for all steps in this wave to complete. Always awaited (even on + // the abort break above) so no scheduled promise is left unhandled. await Promise.all(stepPromises); + // Honor a mid-wave abort now that the scheduled steps have settled. The + // wave-top check would also catch it next iteration, but throwing here + // surfaces 'cancelled' immediately instead of looping once more. + if (signal.aborted) { + throw new Error('Plan execution aborted'); + } + // Update progress await this.planService.recalculateProgress(this.config.userId, planId);