diff --git a/CHANGELOG.md b/CHANGELOG.md index e7b44807..09d2224a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- **Plan cancellation is now prompt during parallel wave execution.** When every concurrency slot was full of slow steps, a cancelled plan kept spinning the `maxConcurrent` slot-wait until a step happened to finish — the wave-top abort check couldn't fire while the inner scheduling loop held control. The slot-wait (and the step-scheduling loop) now re-check `signal.aborted` and stop scheduling new steps immediately. Already-scheduled steps are still awaited via `Promise.all` before the abort is surfaced, so no in-flight promise is orphaned (which would otherwise become an unhandled rejection). + ## [0.7.1] - 2026-06-05 ### Fixed diff --git a/packages/gateway/src/plans/executor.test.ts b/packages/gateway/src/plans/executor.test.ts index e0387d1e..b98bc62f 100644 --- a/packages/gateway/src/plans/executor.test.ts +++ b/packages/gateway/src/plans/executor.test.ts @@ -936,6 +936,69 @@ describe('PlanExecutor', () => { }); }); + describe('wave execution abort', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it('stops scheduling further steps and resolves cancelled when aborted during the slot-wait', async () => { + // Regression: with all concurrency slots full of slow steps, a cancelled + // plan used to spin the maxConcurrent slot-wait until a step finished + // (the wave-top abort check can't fire while the inner loop holds + // control). The slot-wait now re-checks signal.aborted and breaks; the + // scheduled steps are still awaited via Promise.all (no orphaned promise + // / unhandled rejection), then the abort is surfaced as 'cancelled'. + const waveExecutor = new PlanExecutor({ + userId: 'user-1', + enableWaveExecution: true, + maxConcurrent: 1, + }); + const stepA = makeStep({ id: 'step-a', config: { toolName: 'tool_a', toolArgs: {} } }); + const stepB = makeStep({ id: 'step-b', config: { toolName: 'tool_b', toolArgs: {} } }); + + mockPlanService.getPlan.mockResolvedValue(makePlan()); + mockPlanService.getSteps.mockResolvedValue([stepA, stepB]); + mockPlanService.getStepsByStatus.mockResolvedValue([stepA, stepB]); + mockPlanService.areDependenciesMet.mockResolvedValue(true); + (hasTool as ReturnType).mockResolvedValue(true); + + // Gate tool_a so stepA holds the only slot while stepB waits. + let releaseA!: () => void; + const calledTools: string[] = []; + (executeTool as ReturnType).mockImplementation(async (toolName: string) => { + calledTools.push(toolName); + if (toolName === 'tool_a') { + await new Promise((resolve) => (releaseA = resolve)); + } + return { success: true, result: {} }; + }); + + const failedListener = vi.fn(); + waveExecutor.on('plan:failed', failedListener); + + const promise = waveExecutor.execute('plan-1'); + // Schedule stepA (occupying the slot) and enter the slot-wait for stepB. + await vi.advanceTimersByTimeAsync(20); + expect(calledTools).toEqual(['tool_a']); // stepB blocked on the full slot + + // Abort while stepB waits for a slot. + expect(await waveExecutor.abort('plan-1')).toBe(true); + + // Release stepA and let the executor unwind. + releaseA(); + await vi.advanceTimersByTimeAsync(50); + const result = await promise; + + // stepB was never scheduled; the plan is cancelled, not failed. + expect(calledTools).toEqual(['tool_a']); + expect(result.status).toBe('cancelled'); + expect(failedListener).not.toHaveBeenCalled(); + }); + }); + // ======================================================================== // Resume after pause // ======================================================================== diff --git a/packages/gateway/src/plans/executor.ts b/packages/gateway/src/plans/executor.ts index e4f8b708..1e5a9feb 100644 --- a/packages/gateway/src/plans/executor.ts +++ b/packages/gateway/src/plans/executor.ts @@ -463,10 +463,21 @@ export class PlanExecutor extends EventEmitter { const stepPromises: Promise[] = []; for (const step of readySteps) { - // Respect maxConcurrent + // Stop scheduling new steps if the plan was aborted mid-wave. We must + // NOT throw here — that would orphan the already-pushed stepPromises + // (Promise.all below is what attaches their rejection handlers), so a + // later-failing in-flight step would surface as an unhandled rejection. + // Instead break, let Promise.all await what was scheduled, then throw. + if (signal.aborted) break; + + // Respect maxConcurrent. Re-check abort inside the slot-wait so a + // cancelled plan with every slot full of slow steps stops promptly + // rather than spinning this poll until a step happens to finish. while (executingSteps.size >= this.config.maxConcurrent) { + if (signal.aborted) break; await new Promise((r) => setTimeout(r, 10)); } + if (signal.aborted) break; executingSteps.add(step.id); const promise = this.executeStep(planId, step, results, signal) @@ -480,9 +491,17 @@ export class PlanExecutor extends EventEmitter { stepPromises.push(promise); } - // Wait for all steps in this wave to complete + // Wait for all steps in this wave to complete. Always awaited (even on + // the abort break above) so no scheduled promise is left unhandled. await Promise.all(stepPromises); + // Honor a mid-wave abort now that the scheduled steps have settled. The + // wave-top check would also catch it next iteration, but throwing here + // surfaces 'cancelled' immediately instead of looping once more. + if (signal.aborted) { + throw new Error('Plan execution aborted'); + } + // Update progress await this.planService.recalculateProgress(this.config.userId, planId);