Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fail-started-agent-runs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-ax/agents-runtime': patch
---

Mark newly-started agent runs as failed when a wake handler errors before ending them, preventing chat UIs from showing "Thinking" indefinitely after runtime failures.
45 changes: 42 additions & 3 deletions packages/agents-runtime/src/process-wake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,14 @@ function withRegisteredManifestEntry(
}
}

async function latestNewRunKey(
async function latestStartedNewRunKey(
db: EntityStreamDBWithActions,
existingRunKeys: ReadonlySet<string>
): Promise<string | undefined> {
const runs = await queryOnce((q) => q.from({ runs: db.collections.runs }))
return runs.filter((run) => !existingRunKeys.has(run.key)).at(-1)?.key
return runs
.filter((run) => !existingRunKeys.has(run.key) && run.status === `started`)
.at(-1)?.key
}

async function resolveHeadersProvider(
Expand Down Expand Up @@ -551,7 +553,29 @@ export async function processWake(
failBackgroundWake(error, `WRITE_FAILED`)
},
})
const producedRunStatuses = new Map<
string,
`started` | `completed` | `failed`
>()
const producedRunOrder: Array<string> = []
const writeEvent = (event: ChangeEvent): void => {
if (
event.type === `run` &&
event.value &&
typeof event.value === `object`
) {
const status = (event.value as { status?: unknown }).status
if (
status === `started` ||
status === `completed` ||
status === `failed`
) {
if (!producedRunStatuses.has(event.key)) {
producedRunOrder.push(event.key)
}
producedRunStatuses.set(event.key, status)
}
}
producer.append(JSON.stringify(event))
}

Expand Down Expand Up @@ -2285,7 +2309,22 @@ export async function processWake(
? setupErr.code
: `HANDLER_FAILED`
log.error(`handler failed for ${entityUrl}:`, errMsg)
const failedRunKey = await latestNewRunKey(db, existingRunKeys)
const failedRunKey =
(await latestStartedNewRunKey(db, existingRunKeys)) ??
[...producedRunOrder]
.reverse()
.find((key) => producedRunStatuses.get(key) === `started`)
if (failedRunKey) {
writeEvent(
entityStateSchema.runs.update({
key: failedRunKey,
value: {
status: `failed`,
finish_reason: `error`,
} as never,
}) as ChangeEvent
)
}
writeEvent(
entityStateSchema.errors.insert({
key: `error-${epoch}-${crypto.randomUUID()}`,
Expand Down
70 changes: 69 additions & 1 deletion packages/agents-runtime/test/process-wake.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ vi.mock(`../src/entity-stream-db`, () => ({
? errors
: event.type === `signal`
? signals
: undefined
: event.type === `run`
? runs
: undefined
if (!collection) {
return
}
Expand Down Expand Up @@ -505,6 +507,72 @@ describe(`processWake`, () => {
fetchMock.mockRestore()
})

it(`marks a newly-started run failed when the handler throws before ending it`, async () => {
defineEntity(`test-agent`, {
handler: (ctx) => {
ctx.recordRun()
throw new Error(`boom after run start`)
},
})

await expect(processWake(makeNotification(), BASE_CONFIG)).rejects.toThrow(
`boom after run start`
)

const events = mockProducerAppend.mock.calls.map(([body]) =>
JSON.parse(String(body))
) as Array<ChangeEvent>
const startedRun = events.find(
(event) =>
event.type === `run` &&
event.headers.operation === `insert` &&
(event.value as { status?: string }).status === `started`
)

expect(startedRun).toBeDefined()
expect(events).toContainEqual(
expect.objectContaining({
type: `run`,
key: startedRun!.key,
headers: expect.objectContaining({ operation: `update` }),
value: expect.objectContaining({
status: `failed`,
finish_reason: `error`,
}),
})
)
})

it(`does not mark a completed run failed when later handler cleanup throws`, async () => {
defineEntity(`test-agent`, {
handler: (ctx) => {
ctx.db.collections.runs.insert({
key: `externally-visible-run`,
status: `completed`,
finish_reason: `stop`,
})
throw new Error(`boom after completed run`)
},
})

await expect(processWake(makeNotification(), BASE_CONFIG)).rejects.toThrow(
`boom after completed run`
)

const events = mockProducerAppend.mock.calls.map(([body]) =>
JSON.parse(String(body))
) as Array<ChangeEvent>

expect(events).not.toContainEqual(
expect.objectContaining({
type: `run`,
key: `externally-visible-run`,
headers: expect.objectContaining({ operation: `update` }),
value: expect.objectContaining({ status: `failed` }),
})
)
})

it(`returns null without acking for unknown entity types`, async () => {
// No entity type registered — runtime should silently bail
const result = await processWake(makeNotification(), BASE_CONFIG)
Expand Down
Loading