From 89292f7d4188edc4bea5da51bc8ba11207d2fc5a Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Fri, 19 Jun 2026 14:35:37 -0600 Subject: [PATCH 1/2] Fail started runs on handler errors --- packages/agents-runtime/src/process-wake.ts | 45 +++++++++++- .../agents-runtime/test/process-wake.test.ts | 70 ++++++++++++++++++- 2 files changed, 111 insertions(+), 4 deletions(-) diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index 8ed0484dad..abbec6acbd 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -179,12 +179,14 @@ function withRegisteredManifestEntry( } } -async function latestNewRunKey( +async function latestStartedNewRunKey( db: EntityStreamDBWithActions, existingRunKeys: ReadonlySet ): Promise { const runs = await queryOnce((q) => q.from({ runs: db.collections.runs })) - return runs.filter((run) => !existingRunKeys.has(run.key)).at(-1)?.key + return runs + .filter((run) => !existingRunKeys.has(run.key) && run.status === `started`) + .at(-1)?.key } async function resolveHeadersProvider( @@ -551,7 +553,29 @@ export async function processWake( failBackgroundWake(error, `WRITE_FAILED`) }, }) + const producedRunStatuses = new Map< + string, + `started` | `completed` | `failed` + >() + const producedRunOrder: Array = [] const writeEvent = (event: ChangeEvent): void => { + if ( + event.type === `run` && + event.value && + typeof event.value === `object` + ) { + const status = (event.value as { status?: unknown }).status + if ( + status === `started` || + status === `completed` || + status === `failed` + ) { + if (!producedRunStatuses.has(event.key)) { + producedRunOrder.push(event.key) + } + producedRunStatuses.set(event.key, status) + } + } producer.append(JSON.stringify(event)) } @@ -2285,7 +2309,22 @@ export async function processWake( ? setupErr.code : `HANDLER_FAILED` log.error(`handler failed for ${entityUrl}:`, errMsg) - const failedRunKey = await latestNewRunKey(db, existingRunKeys) + const failedRunKey = + (await latestStartedNewRunKey(db, existingRunKeys)) ?? + [...producedRunOrder] + .reverse() + .find((key) => producedRunStatuses.get(key) === `started`) + if (failedRunKey) { + writeEvent( + entityStateSchema.runs.update({ + key: failedRunKey, + value: { + status: `failed`, + finish_reason: `error`, + } as never, + }) as ChangeEvent + ) + } writeEvent( entityStateSchema.errors.insert({ key: `error-${epoch}-${crypto.randomUUID()}`, diff --git a/packages/agents-runtime/test/process-wake.test.ts b/packages/agents-runtime/test/process-wake.test.ts index 49c3a5460f..2b29ef346b 100644 --- a/packages/agents-runtime/test/process-wake.test.ts +++ b/packages/agents-runtime/test/process-wake.test.ts @@ -208,7 +208,9 @@ vi.mock(`../src/entity-stream-db`, () => ({ ? errors : event.type === `signal` ? signals - : undefined + : event.type === `run` + ? runs + : undefined if (!collection) { return } @@ -505,6 +507,72 @@ describe(`processWake`, () => { fetchMock.mockRestore() }) + it(`marks a newly-started run failed when the handler throws before ending it`, async () => { + defineEntity(`test-agent`, { + handler: (ctx) => { + ctx.recordRun() + throw new Error(`boom after run start`) + }, + }) + + await expect(processWake(makeNotification(), BASE_CONFIG)).rejects.toThrow( + `boom after run start` + ) + + const events = mockProducerAppend.mock.calls.map(([body]) => + JSON.parse(String(body)) + ) as Array + const startedRun = events.find( + (event) => + event.type === `run` && + event.headers.operation === `insert` && + (event.value as { status?: string }).status === `started` + ) + + expect(startedRun).toBeDefined() + expect(events).toContainEqual( + expect.objectContaining({ + type: `run`, + key: startedRun!.key, + headers: expect.objectContaining({ operation: `update` }), + value: expect.objectContaining({ + status: `failed`, + finish_reason: `error`, + }), + }) + ) + }) + + it(`does not mark a completed run failed when later handler cleanup throws`, async () => { + defineEntity(`test-agent`, { + handler: (ctx) => { + ctx.db.collections.runs.insert({ + key: `externally-visible-run`, + status: `completed`, + finish_reason: `stop`, + }) + throw new Error(`boom after completed run`) + }, + }) + + await expect(processWake(makeNotification(), BASE_CONFIG)).rejects.toThrow( + `boom after completed run` + ) + + const events = mockProducerAppend.mock.calls.map(([body]) => + JSON.parse(String(body)) + ) as Array + + expect(events).not.toContainEqual( + expect.objectContaining({ + type: `run`, + key: `externally-visible-run`, + headers: expect.objectContaining({ operation: `update` }), + value: expect.objectContaining({ status: `failed` }), + }) + ) + }) + it(`returns null without acking for unknown entity types`, async () => { // No entity type registered — runtime should silently bail const result = await processWake(makeNotification(), BASE_CONFIG) From e6b9c582f9d7802ece6debd08b468c31a7e8444a Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Fri, 19 Jun 2026 14:36:29 -0600 Subject: [PATCH 2/2] Add changeset for started run failures --- .changeset/fail-started-agent-runs.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fail-started-agent-runs.md diff --git a/.changeset/fail-started-agent-runs.md b/.changeset/fail-started-agent-runs.md new file mode 100644 index 0000000000..d0a86f8ff7 --- /dev/null +++ b/.changeset/fail-started-agent-runs.md @@ -0,0 +1,5 @@ +--- +'@electric-ax/agents-runtime': patch +--- + +Mark newly-started agent runs as failed when a wake handler errors before ending them, preventing chat UIs from showing "Thinking" indefinitely after runtime failures.