Skip to content

Commit db741bb

Browse files
authored
Merge pull request #85 from ownpilot/fix/plan-wave-abort-responsiveness
fix(plans): honor abort during parallel wave slot-wait
2 parents 66840f7 + c92fbf1 commit db741bb

3 files changed

Lines changed: 88 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- **Plan cancellation is now prompt during parallel wave execution.** When every concurrency slot was full of slow steps, a cancelled plan kept spinning the `maxConcurrent` slot-wait until a step happened to finish — the wave-top abort check couldn't fire while the inner scheduling loop held control. The slot-wait (and the step-scheduling loop) now re-check `signal.aborted` and stop scheduling new steps immediately. Already-scheduled steps are still awaited via `Promise.all` before the abort is surfaced, so no in-flight promise is orphaned (which would otherwise become an unhandled rejection).
13+
1014
## [0.7.1] - 2026-06-05
1115

1216
### Fixed

packages/gateway/src/plans/executor.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,69 @@ describe('PlanExecutor', () => {
936936
});
937937
});
938938

939+
describe('wave execution abort', () => {
940+
beforeEach(() => {
941+
vi.useFakeTimers();
942+
});
943+
afterEach(() => {
944+
vi.useRealTimers();
945+
});
946+
947+
it('stops scheduling further steps and resolves cancelled when aborted during the slot-wait', async () => {
948+
// Regression: with all concurrency slots full of slow steps, a cancelled
949+
// plan used to spin the maxConcurrent slot-wait until a step finished
950+
// (the wave-top abort check can't fire while the inner loop holds
951+
// control). The slot-wait now re-checks signal.aborted and breaks; the
952+
// scheduled steps are still awaited via Promise.all (no orphaned promise
953+
// / unhandled rejection), then the abort is surfaced as 'cancelled'.
954+
const waveExecutor = new PlanExecutor({
955+
userId: 'user-1',
956+
enableWaveExecution: true,
957+
maxConcurrent: 1,
958+
});
959+
const stepA = makeStep({ id: 'step-a', config: { toolName: 'tool_a', toolArgs: {} } });
960+
const stepB = makeStep({ id: 'step-b', config: { toolName: 'tool_b', toolArgs: {} } });
961+
962+
mockPlanService.getPlan.mockResolvedValue(makePlan());
963+
mockPlanService.getSteps.mockResolvedValue([stepA, stepB]);
964+
mockPlanService.getStepsByStatus.mockResolvedValue([stepA, stepB]);
965+
mockPlanService.areDependenciesMet.mockResolvedValue(true);
966+
(hasTool as ReturnType<typeof vi.fn>).mockResolvedValue(true);
967+
968+
// Gate tool_a so stepA holds the only slot while stepB waits.
969+
let releaseA!: () => void;
970+
const calledTools: string[] = [];
971+
(executeTool as ReturnType<typeof vi.fn>).mockImplementation(async (toolName: string) => {
972+
calledTools.push(toolName);
973+
if (toolName === 'tool_a') {
974+
await new Promise<void>((resolve) => (releaseA = resolve));
975+
}
976+
return { success: true, result: {} };
977+
});
978+
979+
const failedListener = vi.fn();
980+
waveExecutor.on('plan:failed', failedListener);
981+
982+
const promise = waveExecutor.execute('plan-1');
983+
// Schedule stepA (occupying the slot) and enter the slot-wait for stepB.
984+
await vi.advanceTimersByTimeAsync(20);
985+
expect(calledTools).toEqual(['tool_a']); // stepB blocked on the full slot
986+
987+
// Abort while stepB waits for a slot.
988+
expect(await waveExecutor.abort('plan-1')).toBe(true);
989+
990+
// Release stepA and let the executor unwind.
991+
releaseA();
992+
await vi.advanceTimersByTimeAsync(50);
993+
const result = await promise;
994+
995+
// stepB was never scheduled; the plan is cancelled, not failed.
996+
expect(calledTools).toEqual(['tool_a']);
997+
expect(result.status).toBe('cancelled');
998+
expect(failedListener).not.toHaveBeenCalled();
999+
});
1000+
});
1001+
9391002
// ========================================================================
9401003
// Resume after pause
9411004
// ========================================================================

packages/gateway/src/plans/executor.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,10 +463,21 @@ export class PlanExecutor extends EventEmitter {
463463
const stepPromises: Promise<void>[] = [];
464464

465465
for (const step of readySteps) {
466-
// Respect maxConcurrent
466+
// Stop scheduling new steps if the plan was aborted mid-wave. We must
467+
// NOT throw here — that would orphan the already-pushed stepPromises
468+
// (Promise.all below is what attaches their rejection handlers), so a
469+
// later-failing in-flight step would surface as an unhandled rejection.
470+
// Instead break, let Promise.all await what was scheduled, then throw.
471+
if (signal.aborted) break;
472+
473+
// Respect maxConcurrent. Re-check abort inside the slot-wait so a
474+
// cancelled plan with every slot full of slow steps stops promptly
475+
// rather than spinning this poll until a step happens to finish.
467476
while (executingSteps.size >= this.config.maxConcurrent) {
477+
if (signal.aborted) break;
468478
await new Promise((r) => setTimeout(r, 10));
469479
}
480+
if (signal.aborted) break;
470481

471482
executingSteps.add(step.id);
472483
const promise = this.executeStep(planId, step, results, signal)
@@ -480,9 +491,17 @@ export class PlanExecutor extends EventEmitter {
480491
stepPromises.push(promise);
481492
}
482493

483-
// Wait for all steps in this wave to complete
494+
// Wait for all steps in this wave to complete. Always awaited (even on
495+
// the abort break above) so no scheduled promise is left unhandled.
484496
await Promise.all(stepPromises);
485497

498+
// Honor a mid-wave abort now that the scheduled steps have settled. The
499+
// wave-top check would also catch it next iteration, but throwing here
500+
// surfaces 'cancelled' immediately instead of looping once more.
501+
if (signal.aborted) {
502+
throw new Error('Plan execution aborted');
503+
}
504+
486505
// Update progress
487506
await this.planService.recalculateProgress(this.config.userId, planId);
488507

0 commit comments

Comments
 (0)