Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- **Plan cancellation is now prompt during parallel wave execution.** When every concurrency slot was full of slow steps, a cancelled plan kept spinning the `maxConcurrent` slot-wait until a step happened to finish — the wave-top abort check couldn't fire while the inner scheduling loop held control. The slot-wait (and the step-scheduling loop) now re-check `signal.aborted` and stop scheduling new steps immediately. Already-scheduled steps are still awaited via `Promise.all` before the abort is surfaced, so no in-flight promise is orphaned (which would otherwise become an unhandled rejection).

## [0.7.1] - 2026-06-05

### Fixed
Expand Down
63 changes: 63 additions & 0 deletions packages/gateway/src/plans/executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,69 @@ describe('PlanExecutor', () => {
});
});

describe('wave execution abort', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

it('stops scheduling further steps and resolves cancelled when aborted during the slot-wait', async () => {
// Regression: with all concurrency slots full of slow steps, a cancelled
// plan used to spin the maxConcurrent slot-wait until a step finished
// (the wave-top abort check can't fire while the inner loop holds
// control). The slot-wait now re-checks signal.aborted and breaks; the
// scheduled steps are still awaited via Promise.all (no orphaned promise
// / unhandled rejection), then the abort is surfaced as 'cancelled'.
const waveExecutor = new PlanExecutor({
userId: 'user-1',
enableWaveExecution: true,
maxConcurrent: 1,
});
const stepA = makeStep({ id: 'step-a', config: { toolName: 'tool_a', toolArgs: {} } });
const stepB = makeStep({ id: 'step-b', config: { toolName: 'tool_b', toolArgs: {} } });

mockPlanService.getPlan.mockResolvedValue(makePlan());
mockPlanService.getSteps.mockResolvedValue([stepA, stepB]);
mockPlanService.getStepsByStatus.mockResolvedValue([stepA, stepB]);
mockPlanService.areDependenciesMet.mockResolvedValue(true);
(hasTool as ReturnType<typeof vi.fn>).mockResolvedValue(true);

// Gate tool_a so stepA holds the only slot while stepB waits.
let releaseA!: () => void;
const calledTools: string[] = [];
(executeTool as ReturnType<typeof vi.fn>).mockImplementation(async (toolName: string) => {
calledTools.push(toolName);
if (toolName === 'tool_a') {
await new Promise<void>((resolve) => (releaseA = resolve));
}
return { success: true, result: {} };
});

const failedListener = vi.fn();
waveExecutor.on('plan:failed', failedListener);

const promise = waveExecutor.execute('plan-1');
// Schedule stepA (occupying the slot) and enter the slot-wait for stepB.
await vi.advanceTimersByTimeAsync(20);
expect(calledTools).toEqual(['tool_a']); // stepB blocked on the full slot

// Abort while stepB waits for a slot.
expect(await waveExecutor.abort('plan-1')).toBe(true);

// Release stepA and let the executor unwind.
releaseA();
await vi.advanceTimersByTimeAsync(50);
const result = await promise;

// stepB was never scheduled; the plan is cancelled, not failed.
expect(calledTools).toEqual(['tool_a']);
expect(result.status).toBe('cancelled');
expect(failedListener).not.toHaveBeenCalled();
});
});

// ========================================================================
// Resume after pause
// ========================================================================
Expand Down
23 changes: 21 additions & 2 deletions packages/gateway/src/plans/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,21 @@ export class PlanExecutor extends EventEmitter {
const stepPromises: Promise<void>[] = [];

for (const step of readySteps) {
// Respect maxConcurrent
// Stop scheduling new steps if the plan was aborted mid-wave. We must
// NOT throw here — that would orphan the already-pushed stepPromises
// (Promise.all below is what attaches their rejection handlers), so a
// later-failing in-flight step would surface as an unhandled rejection.
// Instead break, let Promise.all await what was scheduled, then throw.
if (signal.aborted) break;

// Respect maxConcurrent. Re-check abort inside the slot-wait so a
// cancelled plan with every slot full of slow steps stops promptly
// rather than spinning this poll until a step happens to finish.
while (executingSteps.size >= this.config.maxConcurrent) {
if (signal.aborted) break;
await new Promise((r) => setTimeout(r, 10));
}
if (signal.aborted) break;

executingSteps.add(step.id);
const promise = this.executeStep(planId, step, results, signal)
Expand All @@ -480,9 +491,17 @@ export class PlanExecutor extends EventEmitter {
stepPromises.push(promise);
}

// Wait for all steps in this wave to complete
// Wait for all steps in this wave to complete. Always awaited (even on
// the abort break above) so no scheduled promise is left unhandled.
await Promise.all(stepPromises);

// Honor a mid-wave abort now that the scheduled steps have settled. The
// wave-top check would also catch it next iteration, but throwing here
// surfaces 'cancelled' immediately instead of looping once more.
if (signal.aborted) {
throw new Error('Plan execution aborted');
}

// Update progress
await this.planService.recalculateProgress(this.config.userId, planId);

Expand Down
Loading