diff --git a/docs/design/openclaw-ethos-preprompt-recall.md b/docs/design/openclaw-ethos-preprompt-recall.md index 7953daa36f..e9e41bc056 100644 --- a/docs/design/openclaw-ethos-preprompt-recall.md +++ b/docs/design/openclaw-ethos-preprompt-recall.md @@ -334,6 +334,7 @@ following shape at the bridge boundary: ``` Rules: + - preserve provenance back to the ledger event id - keep the episode concise and semantic - do not dump full task objects or raw debug metadata into Ethos @@ -355,6 +356,7 @@ Durable facts should carry explicit freshness/authority semantics: ``` Retrieval policy should prefer: + 1. `active` 2. `disputed` (visible but clearly marked) 3. `superseded` @@ -385,12 +387,14 @@ structured recall trace record that the UI can display safely: ``` Rules: + - operator-visible, not model-visible - enough detail for debugging - no raw prompt dump theater - no raw Ethos metadata leakage into Mission Control cards Finalized runtime contract in OpenClaw: + - emit these records through the existing task-ledger/event substrate - use `entity: "recall"` + `kind: "trace"` rather than inventing a separate sink - keep Mission Control consumption on ledger snapshot/event reads, not prompt scraping @@ -468,6 +472,7 @@ turning the UI into a prompt dump or leaking raw Ethos metadata. ### Likely files to touch #### OpenClaw source + - `src/hooks/bundled/ethos-context/handler.ts` - emit structured recall trace records/events - `src/hooks/bundled/ethos-context/handler.test.ts` @@ -480,6 +485,7 @@ turning the UI into a prompt dump or leaking raw Ethos metadata. of a dedicated trace channel #### Mission Control + - gateway task-ledger sync surfaces - `src/gateway/server-methods/tasks.ts` - extend `tasks.snapshot` / `tasks.events` consumers if Mission Control needs diff --git a/docs/design/task-ledger-mission-control-automation-roadmap.md b/docs/design/task-ledger-mission-control-automation-roadmap.md index fe9caad185..c16d3f2949 100644 --- a/docs/design/task-ledger-mission-control-automation-roadmap.md +++ b/docs/design/task-ledger-mission-control-automation-roadmap.md @@ -5,6 +5,7 @@ Turn the task-ledger/task-bus stack into a more automatic, trustworthy operational spine for OpenClaw, while documenting the portable pieces we want to carry into Mercury. This roadmap is based on real operator pain observed during Story 4 / Story 5 work: + - task truth lagging behind agent reality - Mission Control accurately reflecting stale ledger state - old blocked tasks dominating current agent status @@ -16,6 +17,7 @@ This roadmap is based on real operator pain observed during Story 4 / Story 5 wo Make the **ledger more authoritative** and Mission Control more of a **live projection** over that authority. That means: + - agents should publish lifecycle changes more automatically - heartbeats should carry stronger current-task context - Mission Control should project the ledger honestly, with freshness/confidence cues @@ -27,6 +29,7 @@ Today the biggest issue is not that Mission Control is "just wrong." The bigger issue is that the upstream task-ledger/task-bus flow is only partially automated. That creates a recurring failure mode: + 1. agent starts real work 2. task state stays `todo` or old blocked work remains attached 3. Mission Control projects the ledger faithfully @@ -41,10 +44,13 @@ So the roadmap focus should be upstream automation first, then projection qualit ## P0.1 Automatic task lifecycle publishing from agent execution ### Goal + Stop depending on humans to manually reconcile the most common task transitions. ### Required behavior + When an agent enters a real scoped implementation lane, the runtime should be able to publish: + - `todo -> in_progress` - `in_progress -> blocked` with reason - `in_progress -> qa` @@ -52,7 +58,9 @@ When an agent enters a real scoped implementation lane, the runtime should be ab - task notes with verification context ### Suggested shape + Introduce a thin standard command surface around the ledger publisher: + - `task.start(...)` - `task.block(...)` - `task.note(...)` @@ -62,9 +70,11 @@ Introduce a thin standard command surface around the ledger publisher: These should route through the canonical ledger write path, not bypass it. ### Why this matters + This removes the biggest source of Mission Control drift: real work starts, but the canonical task state never changes. ### Mercury portability + Portable almost as-is. Mercury should also have a small task command API around its canonical publisher. --- @@ -72,10 +82,13 @@ Portable almost as-is. Mercury should also have a small task command API around ## P0.2 Explicit current-task contract in agent heartbeats ### Goal + Make agent activity reliably attributable to current work. ### Required heartbeat fields + Every meaningful agent heartbeat should carry: + - `agentId` - `status` - `currentTaskId` @@ -87,9 +100,11 @@ Every meaningful agent heartbeat should carry: - `phase` / run metadata where relevant ### Why this matters + Mission Control should not have to guess which task an agent is carrying from a mix of old assignments and latest activity. ### Mercury portability + Portable conceptually. Names can differ, but the same contract should exist. --- @@ -97,9 +112,11 @@ Portable conceptually. Names can differ, but the same contract should exist. ## P0.3 Controlled task-reality reconciliation ### Goal + Catch and repair obvious truth drift between session reality and ledger reality. ### Example drift cases + - agent has an active run, but assigned task is still `todo` - task is `in_progress`, but assigned agent has been idle for hours - agent has old blocked tasks plus one newer active task @@ -107,17 +124,28 @@ Catch and repair obvious truth drift between session reality and ledger reality. - agent heartbeat `currentTaskId` disagrees with latest task assignment reality ### Required behavior + Create a reconciliation layer that can: + - detect drift - emit warnings - suggest safe fixes - optionally apply deterministic safe reconciliations +Escalation and reassignment rules should stay ledger-native: + +- activation misses should have explicit thresholds, not vague "follow up later" residue +- repeated proof-checkpoint misses / status loops should escalate on a fixed threshold and become reassignment-eligible on a later threshold +- stale or superseded ownership should define a ledger takeover path: publish the ownership change in the task ledger first, then let the gaining owner heartbeat the task +- Mission Control actions should remain a control surface over those ledger writes, never a second truth store + ### Important rule + Do not hide this inside silent UI reads. Reconciliation should be an explicit operational behavior with explainable output. ### Mercury portability + Portable and desirable. This is one of the strongest candidates to carry into Mercury early. --- @@ -125,9 +153,11 @@ Portable and desirable. This is one of the strongest candidates to carry into Me ## P0.4 Mission Control freshness + confidence cues ### Goal + Make the operator surface honest about projection quality. ### UI should show + - snapshot age - last ledger event age - per-panel freshness @@ -138,9 +168,11 @@ Make the operator surface honest about projection quality. - whether projection looks stale or reconciled ### Why this matters + If the dashboard may lag, the UI should say so clearly instead of implying unwarranted certainty. ### Mercury portability + Portable. Any operator dashboard benefits from explicit projection-confidence cues. --- @@ -150,9 +182,11 @@ Portable. Any operator dashboard benefits from explicit projection-confidence cu ## P1.1 First-class task relations ### Goal + Let the ledger express structure instead of treating tasks as isolated cards. ### Fields to add or normalize + - `parentTaskId` - `dependsOn` - `blockedBy` @@ -161,10 +195,12 @@ Let the ledger express structure instead of treating tasks as isolated cards. - `workstream` ### Why this matters + This makes rollups, blockers, and stale superseded work far easier to reason about. It also reduces the chance that old blocked tasks visually dominate newer active work. ### Mercury portability + Strongly portable. --- @@ -172,9 +208,11 @@ Strongly portable. ## P1.2 Better lifecycle metadata for task participation ### Goal + Improve rollups and automation by distinguishing active from non-participating children. ### Important lifecycle modes + - `parked` - `superseded` - `cancelled` @@ -182,9 +220,11 @@ Improve rollups and automation by distinguishing active from non-participating c - `blocked` ### Why this matters + Parent rollups and current-task logic become much safer when non-participating work is clearly marked. ### Mercury portability + Portable and important. --- @@ -192,9 +232,11 @@ Portable and important. ## P1.3 Operator-side reconcile actions in Mission Control ### Goal + Let the operator fix obvious state mismatches from the dashboard without inventing dashboard-owned truth. ### Good candidate actions + - mark started - mark blocked/unblocked - add note @@ -204,9 +246,11 @@ Let the operator fix obvious state mismatches from the dashboard without inventi - open active lane/session/worktree ### Rule + Mission Control remains a control surface over the ledger, not an independent truth store. ### Mercury portability + Portable, but should come after publisher discipline and reconciliation rules exist. --- @@ -216,9 +260,11 @@ Portable, but should come after publisher discipline and reconciliation rules ex ## P2.1 Nightly operational hygiene ### Goal + Use a scheduled maintenance pass to improve both memory quality and operational truth hygiene. ### Candidate checks + - stale `in_progress` tasks - orphaned blocked tasks - inactive agent/task mismatches @@ -227,9 +273,11 @@ Use a scheduled maintenance pass to improve both memory quality and operational - memory dedupe / summary regeneration / provenance-preserving compaction ### Why this matters + Nightly maintenance should operate over the real episode/fact/task substrate instead of placeholder schemas. ### Mercury portability + Portable in principle; exact maintenance jobs may differ. --- @@ -237,9 +285,11 @@ Portable in principle; exact maintenance jobs may differ. ## P2.2 Smarter rollup and stuck-lane handling ### Goal + Reduce human cleanup for long-running projects. ### Candidate automation + - parent task rollup suggestions - stuck-lane detection - no-heartbeat-with-open-task warnings @@ -247,6 +297,7 @@ Reduce human cleanup for long-running projects. - QA aging detection ### Mercury portability + Portable, but only after base lifecycle correctness is in place. --- @@ -254,6 +305,7 @@ Portable, but only after base lifecycle correctness is in place. # Recommended implementation order ## OpenClaw / Mission Control + 1. **Automatic lifecycle publishing** 2. **Heartbeat current-task contract hardening** 3. **Drift detection + reconciliation warnings** @@ -263,6 +315,7 @@ Portable, but only after base lifecycle correctness is in place. 7. **Nightly operational hygiene expansion** ## Mercury carryover order + 1. canonical event log + snapshot 2. task command publisher API 3. explicit agent heartbeat contract @@ -277,23 +330,27 @@ Portable, but only after base lifecycle correctness is in place. # Suggested acceptance criteria by layer ## Ledger / publisher layer + - agents can publish start/block/qa/done through a stable API - duplicate publishes remain idempotent - canonical snapshot updates deterministically - provenance is preserved for every meaningful transition ## Agent activity layer + - heartbeats include `currentTaskId`, lane, session, worktree, branch - latest active work is distinguishable from older blocked assignments - agent status no longer depends on ambiguous fallback inference alone ## Mission Control layer + - board reflects canonical task state without manual reconciliation drift - agent lanes show current task truth and freshness/confidence - activity feed remains historical, not mistaken for current truth - reconcile warnings surface when source/projection drift is detected ## Operator workflow layer + - operator can correct safe state mismatches without editing raw data - all control actions publish back through the ledger - UI never becomes the canonical source of truth @@ -320,6 +377,7 @@ If Mercury adopts those principles early, it avoids a huge class of “board say # Bottom line To make the task-ledger/task-bus/Mission Control stack better and more automated: + - automate more lifecycle truth at the source - enrich heartbeats with explicit current-work context - add controlled reconciliation instead of silent guessing diff --git a/docs/design/task-ledger-task-bus.md b/docs/design/task-ledger-task-bus.md index 35ba8c4b55..12cc5a5eb9 100644 --- a/docs/design/task-ledger-task-bus.md +++ b/docs/design/task-ledger-task-bus.md @@ -14,6 +14,7 @@ implementation details. - **Ethos is a searchable memory layer, not the operational truth.** That separation matters: + - ledger = what happened / what is active - Mission Control = what operators see - Ethos = what agents can recall semantically later @@ -21,9 +22,11 @@ That separation matters: ## Current implementation shape Main implementation lives in: + - `src/infra/task-ledger.ts` Primary schemas: + - `openclaw.task-ledger.event.v1` - `openclaw.task-ledger.snapshot.v1` @@ -40,6 +43,7 @@ The ledger materializes into two durable artifacts: - includes recent events for cheap consumers This gives us: + - durable history - cheap reads - replay/debug path @@ -52,6 +56,7 @@ This gives us: A task carries the coordination state for real work. Important fields today: + - `id` - `title` - `description` @@ -70,6 +75,7 @@ Important fields today: - `lastEventAt` Core states: + - `backlog` - `todo` - `in_progress` @@ -83,6 +89,7 @@ Agent activity is tracked separately from tasks so the operator surface can show who is alive/working without conflating that with task truth. Important fields today: + - `id` - `name` - `status` @@ -95,6 +102,7 @@ Important fields today: - `metadata` Core statuses: + - `idle` - `running` - `waiting` @@ -105,6 +113,7 @@ Core statuses: ### Task-side Current task event kinds: + - `created` - `started` - `state_changed` @@ -116,14 +125,17 @@ Current task event kinds: ### Agent-side Current agent event kinds: + - `heartbeat` ## Publishing model The main write API is: + - `publishTaskLedgerEvents(...)` Input forms today are essentially: + - task upsert - task transition - task note @@ -132,6 +144,7 @@ Input forms today are essentially: ### Important design rule The publisher is the authoritative place for: + - validation - id generation - idempotency handling @@ -144,6 +157,7 @@ This is where task-state hygiene belongs. ## Bus semantics The ledger also acts like a task bus through a stable topic field: + - default topic: `shared.task.ledger` That topic is not a full pub/sub system by itself; it is the routing/provenance @@ -153,6 +167,7 @@ operational stream they are consuming. ### Practical meaning The “task bus” in our current design is: + - event publication discipline - stable event schema - bus topic field @@ -164,10 +179,12 @@ It is not yet a standalone broker abstraction. The write path produces a materialized snapshot from the canonical event stream. Consumers should prefer: + - **snapshot for operator/UI reads** - **events for history/replay/integration** Mission Control follows this pattern: + - reads task/agent state from the snapshot/projection - shows board + activity + detail views - never becomes the source of truth itself @@ -182,6 +199,7 @@ If a parent task should roll forward based on child story/QA progress, that happens on the write path, not by mutating state during arbitrary reads. Why: + - read-path mutation is hard to reason about - it creates process-safety issues - it can create duplicate synthetic events @@ -194,6 +212,7 @@ promote it to `qa`/`done` just because children progressed. ### Parked / superseded / cancelled children should not participate Lifecycle metadata matters. Children marked as: + - `parked` - `superseded` - `cancelled` @@ -216,6 +235,7 @@ There is no separate Mission Control-native `/api/v1/activities` bridge in the current OpenClaw stack, and there is no Phoenix route tree in this repository. The current contract is: + - runtime/automation activity is written into the canonical ledger by the gateway-side task-ledger publishers and lifecycle listeners - explicit operator or integration writes go through the gateway RPC method @@ -228,12 +248,14 @@ That keeps the task ledger as the source of truth and keeps Mission Control as the projection/control surface over that truth. ### It should rely on the ledger for: + - board columns / task state - assigned agent and worktree context - blockers and notes - agent heartbeats / current task ### It should not invent its own truth for: + - task completion - stale task cleanup - lane ownership @@ -248,6 +270,7 @@ The ledger is operational truth. Ethos is the searchable memory layer. So the correct bridge pattern is: + - select high-signal ledger events - transform them into concise episodic records - preserve provenance back to ledger event ids / task ids @@ -263,12 +286,14 @@ If we port this into Mercury, the portable ideas are: Do not make the dashboard/UI the source of truth. Keep: + - append-only event history - materialized snapshot for cheap reads ### 2. Stable task schema Mercury should keep the same conceptual fields: + - stable task id - state - priority @@ -285,6 +310,7 @@ actual task state transitions. ### 4. Explicit lifecycle metadata The port should preserve lifecycle semantics for: + - parked - superseded - blocked @@ -295,6 +321,7 @@ This is where many “smart board” bugs come from. ### 5. Event-driven projections Mercury can use a different UI, but the pattern should stay: + - ledger events - snapshot projection - operator-facing surface built on projections @@ -314,6 +341,7 @@ consumer, not merge memory records into the canonical task log itself. ## What is OpenClaw-specific vs portable ### OpenClaw-specific details + - current schema names - current file paths - Mission Control implementation details @@ -321,6 +349,7 @@ consumer, not merge memory records into the canonical task log itself. - Ethos hook plumbing ### Portable concepts + - append-only operational event log - materialized snapshot - stable task states @@ -345,12 +374,14 @@ If Mercury adopts this pattern, start with: ## Follow-on roadmap For the automation / reconciliation / Mission Control hardening roadmap that should also carry into Mercury, see: + - `docs/design/task-ledger-mission-control-automation-roadmap.md` ## Bottom line The task ledger / task bus is not just “a board backend.” It is the operational spine that lets: + - agents coordinate safely - dashboards stay honest - memory systems ingest meaningful episodes diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 2e908ea3d9..34138cf3dd 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -1,8 +1,8 @@ import { describe, expect, it, vi } from "vitest"; import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js"; import type { OpenClawConfig } from "../../../config/config.js"; -import { appendBootstrapPromptWarning } from "../../bootstrap-budget.js"; import { clearInternalHooks, registerInternalHook } from "../../../hooks/internal-hooks.js"; +import { appendBootstrapPromptWarning } from "../../bootstrap-budget.js"; import { resolveOllamaBaseUrlForRun } from "../../ollama-stream.js"; import { buildAgentSystemPrompt } from "../../system-prompt.js"; import { buildEmbeddedSystemPrompt } from "../system-prompt.js"; diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index f0051778d5..5cb3dc615b 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -51,6 +51,8 @@ export type RunEmbeddedPiAgentParams = { senderE164?: string | null; /** Whether the sender is an owner (required for owner-only tools). */ senderIsOwner?: boolean; + /** Whether this caller may use per-run provider/model overrides. */ + allowModelOverride?: boolean; /** Current channel ID for auto-threading (Slack). */ currentChannelId?: string; /** Current thread timestamp for auto-threading (Slack). */ diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 9f46f4fba8..81745af721 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -4,9 +4,6 @@ import { getAcpSessionManager } from "../acp/control-plane/manager.js"; import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../acp/policy.js"; import { toAcpRuntimeError } from "../acp/runtime/errors.js"; import { resolveAcpSessionCwd } from "../acp/runtime/session-identifiers.js"; -import { createSubsystemLogger } from "../logging/subsystem.js"; - - import { listAgentIds, resolveAgentDir, @@ -83,6 +80,7 @@ import { import { resolveGitHeadPath } from "../infra/git-root.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { getRemoteSkillEligibility } from "../infra/skills-remote.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import { normalizeAgentId } from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { applyVerboseOverride } from "../sessions/level-overrides.js"; diff --git a/src/commands/agent/types.ts b/src/commands/agent/types.ts index 9786ac71ce..2d1af30449 100644 --- a/src/commands/agent/types.ts +++ b/src/commands/agent/types.ts @@ -1,98 +1,7 @@ -import type { AgentInternalEvent } from "../../agents/internal-events.js"; -import type { ClientToolDefinition } from "../../agents/pi-embedded-runner/run/params.js"; -import type { SpawnedRunMetadata } from "../../agents/spawned-context.js"; -import type { ChannelOutboundTargetMode } from "../../channels/plugins/types.js"; -import type { InputProvenance } from "../../sessions/input-provenance.js"; - -/** Image content block for Claude API multimodal messages. */ -export type ImageContent = { - type: "image"; - data: string; - mimeType: string; -}; - -export type AgentStreamParams = { - /** Provider stream params override (best-effort). */ - temperature?: number; - maxTokens?: number; - /** Provider fast-mode override (best-effort). */ - fastMode?: boolean; -}; - -export type AgentRunContext = { - messageChannel?: string; - accountId?: string; - groupId?: string | null; - groupChannel?: string | null; - groupSpace?: string | null; - currentChannelId?: string; - currentThreadTs?: string; - replyToMode?: "off" | "first" | "all"; - hasRepliedRef?: { value: boolean }; -}; - -export type AgentCommandOpts = { - message: string; - /** Optional image attachments for multimodal messages. */ - images?: ImageContent[]; - /** Optional client-provided tools (OpenResponses hosted tools). */ - clientTools?: ClientToolDefinition[]; - /** Agent id override (must exist in config). */ - agentId?: string; - to?: string; - sessionId?: string; - sessionKey?: string; - thinking?: string; - thinkingOnce?: string; - verbose?: string; - json?: boolean; - timeout?: string; - deliver?: boolean; - /** Override delivery target (separate from session routing). */ - replyTo?: string; - /** Override delivery channel (separate from session routing). */ - replyChannel?: string; - /** Override delivery account id (separate from session routing). */ - replyAccountId?: string; - /** Override delivery thread/topic id (separate from session routing). */ - threadId?: string | number; - /** Message channel context (webchat|voicewake|whatsapp|...). */ - messageChannel?: string; - channel?: string; // delivery channel (whatsapp|telegram|...) - /** Account ID for multi-account channel routing (e.g., WhatsApp account). */ - accountId?: string; - /** Context for embedded run routing (channel/account/thread). */ - runContext?: AgentRunContext; - /** Whether this caller is authorized for owner-only tools (defaults true for local CLI calls). */ - senderIsOwner?: boolean; - /** Whether this caller is authorized to use provider/model per-run overrides. */ - allowModelOverride?: boolean; - /** Group/spawn metadata for subagent policy inheritance and routing context. */ - groupId?: SpawnedRunMetadata["groupId"]; - groupChannel?: SpawnedRunMetadata["groupChannel"]; - groupSpace?: SpawnedRunMetadata["groupSpace"]; - spawnedBy?: SpawnedRunMetadata["spawnedBy"]; - deliveryTargetMode?: ChannelOutboundTargetMode; - bestEffortDeliver?: boolean; - abortSignal?: AbortSignal; - lane?: string; - currentTaskId?: string; - runId?: string; - extraSystemPrompt?: string; - internalEvents?: AgentInternalEvent[]; - inputProvenance?: InputProvenance; - /** Per-call stream param overrides (best-effort). */ - streamParams?: AgentStreamParams; - /** Explicit workspace directory override (for subagents to inherit parent workspace). */ - workspaceDir?: SpawnedRunMetadata["workspaceDir"]; -}; - -export type AgentCommandIngressOpts = Omit< +export type { + AgentCommandIngressOpts, AgentCommandOpts, - "senderIsOwner" | "allowModelOverride" -> & { - /** Ingress callsites must always pass explicit owner authorization state. */ - senderIsOwner: boolean; - /** Ingress callsites must always pass explicit model-override authorization state. */ - allowModelOverride: boolean; -}; + AgentRunContext, + AgentStreamParams, + ImageContent, +} from "../../agents/command/types.js"; diff --git a/src/gateway/server-methods/tasks.test.ts b/src/gateway/server-methods/tasks.test.ts index e2303d7f34..ee91f39c71 100644 --- a/src/gateway/server-methods/tasks.test.ts +++ b/src/gateway/server-methods/tasks.test.ts @@ -214,9 +214,9 @@ describe("tasks gateway handlers", () => { it("returns INVALID_REQUEST for malformed task ledger events", async () => { const publishRespond = vi.fn(); const context = makeContext(); - const spy = vi.spyOn(taskLedger, "publishTaskLedgerEvents").mockRejectedValueOnce( - new taskLedger.TaskLedgerPublishInputError("bad event"), - ); + const spy = vi + .spyOn(taskLedger, "publishTaskLedgerEvents") + .mockRejectedValueOnce(new taskLedger.TaskLedgerPublishInputError("bad event")); await tasksHandlers["tasks.publish"]({ req: {} as never, @@ -238,9 +238,9 @@ describe("tasks gateway handlers", () => { it("returns UNAVAILABLE for internal publish failures", async () => { const publishRespond = vi.fn(); const context = makeContext(); - const spy = vi.spyOn(taskLedger, "publishTaskLedgerEvents").mockRejectedValueOnce( - new Error("disk full"), - ); + const spy = vi + .spyOn(taskLedger, "publishTaskLedgerEvents") + .mockRejectedValueOnce(new Error("disk full")); await tasksHandlers["tasks.publish"]({ req: {} as never, diff --git a/src/hooks/bundled/ethos-context/handler.test.ts b/src/hooks/bundled/ethos-context/handler.test.ts index 966dd177dc..b5ebabbc6f 100644 --- a/src/hooks/bundled/ethos-context/handler.test.ts +++ b/src/hooks/bundled/ethos-context/handler.test.ts @@ -257,7 +257,7 @@ describe("ethos-context hook", () => { created_at: "1710000000000", source: "chat_history", }); - expect(Object.keys(memories[0] ?? {}).sort()).toEqual(["created_at", "source", "text"]); + expect(Object.keys(memories[0] ?? {}).toSorted()).toEqual(["created_at", "source", "text"]); expect(memories[0]?.id).toBeUndefined(); expect(memories[0]?.score).toBeUndefined(); expect(memories[0]?.resource_id).toBeUndefined(); @@ -267,7 +267,7 @@ describe("ethos-context hook", () => { expect(memories[0]?.metadata_scores).toBeUndefined(); expect(JSON.stringify(payload)).not.toContain("should-never-be-exposed"); expect(JSON.stringify(payload)).not.toContain("agent:main:main"); - expect(JSON.stringify(payload)).not.toContain("\"score\":"); + expect(JSON.stringify(payload)).not.toContain('"score":'); expect(publishTaskLedgerEventsMock).toHaveBeenCalledTimes(1); const trace = getLastRecallTraceInput(); @@ -385,9 +385,9 @@ describe("ethos-context hook", () => { expect(prependContext.length).toBeLessThanOrEqual(220); expect(prependContext).toContain(START_DELIMITER); expect(prependContext).toContain(END_DELIMITER); - expect(prependContext).not.toContain("\"thread_id\":"); - expect(prependContext).not.toContain("\"resource_id\":"); - expect(prependContext).not.toContain("\"score\":"); + expect(prependContext).not.toContain('"thread_id":'); + expect(prependContext).not.toContain('"resource_id":'); + expect(prependContext).not.toContain('"score":'); }); it("does not run when canaryAgents is empty", async () => { @@ -668,7 +668,9 @@ describe("ethos-context hook", () => { }); it("records timeout fail-open when the Ethos request aborts", async () => { - fetchMock.mockRejectedValueOnce(Object.assign(new Error("request aborted"), { name: "AbortError" })); + fetchMock.mockRejectedValueOnce( + Object.assign(new Error("request aborted"), { name: "AbortError" }), + ); const cfg = createConfig({ canaryAgents: ["main"] }); const event = createScopedEvent(cfg); (event.context as { prependContext?: unknown }).prependContext = "stale-recall-block"; diff --git a/src/hooks/bundled/ethos-context/handler.ts b/src/hooks/bundled/ethos-context/handler.ts index f021a64589..4c400e7e3e 100644 --- a/src/hooks/bundled/ethos-context/handler.ts +++ b/src/hooks/bundled/ethos-context/handler.ts @@ -1,11 +1,11 @@ import type { OpenClawConfig } from "../../../config/config.js"; import { publishTaskLedgerEvents } from "../../../infra/task-ledger.js"; import { createSubsystemLogger } from "../../../logging/subsystem.js"; -import { deriveSessionChatType } from "../../../sessions/session-key-utils.js"; import { resolveAgentIdFromSessionKey, resolveCanonicalResourceId, } from "../../../routing/session-key.js"; +import { deriveSessionChatType } from "../../../sessions/session-key-utils.js"; import { resolveHookConfig } from "../../config.js"; import { isAgentBeforePromptBuildEvent, type HookHandler } from "../../hooks.js"; @@ -193,6 +193,17 @@ function compactRecord(input: Record): Record return output; } +function compactStringRecord(input: Record): Record { + const output: Record = {}; + for (const [key, value] of Object.entries(input)) { + if (typeof value !== "string") { + continue; + } + output[key] = value; + } + return output; +} + function asObject(value: unknown): Record | null { if (!value || typeof value !== "object" || Array.isArray(value)) { return null; @@ -405,9 +416,9 @@ function buildContextBlockResult(params: { }; } try { - const payload = JSON.parse( - prependContext.slice(jsonStart + 1, jsonEnd), - ) as { memories?: Array<{ text?: string }> }; + const payload = JSON.parse(prependContext.slice(jsonStart + 1, jsonEnd)) as { + memories?: Array<{ text?: string }>; + }; const memories = Array.isArray(payload.memories) ? payload.memories : []; return { prependContext, @@ -430,7 +441,7 @@ function buildContextBlockResult(params: { function recordMatchesScope(params: { record: EthosSearchRecord; - resourceId: string; + resourceId?: string; threadId?: string; }): boolean { const expectedResourceId = normalizeScopeValue(params.resourceId); @@ -497,7 +508,7 @@ async function postSearchWithTimeout(params: { try { const response = await fetch(params.url, { method: "POST", - headers: compactRecord({ + headers: compactStringRecord({ "content-type": "application/json", authorization: params.apiKey ? `Bearer ${params.apiKey}` : undefined, }), @@ -798,8 +809,7 @@ const ethosContextHook: HookHandler = async (event) => { candidatesConsidered: 0, injectedCount: 0, injectedChars: 0, - dependencyStatus: - error instanceof Error && error.name === "AbortError" ? "timeout" : "error", + dependencyStatus: error instanceof Error && error.name === "AbortError" ? "timeout" : "error", }); } }; diff --git a/src/hooks/bundled/ethos-ingest/handler.ts b/src/hooks/bundled/ethos-ingest/handler.ts index 6d53082774..71467ab5e2 100644 --- a/src/hooks/bundled/ethos-ingest/handler.ts +++ b/src/hooks/bundled/ethos-ingest/handler.ts @@ -88,6 +88,17 @@ function compactRecord(input: Record): Record return output; } +function compactStringRecord(input: Record): Record { + const output: Record = {}; + for (const [key, value] of Object.entries(input)) { + if (typeof value !== "string") { + continue; + } + output[key] = value; + } + return output; +} + function resolveSenderId( context: MessageReceivedHookContext | MessageSentHookContext, isInbound: boolean, @@ -115,7 +126,7 @@ async function postWithTimeout(params: { try { const response = await fetch(params.url, { method: "POST", - headers: compactRecord({ + headers: compactStringRecord({ "content-type": "application/json", authorization: params.apiKey ? `Bearer ${params.apiKey}` : undefined, }), diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 34e1c40513..5d8eb5bc05 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -32,18 +32,19 @@ type AgentEventsRuntimeState = { }; const AGENT_EVENTS_STATE_KEY = "__openclaw_agent_events_state__"; -const globalState = - (globalThis as unknown as { [AGENT_EVENTS_STATE_KEY]?: AgentEventsRuntimeState })[ - AGENT_EVENTS_STATE_KEY - ] ?? { - seqByRun: new Map(), - listeners: new Set<(evt: AgentEventPayload) => void>(), - runContextById: new Map(), - }; +const globalState = ( + globalThis as unknown as { [AGENT_EVENTS_STATE_KEY]?: AgentEventsRuntimeState } +)[AGENT_EVENTS_STATE_KEY] ?? { + seqByRun: new Map(), + listeners: new Set<(evt: AgentEventPayload) => void>(), + runContextById: new Map(), +}; -if (!(globalThis as unknown as { [AGENT_EVENTS_STATE_KEY]?: AgentEventsRuntimeState })[ - AGENT_EVENTS_STATE_KEY -]) { +if ( + !(globalThis as unknown as { [AGENT_EVENTS_STATE_KEY]?: AgentEventsRuntimeState })[ + AGENT_EVENTS_STATE_KEY + ] +) { (globalThis as unknown as { [AGENT_EVENTS_STATE_KEY]: AgentEventsRuntimeState })[ AGENT_EVENTS_STATE_KEY ] = globalState; diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 56a1dfab52..6d0fc19e25 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -197,6 +197,9 @@ function expectSuccessfulWhatsAppInternalHookPayload( messageId: string; isGroup: boolean; groupId: string; + agentId: string; + cfg: OpenClawConfig; + timestamp: number; }>, ) { return expect.objectContaining({ diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 2f41a847bc..42ff070a91 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -1,8 +1,8 @@ -import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { resolveSendableOutboundReplyParts, sendMediaWithLeadingCaption, } from "openclaw/plugin-sdk/reply-payload"; +import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { chunkByParagraph, chunkMarkdownTextWithMode, diff --git a/src/infra/task-ledger.test.ts b/src/infra/task-ledger.test.ts index efcd384365..da14bb0d12 100644 --- a/src/infra/task-ledger.test.ts +++ b/src/infra/task-ledger.test.ts @@ -7,11 +7,14 @@ import { TASK_ACTIVATION_LANE_DEADLINE_MS, TASK_ACTIVATION_START_DEADLINE_MS, TASK_LEDGER_SCHEMA, + TASK_OWNERSHIP_ESCALATION_METADATA_KEY, TASK_PROOF_CHECKPOINT_METADATA_KEY, publishTaskLedgerEvents, readTaskLedgerEvents, readTaskLedgerSnapshot, + type TaskLedgerRecallRecord, type TaskLedgerRecord, + type TaskLedgerTaskRecord, } from "./task-ledger.js"; const stateDirs: string[] = []; @@ -42,6 +45,14 @@ async function appendRawEvents(stateDir: string, events: TaskLedgerRecord[]) { ); } +function isTaskRecord(event: TaskLedgerRecord): event is TaskLedgerTaskRecord { + return event.entity === "task"; +} + +function isRecallRecord(event: TaskLedgerRecord): event is TaskLedgerRecallRecord { + return event.entity === "recall"; +} + afterEach(async () => { vi.useRealTimers(); await Promise.all( @@ -502,7 +513,9 @@ describe("task ledger", () => { ], }); - const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const taskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); const agentEvents = await readTaskLedgerEvents({ stateDir, agentId: "forge" }); expect(firstTransition.accepted).toBe(1); @@ -574,7 +587,9 @@ describe("task ledger", () => { }); const snapshot = await readTaskLedgerSnapshot({ stateDir }); - const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const taskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); expect(started.accepted).toBe(1); expect(qa.accepted).toBe(1); @@ -760,17 +775,85 @@ describe("task ledger", () => { const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-activation-2" }); const activationNotes = taskEvents - .filter((event) => event.entity === "task" && event.kind === "note") + .filter( + (event): event is TaskLedgerTaskRecord => isTaskRecord(event) && event.kind === "note", + ) .map((event) => event.summary) .filter((summary) => summary.startsWith("Activation SLA miss:")); expect(activationNotes).toEqual([ - "Activation SLA miss: assigned agent forge has not acknowledged the task in the ledger within 5 minutes. Expected explicit task context, blocked state, or deferred progress before 2026-03-15T09:05:00.000Z.", - "Activation SLA miss: assigned agent forge has not pinned a lane for this task within 10 minutes. Expected a heartbeat with lane context before 2026-03-15T09:10:00.000Z.", - "Activation SLA miss: assigned agent forge did not show explicit start proof within 15 minutes. Expected a run-start milestone, in-progress transition, or explicit blocked/deferred state before 2026-03-15T09:15:00.000Z.", + "Activation SLA miss: assigned agent forge has not acknowledged the task in the ledger within 5 minutes. Expected explicit task context, blocked state, or deferred progress before 2026-03-15T09:05:00.000Z. This is 1 of 3 missed activation checkpoints for the current assignment cycle. Escalate ownership after 2 missed checkpoints; reassign through the ledger after 3 if the task is still silent.", + "Activation SLA miss: assigned agent forge has not pinned a lane for this task within 10 minutes. Expected a heartbeat with lane context before 2026-03-15T09:10:00.000Z. This is 2 of 3 missed activation checkpoints for the current assignment cycle. Escalate ownership now. If the final start checkpoint is also missed, reassign through the ledger by updating assignedAgent and requiring the gaining owner to heartbeat currentTaskId task-activation-2.", + "Activation SLA miss: assigned agent forge did not show explicit start proof within 15 minutes. Expected a run-start milestone, in-progress transition, or explicit blocked/deferred state before 2026-03-15T09:15:00.000Z. This is 3 of 3 missed activation checkpoints for the current assignment cycle. Reassignment is now appropriate: update assignedAgent through the ledger (or clear stale ownership), then require the gaining owner to heartbeat currentTaskId task-activation-2. Mission Control remains a control surface only.", ]); }); + it("projects structured ownership escalation metadata when activation misses exhaust an assignment cycle", async () => { + const stateDir = await createStateDir(); + + await publishTaskLedgerEvents({ + stateDir, + events: [ + { + entity: "task", + kind: "upsert", + task: { + id: "task-activation-escalation-1", + title: "Silent assigned work", + state: "todo", + assignedAgent: "forge", + }, + ts: "2026-03-15T09:00:00.000Z", + }, + { + entity: "task", + kind: "note", + taskId: "task-activation-escalation-1", + summary: "Operator checked escalation state.", + actor: { type: "operator", id: "mission-control" }, + ts: "2026-03-15T09:20:00.000Z", + }, + ], + }); + + const snapshot = await readTaskLedgerSnapshot({ stateDir }); + const escalation = snapshot.tasks[0]?.metadata[TASK_OWNERSHIP_ESCALATION_METADATA_KEY] as + | Record + | undefined; + + expect(escalation).toMatchObject({ + version: 1, + sourceOfTruth: "task_ledger", + level: "reassignment_ready", + thresholds: { + activationMissesToEscalate: 2, + activationMissesToReassign: 3, + statusOnlyUpdatesToPrompt: 2, + statusOnlyUpdatesToEscalate: 3, + statusOnlyUpdatesToReassign: 4, + }, + triggers: [ + { + code: "activation_sla", + level: "reassignment_ready", + activationMisses: { + checkpoints: ["acknowledge", "lane", "start"], + missCount: 3, + }, + }, + ], + takeover: { + recommended: true, + through: "task_ledger", + path: "publish_task_assignment", + currentAssignedAgent: "forge", + }, + }); + expect(escalation?.takeover).toMatchObject({ + summary: expect.stringContaining("updating assignedAgent"), + }); + }); + it("treats deferred milestone evidence differently from silent missed-start work", async () => { const stateDir = await createStateDir(); @@ -886,7 +969,9 @@ describe("task ledger", () => { const activation = snapshot.tasks[0]?.metadata.activationSla as Record; const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-activation-4" }); const activationNotes = taskEvents - .filter((event) => event.entity === "task" && event.kind === "note") + .filter( + (event): event is TaskLedgerTaskRecord => isTaskRecord(event) && event.kind === "note", + ) .map((event) => event.summary) .filter((summary) => summary.startsWith("Activation SLA miss:")); @@ -894,9 +979,9 @@ describe("task ledger", () => { expect(activation.startedAt).toBeUndefined(); expect(activation.startDisposition).toBeUndefined(); expect(activationNotes).toEqual([ - "Activation SLA miss: assigned agent forge has not acknowledged the task in the ledger within 5 minutes. Expected explicit task context, blocked state, or deferred progress before 2026-03-15T11:05:00.000Z.", - "Activation SLA miss: assigned agent forge has not pinned a lane for this task within 10 minutes. Expected a heartbeat with lane context before 2026-03-15T11:10:00.000Z.", - "Activation SLA miss: assigned agent forge did not show explicit start proof within 15 minutes. Expected a run-start milestone, in-progress transition, or explicit blocked/deferred state before 2026-03-15T11:15:00.000Z.", + "Activation SLA miss: assigned agent forge has not acknowledged the task in the ledger within 5 minutes. Expected explicit task context, blocked state, or deferred progress before 2026-03-15T11:05:00.000Z. This is 1 of 3 missed activation checkpoints for the current assignment cycle. Escalate ownership after 2 missed checkpoints; reassign through the ledger after 3 if the task is still silent.", + "Activation SLA miss: assigned agent forge has not pinned a lane for this task within 10 minutes. Expected a heartbeat with lane context before 2026-03-15T11:10:00.000Z. This is 2 of 3 missed activation checkpoints for the current assignment cycle. Escalate ownership now. If the final start checkpoint is also missed, reassign through the ledger by updating assignedAgent and requiring the gaining owner to heartbeat currentTaskId task-activation-4.", + "Activation SLA miss: assigned agent forge did not show explicit start proof within 15 minutes. Expected a run-start milestone, in-progress transition, or explicit blocked/deferred state before 2026-03-15T11:15:00.000Z. This is 3 of 3 missed activation checkpoints for the current assignment cycle. Reassignment is now appropriate: update assignedAgent through the ledger (or clear stale ownership), then require the gaining owner to heartbeat currentTaskId task-activation-4. Mission Control remains a control surface only.", ]); }); @@ -913,8 +998,8 @@ describe("task ledger", () => { acknowledgedAt: "2026-03-15T12:03:00.000Z", }, expectedMisses: [ - "Activation SLA miss: assigned agent forge has not pinned a lane for this task within 10 minutes. Expected a heartbeat with lane context before 2026-03-15T12:10:00.000Z.", - "Activation SLA miss: assigned agent forge did not show explicit start proof within 15 minutes. Expected a run-start milestone, in-progress transition, or explicit blocked/deferred state before 2026-03-15T12:15:00.000Z.", + "Activation SLA miss: assigned agent forge has not pinned a lane for this task within 10 minutes. Expected a heartbeat with lane context before 2026-03-15T12:10:00.000Z. This is 2 of 3 missed activation checkpoints for the current assignment cycle. Escalate ownership now. If the final start checkpoint is also missed, reassign through the ledger by updating assignedAgent and requiring the gaining owner to heartbeat currentTaskId task-activation-5.", + "Activation SLA miss: assigned agent forge did not show explicit start proof within 15 minutes. Expected a run-start milestone, in-progress transition, or explicit blocked/deferred state before 2026-03-15T12:15:00.000Z. This is 3 of 3 missed activation checkpoints for the current assignment cycle. Reassignment is now appropriate: update assignedAgent through the ledger (or clear stale ownership), then require the gaining owner to heartbeat currentTaskId task-activation-5. Mission Control remains a control surface only.", ], }, { @@ -930,7 +1015,7 @@ describe("task ledger", () => { startedAt: "2026-03-15T13:04:00.000Z", }, expectedMisses: [ - "Activation SLA miss: assigned agent forge has not pinned a lane for this task within 10 minutes. Expected a heartbeat with lane context before 2026-03-15T13:10:00.000Z.", + "Activation SLA miss: assigned agent forge has not pinned a lane for this task within 10 minutes. Expected a heartbeat with lane context before 2026-03-15T13:10:00.000Z. This is 2 of 3 missed activation checkpoints for the current assignment cycle. Escalate ownership now. If the final start checkpoint is also missed, reassign through the ledger by updating assignedAgent and requiring the gaining owner to heartbeat currentTaskId task-activation-6.", ], }, { @@ -1016,7 +1101,9 @@ describe("task ledger", () => { const activation = snapshot.tasks[0]?.metadata.activationSla as Record; const taskEvents = await readTaskLedgerEvents({ stateDir, taskId }); const activationNotes = taskEvents - .filter((event) => event.entity === "task" && event.kind === "note") + .filter( + (event): event is TaskLedgerTaskRecord => isTaskRecord(event) && event.kind === "note", + ) .map((event) => event.summary) .filter((summary) => summary.startsWith("Activation SLA miss:")); @@ -1141,16 +1228,18 @@ describe("task ledger", () => { const promptState = promptSnapshot.tasks[0]?.metadata[TASK_PROOF_CHECKPOINT_METADATA_KEY] as | Record | undefined; - const promptEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-proof-2" }); + const promptEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-proof-2" })).filter( + isTaskRecord, + ); const reconcilePrompts = promptEvents.filter( (event) => - event.entity === "task" && event.kind === "note" && event.actor.id === "task-ledger-reconciler" && event.summary.startsWith("Proof checkpoint required:"), ); expect(reconcilePrompts).toHaveLength(1); + expect(reconcilePrompts[0]?.summary).toContain("Escalate ownership at 3 consecutive"); expect(promptState).toMatchObject({ version: 1, statusOnlyUpdateCount: 2, @@ -1164,6 +1253,80 @@ describe("task ledger", () => { }); expect(promptState?.lastCheckpointAt).toBeUndefined(); + await publishTaskLedgerEvents({ + stateDir, + events: [ + { + entity: "task", + kind: "note", + taskId: "task-proof-2", + summary: "Still working and do not have proof to share yet.", + actor: { type: "agent", id: "forge" }, + ts: "2026-03-15T17:09:00.000Z", + }, + { + entity: "task", + kind: "note", + taskId: "task-proof-2", + summary: "Continuing without a concrete proof update yet.", + actor: { type: "agent", id: "forge" }, + ts: "2026-03-15T17:10:00.000Z", + }, + ], + }); + + const escalatedSnapshot = await readTaskLedgerSnapshot({ stateDir }); + const escalatedProofState = escalatedSnapshot.tasks[0]?.metadata[ + TASK_PROOF_CHECKPOINT_METADATA_KEY + ] as Record | undefined; + const escalatedOwnershipState = escalatedSnapshot.tasks[0]?.metadata[ + TASK_OWNERSHIP_ESCALATION_METADATA_KEY + ] as Record | undefined; + const escalatedEvents = ( + await readTaskLedgerEvents({ stateDir, taskId: "task-proof-2" }) + ).filter(isTaskRecord); + const ownershipEscalationNote = escalatedEvents.find( + (event) => event.kind === "note" && event.summary.startsWith("Ownership escalation:"), + ); + const ownershipReassignNote = escalatedEvents.find( + (event) => event.kind === "note" && event.summary.startsWith("Ownership reassignment ready:"), + ); + + expect(ownershipEscalationNote?.summary).toContain("4 consecutive status-only updates"); + expect(ownershipReassignNote?.summary).toContain( + "Reassign through the ledger by updating assignedAgent", + ); + expect(escalatedProofState).toMatchObject({ + version: 1, + statusOnlyUpdateCount: 4, + lastStatusNoteAt: "2026-03-15T17:10:00.000Z", + prompt: { + required: true, + reason: "status_loop", + requestedAt: "2026-03-15T17:07:00.000Z", + }, + }); + expect(escalatedOwnershipState).toMatchObject({ + version: 1, + sourceOfTruth: "task_ledger", + level: "reassignment_ready", + takeover: expect.objectContaining({ + recommended: true, + path: "publish_task_assignment", + }), + }); + expect( + (escalatedOwnershipState?.triggers as Array> | undefined)?.some( + (trigger) => + trigger.code === "proof_checkpoint" && + trigger.level === "reassignment_ready" && + (trigger.proofCheckpoint as Record | undefined) + ?.statusOnlyUpdateCount === 4 && + (trigger.proofCheckpoint as Record | undefined)?.promptRequestedAt === + "2026-03-15T17:07:00.000Z", + ), + ).toBe(true); + await publishTaskLedgerEvents({ stateDir, events: [ @@ -1187,6 +1350,9 @@ describe("task ledger", () => { const clearedState = clearedSnapshot.tasks[0]?.metadata[TASK_PROOF_CHECKPOINT_METADATA_KEY] as | Record | undefined; + const clearedOwnershipState = clearedSnapshot.tasks[0]?.metadata[ + TASK_OWNERSHIP_ESCALATION_METADATA_KEY + ] as Record | undefined; expect(clearedState).toMatchObject({ version: 1, @@ -1199,6 +1365,16 @@ describe("task ledger", () => { statusOnlyUpdateCount: 0, }); expect(clearedState?.prompt).toBeUndefined(); + expect(clearedOwnershipState).toMatchObject({ + version: 1, + sourceOfTruth: "task_ledger", + level: "watch", + }); + expect( + (clearedOwnershipState?.triggers as Array> | undefined)?.every( + (trigger) => trigger.code !== "proof_checkpoint", + ), + ).toBe(true); }); it("emits reconcile evidence when an active agent points at a task that is still todo", async () => { @@ -1227,7 +1403,9 @@ describe("task ledger", () => { ], }); - const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const taskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); expect(result.accepted).toBe(3); expect(taskEvents.at(-1)).toMatchObject({ @@ -1271,11 +1449,14 @@ describe("task ledger", () => { ], }); - const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const taskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); expect(result.accepted).toBe(3); expect(taskEvents.at(-1)?.summary).toMatch(/^Reconcile residue:/); expect(taskEvents.at(-1)?.summary).toMatch(/latest heartbeat reports the agent idle/i); + expect(taskEvents.at(-1)?.summary).toContain("immediate ownership escalation"); }); it("does not repeat reconcile residue for unchanged idle drift", async () => { @@ -1326,12 +1507,12 @@ describe("task ledger", () => { ], }); - const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const taskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); const reconcileEvents = taskEvents.filter( - (event) => - event.entity === "task" && - event.kind === "note" && - event.actor.id === "task-ledger-reconciler", + (event): event is TaskLedgerTaskRecord => + event.kind === "note" && event.actor.id === "task-ledger-reconciler", ); expect(replay.accepted).toBe(1); @@ -1370,12 +1551,63 @@ describe("task ledger", () => { ], }); - const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const taskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); expect(result.accepted).toBe(3); expect(taskEvents.at(-1)?.summary).toMatch(/^Reconcile residue:/); expect(taskEvents.at(-1)?.summary).toMatch(/latest heartbeat is stale/i); expect(taskEvents.at(-1)?.summary).toContain("2026-03-15T06:20:00.000Z"); + expect(taskEvents.at(-1)?.summary).toContain("currentTaskId task-1"); + }); + + it("detects stale in-progress ownership against reconciliation time, not only task-local timestamps", async () => { + const stateDir = await createStateDir(); + + const result = await publishTaskLedgerEvents({ + stateDir, + events: [ + { + entity: "task", + kind: "upsert", + task: { + id: "task-1", + title: "Resume task", + state: "in_progress", + assignedAgent: "forge", + }, + ts: "2026-03-15T06:05:00.000Z", + }, + { + entity: "agent", + kind: "heartbeat", + agent: { + id: "forge", + status: "running", + currentTaskId: "task-1", + summary: "Earlier run", + }, + ts: "2026-03-15T06:10:00.000Z", + }, + { + entity: "task", + kind: "upsert", + task: { id: "task-2", title: "Unrelated work", state: "todo" }, + ts: "2026-03-15T06:30:00.000Z", + }, + ], + }); + + const taskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); + + const staleEvent = taskEvents.find((event) => /latest heartbeat is stale/i.test(event.summary)); + + expect(result.accepted).toBeGreaterThanOrEqual(4); + expect(staleEvent?.summary).toMatch(/^Reconcile residue:/); + expect(staleEvent?.summary).toContain("2026-03-15T06:10:00.000Z"); }); it("emits reconcile evidence when older blocked work coexists with newer active work for the same agent", async () => { @@ -1415,11 +1647,71 @@ describe("task ledger", () => { ], }); - const blockedTaskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const blockedTaskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); expect(result.accepted).toBe(4); expect(blockedTaskEvents.at(-1)?.summary).toMatch(/^Reconcile residue:/); expect(blockedTaskEvents.at(-1)?.summary).toMatch(/newer active work exists on task-2/i); + expect(blockedTaskEvents.at(-1)?.summary).toContain("Reassignment is appropriate"); + }); + + it("does not let later blocked-task notes suppress superseded-ownership detection", async () => { + const stateDir = await createStateDir(); + + const result = await publishTaskLedgerEvents({ + stateDir, + events: [ + { + entity: "task", + kind: "upsert", + task: { id: "task-1", title: "Blocked work", state: "blocked", assignedAgent: "forge" }, + ts: "2026-03-15T06:50:00.000Z", + }, + { + entity: "task", + kind: "upsert", + task: { + id: "task-2", + title: "Active work", + state: "in_progress", + assignedAgent: "forge", + }, + ts: "2026-03-15T07:00:00.000Z", + }, + { + entity: "agent", + kind: "heartbeat", + agent: { + id: "forge", + status: "running", + currentTaskId: "task-2", + summary: "Working task-2", + }, + ts: "2026-03-15T07:10:00.000Z", + }, + { + entity: "task", + kind: "note", + taskId: "task-1", + summary: "Operator left a later comment on the blocked task.", + actor: { type: "operator", id: "mission-control" }, + ts: "2026-03-15T07:15:00.000Z", + }, + ], + }); + + const blockedTaskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); + + const supersededEvent = blockedTaskEvents.find((event) => + /newer active work exists on task-2/i.test(event.summary), + ); + + expect(result.accepted).toBeGreaterThanOrEqual(5); + expect(supersededEvent?.summary).toMatch(/^Reconcile residue:/); }); it("emits reconcile mismatch when heartbeat task context disagrees with task ownership", async () => { @@ -1448,11 +1740,69 @@ describe("task ledger", () => { ], }); - const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const taskEvents = (await readTaskLedgerEvents({ stateDir, taskId: "task-1" })).filter( + isTaskRecord, + ); expect(result.accepted).toBe(3); expect(taskEvents.at(-1)?.summary).toMatch(/^Reconcile mismatch:/); expect(taskEvents.at(-1)?.summary).toMatch(/task is assigned to atlas/i); + expect(taskEvents.at(-1)?.summary).toContain("reassign through the ledger"); + }); + + it("ignores stale heartbeat claims when deriving mismatch escalation state", async () => { + const stateDir = await createStateDir(); + + const result = await publishTaskLedgerEvents({ + stateDir, + events: [ + { + entity: "task", + kind: "upsert", + task: { id: "task-1", title: "Already closed", state: "done", assignedAgent: "atlas" }, + ts: "2026-03-15T07:10:00.000Z", + }, + { + entity: "agent", + kind: "heartbeat", + agent: { + id: "forge", + status: "running", + currentTaskId: "task-1", + summary: "Old task claim", + }, + ts: "2026-03-15T07:11:00.000Z", + }, + { + entity: "task", + kind: "note", + taskId: "task-1", + summary: "Operator checked ownership much later.", + actor: { type: "operator", id: "mission-control" }, + ts: "2026-03-15T07:40:00.000Z", + }, + ], + }); + + const taskEvents = await readTaskLedgerEvents({ stateDir, taskId: "task-1" }); + const snapshot = await readTaskLedgerSnapshot({ stateDir }); + const ownershipEscalation = snapshot.tasks[0]?.metadata[ + TASK_OWNERSHIP_ESCALATION_METADATA_KEY + ] as Record | undefined; + const reconcileMismatchEvents = taskEvents.filter( + (event) => + event.entity === "task" && + event.kind === "note" && + event.summary.startsWith("Reconcile mismatch:"), + ); + + expect(result.accepted).toBe(3); + expect(reconcileMismatchEvents).toHaveLength(0); + expect( + (ownershipEscalation?.triggers as Array> | undefined)?.some( + (trigger) => trigger.code === "heartbeat_claim_mismatch", + ) ?? false, + ).toBe(false); }); it("accepts recall trace events without mutating task/agent projections", async () => { @@ -1585,14 +1935,16 @@ describe("task ledger", () => { }); expect(result.accepted).toBe(1); - expect(result.events[0].scope).toMatchObject({ + const recallEvent = result.events.find(isRecallRecord); + expect(recallEvent).toBeDefined(); + expect(recallEvent?.scope).toMatchObject({ senderId: "8480568759", channelClass: "dm", threadId: "agent:main:thread", resourceId: "agent:main:resource", }); - expect(result.events[0].scope?.ignored).toBeUndefined(); - expect(result.events[0].scope?.dropped).toBeUndefined(); + expect(recallEvent?.scope?.ignored).toBeUndefined(); + expect(recallEvent?.scope?.dropped).toBeUndefined(); }); it("includes recall traces in agent-scoped event reads", async () => { @@ -1670,6 +2022,9 @@ describe("task ledger", () => { agentId: "forge", ran: true, dependencyStatus: "bogus-status" as never, + candidatesConsidered: 0, + injectedCount: 0, + injectedChars: 0, ts: "2026-03-15T07:24:00.000Z", }, ], diff --git a/src/infra/task-ledger.ts b/src/infra/task-ledger.ts index f0d398cf3f..639926c843 100644 --- a/src/infra/task-ledger.ts +++ b/src/infra/task-ledger.ts @@ -11,6 +11,7 @@ export const TASK_PRIORITIES = ["low", "medium", "high", "critical"] as const; export const AGENT_ACTIVITY_STATUSES = ["idle", "running", "waiting", "blocked"] as const; export const TASK_ACTIVATION_SLA_METADATA_KEY = "activationSla" as const; export const TASK_PROOF_CHECKPOINT_METADATA_KEY = "proofCheckpoint" as const; +export const TASK_OWNERSHIP_ESCALATION_METADATA_KEY = "ownershipEscalation" as const; export const TASK_ACTIVATION_ACK_DEADLINE_MS = 5 * 60_000; export const TASK_ACTIVATION_LANE_DEADLINE_MS = 10 * 60_000; export const TASK_ACTIVATION_START_DEADLINE_MS = 15 * 60_000; @@ -24,6 +25,10 @@ const RECONCILE_ACTOR_NAME = "Task ledger reconciler"; const RECONCILE_IDEMPOTENCY_PREFIX = "reconcile"; const RECONCILE_AGENT_STALE_MS = 15 * 60_000; const PROOF_CHECKPOINT_REQUIRED_STATUS_NOTES = 2; +const OWNERSHIP_ACTIVATION_MISSES_TO_ESCALATE = 2; +const OWNERSHIP_ACTIVATION_MISSES_TO_REASSIGN = 3; +const OWNERSHIP_STATUS_ONLY_UPDATES_TO_ESCALATE = PROOF_CHECKPOINT_REQUIRED_STATUS_NOTES + 1; +const OWNERSHIP_STATUS_ONLY_UPDATES_TO_REASSIGN = PROOF_CHECKPOINT_REQUIRED_STATUS_NOTES + 2; export type TaskProofCheckpointSignalType = "files" | "diffSummary" | "tests" | "reviewSignal"; const TASK_PROOF_CHECKPOINT_SIGNAL_TYPES = [ "files", @@ -103,6 +108,84 @@ type TaskProofCheckpointEvidence = TaskProofCheckpointState & { currentCycleStartedAt?: string; }; +type TaskActivationMissedCheckpoint = "acknowledge" | "lane" | "start"; + +type TaskOwnershipEscalationLevel = "watch" | "escalated" | "reassignment_ready"; + +type TaskOwnershipEscalationTriggerCode = + | "activation_sla" + | "proof_checkpoint" + | "assigned_agent_missing" + | "assigned_agent_idle" + | "assigned_agent_stale" + | "heartbeat_claim_mismatch" + | "blocked_superseded"; + +type TaskOwnershipEscalationTrigger = { + code: TaskOwnershipEscalationTriggerCode; + level: TaskOwnershipEscalationLevel; + observedAt: string; + summary: string; + activationMisses?: { + checkpoints: TaskActivationMissedCheckpoint[]; + missCount: number; + }; + proofCheckpoint?: { + statusOnlyUpdateCount: number; + promptRequestedAt?: string; + }; + ownership?: { + assignedAgent?: string; + claimedByAgent?: string; + staleHeartbeatAt?: string; + supersededByTaskId?: string; + }; +}; + +export type TaskOwnershipEscalationState = { + version: 1; + sourceOfTruth: "task_ledger"; + level: TaskOwnershipEscalationLevel; + thresholds: { + activationMissesToEscalate: number; + activationMissesToReassign: number; + statusOnlyUpdatesToPrompt: number; + statusOnlyUpdatesToEscalate: number; + statusOnlyUpdatesToReassign: number; + staleHeartbeatMs: number; + }; + triggers: TaskOwnershipEscalationTrigger[]; + takeover?: { + recommended: true; + through: "task_ledger"; + path: "publish_task_assignment"; + summary: string; + currentAssignedAgent?: string; + suggestedAgent?: string; + }; +}; + +type ActiveTaskWork = { + taskId: string; + tsMs: number; + referenceId: string; + source: "task" | "heartbeat"; +}; + +type TaskOwnershipObservationContext = { + reconciliationTs: string; + reconciliationTsMs: number; + lastSubstantiveTaskRecordById: Map; + lastAgentRecordById: Map; + activationEvidenceByTaskId: Map; + proofCheckpointByTaskId: Map; + activeWorkByAgentId: Map; + heartbeatClaimantsByTaskId: Map< + string, + Array<{ agentId: string; status: AgentActivityStatus; heartbeatAt: string }> + >; +}; + export type TaskState = (typeof TASK_STATES)[number]; export type TaskPriority = (typeof TASK_PRIORITIES)[number]; export type AgentActivityStatus = (typeof AGENT_ACTIVITY_STATUSES)[number]; @@ -216,6 +299,14 @@ export type TaskLedgerRecord = | TaskLedgerAgentRecord | TaskLedgerRecallRecord; +type TaskLedgerReconcileNoteRecord = TaskLedgerTaskRecord & { + kind: "note"; + actor: TaskLedgerActor & { + type: "system"; + id: typeof RECONCILE_ACTOR_ID; + }; +}; + export type TaskLedgerSnapshot = { schema: typeof TASK_LEDGER_SNAPSHOT_SCHEMA; generatedAt: string; @@ -688,6 +779,21 @@ function withTaskProofCheckpointMetadata( return next; } +function withTaskOwnershipEscalationMetadata( + metadata: Record, + escalation: TaskOwnershipEscalationState | undefined, +): Record { + const next = normalizeMetadata(metadata); + if (!escalation) { + if (Object.hasOwn(next, TASK_OWNERSHIP_ESCALATION_METADATA_KEY)) { + delete next[TASK_OWNERSHIP_ESCALATION_METADATA_KEY]; + } + return next; + } + next[TASK_OWNERSHIP_ESCALATION_METADATA_KEY] = escalation; + return next; +} + function resolveTaskLedgerPaths(stateDir = resolveStateDir()) { const rootDir = path.join(stateDir, "shared", "task-ledger"); return { @@ -1032,6 +1138,7 @@ function createSnapshotFromRecords(params: { const materialized = materializeLedgerState(params.records); applyDerivedTaskActivationEvidence(materialized); applyDerivedTaskProofCheckpointEvidence(materialized); + applyDerivedTaskOwnershipEscalationEvidence(materialized); const recentLimit = typeof params.recentEventLimit === "number" && params.recentEventLimit > 0 ? Math.floor(params.recentEventLimit) @@ -1048,7 +1155,13 @@ function createSnapshotFromRecords(params: { }; } -function isReconcileTaskNote(record: TaskLedgerRecord): record is TaskLedgerTaskRecord { +function isTaskLedgerTaskRecord(record: TaskLedgerRecord): record is TaskLedgerTaskRecord { + return record.entity === "task"; +} + +function isReconcileTaskNote( + record: TaskLedgerRecord | TaskLedgerTaskRecord, +): record is TaskLedgerReconcileNoteRecord { return ( record.entity === "task" && record.kind === "note" && @@ -1189,32 +1302,43 @@ function deriveTaskActivationEvidence( continue; } - if (record.entity !== "task" || record.taskId !== task.id || isReconcileTaskNote(record)) { + if ( + !isTaskLedgerTaskRecord(record) || + record.taskId !== task.id || + isReconcileTaskNote(record) + ) { continue; } - const explicitAgentEvidence = resolveExplicitAgentActivationNoteEvidence(record, assignedAgent); + const taskRecord = record; + const explicitAgentEvidence = resolveExplicitAgentActivationNoteEvidence( + taskRecord, + assignedAgent, + ); const disposition = explicitAgentEvidence?.startDisposition !== undefined ? { startDisposition: explicitAgentEvidence.startDisposition, startDispositionReason: explicitAgentEvidence.startDispositionReason, } - : resolveActivationStartDisposition(record); + : resolveActivationStartDisposition(taskRecord); - if (!evidence.startedAt && (isActivationStartProof(record) || explicitAgentEvidence?.started)) { - evidence.startedAt = record.ts; + if ( + !evidence.startedAt && + (isActivationStartProof(taskRecord) || explicitAgentEvidence?.started) + ) { + evidence.startedAt = taskRecord.ts; } if (disposition && !evidence.startDisposition) { evidence.startDisposition = disposition.startDisposition; - evidence.startDispositionAt = record.ts; + evidence.startDispositionAt = taskRecord.ts; evidence.startDispositionReason = disposition.startDispositionReason; } if ( !evidence.acknowledgedAt && (explicitAgentEvidence?.acknowledged || evidence.startedAt || disposition) ) { - evidence.acknowledgedAt = record.ts; + evidence.acknowledgedAt = taskRecord.ts; } } @@ -1288,16 +1412,17 @@ function deriveTaskProofCheckpointEvidence( let prompt: TaskProofCheckpointState["prompt"]; for (const record of records) { - if (record.entity !== "task" || record.taskId !== task.id) { + if (!isTaskLedgerTaskRecord(record) || record.taskId !== task.id) { continue; } - const nextState = resolveTaskRecordState(record, currentState); + const taskRecord = record; + const nextState = resolveTaskRecordState(taskRecord, currentState); const enteredInProgress = nextState === "in_progress" && currentState !== "in_progress"; const leftInProgress = currentState === "in_progress" && nextState !== "in_progress"; if (enteredInProgress) { - currentCycleStartedAt = record.ts; + currentCycleStartedAt = taskRecord.ts; statusOnlyUpdateCount = 0; lastStatusNoteAt = undefined; prompt = undefined; @@ -1310,9 +1435,9 @@ function deriveTaskProofCheckpointEvidence( currentState = nextState; - if (hasConcreteTaskProofCheckpoint(record.proofCheckpoint)) { - lastCheckpointAt = record.ts; - lastCheckpoint = record.proofCheckpoint; + if (hasConcreteTaskProofCheckpoint(taskRecord.proofCheckpoint)) { + lastCheckpointAt = taskRecord.ts; + lastCheckpoint = taskRecord.proofCheckpoint; statusOnlyUpdateCount = 0; lastStatusNoteAt = undefined; prompt = undefined; @@ -1323,17 +1448,17 @@ function deriveTaskProofCheckpointEvidence( continue; } - if (isStatusOnlyAgentTaskNote(record)) { + if (isStatusOnlyAgentTaskNote(taskRecord)) { statusOnlyUpdateCount += 1; - lastStatusNoteAt = record.ts; + lastStatusNoteAt = taskRecord.ts; continue; } - if (isProofCheckpointPromptRecord(record)) { + if (isProofCheckpointPromptRecord(taskRecord)) { prompt = { required: true, reason: "status_loop", - requestedAt: record.ts, + requestedAt: taskRecord.ts, requiredSignals: [...TASK_PROOF_CHECKPOINT_SIGNAL_TYPES], }; } @@ -1366,6 +1491,405 @@ function applyDerivedTaskProofCheckpointEvidence(materialized: MaterializedLedge } } +function ownershipEscalationLevelRank(level: TaskOwnershipEscalationLevel): number { + switch (level) { + case "watch": + return 1; + case "escalated": + return 2; + case "reassignment_ready": + return 3; + } +} + +function maxOwnershipEscalationLevel( + left: TaskOwnershipEscalationLevel, + right: TaskOwnershipEscalationLevel, +): TaskOwnershipEscalationLevel { + return ownershipEscalationLevelRank(left) >= ownershipEscalationLevelRank(right) ? left : right; +} + +function buildTaskOwnershipEscalationThresholds(): TaskOwnershipEscalationState["thresholds"] { + return { + activationMissesToEscalate: OWNERSHIP_ACTIVATION_MISSES_TO_ESCALATE, + activationMissesToReassign: OWNERSHIP_ACTIVATION_MISSES_TO_REASSIGN, + statusOnlyUpdatesToPrompt: PROOF_CHECKPOINT_REQUIRED_STATUS_NOTES, + statusOnlyUpdatesToEscalate: OWNERSHIP_STATUS_ONLY_UPDATES_TO_ESCALATE, + statusOnlyUpdatesToReassign: OWNERSHIP_STATUS_ONLY_UPDATES_TO_REASSIGN, + staleHeartbeatMs: RECONCILE_AGENT_STALE_MS, + }; +} + +function isFreshAgentHeartbeat(heartbeatAtMs: number, reconciliationTsMs: number): boolean { + return ( + Number.isFinite(heartbeatAtMs) && + Number.isFinite(reconciliationTsMs) && + reconciliationTsMs - heartbeatAtMs < RECONCILE_AGENT_STALE_MS + ); +} + +function findLatestTaskRecord( + records: TaskLedgerRecord[], + taskId: string, + predicate?: (record: TaskLedgerTaskRecord) => boolean, +): TaskLedgerTaskRecord | undefined { + for (let index = records.length - 1; index >= 0; index -= 1) { + const record = records[index]; + if (record?.entity !== "task" || record.taskId !== taskId || isReconcileTaskNote(record)) { + continue; + } + if (!predicate || predicate(record)) { + return record; + } + } + return undefined; +} + +function buildTaskOwnershipObservationContext( + materialized: MaterializedLedgerState, +): TaskOwnershipObservationContext { + const lastSubstantiveTaskRecordById = new Map(); + const lastAgentRecordById = new Map(); + const activationEvidenceByTaskId = new Map(); + const proofCheckpointByTaskId = new Map(); + const activeWorkByAgentId = new Map(); + const heartbeatClaimantsByTaskId = new Map< + string, + Array<{ agentId: string; status: AgentActivityStatus; heartbeatAt: string }> + >(); + const reconciliationTs = materialized.appliedRecords.at(-1)?.ts ?? new Date().toISOString(); + const reconciliationTsMs = Date.parse(reconciliationTs); + + const considerActiveWork = (agentId: string, candidate: ActiveTaskWork) => { + if (!Number.isFinite(candidate.tsMs)) { + return; + } + const current = activeWorkByAgentId.get(agentId); + if (!current || candidate.tsMs > current.tsMs) { + activeWorkByAgentId.set(agentId, candidate); + } + }; + + for (const record of materialized.appliedRecords) { + if (record.entity === "task") { + if (!isReconcileTaskNote(record)) { + lastSubstantiveTaskRecordById.set(record.taskId, record); + } + continue; + } + if (record.entity === "agent") { + lastAgentRecordById.set(record.agentId, record); + } + } + + for (const task of materialized.tasks.values()) { + const activationEvidence = deriveTaskActivationEvidence(task, materialized.appliedRecords); + if (activationEvidence) { + activationEvidenceByTaskId.set(task.id, activationEvidence); + } + + const proofCheckpointEvidence = deriveTaskProofCheckpointEvidence( + task, + materialized.appliedRecords, + ); + if (proofCheckpointEvidence) { + proofCheckpointByTaskId.set(task.id, proofCheckpointEvidence); + } + + const assignedAgent = trimToUndefined(task.assignedAgent); + if (task.state !== "in_progress" || !assignedAgent) { + continue; + } + const taskRecord = lastSubstantiveTaskRecordById.get(task.id); + considerActiveWork(assignedAgent, { + taskId: task.id, + tsMs: taskRecord ? Date.parse(taskRecord.ts) : Date.parse(task.lastEventAt), + referenceId: taskRecord?.id ?? task.id, + source: "task", + }); + } + + for (const agent of materialized.agents.values()) { + const currentTaskId = trimToUndefined(agent.currentTaskId); + if (!currentTaskId) { + continue; + } + const task = materialized.tasks.get(currentTaskId); + const agentRecord = lastAgentRecordById.get(agent.id); + if (!task || !agentRecord) { + continue; + } + + const heartbeatAtMs = Date.parse(agentRecord.ts); + const heartbeatIsFresh = isFreshAgentHeartbeat(heartbeatAtMs, reconciliationTsMs); + + if (heartbeatIsFresh && agent.status !== "idle") { + considerActiveWork(agent.id, { + taskId: currentTaskId, + tsMs: heartbeatAtMs, + referenceId: agentRecord.id, + source: "heartbeat", + }); + + const existing = heartbeatClaimantsByTaskId.get(currentTaskId) ?? []; + if (!existing.some((entry) => entry.agentId === agent.id)) { + existing.push({ + agentId: agent.id, + status: agent.status, + heartbeatAt: agent.heartbeatAt, + }); + heartbeatClaimantsByTaskId.set(currentTaskId, existing); + } + } + } + + return { + reconciliationTs, + reconciliationTsMs, + lastSubstantiveTaskRecordById, + lastAgentRecordById, + activationEvidenceByTaskId, + proofCheckpointByTaskId, + activeWorkByAgentId, + heartbeatClaimantsByTaskId, + }; +} + +function deriveTaskOwnershipEscalationState( + task: TaskLedgerTask, + materialized: MaterializedLedgerState, + context: TaskOwnershipObservationContext, +): TaskOwnershipEscalationState | null { + const thresholds = buildTaskOwnershipEscalationThresholds(); + const triggers: TaskOwnershipEscalationTrigger[] = []; + let level: TaskOwnershipEscalationLevel = "watch"; + let takeover: TaskOwnershipEscalationState["takeover"]; + const assignedAgent = trimToUndefined(task.assignedAgent); + const taskRecord = context.lastSubstantiveTaskRecordById.get(task.id); + + const addTrigger = (trigger: TaskOwnershipEscalationTrigger) => { + triggers.push(trigger); + level = maxOwnershipEscalationLevel(level, trigger.level); + }; + + const activationEvidence = context.activationEvidenceByTaskId.get(task.id); + if ( + assignedAgent && + activationEvidence && + task.state !== "blocked" && + Number.isFinite(context.reconciliationTsMs) + ) { + const activation = activationEvidence.activation; + const missedCheckpoints: TaskActivationMissedCheckpoint[] = []; + if ( + !activationEvidence.acknowledgedAt && + Number.isFinite(Date.parse(activation.acknowledgeDeadlineAt)) && + context.reconciliationTsMs >= Date.parse(activation.acknowledgeDeadlineAt) + ) { + missedCheckpoints.push("acknowledge"); + } + if ( + !activationEvidence.lanePinnedAt && + !activationEvidence.startDisposition && + Number.isFinite(Date.parse(activation.laneDeadlineAt)) && + context.reconciliationTsMs >= Date.parse(activation.laneDeadlineAt) + ) { + missedCheckpoints.push("lane"); + } + if ( + !activationEvidence.startedAt && + !activationEvidence.startDisposition && + Number.isFinite(Date.parse(activation.startDeadlineAt)) && + context.reconciliationTsMs >= Date.parse(activation.startDeadlineAt) + ) { + missedCheckpoints.push("start"); + } + + if (missedCheckpoints.length > 0) { + const missCount = missedCheckpoints.length; + addTrigger({ + code: "activation_sla", + level: + missCount >= OWNERSHIP_ACTIVATION_MISSES_TO_REASSIGN + ? "reassignment_ready" + : missCount >= OWNERSHIP_ACTIVATION_MISSES_TO_ESCALATE + ? "escalated" + : "watch", + observedAt: + missedCheckpoints.at(-1) === "start" + ? activation.startDeadlineAt + : missedCheckpoints.at(-1) === "lane" + ? activation.laneDeadlineAt + : activation.acknowledgeDeadlineAt, + summary: `Missed ${missCount} of 3 activation checkpoints in the current assignment cycle.`, + activationMisses: { + checkpoints: missedCheckpoints, + missCount, + }, + }); + } + } + + const proofCheckpointEvidence = context.proofCheckpointByTaskId.get(task.id); + if ( + task.state === "in_progress" && + proofCheckpointEvidence && + proofCheckpointEvidence.statusOnlyUpdateCount >= PROOF_CHECKPOINT_REQUIRED_STATUS_NOTES + ) { + const statusOnlyUpdateCount = proofCheckpointEvidence.statusOnlyUpdateCount; + addTrigger({ + code: "proof_checkpoint", + level: + statusOnlyUpdateCount >= OWNERSHIP_STATUS_ONLY_UPDATES_TO_REASSIGN + ? "reassignment_ready" + : statusOnlyUpdateCount >= OWNERSHIP_STATUS_ONLY_UPDATES_TO_ESCALATE + ? "escalated" + : "watch", + observedAt: + proofCheckpointEvidence.lastStatusNoteAt ?? + proofCheckpointEvidence.prompt?.requestedAt ?? + task.lastEventAt, + summary: `Observed ${statusOnlyUpdateCount} consecutive status-only in-progress updates since the last proof checkpoint.`, + proofCheckpoint: { + statusOnlyUpdateCount, + ...(proofCheckpointEvidence.prompt?.requestedAt + ? { promptRequestedAt: proofCheckpointEvidence.prompt.requestedAt } + : {}), + }, + }); + } + + if (task.state === "in_progress" && assignedAgent) { + const agent = materialized.agents.get(assignedAgent); + const agentRecord = context.lastAgentRecordById.get(assignedAgent); + if (!agent || !agentRecord) { + addTrigger({ + code: "assigned_agent_missing", + level: "escalated", + observedAt: taskRecord?.ts ?? task.lastEventAt, + summary: `Assigned agent ${assignedAgent} has no current heartbeat for this in-progress task.`, + ownership: { assignedAgent }, + }); + } else if (agent.status === "idle") { + addTrigger({ + code: "assigned_agent_idle", + level: "escalated", + observedAt: agent.lastSeenAt, + summary: `Assigned agent ${assignedAgent} is idle while the task is still in progress.`, + ownership: { assignedAgent }, + }); + } else { + const staleDeltaMs = context.reconciliationTsMs - Date.parse(agent.lastSeenAt); + if (Number.isFinite(staleDeltaMs) && staleDeltaMs >= RECONCILE_AGENT_STALE_MS) { + addTrigger({ + code: "assigned_agent_stale", + level: "escalated", + observedAt: agent.lastSeenAt, + summary: `Assigned agent ${assignedAgent} has a stale heartbeat for this in-progress task.`, + ownership: { + assignedAgent, + staleHeartbeatAt: agent.lastSeenAt, + }, + }); + } + } + } + + const claimant = (context.heartbeatClaimantsByTaskId.get(task.id) ?? []) + .filter((entry) => entry.status !== "idle" && entry.agentId !== assignedAgent) + .toSorted((left, right) => Date.parse(right.heartbeatAt) - Date.parse(left.heartbeatAt))[0]; + if (claimant) { + addTrigger({ + code: "heartbeat_claim_mismatch", + level: "reassignment_ready", + observedAt: claimant.heartbeatAt, + summary: `Agent ${claimant.agentId} is actively heartbeating this task while ledger ownership points elsewhere.`, + ownership: { + ...(assignedAgent ? { assignedAgent } : {}), + claimedByAgent: claimant.agentId, + }, + }); + takeover = { + recommended: true, + through: "task_ledger", + path: "publish_task_assignment", + summary: assignedAgent + ? `If ${claimant.agentId} is the real owner, reassign the task in the ledger by updating assignedAgent from ${assignedAgent} to ${claimant.agentId}; otherwise clear ${claimant.agentId}'s heartbeat claim. Mission Control remains a control surface only.` + : `If ${claimant.agentId} is the real owner, assign the task to ${claimant.agentId} through the ledger; otherwise clear ${claimant.agentId}'s heartbeat claim. Mission Control remains a control surface only.`, + ...(assignedAgent ? { currentAssignedAgent: assignedAgent } : {}), + suggestedAgent: claimant.agentId, + }; + } + + if (task.state === "blocked" && assignedAgent) { + const blockedRecord = findLatestTaskRecord( + materialized.appliedRecords, + task.id, + (record) => record.kind !== "note", + ); + const blockedTsMs = blockedRecord ? Date.parse(blockedRecord.ts) : Date.parse(task.lastEventAt); + const activeWork = context.activeWorkByAgentId.get(assignedAgent); + if ( + activeWork && + activeWork.taskId !== task.id && + Number.isFinite(blockedTsMs) && + activeWork.tsMs > blockedTsMs + ) { + addTrigger({ + code: "blocked_superseded", + level: "reassignment_ready", + observedAt: new Date(activeWork.tsMs).toISOString(), + summary: `Blocked ownership is superseded by newer active work on ${activeWork.taskId}.`, + ownership: { + assignedAgent, + supersededByTaskId: activeWork.taskId, + }, + }); + takeover ??= { + recommended: true, + through: "task_ledger", + path: "publish_task_assignment", + summary: `If ${task.id} still needs work, reassign or clear its ownership through the ledger before anyone takes it over, then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.`, + currentAssignedAgent: assignedAgent, + }; + } + } + + if (triggers.length === 0) { + return null; + } + + const needsTakeover = triggers.some((trigger) => trigger.level === "reassignment_ready"); + if (needsTakeover && !takeover) { + takeover = { + recommended: true, + through: "task_ledger", + path: "publish_task_assignment", + summary: assignedAgent + ? `Reassign the task through the ledger by updating assignedAgent (or clearing stale ownership), then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.` + : `Assign the task through the ledger before anyone takes it over, then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.`, + ...(assignedAgent ? { currentAssignedAgent: assignedAgent } : {}), + }; + } + + return { + version: 1, + sourceOfTruth: "task_ledger", + level, + thresholds, + triggers, + ...(takeover ? { takeover } : {}), + }; +} + +function applyDerivedTaskOwnershipEscalationEvidence(materialized: MaterializedLedgerState) { + const context = buildTaskOwnershipObservationContext(materialized); + for (const task of materialized.tasks.values()) { + const escalation = deriveTaskOwnershipEscalationState(task, materialized, context); + task.metadata = withTaskOwnershipEscalationMetadata(task.metadata, escalation ?? undefined); + } +} + function buildReconcileIdempotencyKey(kind: string, parts: Array): string { return `${RECONCILE_IDEMPOTENCY_PREFIX}:${kind}:${parts .map((part) => trimToUndefined(part) ?? "none") @@ -1400,31 +1924,9 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task return []; } - const lastSubstantiveTaskRecordById = new Map(); - const lastAgentRecordById = new Map(); + const context = buildTaskOwnershipObservationContext(materialized); const emittedIdempotencyKeys = new Set(); - - for (const record of materialized.appliedRecords) { - if (record.entity === "task") { - if (!isReconcileTaskNote(record)) { - lastSubstantiveTaskRecordById.set(record.taskId, record); - } - continue; - } - if (record.entity === "agent") { - lastAgentRecordById.set(record.agentId, record); - } - } - const notes: TaskLedgerTaskRecord[] = []; - const activationEvidenceByTaskId = new Map(); - const proofCheckpointByTaskId = new Map(); - const activeWorkByAgentId = new Map< - string, - { taskId: string; tsMs: number; referenceId: string; source: "task" | "heartbeat" } - >(); - const reconciliationTs = materialized.appliedRecords.at(-1)?.ts ?? new Date().toISOString(); - const reconciliationTsMs = Date.parse(reconciliationTs); const queueTaskNote = (taskId: string, summary: string, idempotencyKey: string, ts?: string) => { if (!materialized.tasks.has(taskId) || emittedIdempotencyKeys.has(idempotencyKey)) { @@ -1441,61 +1943,19 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task ); }; - const considerActiveWork = ( - agentId: string, - candidate: { - taskId: string; - tsMs: number; - referenceId: string; - source: "task" | "heartbeat"; - }, - ) => { - if (!Number.isFinite(candidate.tsMs)) { - return; - } - const current = activeWorkByAgentId.get(agentId); - if (!current || candidate.tsMs > current.tsMs) { - activeWorkByAgentId.set(agentId, candidate); - } - }; - - for (const task of materialized.tasks.values()) { - const activationEvidence = deriveTaskActivationEvidence(task, materialized.appliedRecords); - if (activationEvidence) { - activationEvidenceByTaskId.set(task.id, activationEvidence); - } - const proofCheckpointEvidence = deriveTaskProofCheckpointEvidence( - task, - materialized.appliedRecords, - ); - if (proofCheckpointEvidence) { - proofCheckpointByTaskId.set(task.id, proofCheckpointEvidence); - } - } - for (const task of materialized.tasks.values()) { const assignedAgent = trimToUndefined(task.assignedAgent); - const taskRecord = lastSubstantiveTaskRecordById.get(task.id); - const taskRecordId = taskRecord?.id; - const taskTsMs = taskRecord ? Date.parse(taskRecord.ts) : Date.parse(task.lastEventAt); - const activationEvidence = activationEvidenceByTaskId.get(task.id); - const proofCheckpointEvidence = proofCheckpointByTaskId.get(task.id); + const activationEvidence = context.activationEvidenceByTaskId.get(task.id); + const proofCheckpointEvidence = context.proofCheckpointByTaskId.get(task.id); if (task.state === "in_progress" && assignedAgent) { - considerActiveWork(assignedAgent, { - taskId: task.id, - tsMs: taskTsMs, - referenceId: taskRecordId ?? task.id, - source: "task", - }); - const agent = materialized.agents.get(assignedAgent); - const agentRecord = lastAgentRecordById.get(assignedAgent); + const agentRecord = context.lastAgentRecordById.get(assignedAgent); if (!agent || !agentRecord) { queueTaskNote( task.id, - `Reconcile residue: task is still marked in progress for assigned agent ${assignedAgent}, but no agent heartbeat is recorded. This usually means stale residue from earlier work; verify whether the task should remain in progress or be reassigned.`, + `Reconcile residue: task is still marked in progress for assigned agent ${assignedAgent}, but no agent heartbeat is recorded. This is immediate ownership escalation. If ${assignedAgent} is no longer the owner, reassign through the ledger by updating assignedAgent (or clearing it), then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.`, buildReconcileIdempotencyKey("in-progress-agent-missing", [ task.id, assignedAgent, @@ -1508,7 +1968,7 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task if (agent.status === "idle") { queueTaskNote( task.id, - `Reconcile residue: task is still marked in progress for assigned agent ${assignedAgent}, but the latest heartbeat reports the agent idle. This usually means the task state or agent context was not cleaned up; verify whether the task should pause or the agent context should be updated.`, + `Reconcile residue: task is still marked in progress for assigned agent ${assignedAgent}, but the latest heartbeat reports the agent idle. This is immediate ownership escalation. If ${assignedAgent} does not resume through the ledger, reassign by updating assignedAgent (or clearing stale ownership), then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.`, buildReconcileIdempotencyKey("in-progress-agent-idle", [ task.id, assignedAgent, @@ -1518,21 +1978,18 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task continue; } - const taskReferenceTsMs = taskRecord - ? Date.parse(taskRecord.ts) - : Date.parse(task.lastEventAt); - const staleDeltaMs = taskReferenceTsMs - Date.parse(agent.lastSeenAt); + const staleDeltaMs = context.reconciliationTsMs - Date.parse(agent.lastSeenAt); if (Number.isFinite(staleDeltaMs) && staleDeltaMs >= RECONCILE_AGENT_STALE_MS) { queueTaskNote( task.id, - `Reconcile residue: task is still marked in progress for assigned agent ${assignedAgent}, but the latest heartbeat is stale (${agent.lastSeenAt}). This looks like stale residue unless work is still active; verify whether the task should remain in progress or refresh the agent task context.`, + `Reconcile residue: task is still marked in progress for assigned agent ${assignedAgent}, but the latest heartbeat is stale (${agent.lastSeenAt}). This is immediate ownership escalation. If ${assignedAgent} does not refresh task ownership through the ledger, reassign by updating assignedAgent (or clearing stale ownership), then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.`, buildReconcileIdempotencyKey("in-progress-agent-stale", [ task.id, assignedAgent, "stale", ]), - Number.isFinite(taskReferenceTsMs) - ? new Date(taskReferenceTsMs).toISOString() + Number.isFinite(context.reconciliationTsMs) + ? new Date(context.reconciliationTsMs).toISOString() : undefined, ); } @@ -1541,7 +1998,7 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task if ( assignedAgent && activationEvidence && - Number.isFinite(reconciliationTsMs) && + Number.isFinite(context.reconciliationTsMs) && task.state !== "blocked" ) { const activation = activationEvidence.activation; @@ -1550,11 +2007,11 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task if ( !activationEvidence.acknowledgedAt && Number.isFinite(acknowledgeDeadlineMs) && - reconciliationTsMs >= acknowledgeDeadlineMs + context.reconciliationTsMs >= acknowledgeDeadlineMs ) { queueTaskNote( task.id, - `Activation SLA miss: assigned agent ${assignedAgent} has not acknowledged the task in the ledger within ${Math.floor(activation.acknowledgeWithinMs / 60_000)} minutes. Expected explicit task context, blocked state, or deferred progress before ${activation.acknowledgeDeadlineAt}.`, + `Activation SLA miss: assigned agent ${assignedAgent} has not acknowledged the task in the ledger within ${Math.floor(activation.acknowledgeWithinMs / 60_000)} minutes. Expected explicit task context, blocked state, or deferred progress before ${activation.acknowledgeDeadlineAt}. This is 1 of 3 missed activation checkpoints for the current assignment cycle. Escalate ownership after 2 missed checkpoints; reassign through the ledger after 3 if the task is still silent.`, buildReconcileIdempotencyKey("activation-ack-missed", [ activationCycleId, activation.acknowledgeDeadlineAt, @@ -1567,12 +2024,12 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task if ( !activationEvidence.lanePinnedAt && Number.isFinite(laneDeadlineMs) && - reconciliationTsMs >= laneDeadlineMs && + context.reconciliationTsMs >= laneDeadlineMs && !activationEvidence.startDisposition ) { queueTaskNote( task.id, - `Activation SLA miss: assigned agent ${assignedAgent} has not pinned a lane for this task within ${Math.floor(activation.laneWithinMs / 60_000)} minutes. Expected a heartbeat with lane context before ${activation.laneDeadlineAt}.`, + `Activation SLA miss: assigned agent ${assignedAgent} has not pinned a lane for this task within ${Math.floor(activation.laneWithinMs / 60_000)} minutes. Expected a heartbeat with lane context before ${activation.laneDeadlineAt}. This is 2 of 3 missed activation checkpoints for the current assignment cycle. Escalate ownership now. If the final start checkpoint is also missed, reassign through the ledger by updating assignedAgent and requiring the gaining owner to heartbeat currentTaskId ${task.id}.`, buildReconcileIdempotencyKey("activation-lane-missed", [ activationCycleId, activation.laneDeadlineAt, @@ -1586,11 +2043,11 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task !activationEvidence.startedAt && !activationEvidence.startDisposition && Number.isFinite(startDeadlineMs) && - reconciliationTsMs >= startDeadlineMs + context.reconciliationTsMs >= startDeadlineMs ) { queueTaskNote( task.id, - `Activation SLA miss: assigned agent ${assignedAgent} did not show explicit start proof within ${Math.floor(activation.startWithinMs / 60_000)} minutes. Expected a run-start milestone, in-progress transition, or explicit blocked/deferred state before ${activation.startDeadlineAt}.`, + `Activation SLA miss: assigned agent ${assignedAgent} did not show explicit start proof within ${Math.floor(activation.startWithinMs / 60_000)} minutes. Expected a run-start milestone, in-progress transition, or explicit blocked/deferred state before ${activation.startDeadlineAt}. This is 3 of 3 missed activation checkpoints for the current assignment cycle. Reassignment is now appropriate: update assignedAgent through the ledger (or clear stale ownership), then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.`, buildReconcileIdempotencyKey("activation-start-missed", [ activationCycleId, activation.startDeadlineAt, @@ -1608,7 +2065,7 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task ) { queueTaskNote( task.id, - `Proof checkpoint required: task is still in progress, but the latest ${proofCheckpointEvidence.statusOnlyUpdateCount} agent updates are status-only with no concrete proof of work. Record a proof checkpoint with files touched, diff summary, tests run, or review signal before sending another status-only update.`, + `Proof checkpoint required: task is still in progress, but the latest ${proofCheckpointEvidence.statusOnlyUpdateCount} agent updates are status-only with no concrete proof of work. Record a proof checkpoint with files touched, diff summary, tests run, or review signal before sending another status-only update. Escalate ownership at ${OWNERSHIP_STATUS_ONLY_UPDATES_TO_ESCALATE} consecutive status-only updates and reassign through the ledger at ${OWNERSHIP_STATUS_ONLY_UPDATES_TO_REASSIGN} if no proof is recorded.`, buildReconcileIdempotencyKey("proof-checkpoint-required", [ task.id, trimToUndefined(task.assignedAgent) ?? "unassigned", @@ -1617,6 +2074,40 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task proofCheckpointEvidence.lastStatusNoteAt, ); } + + if ( + task.state === "in_progress" && + proofCheckpointEvidence && + proofCheckpointEvidence.statusOnlyUpdateCount >= OWNERSHIP_STATUS_ONLY_UPDATES_TO_ESCALATE + ) { + queueTaskNote( + task.id, + `Ownership escalation: task is still in progress with ${proofCheckpointEvidence.statusOnlyUpdateCount} consecutive status-only updates and no proof checkpoint. Escalate the current owner now. If one more status-only update lands without proof, reassign through the ledger by updating assignedAgent and requiring the gaining owner to heartbeat currentTaskId ${task.id}.`, + buildReconcileIdempotencyKey("proof-checkpoint-escalated", [ + task.id, + trimToUndefined(task.assignedAgent) ?? "unassigned", + proofCheckpointEvidence.lastCheckpointAt ?? proofCheckpointEvidence.currentCycleStartedAt, + ]), + proofCheckpointEvidence.lastStatusNoteAt, + ); + } + + if ( + task.state === "in_progress" && + proofCheckpointEvidence && + proofCheckpointEvidence.statusOnlyUpdateCount >= OWNERSHIP_STATUS_ONLY_UPDATES_TO_REASSIGN + ) { + queueTaskNote( + task.id, + `Ownership reassignment ready: task is still in progress with ${proofCheckpointEvidence.statusOnlyUpdateCount} consecutive status-only updates and no proof checkpoint. Reassign through the ledger by updating assignedAgent (or clearing stale ownership), then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.`, + buildReconcileIdempotencyKey("proof-checkpoint-reassign", [ + task.id, + trimToUndefined(task.assignedAgent) ?? "unassigned", + proofCheckpointEvidence.lastCheckpointAt ?? proofCheckpointEvidence.currentCycleStartedAt, + ]), + proofCheckpointEvidence.lastStatusNoteAt, + ); + } } for (const agent of materialized.agents.values()) { @@ -1625,17 +2116,15 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task continue; } const task = materialized.tasks.get(currentTaskId); - const agentRecord = lastAgentRecordById.get(agent.id); + const agentRecord = context.lastAgentRecordById.get(agent.id); if (!task || !agentRecord) { continue; } - considerActiveWork(agent.id, { - taskId: currentTaskId, - tsMs: Date.parse(agentRecord.ts), - referenceId: agentRecord.id, - source: "heartbeat", - }); + const heartbeatAtMs = Date.parse(agentRecord.ts); + if (!isFreshAgentHeartbeat(heartbeatAtMs, context.reconciliationTsMs)) { + continue; + } const assignedAgent = trimToUndefined(task.assignedAgent); @@ -1656,8 +2145,8 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task queueTaskNote( task.id, assignedAgent - ? `Reconcile mismatch: agent ${agent.id} heartbeat claims this task as current work, but the task is assigned to ${assignedAgent}. Fix task ownership or clear the stale heartbeat context.` - : `Reconcile mismatch: agent ${agent.id} heartbeat claims this task as current work, but the task is currently unassigned. Fix task ownership or clear the stale heartbeat context.`, + ? `Reconcile mismatch: agent ${agent.id} heartbeat claims this task as current work, but the task is assigned to ${assignedAgent}. If ${agent.id} is the real owner, reassign through the ledger by updating assignedAgent from ${assignedAgent} to ${agent.id}; otherwise clear ${agent.id}'s heartbeat claim. Mission Control remains a control surface only.` + : `Reconcile mismatch: agent ${agent.id} heartbeat claims this task as current work, but the task is currently unassigned. If ${agent.id} is the real owner, assign it through the ledger before takeover; otherwise clear ${agent.id}'s heartbeat claim. Mission Control remains a control surface only.`, buildReconcileIdempotencyKey("heartbeat-task-assignment-mismatch", [ task.id, agent.id, @@ -1673,9 +2162,13 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task if (task.state !== "blocked" || !assignedAgent) { continue; } - const blockedRecord = lastSubstantiveTaskRecordById.get(task.id); + const blockedRecord = findLatestTaskRecord( + materialized.appliedRecords, + task.id, + (record) => record.kind !== "note", + ); const blockedTsMs = blockedRecord ? Date.parse(blockedRecord.ts) : Date.parse(task.lastEventAt); - const activeWork = activeWorkByAgentId.get(assignedAgent); + const activeWork = context.activeWorkByAgentId.get(assignedAgent); if (!activeWork || activeWork.taskId === task.id || !Number.isFinite(blockedTsMs)) { continue; } @@ -1684,7 +2177,7 @@ function buildReconciliationRecords(materialized: MaterializedLedgerState): Task } queueTaskNote( task.id, - `Reconcile residue: blocked task still belongs to ${assignedAgent}, but newer active work exists on ${activeWork.taskId}. Review whether this task is still blocked, should be reassigned, or can be reopened.`, + `Reconcile residue: blocked task still belongs to ${assignedAgent}, but newer active work exists on ${activeWork.taskId}. Reassignment is appropriate if ${task.id} still needs work: publish the ownership change through the ledger before takeover, then require the gaining owner to heartbeat currentTaskId ${task.id}. Mission Control remains a control surface only.`, buildReconcileIdempotencyKey("blocked-task-superseded", [ task.id, assignedAgent, diff --git a/src/infra/task-lifecycle-publisher.test.ts b/src/infra/task-lifecycle-publisher.test.ts index 44a79d278d..f29883dfb1 100644 --- a/src/infra/task-lifecycle-publisher.test.ts +++ b/src/infra/task-lifecycle-publisher.test.ts @@ -2,7 +2,11 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; -import { readTaskLedgerEvents, readTaskLedgerSnapshot } from "./task-ledger.js"; +import { + readTaskLedgerEvents, + readTaskLedgerSnapshot, + type TaskLedgerTaskRecord, +} from "./task-ledger.js"; import { buildTaskLifecyclePublishInput, publishTaskLifecycleEvent, @@ -22,6 +26,10 @@ afterEach(async () => { ); }); +function isTaskNoteEvent(event: { entity: string; kind: string }): event is TaskLedgerTaskRecord { + return event.entity === "task" && event.kind === "note"; +} + describe("task lifecycle publisher", () => { it("maps lifecycle actions onto canonical task-ledger publish inputs", () => { expect( @@ -187,15 +195,15 @@ describe("task lifecycle publisher", () => { ]); const reconcileNotes = events.filter( - (event) => - event.kind == "note" && + (event): event is TaskLedgerTaskRecord => + isTaskNoteEvent(event) && typeof event.idempotencyKey == "string" && event.idempotencyKey.startsWith("reconcile:in-progress-agent-missing:"), ); expect(reconcileNotes).toHaveLength(1); expect(reconcileNotes.map((event) => event.summary)).toEqual([ - "Reconcile residue: task is still marked in progress for assigned agent forge, but no agent heartbeat is recorded. This usually means stale residue from earlier work; verify whether the task should remain in progress or be reassigned.", + "Reconcile residue: task is still marked in progress for assigned agent forge, but no agent heartbeat is recorded. This is immediate ownership escalation. If forge is no longer the owner, reassign through the ledger by updating assignedAgent (or clearing it), then require the gaining owner to heartbeat currentTaskId task-1. Mission Control remains a control surface only.", ]); expect(events.map((event) => event.idempotencyKey)).toEqual([