From 839d0cda5c807ac5a419ef8cdb529d52fa5f3467 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Wed, 20 May 2026 17:44:39 -0400 Subject: [PATCH 1/4] Add structured agent result callbacks --- .../completed/2026-05/traj_ceo5q9bh2od3.json | 53 ++++ .../completed/2026-05/traj_ceo5q9bh2od3.md | 33 +++ .trajectories/index.json | 9 +- crates/broker/src/listen_api.rs | 188 +++++++++++++ crates/broker/src/protocol.rs | 9 + crates/broker/src/relaycast/mod.rs | 5 +- crates/broker/src/runtime/api.rs | 53 ++++ crates/broker/src/runtime/event_loop.rs | 1 + crates/broker/src/runtime/init.rs | 6 + crates/broker/src/runtime/maintenance.rs | 4 + crates/broker/src/runtime/mod.rs | 4 +- crates/broker/src/runtime/relaycast_events.rs | 4 + crates/broker/src/snippets.rs | 248 ++++++++++++++-- crates/broker/src/types.rs | 25 ++ crates/broker/src/worker.rs | 17 +- package-lock.json | 4 +- packages/sdk/package.json | 4 +- .../__tests__/orchestration-upgrades.test.ts | 72 +++++ packages/sdk/src/__tests__/unit.test.ts | 3 + packages/sdk/src/client.ts | 2 + packages/sdk/src/protocol.ts | 8 + packages/sdk/src/relay.ts | 266 ++++++++++++++++-- packages/sdk/src/types.ts | 5 + src/cli/relaycast-mcp.startup.test.ts | 37 +++ src/cli/relaycast-mcp.ts | 97 +++++++ web/content/docs/spawning-an-agent.mdx | 24 +- web/content/docs/typescript-sdk.mdx | 34 +++ 27 files changed, 1170 insertions(+), 45 deletions(-) create mode 100644 .trajectories/completed/2026-05/traj_ceo5q9bh2od3.json create mode 100644 .trajectories/completed/2026-05/traj_ceo5q9bh2od3.md diff --git a/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.json b/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.json new file mode 100644 index 000000000..5e2feddee --- /dev/null +++ b/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.json @@ -0,0 +1,53 @@ +{ + "id": "traj_ceo5q9bh2od3", + "version": 1, + "task": { + "title": "Add structured spawned-agent results" + }, + "status": "completed", + "startedAt": "2026-05-20T21:24:17.929Z", + "completedAt": "2026-05-20T21:43:51.936Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-20T21:43:48.187Z" + } + ], + "chapters": [ + { + "id": "chap_7lq3uodpwrbr", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-20T21:43:48.187Z", + "endedAt": "2026-05-20T21:43:51.936Z", + "events": [ + { + "ts": 1779313428188, + "type": "decision", + "content": "Implemented structured agent results as a broker-mediated callback token plus MCP tool: Implemented structured agent results as a broker-mediated callback token plus MCP tool", + "raw": { + "question": "Implemented structured agent results as a broker-mediated callback token plus MCP tool", + "chosen": "Implemented structured agent results as a broker-mediated callback token plus MCP tool", + "alternatives": [], + "reasoning": "The SDK can declare a result contract at spawn time, the broker mints a per-agent callback token, and the injected Relaycast MCP server exposes submit_result without requiring the spawned agent to know broker credentials." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Added structured spawned-agent result contracts across the broker, MCP tool, and TypeScript SDK, with focused Rust and Vitest coverage plus docs.", + "approach": "Standard approach", + "confidence": 0.82 + }, + "commits": [], + "filesChanged": [], + "projectId": "/private/tmp/relay-structured-agent-results", + "tags": [], + "_trace": { + "startRef": "db037b0557e7353e169e92293f4adde06e48a6c6", + "endRef": "db037b0557e7353e169e92293f4adde06e48a6c6" + } +} diff --git a/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.md b/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.md new file mode 100644 index 000000000..6e04b27ae --- /dev/null +++ b/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.md @@ -0,0 +1,33 @@ +# Trajectory: Add structured spawned-agent results + +> **Status:** ✅ Completed +> **Confidence:** 82% +> **Started:** May 20, 2026 at 05:24 PM +> **Completed:** May 20, 2026 at 05:43 PM + +--- + +## Summary + +Added structured spawned-agent result contracts across the broker, MCP tool, and TypeScript SDK, with focused Rust and Vitest coverage plus docs. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Implemented structured agent results as a broker-mediated callback token plus MCP tool + +- **Chose:** Implemented structured agent results as a broker-mediated callback token plus MCP tool +- **Reasoning:** The SDK can declare a result contract at spawn time, the broker mints a per-agent callback token, and the injected Relaycast MCP server exposes submit_result without requiring the spawned agent to know broker credentials. + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Implemented structured agent results as a broker-mediated callback token plus MCP tool: Implemented structured agent results as a broker-mediated callback token plus MCP tool diff --git a/.trajectories/index.json b/.trajectories/index.json index 37a2e0e82..30e171d27 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-20T11:46:17.517Z", + "lastUpdated": "2026-05-20T21:43:52.084Z", "trajectories": { "traj_05xg7j388bc4": { "title": "Add browser workflow step integration", @@ -1124,6 +1124,13 @@ "startedAt": "2026-05-20T11:36:34.306Z", "completedAt": "2026-05-20T11:46:17.506Z", "path": "/Users/khaliqgant/Projects/AgentWorkforce/.msd-autofix-1bdf6c0b/.trajectories/completed/2026-05/traj_af7iew24eiip.json" + }, + "traj_ceo5q9bh2od3": { + "title": "Add structured spawned-agent results", + "status": "completed", + "startedAt": "2026-05-20T21:24:17.929Z", + "completedAt": "2026-05-20T21:43:51.936Z", + "path": "/private/tmp/relay-structured-agent-results/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.json" } } } diff --git a/crates/broker/src/listen_api.rs b/crates/broker/src/listen_api.rs index d8562fc93..1ba38e51d 100644 --- a/crates/broker/src/listen_api.rs +++ b/crates/broker/src/listen_api.rs @@ -50,6 +50,7 @@ pub enum ListenApiRequest { skip_relay_prompt: bool, restart_policy: Box>, agent_token: Option, + agent_result_schema: Option, reply: tokio::sync::oneshot::Sender>, }, SetModel { @@ -175,6 +176,16 @@ pub enum ListenApiRequest { name: String, reply: tokio::sync::oneshot::Sender>, }, + /// `POST /api/agent-result` — accepts structured result payloads from the + /// per-agent MCP tool using a callback token minted at spawn time. + SubmitAgentResult { + token: String, + name: Option, + data: Value, + final_result: bool, + metadata: Option, + reply: tokio::sync::oneshot::Sender>, + }, } /// Typed errors for the inbound-delivery-mode HTTP routes. Keeps the broker arm's @@ -200,6 +211,21 @@ impl std::fmt::Display for DeliveryRouteError { impl std::error::Error for DeliveryRouteError {} +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum AgentResultRouteError { + InvalidToken, +} + +impl std::fmt::Display for AgentResultRouteError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AgentResultRouteError::InvalidToken => write!(f, "invalid_result_token"), + } + } +} + +impl std::error::Error for AgentResultRouteError {} + /// Reply payload for [`ListenApiRequest::SetInboundDeliveryMode`]. `flushed` /// is the number of pending messages drained during the transition /// (always `0` unless we transitioned `manual_flush → auto_inject`). @@ -393,6 +419,7 @@ fn listen_api_router_with_auth( Router::new() .route("/health", routing::get(listen_api_health)) + .route("/api/agent-result", routing::post(listen_api_agent_result)) .merge(protected) .with_state(state.clone()) } @@ -616,6 +643,11 @@ async fn listen_api_spawn( .or_else(|| body.get("agentToken")) .and_then(Value::as_str) .map(String::from); + let agent_result_schema = body + .get("agent_result_schema") + .or_else(|| body.get("agentResultSchema")) + .or_else(|| body.get("resultSchema")) + .cloned(); if name.is_empty() { return ( @@ -644,6 +676,7 @@ async fn listen_api_spawn( skip_relay_prompt, restart_policy, agent_token, + agent_result_schema, reply: reply_tx, }) .await @@ -755,6 +788,84 @@ async fn listen_api_threads( } } +async fn listen_api_agent_result( + axum::extract::State(state): axum::extract::State, + headers: axum::http::HeaderMap, + axum::Json(body): axum::Json, +) -> (axum::http::StatusCode, axum::Json) { + let token = headers + .get("x-agent-result-token") + .and_then(|value| value.to_str().ok()) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(String::from) + .or_else(|| { + headers + .get("authorization") + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.strip_prefix("Bearer ")) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(String::from) + }); + let Some(token) = token else { + return ( + axum::http::StatusCode::UNAUTHORIZED, + axum::Json(json!({ "success": false, "error": "missing_result_token" })), + ); + }; + + let Some(data) = body.get("data").or_else(|| body.get("result")).cloned() else { + return ( + axum::http::StatusCode::BAD_REQUEST, + axum::Json(json!({ "success": false, "error": "Missing required field: data" })), + ); + }; + let name = body + .get("name") + .or_else(|| body.get("agent")) + .and_then(Value::as_str) + .map(String::from); + let final_result = body + .get("final") + .or_else(|| body.get("final_result")) + .and_then(Value::as_bool) + .unwrap_or(true); + let metadata = body.get("metadata").cloned(); + + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + if state + .tx + .send(ListenApiRequest::SubmitAgentResult { + token, + name, + data, + final_result, + metadata, + reply: reply_tx, + }) + .await + .is_err() + { + return ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + axum::Json(json!({ "success": false, "error": "internal channel closed" })), + ); + } + + match reply_rx.await { + Ok(Ok(value)) => (axum::http::StatusCode::OK, axum::Json(value)), + Ok(Err(AgentResultRouteError::InvalidToken)) => ( + axum::http::StatusCode::UNAUTHORIZED, + axum::Json(json!({ "success": false, "error": "invalid_result_token" })), + ), + Err(_) => ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + axum::Json(json!({ "success": false, "error": "internal reply dropped" })), + ), + } +} + async fn listen_api_release( axum::extract::State(state): axum::extract::State, axum::extract::Path(name): axum::extract::Path, @@ -2277,6 +2388,7 @@ mod auth_tests { skip_relay_prompt: _, restart_policy: _, agent_token: _, + agent_result_schema, reply, }) => { assert_eq!(name, "worker-a"); @@ -2295,6 +2407,10 @@ mod auth_tests { assert_eq!(shadow_mode.as_deref(), Some("subagent")); assert_eq!(continue_from.as_deref(), Some("worker-prev")); assert_eq!(idle_threshold_secs, Some(30)); + assert_eq!( + agent_result_schema, + Some(json!({"type": "object", "properties": {"ok": {"type": "boolean"}}})) + ); let _ = reply.send(Ok( json!({ "success": true, "name": "worker-a", "pid": 42 }), )); @@ -2325,6 +2441,7 @@ mod auth_tests { "shadowMode": "subagent", "continueFrom": "worker-prev", "idleThresholdSecs": 30, + "resultSchema": {"type": "object", "properties": {"ok": {"type": "boolean"}}}, }) .to_string(), )) @@ -2340,6 +2457,77 @@ mod auth_tests { spawn_replier.await.expect("spawn replier should complete"); } + #[tokio::test] + async fn agent_result_route_accepts_callback_token_without_broker_auth() { + let (router, mut rx) = test_router(Some("secret")); + + let replier = tokio::spawn(async move { + match rx.recv().await { + Some(ListenApiRequest::SubmitAgentResult { + token, + name, + data, + final_result, + metadata, + reply, + }) => { + assert_eq!(token, "arr_test"); + assert_eq!(name.as_deref(), Some("worker-a")); + assert_eq!(data, json!({"ok": true})); + assert!(final_result); + assert_eq!(metadata, Some(json!({"source": "test"}))); + let _ = reply.send(Ok(json!({"success": true, "result_id": "ar_1"}))); + } + other => panic!("unexpected request: {:?}", other.map(|_| "other")), + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/agent-result") + .method("POST") + .header("authorization", "Bearer arr_test") + .header("content-type", "application/json") + .body(Body::from( + json!({ + "agent": "worker-a", + "data": {"ok": true}, + "metadata": {"source": "test"} + }) + .to_string(), + )) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::OK); + let body = response_json(response).await; + assert_eq!(body["result_id"], json!("ar_1")); + + replier.await.expect("result replier should complete"); + } + + #[tokio::test] + async fn agent_result_route_rejects_missing_callback_token() { + let (router, _rx) = test_router(Some("secret")); + + let response = router + .oneshot( + Request::builder() + .uri("/api/agent-result") + .method("POST") + .header("content-type", "application/json") + .body(Body::from(json!({"data": {"ok": true}}).to_string())) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + } + #[tokio::test] async fn set_model_route_forwards_request() { let (router, mut rx) = test_router(Some("secret")); diff --git a/crates/broker/src/protocol.rs b/crates/broker/src/protocol.rs index 7ad80fcf8..e59a5127c 100644 --- a/crates/broker/src/protocol.rs +++ b/crates/broker/src/protocol.rs @@ -268,6 +268,15 @@ pub enum BrokerEvent { #[serde(default)] since: Option, }, + AgentResult { + name: String, + result_id: String, + data: Value, + #[serde(rename = "final")] + final_result: bool, + #[serde(default)] + metadata: Option, + }, AgentBlockedOnSend { name: String, blocked_secs: u64, diff --git a/crates/broker/src/relaycast/mod.rs b/crates/broker/src/relaycast/mod.rs index 42c460224..b198f0ae3 100644 --- a/crates/broker/src/relaycast/mod.rs +++ b/crates/broker/src/relaycast/mod.rs @@ -4,7 +4,10 @@ pub(crate) mod dm_participants; pub(crate) mod workspace; pub(crate) mod ws; -pub(crate) use crate::snippets::{configure_relaycast_mcp_with_token, ensure_relaycast_mcp_config}; +pub(crate) use crate::snippets::{ + configure_relaycast_mcp_with_result, configure_relaycast_mcp_with_token, + ensure_relaycast_mcp_config, +}; pub(crate) use auth::AuthClient; pub(crate) use bridge::{map_ws_broker_command, map_ws_event}; pub(crate) use dm_participants::{resolve_dm_participants_cached, DmParticipantsCache}; diff --git a/crates/broker/src/runtime/api.rs b/crates/broker/src/runtime/api.rs index 945885971..1f812d341 100644 --- a/crates/broker/src/runtime/api.rs +++ b/crates/broker/src/runtime/api.rs @@ -18,6 +18,7 @@ impl BrokerRuntime { let pending_deliveries = &mut self.pending_deliveries; let pending_requests = &mut self.pending_requests; let delivery_states = &mut self.delivery_states; + let agent_result_tokens = &mut self.agent_result_tokens; let dedup = &mut self.dedup; let recent_thread_messages = &mut self.recent_thread_messages; let delivery_retry_interval = self.delivery_retry_interval; @@ -45,6 +46,7 @@ impl BrokerRuntime { skip_relay_prompt, restart_policy, agent_token, + agent_result_schema, reply, } => { let effective_channels = if channels.is_empty() { @@ -190,6 +192,17 @@ impl BrokerRuntime { .first() .map(|workspace| workspace.workspace_id.clone()) }); + let agent_result = agent_result_schema.map(|schema| AgentResultMcpConfig { + callback_url: workers + .env_value("AGENT_RELAY_RESULT_URL") + .unwrap_or("http://127.0.0.1:3889/api/agent-result") + .to_string(), + token: format!("arr_{}", Uuid::new_v4().simple()), + schema: Some(schema), + }); + if let Some(config) = &agent_result { + agent_result_tokens.insert(config.token.clone(), name.clone()); + } match workers .spawn( spec, @@ -198,6 +211,7 @@ impl BrokerRuntime { worker_relay_key.clone(), skip_relay_prompt, spawn_workspace_id.clone(), + agent_result.clone(), ) .await { @@ -278,11 +292,49 @@ impl BrokerRuntime { }))); } Err(e) => { + if let Some(config) = &agent_result { + agent_result_tokens.remove(&config.token); + } eprintln!("[agent-relay] HTTP API: failed to spawn '{}': {}", name, e); let _ = reply.send(Err(e.to_string())); } } } + ListenApiRequest::SubmitAgentResult { + token, + name, + data, + final_result, + metadata, + reply, + } => { + let Some(agent_name) = agent_result_tokens.get(&token).cloned() else { + let _ = reply.send(Err(listen_api::AgentResultRouteError::InvalidToken)); + return; + }; + if let Some(requested_name) = name.as_deref() { + if requested_name != agent_name { + let _ = reply.send(Err(listen_api::AgentResultRouteError::InvalidToken)); + return; + } + } + + let result_id = format!("ar_{}", Uuid::new_v4().simple()); + let payload = json!({ + "kind": "agent_result", + "name": agent_name, + "result_id": result_id, + "data": data, + "final": final_result, + "metadata": metadata, + }); + let _ = send_event(sdk_out_tx, payload).await; + let _ = reply.send(Ok(json!({ + "success": true, + "name": agent_name, + "result_id": result_id, + }))); + } ListenApiRequest::SetModel { name, model, @@ -368,6 +420,7 @@ impl BrokerRuntime { } fail_pending_requests_for_worker(pending_requests, &name, "agent_released"); delivery_states.remove(&name); + agent_result_tokens.retain(|_, agent| agent != &name); state.agents.remove(&name); if paths.persist { let _ = state.save(&paths.state); diff --git a/crates/broker/src/runtime/event_loop.rs b/crates/broker/src/runtime/event_loop.rs index 1aa715523..568941bc1 100644 --- a/crates/broker/src/runtime/event_loop.rs +++ b/crates/broker/src/runtime/event_loop.rs @@ -32,6 +32,7 @@ pub(crate) struct BrokerRuntime { pub(super) terminal_failed_deliveries: HashSet, pub(super) pending_requests: HashMap, pub(super) delivery_states: HashMap, + pub(super) agent_result_tokens: HashMap, pub(super) dm_participants_cache: DmParticipantsCache, pub(super) recent_thread_messages: VecDeque, pub(super) shutdown: bool, diff --git a/crates/broker/src/runtime/init.rs b/crates/broker/src/runtime/init.rs index f8b679b67..8115bd5a9 100644 --- a/crates/broker/src/runtime/init.rs +++ b/crates/broker/src/runtime/init.rs @@ -338,6 +338,10 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re let mut worker_env = vec![ ("RELAY_BASE_URL".to_string(), http_base.clone()), ("RELAY_API_KEY".to_string(), relay_workspace_key.clone()), + ( + "AGENT_RELAY_RESULT_URL".to_string(), + format!("http://{}:{}/api/agent-result", cmd.api_bind, actual_port), + ), ( "RELAY_WORKSPACES_JSON".to_string(), relay_workspaces_json.clone(), @@ -416,6 +420,7 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re // are created lazily on first lookup and removed wherever workers // exit (`Release` arm or `reap_exited` sweep). let delivery_states: HashMap = HashMap::new(); + let agent_result_tokens: HashMap = HashMap::new(); let dm_participants_cache = DmParticipantsCache::new(); let recent_thread_messages: VecDeque = VecDeque::new(); if !pending_deliveries.is_empty() { @@ -480,6 +485,7 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re terminal_failed_deliveries, pending_requests, delivery_states, + agent_result_tokens, dm_participants_cache, recent_thread_messages, shutdown, diff --git a/crates/broker/src/runtime/maintenance.rs b/crates/broker/src/runtime/maintenance.rs index 64fef4aa6..3999ea8b4 100644 --- a/crates/broker/src/runtime/maintenance.rs +++ b/crates/broker/src/runtime/maintenance.rs @@ -13,6 +13,7 @@ impl BrokerRuntime { let pending_deliveries = &mut self.pending_deliveries; let pending_requests = &mut self.pending_requests; let delivery_states = &mut self.delivery_states; + let agent_result_tokens = &mut self.agent_result_tokens; let delivery_retry_interval = self.delivery_retry_interval; let shutdown = &self.shutdown; @@ -169,6 +170,7 @@ impl BrokerRuntime { "worker_permanently_dead", ); delivery_states.remove(name); + agent_result_tokens.retain(|_, agent| agent != name); let _ = send_event( sdk_out_tx, json!({"kind":"agent_permanently_dead","name":name,"reason":reason}), @@ -215,6 +217,7 @@ impl BrokerRuntime { } fail_pending_requests_for_worker(pending_requests, name, "worker_exited"); delivery_states.remove(name); + agent_result_tokens.retain(|_, agent| agent != name); let _ = send_event( sdk_out_tx, json!({ @@ -302,6 +305,7 @@ impl BrokerRuntime { worker_relay_key, rst.skip_relay_prompt, None, + None, ) .await { diff --git a/crates/broker/src/runtime/mod.rs b/crates/broker/src/runtime/mod.rs index a39655365..343e40c3c 100644 --- a/crates/broker/src/runtime/mod.rs +++ b/crates/broker/src/runtime/mod.rs @@ -40,8 +40,8 @@ use crate::{ replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}, telemetry::{ActionSource, TelemetryClient, TelemetryEvent}, types::{ - BrokerCommandEvent, InboundDeliveryDispatch, InboundDeliveryMode, InboundDeliveryState, - InboundKind, PendingRelayMessage, + AgentResultMcpConfig, BrokerCommandEvent, InboundDeliveryDispatch, InboundDeliveryMode, + InboundDeliveryState, InboundKind, PendingRelayMessage, }, }; diff --git a/crates/broker/src/runtime/relaycast_events.rs b/crates/broker/src/runtime/relaycast_events.rs index d11f75afa..9775fe0f1 100644 --- a/crates/broker/src/runtime/relaycast_events.rs +++ b/crates/broker/src/runtime/relaycast_events.rs @@ -14,6 +14,7 @@ impl BrokerRuntime { let pending_deliveries = &mut self.pending_deliveries; let pending_requests = &mut self.pending_requests; let delivery_states = &mut self.delivery_states; + let agent_result_tokens = &mut self.agent_result_tokens; let dm_participants_cache = &mut self.dm_participants_cache; let recent_thread_messages = &mut self.recent_thread_messages; let delivery_retry_interval = self.delivery_retry_interval; @@ -107,6 +108,7 @@ impl BrokerRuntime { "relaycast_release", ); delivery_states.remove(&name); + agent_result_tokens.retain(|_, agent| agent != &name); telemetry.track(TelemetryEvent::AgentRelease { cli: String::new(), release_reason: "relaycast_release".to_string(), @@ -285,6 +287,7 @@ impl BrokerRuntime { worker_relay_key.clone(), false, Some(workspace_id.clone()), + None, ) .await { @@ -478,6 +481,7 @@ impl BrokerRuntime { worker_relay_key.clone(), false, Some(workspace_id.clone()), + None, ) .await { diff --git a/crates/broker/src/snippets.rs b/crates/broker/src/snippets.rs index 9baaf92a3..26117a16f 100644 --- a/crates/broker/src/snippets.rs +++ b/crates/broker/src/snippets.rs @@ -9,6 +9,8 @@ use anyhow::{Context, Result}; use serde_json::{Map, Value}; use tokio::process::Command; +use crate::types::AgentResultMcpConfig; + const RELAYCAST_MCP_PACKAGE: &str = "@relaycast/mcp"; const MCP_FILE: &str = ".mcp.json"; @@ -36,6 +38,7 @@ pub fn ensure_relaycast_mcp_config( None, None, None, + None, ); if !path.exists() { @@ -120,6 +123,26 @@ pub fn relaycast_mcp_config_json_with_token( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, +) -> String { + relaycast_mcp_config_json_with_result( + relay_api_key, + relay_base_url, + relay_agent_name, + relay_agent_token, + workspaces_json, + default_workspace, + None, + ) +} + +pub fn relaycast_mcp_config_json_with_result( + relay_api_key: Option<&str>, + relay_base_url: Option<&str>, + relay_agent_name: Option<&str>, + relay_agent_token: Option<&str>, + workspaces_json: Option<&str>, + default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> String { let server = relaycast_server_config( relay_api_key, @@ -128,6 +151,7 @@ pub fn relaycast_mcp_config_json_with_token( relay_agent_token, workspaces_json, default_workspace, + agent_result, ); let mut servers = Map::new(); servers.insert(RELAYCAST_SERVER.to_string(), server); @@ -153,6 +177,7 @@ fn merge_relaycast_with_project_mcp( cwd: &Path, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> String { merge_relaycast_with_project_mcp_inner( relay_api_key, @@ -163,6 +188,7 @@ fn merge_relaycast_with_project_mcp( dirs::home_dir(), workspaces_json, default_workspace, + agent_result, ) } @@ -178,6 +204,7 @@ fn merge_relaycast_with_project_mcp_inner( home: Option, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> String { let relaycast_server = relaycast_server_config( relay_api_key, @@ -186,6 +213,7 @@ fn merge_relaycast_with_project_mcp_inner( relay_agent_token, workspaces_json, default_workspace, + agent_result, ); let mut servers = Map::new(); @@ -236,6 +264,7 @@ fn relaycast_server_config( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> Value { let mut server = Map::new(); // Allow overriding the MCP command for local development/testing. @@ -308,6 +337,7 @@ fn relaycast_server_config( Value::String(dw.to_string()), ); } + apply_agent_result_env(&mut env, agent_result); if !env.is_empty() { server.insert("env".into(), Value::Object(env)); } @@ -315,6 +345,21 @@ fn relaycast_server_config( Value::Object(server) } +fn apply_agent_result_env( + env: &mut Map, + agent_result: Option<&AgentResultMcpConfig>, +) { + if let Some(config) = agent_result { + for (key, value) in config.env_pairs() { + env.insert(key.into(), Value::String(value)); + } + } +} + +fn escape_toml_basic_string(value: &str) -> String { + value.replace('\\', "\\\\").replace('"', "\\\"") +} + /// Inject RELAY_API_KEY into the relaycast server's env block within a merged /// MCP config JSON string. The shared `relaycast_server_config()` omits it /// (codex strips API keys from .mcp.json env), but for Claude's inline @@ -359,6 +404,28 @@ pub fn ensure_opencode_config( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, +) -> io::Result { + ensure_opencode_config_with_result( + root, + relay_api_key, + relay_base_url, + relay_agent_name, + relay_agent_token, + workspaces_json, + default_workspace, + None, + ) +} + +pub fn ensure_opencode_config_with_result( + root: &Path, + relay_api_key: Option<&str>, + relay_base_url: Option<&str>, + relay_agent_name: Option<&str>, + relay_agent_token: Option<&str>, + workspaces_json: Option<&str>, + default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> io::Result { let path = root.join(OPENCODE_CONFIG); @@ -413,6 +480,7 @@ pub fn ensure_opencode_config( Value::String(dw.to_string()), ); } + apply_agent_result_env(&mut env, agent_result); if !env.is_empty() { mcp_server.insert("environment".into(), Value::Object(env)); } @@ -500,18 +568,20 @@ pub fn ensure_cursor_mcp_config( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> io::Result { let cursor_dir = root.join(".cursor"); fs::create_dir_all(&cursor_dir)?; let path = cursor_dir.join("mcp.json"); - let mcp_json = relaycast_mcp_config_json_with_token( + let mcp_json = relaycast_mcp_config_json_with_result( relay_api_key, relay_base_url, relay_agent_name, relay_agent_token, workspaces_json, default_workspace, + agent_result, ); let mut new_value: Value = serde_json::from_str(&mcp_json).map_err(|e| { io::Error::new( @@ -614,6 +684,34 @@ pub async fn configure_relaycast_mcp_with_token( agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, +) -> Result> { + configure_relaycast_mcp_with_result( + cli, + agent_name, + api_key, + base_url, + existing_args, + cwd, + agent_token, + workspaces_json, + default_workspace, + None, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +pub async fn configure_relaycast_mcp_with_result( + cli: &str, + agent_name: &str, + api_key: Option<&str>, + base_url: Option<&str>, + existing_args: &[String], + cwd: &Path, + agent_token: Option<&str>, + workspaces_json: Option<&str>, + default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> Result> { let cli_lower = detect_cli_name(cli).to_lowercase(); let is_claude = cli_lower == "claude" || cli_lower.starts_with("claude:"); @@ -638,13 +736,14 @@ pub async fn configure_relaycast_mcp_with_token( // MCP servers (filesystem, database, etc.). // We do NOT pass --strict-mcp-config — that would block .mcp.json loading // and prevent child agents from inheriting MCP servers. - let mcp_json = relaycast_mcp_config_json_with_token( + let mcp_json = relaycast_mcp_config_json_with_result( api_key, base_url, Some(agent_name), agent_token, workspaces_json, default_workspace, + agent_result, ); // Claude Code does not reliably pass parent process env vars to MCP server // subprocesses, so RELAY_API_KEY must be injected directly into the config. @@ -685,18 +784,27 @@ pub async fn configure_relaycast_mcp_with_token( if let Some(key) = api_key { args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_API_KEY=\"{key}\""), + format!( + "mcp_servers.relaycast.env.RELAY_API_KEY=\"{}\"", + escape_toml_basic_string(key) + ), ]); } if let Some(url) = base_url { args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_BASE_URL=\"{url}\""), + format!( + "mcp_servers.relaycast.env.RELAY_BASE_URL=\"{}\"", + escape_toml_basic_string(url) + ), ]); } args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_AGENT_NAME=\"{agent_name}\""), + format!( + "mcp_servers.relaycast.env.RELAY_AGENT_NAME=\"{}\"", + escape_toml_basic_string(agent_name) + ), ]); args.extend([ "--config".to_string(), @@ -709,7 +817,10 @@ pub async fn configure_relaycast_mcp_with_token( if let Some(token) = agent_token.map(str::trim).filter(|s| !s.is_empty()) { args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_AGENT_TOKEN=\"{token}\""), + format!( + "mcp_servers.relaycast.env.RELAY_AGENT_TOKEN=\"{}\"", + escape_toml_basic_string(token) + ), ]); // Skip bootstrap when the broker has already pre-registered the agent. args.extend([ @@ -720,19 +831,34 @@ pub async fn configure_relaycast_mcp_with_token( // Forward multi-workspace context to codex child agents. // JSON values must have inner quotes escaped for TOML basic-string parsing. if let Some(wj) = workspaces_json.map(str::trim).filter(|s| !s.is_empty()) { - let escaped = wj.replace('\\', "\\\\").replace('"', "\\\""); args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_WORKSPACES_JSON=\"{escaped}\""), + format!( + "mcp_servers.relaycast.env.RELAY_WORKSPACES_JSON=\"{}\"", + escape_toml_basic_string(wj) + ), ]); } if let Some(dw) = default_workspace.map(str::trim).filter(|s| !s.is_empty()) { - let escaped = dw.replace('\\', "\\\\").replace('"', "\\\""); args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_DEFAULT_WORKSPACE=\"{escaped}\""), + format!( + "mcp_servers.relaycast.env.RELAY_DEFAULT_WORKSPACE=\"{}\"", + escape_toml_basic_string(dw) + ), ]); } + if let Some(config) = agent_result { + for (key, value) in config.env_pairs() { + args.extend([ + "--config".to_string(), + format!( + "mcp_servers.relaycast.env.{key}=\"{}\"", + escape_toml_basic_string(&value) + ), + ]); + } + } } else if is_gemini || is_droid { if is_gemini { ensure_gemini_folder_trusted(cwd); @@ -746,10 +872,11 @@ pub async fn configure_relaycast_mcp_with_token( is_gemini, workspaces_json, default_workspace, + agent_result, ) .await?; } else if is_opencode && !existing_args.iter().any(|a| a == "--agent") { - ensure_opencode_config( + ensure_opencode_config_with_result( cwd, api_key, base_url, @@ -757,6 +884,7 @@ pub async fn configure_relaycast_mcp_with_token( agent_token, workspaces_json, default_workspace, + agent_result, ) .with_context(|| { "failed to write opencode.json for relaycast MCP. \ @@ -773,6 +901,7 @@ pub async fn configure_relaycast_mcp_with_token( agent_token, workspaces_json, default_workspace, + agent_result, ) .with_context(|| { "failed to write .cursor/mcp.json for relaycast MCP. \ @@ -852,6 +981,7 @@ fn gemini_droid_manual_mcp_add_cmd(cli: &str, is_gemini: bool) -> String { ) } +#[cfg(test)] fn gemini_droid_mcp_add_args( api_key: Option<&str>, base_url: Option<&str>, @@ -860,6 +990,28 @@ fn gemini_droid_mcp_add_args( is_gemini: bool, workspaces_json: Option<&str>, default_workspace: Option<&str>, +) -> Vec { + gemini_droid_mcp_add_args_with_result( + api_key, + base_url, + agent_name, + agent_token, + is_gemini, + workspaces_json, + default_workspace, + None, + ) +} + +fn gemini_droid_mcp_add_args_with_result( + api_key: Option<&str>, + base_url: Option<&str>, + agent_name: Option<&str>, + agent_token: Option<&str>, + is_gemini: bool, + workspaces_json: Option<&str>, + default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> Vec { let env_flag = gemini_droid_mcp_env_flag(is_gemini); let mut args = vec!["mcp".to_string(), "add".to_string()]; @@ -895,6 +1047,12 @@ fn gemini_droid_mcp_add_args( args.push(env_flag.to_string()); args.push(format!("RELAY_DEFAULT_WORKSPACE={dw}")); } + if let Some(config) = agent_result { + for (key, value) in config.env_pairs() { + args.push(env_flag.to_string()); + args.push(format!("{key}={value}")); + } + } args.push("relaycast".to_string()); // Droid's CLI parser continues parsing options after positional args. // Insert `--` so `-y` is treated as an argument to `npx`. @@ -917,6 +1075,7 @@ async fn configure_gemini_droid_mcp( is_gemini: bool, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> Result<()> { // Extract the executable from cli which may contain inline args // (e.g. "gemini --model foo"). Command::new needs just the binary. @@ -935,7 +1094,7 @@ async fn configure_gemini_droid_mcp( .and_then(|mut c| c.wait()); let mut mcp_cmd = Command::new(&exe); - mcp_cmd.args(gemini_droid_mcp_add_args( + mcp_cmd.args(gemini_droid_mcp_add_args_with_result( api_key, base_url, agent_name, @@ -943,6 +1102,7 @@ async fn configure_gemini_droid_mcp( is_gemini, workspaces_json, default_workspace, + agent_result, )); mcp_cmd .stdin(Stdio::null()) @@ -996,7 +1156,7 @@ fn write_pretty_json(path: &Path, value: &Value) -> io::Result<()> { mod tests { use std::fs; - use serde_json::{Map, Value}; + use serde_json::{json, Map, Value}; use tempfile::tempdir; use super::ensure_relaycast_mcp_config; @@ -2072,6 +2232,7 @@ mod tests { temp.path(), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2109,6 +2270,7 @@ mod tests { temp.path(), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2131,6 +2293,7 @@ mod tests { temp.path(), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2180,6 +2343,7 @@ mod tests { &project, None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2356,6 +2520,7 @@ mod tests { Some(fake_home), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2403,6 +2568,7 @@ mod tests { Some(fake_home), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2447,6 +2613,7 @@ mod tests { Some(fake_home), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2494,6 +2661,7 @@ mod tests { None, None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2553,6 +2721,7 @@ mod tests { None, None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2595,8 +2764,15 @@ mod tests { #[test] fn relaycast_server_config_includes_workspace_vars() { - let server = - super::relaycast_server_config(None, None, None, None, Some("wj-json"), Some("ws-id")); + let server = super::relaycast_server_config( + None, + None, + None, + None, + Some("wj-json"), + Some("ws-id"), + None, + ); let env = &server["env"]; assert_eq!( env["RELAY_WORKSPACES_JSON"].as_str(), @@ -2610,9 +2786,48 @@ mod tests { ); } + #[test] + fn relaycast_server_config_includes_agent_result_vars() { + let schema = json!({ + "type": "object", + "properties": { + "ok": { "type": "boolean" } + }, + "required": ["ok"] + }); + let schema_json = schema.to_string(); + let config = crate::types::AgentResultMcpConfig { + callback_url: "http://127.0.0.1:3889/api/agent-result".to_string(), + token: "arr_test".to_string(), + schema: Some(schema.clone()), + }; + + let server = super::relaycast_server_config( + None, + None, + Some("agent-1"), + None, + None, + None, + Some(&config), + ); + let env = &server["env"]; + + assert_eq!( + env["AGENT_RELAY_RESULT_URL"].as_str(), + Some("http://127.0.0.1:3889/api/agent-result") + ); + assert_eq!(env["AGENT_RELAY_RESULT_TOKEN"].as_str(), Some("arr_test")); + assert_eq!( + env["AGENT_RELAY_RESULT_SCHEMA"].as_str(), + Some(schema_json.as_str()) + ); + } + #[test] fn relaycast_server_config_empty_workspace_vars_omitted() { - let server = super::relaycast_server_config(None, None, None, None, Some(""), Some(" ")); + let server = + super::relaycast_server_config(None, None, None, None, Some(""), Some(" "), None); // env may be absent (Value::Null) or present but without workspace keys let env = &server["env"]; assert!( @@ -2637,6 +2852,7 @@ mod tests { None, Some("wj"), Some("dw"), + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); assert_eq!( diff --git a/crates/broker/src/types.rs b/crates/broker/src/types.rs index 4ee20451e..bfe1099fa 100644 --- a/crates/broker/src/types.rs +++ b/crates/broker/src/types.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use serde_json::Value; use crate::protocol::MessageInjectionMode; @@ -214,6 +215,30 @@ pub struct InboundDeliveryState { pub pending: std::collections::VecDeque, } +/// Per-spawn structured result callback configuration. The broker generates a +/// token for agents spawned with a result contract and injects this into that +/// agent's MCP server environment. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AgentResultMcpConfig { + pub callback_url: String, + pub token: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub schema: Option, +} + +impl AgentResultMcpConfig { + pub fn env_pairs(&self) -> Vec<(&'static str, String)> { + let mut pairs = vec![ + ("AGENT_RELAY_RESULT_URL", self.callback_url.clone()), + ("AGENT_RELAY_RESULT_TOKEN", self.token.clone()), + ]; + if let Some(schema) = &self.schema { + pairs.push(("AGENT_RELAY_RESULT_SCHEMA", schema.to_string())); + } + pairs + } +} + /// Per-worker cap on the pending queue. Prevents unbounded growth when a /// `manual_flush` delivery mode is left open for hours; oldest message is evicted /// with a `tracing::warn!` (see [`InboundDeliveryState::push_pending`]). diff --git a/crates/broker/src/worker.rs b/crates/broker/src/worker.rs index 059011061..c2cc77fec 100644 --- a/crates/broker/src/worker.rs +++ b/crates/broker/src/worker.rs @@ -8,8 +8,9 @@ use std::{ use crate::{ metrics::MetricsCollector, protocol::{AgentRuntime, AgentSpec, ProtocolEnvelope, RelayDelivery, PROTOCOL_VERSION}, - relaycast::configure_relaycast_mcp_with_token, + relaycast::configure_relaycast_mcp_with_result, supervisor::Supervisor, + types::AgentResultMcpConfig, }; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; @@ -161,11 +162,12 @@ impl WorkerRegistry { cwd: &Path, worker_relay_api_key: Option<&str>, skip_relay_prompt: bool, + agent_result: Option<&AgentResultMcpConfig>, ) -> Result> { - if skip_relay_prompt { + if skip_relay_prompt && agent_result.is_none() { return Ok(Vec::new()); } - configure_relaycast_mcp_with_token( + configure_relaycast_mcp_with_result( cli_name, agent_name, self.env_value("RELAY_API_KEY"), @@ -175,6 +177,7 @@ impl WorkerRegistry { worker_relay_api_key, self.env_value("RELAY_WORKSPACES_JSON"), self.env_value("RELAY_DEFAULT_WORKSPACE"), + agent_result, ) .await } @@ -195,6 +198,7 @@ impl WorkerRegistry { worker_relay_api_key: Option, skip_relay_prompt: bool, workspace_id: Option, + agent_result: Option, ) -> Result { let mut spec = spec; if self.workers.contains_key(&spec.name) { @@ -281,6 +285,7 @@ impl WorkerRegistry { Path::new(spec.cwd.as_deref().unwrap_or(".")), worker_relay_api_key.as_deref(), skip_relay_prompt, + agent_result.as_ref(), ) .await?; @@ -345,6 +350,7 @@ impl WorkerRegistry { Path::new(spec.cwd.as_deref().unwrap_or(".")), worker_relay_api_key.as_deref(), skip_relay_prompt, + agent_result.as_ref(), ) .await?; @@ -383,6 +389,11 @@ impl WorkerRegistry { for (key, value) in &self.worker_env { command.env(key, value); } + if let Some(config) = &agent_result { + for (key, value) in config.env_pairs() { + command.env(key, value); + } + } if !skip_relay_prompt && !matches!(spec.runtime, AgentRuntime::Headless) { if let Some(relay_key) = worker_relay_api_key { command.env("RELAY_AGENT_TOKEN", relay_key); diff --git a/package-lock.json b/package-lock.json index 8347839e0..e9ee51ae0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17309,7 +17309,9 @@ "listr2": "^10.2.1", "tar": "^7.5.10", "ws": "^8.18.3", - "yaml": "^2.7.0" + "yaml": "^2.7.0", + "zod": "^3.23.8", + "zod-to-json-schema": "^3.23.1" }, "devDependencies": { "@types/node": "^22.13.10", diff --git a/packages/sdk/package.json b/packages/sdk/package.json index 682d56cba..274594c68 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -181,7 +181,9 @@ "listr2": "^10.2.1", "tar": "^7.5.10", "ws": "^8.18.3", - "yaml": "^2.7.0" + "yaml": "^2.7.0", + "zod": "^3.23.8", + "zod-to-json-schema": "^3.23.1" }, "optionalDependencies": { "@agent-relay/broker-darwin-arm64": "6.3.2", diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index 90ce9a17d..6c1808ed3 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -130,6 +130,25 @@ describe('AgentRelayClient orchestration payloads', () => { }); }); + it('spawnPty forwards structured result JSON schema', async () => { + const client = createProtocolClient(); + const request = vi + .spyOn((client as any).transport, 'request') + .mockResolvedValue({ name: 'agent-result', runtime: 'pty' }); + + await client.spawnPty({ + name: 'agent-result', + cli: 'claude', + agentResultSchema: { type: 'object', properties: { ok: { type: 'boolean' } } }, + }); + + const body = JSON.parse(request.mock.calls[0]?.[1]?.body ?? '{}'); + expect(body.agentResultSchema).toEqual({ + type: 'object', + properties: { ok: { type: 'boolean' } }, + }); + }); + it('spawnPty forwards model and args in the broker payload', async () => { const client = createProtocolClient(); const request = vi @@ -547,6 +566,59 @@ describe('AgentRelay orchestration handles', () => { } }); + it('agent.waitForResult resolves typed structured results and invokes hooks', async () => { + const { client, emit, mock } = createMockFacadeClient(); + + const relay = createWiredRelay(client); + const globalResults: unknown[] = []; + const callbackResults: unknown[] = []; + relay.onAgentResult = (result) => globalResults.push(result); + + try { + const agent = await relay.spawnPty<{ ok: boolean }>({ + name: 'result-agent', + cli: 'claude', + channels: ['general'], + result: { + jsonSchema: { type: 'object', properties: { ok: { type: 'boolean' } }, required: ['ok'] }, + schema(value) { + if (!value || typeof value !== 'object' || typeof (value as { ok?: unknown }).ok !== 'boolean') { + throw new Error('invalid result'); + } + return value as { ok: boolean }; + }, + onResult: (data) => callbackResults.push(data), + }, + }); + + expect(mock.spawnPty).toHaveBeenCalledWith( + expect.objectContaining({ + agentResultSchema: { type: 'object', properties: { ok: { type: 'boolean' } }, required: ['ok'] }, + }) + ); + + const waitPromise = agent.waitForResult(1_000); + emit({ + kind: 'agent_result', + name: 'result-agent', + result_id: 'ar_1', + data: { ok: true }, + final: true, + }); + + await expect(waitPromise).resolves.toMatchObject({ + name: 'result-agent', + resultId: 'ar_1', + data: { ok: true }, + final: true, + }); + expect(globalResults).toHaveLength(1); + expect(callbackResults).toEqual([{ ok: true }]); + } finally { + await relay.shutdown(); + } + }); + it('waitForAgentMessage waits for relay_inbound from the agent', async () => { const { client, emit } = createMockFacadeClient(); vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); diff --git a/packages/sdk/src/__tests__/unit.test.ts b/packages/sdk/src/__tests__/unit.test.ts index 5ca78af16..4445c9d54 100644 --- a/packages/sdk/src/__tests__/unit.test.ts +++ b/packages/sdk/src/__tests__/unit.test.ts @@ -82,6 +82,9 @@ function makeFakeAgentWithControls(name: string, exitAfterMs?: number): FakeAgen } return idlePromise!; }, + waitForResult() { + return Promise.reject(new Error(`No structured result for '${name}'`)); + }, async sendMessage() { return { eventId: 'fake', from: name, to: '', text: '' }; }, diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index c504f1cb9..b6f983a2f 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -416,6 +416,7 @@ export class AgentRelayClient { ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), + ...(input.agentResultSchema !== undefined ? { agentResultSchema: input.agentResultSchema } : {}), }), }); } @@ -446,6 +447,7 @@ export class AgentRelayClient { ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), + ...(input.agentResultSchema !== undefined ? { agentResultSchema: input.agentResultSchema } : {}), transport, }), }); diff --git a/packages/sdk/src/protocol.ts b/packages/sdk/src/protocol.ts index edbace5b7..f22f72cec 100644 --- a/packages/sdk/src/protocol.ts +++ b/packages/sdk/src/protocol.ts @@ -428,6 +428,14 @@ export type BrokerEvent = idle_secs: number; since?: string; } + | { + kind: 'agent_result'; + name: string; + result_id: string; + data: unknown; + final: boolean; + metadata?: Record | null; + } | { kind: 'agent_blocked_on_send'; name: string; diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index 93004dfbc..7ad883e24 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -28,6 +28,8 @@ import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs'; import path from 'node:path'; import { RelayCast } from '@relaycast/sdk'; +import { zodToJsonSchema } from 'zod-to-json-schema'; +import type { ZodTypeAny } from 'zod'; import { AgentRelayClient, type AgentRelayBrokerInitArgs, type AgentRelaySpawnOptions } from './client.js'; import { @@ -41,7 +43,7 @@ import { type ResolvedPersona, } from './personas.js'; import { AgentRelayProtocolError } from './transport.js'; -import type { SendMessageInput, SpawnPtyInput } from './types.js'; +import type { JsonSchema, SendMessageInput, SpawnPtyInput } from './types.js'; import type { AgentRuntime, BrokerEvent, @@ -161,6 +163,36 @@ export interface Message { mode?: MessageInjectionMode; } +export interface AgentResultMeta { + name: string; + resultId: string; + final: boolean; + metadata?: Record; +} + +export interface AgentResult extends AgentResultMeta { + data: T; +} + +export type AgentResultParser = (value: unknown) => T; + +export type AgentResultSchema = + | ZodTypeAny + | { + parse?: (value: unknown) => T; + safeParse?: (value: unknown) => { success: true; data: T } | { success: false; error: unknown }; + } + | AgentResultParser; + +export interface AgentResultOptions { + /** Runtime validator/parser for submitted data. Zod schemas are supported. */ + schema?: AgentResultSchema; + /** JSON Schema exposed to the spawned agent's MCP result tool. Defaults to any JSON. */ + jsonSchema?: JsonSchema; + /** Invoked after a result arrives and passes local schema validation. */ + onResult?: (data: T, meta: AgentResultMeta) => void | Promise; +} + export type AgentStatus = 'spawning' | 'ready' | 'idle' | 'exited'; export type DeliveryWaitStatus = 'ack' | 'failed' | 'timeout'; export type DeliveryStateStatus = 'queued' | 'injected' | 'active' | 'verified' | 'failed'; @@ -237,7 +269,7 @@ export interface ReleaseOptions extends ReleaseLifecycleHooks { reason?: string; } -export interface SpawnOptions extends SpawnLifecycleHooks { +export interface SpawnOptions extends SpawnLifecycleHooks { args?: string[]; channels?: string[]; model?: string; @@ -258,14 +290,19 @@ export interface SpawnOptions extends SpawnLifecycleHooks { /** When true, skip injecting the relay MCP configuration and protocol prompt into the spawned agent. * Useful for minor tasks where relay messaging is not needed, saving tokens. */ skipRelayPrompt?: boolean; + /** + * Enables a structured-result MCP tool for the spawned agent and validates + * submissions on the SDK side. + */ + result?: AgentResultOptions; } -export interface SpawnAndWaitOptions extends SpawnOptions { +export interface SpawnAndWaitOptions extends SpawnOptions { timeoutMs?: number; waitForMessage?: boolean; } -export interface SpawnPersonaOptions extends SpawnOptions { +export interface SpawnPersonaOptions extends SpawnOptions { /** Override the spawned agent's name. Defaults to the persona id. */ name?: string; /** Initial task / user prompt for the agent. */ @@ -296,7 +333,7 @@ export interface SpawnPersonaOptions extends SpawnOptions { type AgentOutputPayload = { stream: string; chunk: string }; type AgentOutputCallback = ((chunk: string) => void) | ((data: AgentOutputPayload) => void); -export interface Agent { +export interface Agent { readonly name: string; readonly runtime: AgentRuntime; readonly channels: string[]; @@ -318,6 +355,8 @@ export interface Agent { * @param timeoutMs — optional timeout in ms. Resolves with `"idle"` when first idle event fires, * `"timeout"` if timeoutMs elapses first, or `"exited"` if the agent exits. */ waitForIdle(timeoutMs?: number): Promise<'idle' | 'timeout' | 'exited'>; + /** Wait for the structured result submitted through the spawned agent's result MCP tool. */ + waitForResult(timeoutMs?: number): Promise>; sendMessage(input: { to: string; text: string; @@ -351,10 +390,10 @@ export interface HumanHandle { } export interface AgentSpawner { - spawn(options?: SpawnerSpawnOptions): Promise; + spawn(options?: SpawnerSpawnOptions): Promise>; } -export interface SpawnerSpawnOptions extends SpawnLifecycleHooks { +export interface SpawnerSpawnOptions extends SpawnLifecycleHooks { name?: string; args?: string[]; channels?: string[]; @@ -373,6 +412,7 @@ export interface SpawnerSpawnOptions extends SpawnLifecycleHooks { /** When true, skip injecting the relay MCP configuration and protocol prompt into the spawned agent. * Useful for minor tasks where relay messaging is not needed, saving tokens. */ skipRelayPrompt?: boolean; + result?: AgentResultOptions; } export type EventHook = ((value: T) => void) | null; @@ -424,10 +464,22 @@ type OutputListener = { stream?: string; }; -type InternalAgent = Agent & { +type InternalAgent = Agent & { _setChannels: (channels: string[]) => void; }; +type InternalAgentResultContract = { + schema?: AgentResultSchema; + jsonSchema: JsonSchema; + onResult?: (data: T, meta: AgentResultMeta) => void | Promise; +}; + +type AgentResultResolver = { + resolve: (result: AgentResult) => void; + reject: (error: Error) => void; + token: number; +}; + interface AgentActivityState { active: boolean; pendingDeliveries: Map; @@ -445,6 +497,7 @@ export class AgentRelay { onAgentReady: EventHook = null; onWorkerOutput: EventHook<{ name: string; stream: string; chunk: string }> = null; onDeliveryUpdate: EventHook = null; + onAgentResult: EventHook = null; onAgentExitRequested: EventHook<{ name: string; reason: string }> = null; onAgentIdle: EventHook<{ name: string; idleSecs: number }> = null; onAgentActivityChanged: EventHook = null; @@ -490,6 +543,10 @@ export class AgentRelay { private readonly deliveryStates = new Map(); private readonly agentActivityStates = new Map(); private readonly outputListeners = new Map>(); + private readonly resultContracts = new Map(); + private readonly lastAgentResults = new Map>(); + private readonly resultResolvers = new Map(); + private resultResolverSeq = 0; private readonly exitResolvers = new Map< string, { resolve: (reason: 'exited' | 'released') => void; token: number } @@ -661,7 +718,9 @@ export class AgentRelay { // ── Spawning ──────────────────────────────────────────────────────────── - async spawnPty(input: SpawnPtyInput & SpawnLifecycleHooks): Promise { + async spawnPty( + input: SpawnPtyInput & SpawnLifecycleHooks & { result?: AgentResultOptions } + ): Promise> { const client = await this.ensureStarted(); if (!input.channels || input.channels.length === 0) { console.warn( @@ -678,6 +737,10 @@ export class AgentRelay { }; await this.invokeLifecycleHook(input.onStart, lifecycleContext, `spawnPty("${input.name}") onStart`); let result: { name: string; runtime: AgentRuntime }; + const resultContract = this.prepareAgentResultContract(input.result); + if (resultContract) { + this.resultContracts.set(input.name, resultContract as InternalAgentResultContract); + } try { result = await client.spawnPty({ name: input.name, @@ -694,8 +757,12 @@ export class AgentRelay { idleThresholdSecs: input.idleThresholdSecs, restartPolicy: input.restartPolicy, skipRelayPrompt: input.skipRelayPrompt, + agentResultSchema: resultContract?.jsonSchema, }); } catch (error) { + if (resultContract) { + this.resultContracts.delete(input.name); + } await this.invokeLifecycleHook( input.onError, { @@ -707,7 +774,11 @@ export class AgentRelay { throw error; } this.resetAgentLifecycleState(result.name); - const agent = this.makeAgent(result.name, result.runtime, channels); + if (result.name !== input.name && resultContract) { + this.resultContracts.delete(input.name); + this.resultContracts.set(result.name, resultContract as InternalAgentResultContract); + } + const agent = this.makeAgent(result.name, result.runtime, channels) as Agent; this.knownAgents.set(agent.name, agent); await this.invokeLifecycleHook( input.onSuccess, @@ -721,7 +792,12 @@ export class AgentRelay { return agent; } - async spawn(name: string, cli: string, task?: string, options?: SpawnOptions): Promise { + async spawn( + name: string, + cli: string, + task?: string, + options?: SpawnOptions + ): Promise> { return this.spawnPty({ name, cli, @@ -737,19 +813,25 @@ export class AgentRelay { idleThresholdSecs: options?.idleThresholdSecs, restartPolicy: options?.restartPolicy, skipRelayPrompt: options?.skipRelayPrompt, + result: options?.result, onStart: options?.onStart, onSuccess: options?.onSuccess, onError: options?.onError, }); } - async spawnAndWait(name: string, cli: string, task: string, options?: SpawnAndWaitOptions): Promise { + async spawnAndWait( + name: string, + cli: string, + task: string, + options?: SpawnAndWaitOptions + ): Promise> { const { timeoutMs, waitForMessage, ...spawnOptions } = options ?? {}; await this.spawn(name, cli, task, spawnOptions); if (waitForMessage) { - return this.waitForAgentMessage(name, timeoutMs ?? 60_000); + return this.waitForAgentMessage(name, timeoutMs ?? 60_000) as Promise>; } - return this.waitForAgentReady(name, timeoutMs ?? 60_000); + return this.waitForAgentReady(name, timeoutMs ?? 60_000) as Promise>; } /** @@ -770,7 +852,10 @@ export class AgentRelay { * @param options — overrides for tier, search dirs, name, task, and the * underlying spawn options */ - async spawnPersona(personaId: string, options: SpawnPersonaOptions = {}): Promise { + async spawnPersona( + personaId: string, + options: SpawnPersonaOptions = {} + ): Promise> { const personaCwd = options.personaCwd ?? options.cwd ?? process.cwd(); const searchDirs = options.searchDirs ?? this.defaultPersonaDirs; const loadOpts: PersonaLoadOptions = { @@ -795,7 +880,7 @@ export class AgentRelay { const task = composePersonaTask(spec, options.task); const spawnName = options.name ?? persona.id; - let agent: Agent; + let agent: Agent; try { agent = await this.spawnPty({ name: spawnName, @@ -812,6 +897,7 @@ export class AgentRelay { idleThresholdSecs: options.idleThresholdSecs, restartPolicy: options.restartPolicy, skipRelayPrompt: options.skipRelayPrompt, + result: options.result, onStart: options.onStart, onSuccess: options.onSuccess, onError: options.onError, @@ -1542,10 +1628,15 @@ export class AgentRelay { this.onAgentReleased?.(agent); this.knownAgents.delete(event.name); this.outputListeners.delete(event.name); + this.resultContracts.delete(event.name); this.exitResolvers.get(event.name)?.resolve('released'); this.exitResolvers.delete(event.name); this.idleResolvers.get(event.name)?.resolve('exited'); this.idleResolvers.delete(event.name); + this.resultResolvers + .get(event.name) + ?.reject(new Error(`Agent '${event.name}' was released before submitting a result`)); + this.resultResolvers.delete(event.name); break; } case 'agent_exited': { @@ -1564,10 +1655,15 @@ export class AgentRelay { this.onAgentExited?.(agent); this.knownAgents.delete(event.name); this.outputListeners.delete(event.name); + this.resultContracts.delete(event.name); this.exitResolvers.get(event.name)?.resolve('exited'); this.exitResolvers.delete(event.name); this.idleResolvers.get(event.name)?.resolve('exited'); this.idleResolvers.delete(event.name); + this.resultResolvers + .get(event.name) + ?.reject(new Error(`Agent '${event.name}' exited before submitting a result`)); + this.resultResolvers.delete(event.name); break; } case 'agent_exit': { @@ -1677,6 +1773,16 @@ export class AgentRelay { this.idleResolvers.delete(event.name); break; } + case 'agent_result': { + this.dispatchAgentResult(event.name, { + name: event.name, + resultId: event.result_id, + data: event.data, + final: event.final, + metadata: event.metadata ?? undefined, + }); + break; + } } if (event.kind.startsWith('delivery_') || event.kind.startsWith('message_delivery_')) { this.onDeliveryUpdate?.(event); @@ -1684,7 +1790,111 @@ export class AgentRelay { }); } - private makeAgent(name: string, runtime: AgentRuntime, channels: string[]): Agent { + private prepareAgentResultContract( + options: AgentResultOptions | undefined + ): InternalAgentResultContract | undefined { + if (!options) { + return undefined; + } + return { + schema: options.schema, + jsonSchema: options.jsonSchema ?? this.schemaToJsonSchema(options.schema), + onResult: options.onResult, + }; + } + + private schemaToJsonSchema(schema: AgentResultSchema | undefined): JsonSchema { + if (schema && typeof schema === 'object' && this.isZodSchema(schema)) { + return zodToJsonSchema(schema as ZodTypeAny, { target: 'jsonSchema7' }) as JsonSchema; + } + return true; + } + + private isZodSchema(schema: object): boolean { + return '_def' in schema && typeof (schema as { safeParse?: unknown }).safeParse === 'function'; + } + + private validateAgentResult(contract: InternalAgentResultContract | undefined, value: unknown): T { + const schema = contract?.schema; + if (!schema) { + return value as T; + } + if (typeof schema === 'function') { + return schema(value) as T; + } + if (typeof schema.safeParse === 'function') { + const parsed = schema.safeParse(value); + if (parsed.success) { + return parsed.data; + } + throw new Error(`Agent result failed schema validation: ${String(parsed.error)}`); + } + if (typeof schema.parse === 'function') { + return schema.parse(value); + } + return value as T; + } + + private dispatchAgentResult(name: string, raw: AgentResult): void { + const contract = this.resultContracts.get(name); + let result: AgentResult; + try { + const data = this.validateAgentResult(contract, raw.data); + result = { ...raw, data }; + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + this.resultResolvers.get(name)?.reject(err); + this.resultResolvers.delete(name); + console.warn(`[AgentRelay] structured result from "${name}" failed validation`, err); + return; + } + + this.lastAgentResults.set(name, result); + this.onAgentResult?.(result); + if (contract?.onResult) { + Promise.resolve(contract.onResult(result.data, result)).catch((error) => { + console.warn(`[AgentRelay] result("${name}") onResult hook threw`, error); + }); + } + this.resultResolvers.get(name)?.resolve(result); + this.resultResolvers.delete(name); + } + + private waitForAgentResult(name: string, timeoutMs?: number): Promise> { + const existing = this.lastAgentResults.get(name); + if (existing) { + return Promise.resolve(existing); + } + if (timeoutMs === 0) { + return Promise.reject(new Error(`Timed out waiting for structured result from '${name}'`)); + } + return new Promise>((resolve, reject) => { + let timer: ReturnType | undefined; + const token = ++this.resultResolverSeq; + this.resultResolvers.set(name, { + resolve(result) { + if (timer) clearTimeout(timer); + resolve(result); + }, + reject(error) { + if (timer) clearTimeout(timer); + reject(error); + }, + token, + }); + if (timeoutMs !== undefined) { + timer = setTimeout(() => { + const current = this.resultResolvers.get(name); + if (current?.token === token) { + this.resultResolvers.delete(name); + } + reject(new Error(`Timed out waiting for structured result from '${name}' after ${timeoutMs}ms`)); + }, timeoutMs); + } + }); + } + + private makeAgent(name: string, runtime: AgentRuntime, channels: string[]): Agent { // eslint-disable-next-line @typescript-eslint/no-this-alias const relay = this; let agentChannels = [...channels]; @@ -1826,6 +2036,9 @@ export class AgentRelay { } }); }, + waitForResult(timeoutMs?: number) { + return relay.waitForAgentResult(name, timeoutMs); + }, async sendMessage(input) { const client = await relay.ensureStarted(); let result: Awaited>; @@ -1898,7 +2111,7 @@ export class AgentRelay { private createSpawner(cli: string, defaultName: string, runtime: AgentRuntime): AgentSpawner { return { - spawn: async (options?) => { + spawn: async (options?: SpawnerSpawnOptions) => { const name = options?.name ?? defaultName; const channels = options?.channels ?? ['general']; const args = options?.args ?? []; @@ -1916,6 +2129,7 @@ export class AgentRelay { idleThresholdSecs: options?.idleThresholdSecs, agentToken: options?.agentToken, skipRelayPrompt: options?.skipRelayPrompt, + result: options?.result, onStart: options?.onStart, onSuccess: options?.onSuccess, onError: options?.onError, @@ -1931,6 +2145,10 @@ export class AgentRelay { }; await this.invokeLifecycleHook(options?.onStart, lifecycleContext, `spawn("${name}") onStart`); let result: { name: string; runtime: AgentRuntime }; + const resultContract = this.prepareAgentResultContract(options?.result); + if (resultContract) { + this.resultContracts.set(name, resultContract as InternalAgentResultContract); + } try { result = await client.spawnProvider({ name, @@ -1944,8 +2162,12 @@ export class AgentRelay { idleThresholdSecs: options?.idleThresholdSecs, agentToken: options?.agentToken, skipRelayPrompt: options?.skipRelayPrompt, + agentResultSchema: resultContract?.jsonSchema, }); } catch (error) { + if (resultContract) { + this.resultContracts.delete(name); + } await this.invokeLifecycleHook( options?.onError, { @@ -1958,7 +2180,11 @@ export class AgentRelay { } this.resetAgentLifecycleState(result.name); - const agent = this.makeAgent(result.name, result.runtime, channels); + if (result.name !== name && resultContract) { + this.resultContracts.delete(name); + this.resultContracts.set(result.name, resultContract as InternalAgentResultContract); + } + const agent = this.makeAgent(result.name, result.runtime, channels) as Agent; this.knownAgents.set(agent.name, agent); await this.invokeLifecycleHook( options?.onSuccess, @@ -1995,6 +2221,8 @@ export class AgentRelay { this.exitedAgents.delete(name); this.idleAgents.delete(name); this.agentActivityStates.delete(name); + this.lastAgentResults.delete(name); + this.resultResolvers.delete(name); } private normalizeReleaseOptions(reasonOrOptions?: string | ReleaseOptions): ReleaseOptions { diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts index 32f23bdb3..232103bfe 100644 --- a/packages/sdk/src/types.ts +++ b/packages/sdk/src/types.ts @@ -10,6 +10,8 @@ import type { RestartPolicy, } from './protocol.js'; +export type JsonSchema = Record | boolean; + export interface SpawnPtyInput { name: string; cli: string; @@ -25,6 +27,7 @@ export interface SpawnPtyInput { restartPolicy?: RestartPolicy; continueFrom?: string; skipRelayPrompt?: boolean; + agentResultSchema?: JsonSchema; /** Optional pre-minted relaycast agent token (`at_live_`, from * `registerAgent(workspaceKey, name)` in `@agent-relay/sdk/http`). The * broker plumbs this as `RELAY_AGENT_TOKEN`, which the relaycast MCP @@ -42,6 +45,7 @@ export interface SpawnHeadlessInput { channels?: string[]; task?: string; skipRelayPrompt?: boolean; + agentResultSchema?: JsonSchema; /** Optional pre-minted relaycast agent token (`at_live_`, from * `registerAgent(workspaceKey, name)` in `@agent-relay/sdk/http`). The * broker plumbs this as `RELAY_AGENT_TOKEN`, which the relaycast MCP @@ -70,6 +74,7 @@ export interface SpawnProviderInput { restartPolicy?: RestartPolicy; continueFrom?: string; skipRelayPrompt?: boolean; + agentResultSchema?: JsonSchema; /** Optional pre-minted relaycast agent token (`at_live_`, from * `registerAgent(workspaceKey, name)` in `@agent-relay/sdk/http`). The * broker plumbs this as `RELAY_AGENT_TOKEN`, which the relaycast MCP diff --git a/src/cli/relaycast-mcp.startup.test.ts b/src/cli/relaycast-mcp.startup.test.ts index 573bc3c99..84efaab6b 100644 --- a/src/cli/relaycast-mcp.startup.test.ts +++ b/src/cli/relaycast-mcp.startup.test.ts @@ -335,6 +335,43 @@ describe('createPatchedRelayMcpServer', () => { }); }); + it('registers submit_result when a spawned-agent result callback is configured', async () => { + vi.stubEnv('AGENT_RELAY_RESULT_URL', 'http://127.0.0.1:3889/api/agent-result'); + vi.stubEnv('AGENT_RELAY_RESULT_TOKEN', 'arr_test'); + vi.stubEnv('AGENT_RELAY_RESULT_SCHEMA', '{"type":"object","properties":{"ok":{"type":"boolean"}}}'); + const { mod, mocks } = await loadRelaycastMcpModule(); + vi.stubGlobal( + 'fetch', + vi.fn(async () => ({ + ok: true, + status: 200, + text: async () => JSON.stringify({ success: true, result_id: 'ar_1' }), + })) + ); + + mod.createPatchedRelayMcpServer({ agentName: 'WorkerA' }); + const server = mocks.serverInstances[0]; + const submitResult = server.tools.get('submit_result'); + + expect(submitResult).toBeDefined(); + const result = await submitResult?.handler({ data: { ok: true }, metadata: { source: 'test' } }); + + expect(fetch).toHaveBeenCalledWith( + 'http://127.0.0.1:3889/api/agent-result', + expect.objectContaining({ + method: 'POST', + headers: expect.objectContaining({ Authorization: 'Bearer arr_test' }), + body: JSON.stringify({ + agent: 'WorkerA', + data: { ok: true }, + final: true, + metadata: { source: 'test' }, + }), + }) + ); + expect(result?.structuredContent).toEqual({ success: true, result_id: 'ar_1' }); + }); + it('reinitializes the websocket bridge when the workspace changes', async () => { const { mod, mocks } = await loadRelaycastMcpModule(); mod.createPatchedRelayMcpServer({ diff --git a/src/cli/relaycast-mcp.ts b/src/cli/relaycast-mcp.ts index bee1d40a1..7dd604b5d 100644 --- a/src/cli/relaycast-mcp.ts +++ b/src/cli/relaycast-mcp.ts @@ -62,6 +62,12 @@ export interface PatchedMcpServerOptions { type RegistrationSession = Pick; type SessionSetter = (partial: Partial) => void; +type AgentResultCallbackConfig = { + url: string; + token: string; + schema?: unknown; + agentName?: string; +}; type RegisterAgentWithRebindArgs = { session: RegistrationSession; @@ -114,6 +120,96 @@ export function normalizeAgentType(value: string | undefined): AgentType | undef return undefined; } +function readAgentResultCallbackConfig(agentName?: string): AgentResultCallbackConfig | undefined { + const url = resolveEnv('AGENT_RELAY_RESULT_URL'); + const token = resolveEnv('AGENT_RELAY_RESULT_TOKEN'); + if (!url || !token) { + return undefined; + } + + const rawSchema = resolveEnv('AGENT_RELAY_RESULT_SCHEMA'); + let schema: unknown; + if (rawSchema) { + try { + schema = JSON.parse(rawSchema); + } catch { + schema = rawSchema; + } + } + + return { url, token, schema, agentName }; +} + +function registerAgentResultTool(server: McpServer, config: AgentResultCallbackConfig | undefined): void { + if (!config) { + return; + } + + const schemaText = + config.schema === undefined + ? '' + : ` Expected JSON schema: ${JSON.stringify(config.schema).slice(0, 4000)}`; + + server.registerTool( + 'submit_result', + { + title: 'Submit Result', + description: + 'Submit the structured result for this spawned Agent Relay task. Call this when the requested work is complete and the result object is ready.' + + schemaText, + inputSchema: { + data: z.unknown().describe('The JSON result payload requested by the spawning SDK caller.'), + final: z + .boolean() + .optional() + .describe('Whether this is the final result for the task. Defaults to true.'), + metadata: z + .record(z.string(), z.unknown()) + .optional() + .describe('Optional diagnostic metadata about the result.'), + }, + outputSchema: jsonResult, + annotations: { + readOnlyHint: false, + destructiveHint: false, + idempotentHint: false, + openWorldHint: false, + }, + }, + async ({ data, final, metadata }) => { + const response = await fetch(config.url, { + method: 'POST', + headers: { + Authorization: `Bearer ${config.token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + agent: config.agentName, + data, + final: final ?? true, + metadata, + }), + }); + const responseText = await response.text(); + let payload: Record; + try { + payload = responseText ? (JSON.parse(responseText) as Record) : {}; + } catch { + payload = { success: false, error: responseText }; + } + if (!response.ok) { + throw new Error( + `Agent Relay result submission failed (${response.status}): ${String(payload.error ?? responseText)}` + ); + } + return { + content: [{ type: 'text', text: JSON.stringify(payload, null, 2) }], + structuredContent: payload, + }; + } + ); +} + async function createWorkspace(name: string, baseUrl?: string): Promise> { const response = await fetch(`${normalizeBaseUrl(baseUrl)}/v1/workspaces`, { method: 'POST', @@ -477,6 +573,7 @@ export function createPatchedRelayMcpServer(options: PatchedMcpServerOptions): M registerMessagingTools(mcpServer, getAgentClient as never); registerFeatureTools(mcpServer, getAgentClient as never); registerProgrammabilityTools(mcpServer, getRelay as never, getAgentClient as never); + registerAgentResultTool(mcpServer, readAgentResultCallbackConfig(options.agentName)); mcpServer.registerPrompt( 'system', diff --git a/web/content/docs/spawning-an-agent.mdx b/web/content/docs/spawning-an-agent.mdx index 6c4f771e6..886bf4922 100644 --- a/web/content/docs/spawning-an-agent.mdx +++ b/web/content/docs/spawning-an-agent.mdx @@ -107,4 +107,26 @@ These options control how the local broker/client is started before any agents a These options are available on the shorthand helpers and on `relay.spawn(...)`: - \ No newline at end of file + + +### Structured results + +TypeScript spawns can include a `result` contract. Agent Relay exposes a `submit_result` MCP tool to the spawned agent, then routes the submitted JSON back through `agent.waitForResult(...)`, the per-spawn `onResult` callback, and `relay.onAgentResult`. + +```typescript TypeScript file="spawn-with-result.ts" +import { z } from 'zod'; + +const Result = z.object({ + decision: z.enum(['approve', 'request_changes']), + notes: z.array(z.string()), +}); + +const reviewer = await relay.claude.spawn({ + name: 'Reviewer', + task: 'Review the change and submit your decision as structured JSON.', + result: { schema: Result }, +}); + +const { data } = await reviewer.waitForResult(120_000); +console.log(data.decision); +``` diff --git a/web/content/docs/typescript-sdk.mdx b/web/content/docs/typescript-sdk.mdx index eb32e389d..d49f33e1b 100644 --- a/web/content/docs/typescript-sdk.mdx +++ b/web/content/docs/typescript-sdk.mdx @@ -110,10 +110,43 @@ interface Agent { waitForReady(timeoutMs?: number): Promise; waitForExit(timeoutMs?: number): Promise<'exited' | 'timeout' | 'released'>; waitForIdle(timeoutMs?: number): Promise<'idle' | 'timeout' | 'exited'>; + waitForResult(timeoutMs?: number): Promise; onOutput(callback: (chunk: string) => void): () => void; // returns unsubscribe } ``` +### Structured Agent Results + +Provide `result` when spawning an agent to expose a `submit_result` MCP tool inside that agent. The tool posts JSON back to the local broker, and the SDK validates it before resolving waiters or hooks. + +```typescript file="structured-result.ts" +import { AgentRelay } from '@agent-relay/sdk'; +import { z } from 'zod'; + +const relay = new AgentRelay(); + +const Summary = z.object({ + status: z.enum(['pass', 'fail']), + findings: z.array(z.string()), +}); + +const reviewer = await relay.claude.spawn({ + name: 'Reviewer', + task: 'Review the branch and submit the final JSON result.', + result: { + schema: Summary, + onResult: (summary) => { + console.log(summary.status, summary.findings.length); + }, + }, +}); + +const result = await reviewer.waitForResult(120_000); +console.log(result.data.status); +``` + +You can also pass `jsonSchema` directly when you only want to describe the expected payload to the spawned agent. Use `relay.onAgentResult` to observe all structured results globally. + ### `ReleaseOptions` `agent.release(...)` accepts either a reason string or a `ReleaseOptions` object: @@ -275,6 +308,7 @@ relay.onAgentActivityChanged = ({ name, active, pendingDeliveries, reason }) => relay.onAgentExitRequested = ({ name, reason }) => { ... } relay.onWorkerOutput = ({ name, stream, chunk }) => { ... } relay.onDeliveryUpdate = (event: BrokerEvent) => { ... } +relay.onAgentResult = (result: AgentResult) => { ... } ``` **Message type:** From c8299b7a4f954a196d2394aa7e8ce77059cbde57 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Thu, 21 May 2026 00:20:00 -0400 Subject: [PATCH 2/4] Address PR #932 review feedback - broker: preserve agent_result config across supervised restarts (plumb through Supervisor::register, RestartState, PendingRestart) - broker: respect skip_relay_prompt even when agent_result is set; the callback env vars still reach the worker via worker.rs:392 - broker: derive callback URL from a connectable host (fall back to loopback when api_bind is 0.0.0.0/::) - broker: add serde round-trip tests for AgentResult event covering the final field rename and optional metadata - sdk: widen agent_result.metadata to unknown to match the wire contract (in both BrokerEvent and the user-facing AgentResultMeta) - sdk: support multiple concurrent waitForResult() callers per agent - sdk: reject waitForResult() immediately when the agent is unknown and no cached result exists - sdk: clear structured-result state on shutdown() and on agent_not_found release, rejecting any pending waiters - relaycast-mcp: add a configurable AbortController timeout (default 10s) to the submit_result callback POST --- crates/broker/src/protocol.rs | 30 ++++++++++ crates/broker/src/runtime/init.rs | 6 +- crates/broker/src/runtime/maintenance.rs | 2 +- crates/broker/src/supervisor.rs | 24 ++++++-- crates/broker/src/worker.rs | 7 ++- packages/sdk/src/protocol.ts | 2 +- packages/sdk/src/relay.ts | 73 +++++++++++++++++------- src/cli/relaycast-mcp.ts | 40 ++++++++----- 8 files changed, 140 insertions(+), 44 deletions(-) diff --git a/crates/broker/src/protocol.rs b/crates/broker/src/protocol.rs index e59a5127c..5e18f649d 100644 --- a/crates/broker/src/protocol.rs +++ b/crates/broker/src/protocol.rs @@ -497,6 +497,36 @@ mod tests { assert_eq!(decoded, event); } + #[test] + fn agent_result_event_round_trip_with_metadata() { + let event = BrokerToSdk::Event(BrokerEvent::AgentResult { + name: "Worker1".into(), + result_id: "res_42".into(), + data: json!({"answer": 42}), + final_result: true, + metadata: Some(json!({"latency_ms": 123})), + }); + let encoded = serde_json::to_string(&event).unwrap(); + // The `final_result` field MUST serialize as `final` per the SDK wire contract. + assert!(encoded.contains("\"final\":true")); + let decoded: BrokerToSdk = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, event); + } + + #[test] + fn agent_result_event_round_trip_without_metadata() { + let event = BrokerToSdk::Event(BrokerEvent::AgentResult { + name: "Worker2".into(), + result_id: "res_7".into(), + data: json!("partial"), + final_result: false, + metadata: None, + }); + let encoded = serde_json::to_string(&event).unwrap(); + let decoded: BrokerToSdk = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, event); + } + #[test] fn broker_event_delivery_failed_round_trip() { let event = BrokerToSdk::Event(BrokerEvent::DeliveryFailed { diff --git a/crates/broker/src/runtime/init.rs b/crates/broker/src/runtime/init.rs index 8115bd5a9..0876741ee 100644 --- a/crates/broker/src/runtime/init.rs +++ b/crates/broker/src/runtime/init.rs @@ -335,12 +335,16 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re ); } + let callback_host = match cmd.api_bind.as_str() { + "0.0.0.0" | "::" | "[::]" | "" => "127.0.0.1", + other => other, + }; let mut worker_env = vec![ ("RELAY_BASE_URL".to_string(), http_base.clone()), ("RELAY_API_KEY".to_string(), relay_workspace_key.clone()), ( "AGENT_RELAY_RESULT_URL".to_string(), - format!("http://{}:{}/api/agent-result", cmd.api_bind, actual_port), + format!("http://{}:{}/api/agent-result", callback_host, actual_port), ), ( "RELAY_WORKSPACES_JSON".to_string(), diff --git a/crates/broker/src/runtime/maintenance.rs b/crates/broker/src/runtime/maintenance.rs index 3999ea8b4..7ea853c05 100644 --- a/crates/broker/src/runtime/maintenance.rs +++ b/crates/broker/src/runtime/maintenance.rs @@ -305,7 +305,7 @@ impl BrokerRuntime { worker_relay_key, rst.skip_relay_prompt, None, - None, + rst.agent_result.clone(), ) .await { diff --git a/crates/broker/src/supervisor.rs b/crates/broker/src/supervisor.rs index 20a2c2cff..d1c00f28e 100644 --- a/crates/broker/src/supervisor.rs +++ b/crates/broker/src/supervisor.rs @@ -10,6 +10,7 @@ use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; use crate::protocol::AgentSpec; +use crate::types::AgentResultMcpConfig; /// Configurable restart policy attached to an agent at spawn time. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -58,6 +59,7 @@ struct RestartState { pub initial_task: Option, pub parent: Option, pub skip_relay_prompt: bool, + pub agent_result: Option, } /// Decision returned by the supervisor after an agent exits. @@ -74,6 +76,7 @@ pub struct PendingRestart { pub initial_task: Option, pub restart_count: u32, pub skip_relay_prompt: bool, + pub agent_result: Option, } /// Manages restart state for all supervised agents. @@ -95,6 +98,7 @@ impl Supervisor { } /// Register an agent for supervision. Called at spawn time. + #[allow(clippy::too_many_arguments)] pub fn register( &mut self, name: &str, @@ -103,6 +107,7 @@ impl Supervisor { initial_task: Option, skip_relay_prompt: bool, policy: RestartPolicy, + agent_result: Option, ) { self.states.insert( name.to_string(), @@ -115,6 +120,7 @@ impl Supervisor { initial_task, parent, skip_relay_prompt, + agent_result, }, ); } @@ -192,6 +198,7 @@ impl Supervisor { initial_task: state.initial_task.clone(), restart_count: state.total_restarts + 1, skip_relay_prompt: state.skip_relay_prompt, + agent_result: state.agent_result.clone(), }, )) } else { @@ -269,6 +276,7 @@ mod tests { None, false, RestartPolicy::default(), + None, ); assert!(sup.is_supervised("w1")); @@ -292,6 +300,7 @@ mod tests { Some("do stuff".into()), true, RestartPolicy::default(), + None, ); let decision = sup.on_exit("w1", Some(1), None).unwrap(); @@ -311,7 +320,7 @@ mod tests { max_consecutive_failures: 10, // high so this doesn't trigger ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); // First crash -> restart assert!(matches!( @@ -340,7 +349,7 @@ mod tests { max_restarts: 10, // high so this doesn't trigger ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); // Crash 1 -> consecutive=1, restart assert!(matches!( @@ -368,7 +377,7 @@ mod tests { max_restarts: 10, ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); // Two crashes sup.on_exit("w1", Some(1), None); @@ -391,7 +400,7 @@ mod tests { enabled: false, ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); let decision = sup.on_exit("w1", Some(1), None).unwrap(); assert!(matches!(decision, RestartDecision::PermanentlyDead { .. })); @@ -407,6 +416,7 @@ mod tests { None, false, RestartPolicy::default(), + None, ); sup.unregister("w1"); @@ -428,6 +438,7 @@ mod tests { Some("task".into()), true, policy, + None, ); sup.on_exit("w1", Some(1), None); @@ -449,7 +460,7 @@ mod tests { cooldown_ms: 60_000, // 60 seconds ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); sup.on_exit("w1", Some(1), None); @@ -468,6 +479,7 @@ mod tests { None, false, RestartPolicy::default(), + None, ); assert_eq!(sup.restart_count("w1"), 0); @@ -488,7 +500,7 @@ mod tests { cooldown_ms: 0, ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, true, policy); + sup.register("w1", test_spec("w1"), None, None, true, policy, None); sup.on_exit("w1", Some(1), None); diff --git a/crates/broker/src/worker.rs b/crates/broker/src/worker.rs index c2cc77fec..2a34b9a32 100644 --- a/crates/broker/src/worker.rs +++ b/crates/broker/src/worker.rs @@ -164,7 +164,12 @@ impl WorkerRegistry { skip_relay_prompt: bool, agent_result: Option<&AgentResultMcpConfig>, ) -> Result> { - if skip_relay_prompt && agent_result.is_none() { + // `skip_relay_prompt` is an explicit opt-out: the caller does not want the + // relaycast MCP server (messaging/channel/etc. tools) injected, e.g. to + // save tokens. We honor that even when `agent_result` is configured — + // `AGENT_RELAY_RESULT_*` env vars are still set on the worker process + // below, so a separately-configured relaycast MCP can pick them up. + if skip_relay_prompt { return Ok(Vec::new()); } configure_relaycast_mcp_with_result( diff --git a/packages/sdk/src/protocol.ts b/packages/sdk/src/protocol.ts index f22f72cec..62cdc1e8a 100644 --- a/packages/sdk/src/protocol.ts +++ b/packages/sdk/src/protocol.ts @@ -434,7 +434,7 @@ export type BrokerEvent = result_id: string; data: unknown; final: boolean; - metadata?: Record | null; + metadata?: unknown; } | { kind: 'agent_blocked_on_send'; diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index 7ad883e24..e0616735c 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -167,7 +167,8 @@ export interface AgentResultMeta { name: string; resultId: string; final: boolean; - metadata?: Record; + /** Optional diagnostic metadata about the result. Any JSON-compatible value. */ + metadata?: unknown; } export interface AgentResult extends AgentResultMeta { @@ -545,7 +546,7 @@ export class AgentRelay { private readonly outputListeners = new Map>(); private readonly resultContracts = new Map(); private readonly lastAgentResults = new Map>(); - private readonly resultResolvers = new Map(); + private readonly resultResolvers = new Map(); private resultResolverSeq = 0; private readonly exitResolvers = new Map< string, @@ -1319,6 +1320,15 @@ export class AgentRelay { entry.resolve('exited'); } this.idleResolvers.clear(); + const shutdownError = new Error('AgentRelay shutdown before structured result was submitted'); + for (const waiters of this.resultResolvers.values()) { + for (const waiter of waiters) { + waiter.reject(shutdownError); + } + } + this.resultResolvers.clear(); + this.resultContracts.clear(); + this.lastAgentResults.clear(); } // ── Private helpers ───────────────────────────────────────────────────── @@ -1633,10 +1643,9 @@ export class AgentRelay { this.exitResolvers.delete(event.name); this.idleResolvers.get(event.name)?.resolve('exited'); this.idleResolvers.delete(event.name); - this.resultResolvers - .get(event.name) - ?.reject(new Error(`Agent '${event.name}' was released before submitting a result`)); - this.resultResolvers.delete(event.name); + for (const waiter of this.takeResultResolvers(event.name)) { + waiter.reject(new Error(`Agent '${event.name}' was released before submitting a result`)); + } break; } case 'agent_exited': { @@ -1660,10 +1669,9 @@ export class AgentRelay { this.exitResolvers.delete(event.name); this.idleResolvers.get(event.name)?.resolve('exited'); this.idleResolvers.delete(event.name); - this.resultResolvers - .get(event.name) - ?.reject(new Error(`Agent '${event.name}' exited before submitting a result`)); - this.resultResolvers.delete(event.name); + for (const waiter of this.takeResultResolvers(event.name)) { + waiter.reject(new Error(`Agent '${event.name}' exited before submitting a result`)); + } break; } case 'agent_exit': { @@ -1835,6 +1843,13 @@ export class AgentRelay { return value as T; } + private takeResultResolvers(name: string): AgentResultResolver[] { + const waiters = this.resultResolvers.get(name); + if (!waiters || waiters.length === 0) return []; + this.resultResolvers.delete(name); + return waiters; + } + private dispatchAgentResult(name: string, raw: AgentResult): void { const contract = this.resultContracts.get(name); let result: AgentResult; @@ -1843,8 +1858,7 @@ export class AgentRelay { result = { ...raw, data }; } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); - this.resultResolvers.get(name)?.reject(err); - this.resultResolvers.delete(name); + for (const waiter of this.takeResultResolvers(name)) waiter.reject(err); console.warn(`[AgentRelay] structured result from "${name}" failed validation`, err); return; } @@ -1856,8 +1870,7 @@ export class AgentRelay { console.warn(`[AgentRelay] result("${name}") onResult hook threw`, error); }); } - this.resultResolvers.get(name)?.resolve(result); - this.resultResolvers.delete(name); + for (const waiter of this.takeResultResolvers(name)) waiter.resolve(result); } private waitForAgentResult(name: string, timeoutMs?: number): Promise> { @@ -1865,28 +1878,41 @@ export class AgentRelay { if (existing) { return Promise.resolve(existing); } + // Don't register a waiter for an agent we don't know about and haven't + // observed a result for — the resolver would never settle. + if (!this.knownAgents.has(name) && !this.resultContracts.has(name)) { + return Promise.reject(new Error(`Agent '${name}' is not running and has no structured result`)); + } if (timeoutMs === 0) { return Promise.reject(new Error(`Timed out waiting for structured result from '${name}'`)); } return new Promise>((resolve, reject) => { let timer: ReturnType | undefined; const token = ++this.resultResolverSeq; - this.resultResolvers.set(name, { - resolve(result) { + const waiter: AgentResultResolver = { + resolve: (result) => { if (timer) clearTimeout(timer); resolve(result); }, - reject(error) { + reject: (error) => { if (timer) clearTimeout(timer); reject(error); }, token, - }); + }; + const existingWaiters = this.resultResolvers.get(name); + if (existingWaiters) { + existingWaiters.push(waiter); + } else { + this.resultResolvers.set(name, [waiter]); + } if (timeoutMs !== undefined) { timer = setTimeout(() => { - const current = this.resultResolvers.get(name); - if (current?.token === token) { - this.resultResolvers.delete(name); + const list = this.resultResolvers.get(name); + if (list) { + const idx = list.findIndex((w) => w.token === token); + if (idx >= 0) list.splice(idx, 1); + if (list.length === 0) this.resultResolvers.delete(name); } reject(new Error(`Timed out waiting for structured result from '${name}' after ${timeoutMs}ms`)); }, timeoutMs); @@ -1952,6 +1978,11 @@ export class AgentRelay { relay.exitResolvers.delete(name); relay.idleResolvers.get(name)?.resolve('exited'); relay.idleResolvers.delete(name); + relay.resultContracts.delete(name); + relay.lastAgentResults.delete(name); + for (const waiter of relay.takeResultResolvers(name)) { + waiter.reject(new Error(`Agent '${name}' was released before submitting a result`)); + } await relay.invokeLifecycleHook( releaseOptions.onSuccess, releaseContext, diff --git a/src/cli/relaycast-mcp.ts b/src/cli/relaycast-mcp.ts index 7dd604b5d..09eb741f8 100644 --- a/src/cli/relaycast-mcp.ts +++ b/src/cli/relaycast-mcp.ts @@ -177,19 +177,33 @@ function registerAgentResultTool(server: McpServer, config: AgentResultCallbackC }, }, async ({ data, final, metadata }) => { - const response = await fetch(config.url, { - method: 'POST', - headers: { - Authorization: `Bearer ${config.token}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - agent: config.agentName, - data, - final: final ?? true, - metadata, - }), - }); + const timeoutMs = Number(resolveEnv('AGENT_RELAY_RESULT_TIMEOUT_MS') ?? 10_000); + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + let response: Response; + try { + response = await fetch(config.url, { + method: 'POST', + signal: controller.signal, + headers: { + Authorization: `Bearer ${config.token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + agent: config.agentName, + data, + final: final ?? true, + metadata, + }), + }); + } catch (err) { + if ((err as { name?: string }).name === 'AbortError') { + throw new Error(`Agent Relay result submission timed out after ${timeoutMs}ms`); + } + throw err; + } finally { + clearTimeout(timer); + } const responseText = await response.text(); let payload: Record; try { From 5b4005efb83351fd6aca2cebf2910b80d1fe92b5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 21 May 2026 16:21:08 +0000 Subject: [PATCH 3/4] fix: resolve clippy regressions for structured result callbacks Agent-Logs-Url: https://github.com/AgentWorkforce/relay/sessions/9789558d-1820-4dd0-a204-fcd6dfbec3f6 Co-authored-by: willwashburn <957608+willwashburn@users.noreply.github.com> --- crates/broker/src/listen_api.rs | 1 + crates/broker/src/snippets.rs | 4 ++++ crates/broker/src/worker.rs | 2 ++ 3 files changed, 7 insertions(+) diff --git a/crates/broker/src/listen_api.rs b/crates/broker/src/listen_api.rs index 1ba38e51d..c0a18670e 100644 --- a/crates/broker/src/listen_api.rs +++ b/crates/broker/src/listen_api.rs @@ -32,6 +32,7 @@ type PtyInputSerializers = Arc, relay_base_url: Option<&str>, @@ -417,6 +418,7 @@ pub fn ensure_opencode_config( ) } +#[allow(clippy::too_many_arguments)] pub fn ensure_opencode_config_with_result( root: &Path, relay_api_key: Option<&str>, @@ -560,6 +562,7 @@ pub fn ensure_opencode_config_with_result( /// Write `.cursor/mcp.json` in the given directory with the Relaycast MCP server /// configured with per-agent credentials (name + token). /// Returns `true` if the config was created or updated. +#[allow(clippy::too_many_arguments)] pub fn ensure_cursor_mcp_config( root: &Path, relay_api_key: Option<&str>, @@ -1003,6 +1006,7 @@ fn gemini_droid_mcp_add_args( ) } +#[allow(clippy::too_many_arguments)] fn gemini_droid_mcp_add_args_with_result( api_key: Option<&str>, base_url: Option<&str>, diff --git a/crates/broker/src/worker.rs b/crates/broker/src/worker.rs index 2a34b9a32..2b952c17e 100644 --- a/crates/broker/src/worker.rs +++ b/crates/broker/src/worker.rs @@ -154,6 +154,7 @@ impl WorkerRegistry { .map(|(_, v)| v.as_str()) } + #[allow(clippy::too_many_arguments)] async fn build_mcp_args( &self, cli_name: &str, @@ -195,6 +196,7 @@ impl WorkerRegistry { self.workers.get(name).and_then(|h| h.child.id()) } + #[allow(clippy::too_many_arguments)] pub(crate) async fn spawn( &mut self, spec: AgentSpec, From 4be45b44c01e89da0ef20dd068a7edbf49adb8b6 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Fri, 22 May 2026 12:06:04 -0400 Subject: [PATCH 4/4] Avoid persisting result callback tokens --- .../completed/2026-05/traj_78ytpicts778.json | 53 +++++++ .../completed/2026-05/traj_78ytpicts778.md | 31 ++++ .trajectories/index.json | 11 +- crates/broker/src/snippets.rs | 139 ++++++++++++++++-- 4 files changed, 218 insertions(+), 16 deletions(-) create mode 100644 .trajectories/completed/2026-05/traj_78ytpicts778.json create mode 100644 .trajectories/completed/2026-05/traj_78ytpicts778.md diff --git a/.trajectories/completed/2026-05/traj_78ytpicts778.json b/.trajectories/completed/2026-05/traj_78ytpicts778.json new file mode 100644 index 000000000..3317c1639 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_78ytpicts778.json @@ -0,0 +1,53 @@ +{ + "id": "traj_78ytpicts778", + "version": 1, + "task": { + "title": "Address PR 932 result callback review findings" + }, + "status": "completed", + "startedAt": "2026-05-22T15:59:25.187Z", + "completedAt": "2026-05-22T16:05:12.320Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-22T16:04:55.453Z" + } + ], + "chapters": [ + { + "id": "chap_fjkgwh3kotyj", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-22T16:04:55.453Z", + "endedAt": "2026-05-22T16:05:12.320Z", + "events": [ + { + "ts": 1779465895454, + "type": "decision", + "content": "Do not persist result callback tokens in shared MCP config: Do not persist result callback tokens in shared MCP config", + "raw": { + "question": "Do not persist result callback tokens in shared MCP config", + "chosen": "Do not persist result callback tokens in shared MCP config", + "alternatives": [], + "reasoning": "The result callback token is per spawn. Gemini/Droid mcp add, opencode.json, and .cursor/mcp.json are shared or persistent surfaces, so those paths now omit AGENT_RELAY_RESULT_* and rely on inline configs or worker process env instead." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Verified PR 932 review findings. Fixed the valid shared-config token persistence issue by omitting AGENT_RELAY_RESULT_* from Gemini/Droid mcp add, opencode.json, and .cursor/mcp.json while retaining inline Codex result env. Skipped the skip_relay_prompt finding because current worker spawn already sets AGENT_RELAY_RESULT_* on the worker process even when MCP prompt injection is skipped.", + "approach": "Standard approach", + "confidence": 0.88 + }, + "commits": [], + "filesChanged": [], + "projectId": "/private/tmp/relay-pr-932-review", + "tags": [], + "_trace": { + "startRef": "3332e83756f0e905b77ae7f6a111e9787a66cf99", + "endRef": "3332e83756f0e905b77ae7f6a111e9787a66cf99" + } +} \ No newline at end of file diff --git a/.trajectories/completed/2026-05/traj_78ytpicts778.md b/.trajectories/completed/2026-05/traj_78ytpicts778.md new file mode 100644 index 000000000..7113000a6 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_78ytpicts778.md @@ -0,0 +1,31 @@ +# Trajectory: Address PR 932 result callback review findings + +> **Status:** ✅ Completed +> **Confidence:** 88% +> **Started:** May 22, 2026 at 11:59 AM +> **Completed:** May 22, 2026 at 12:05 PM + +--- + +## Summary + +Verified PR 932 review findings. Fixed the valid shared-config token persistence issue by omitting AGENT_RELAY_RESULT_* from Gemini/Droid mcp add, opencode.json, and .cursor/mcp.json while retaining inline Codex result env. Skipped the skip_relay_prompt finding because current worker spawn already sets AGENT_RELAY_RESULT_* on the worker process even when MCP prompt injection is skipped. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Do not persist result callback tokens in shared MCP config +- **Chose:** Do not persist result callback tokens in shared MCP config +- **Reasoning:** The result callback token is per spawn. Gemini/Droid mcp add, opencode.json, and .cursor/mcp.json are shared or persistent surfaces, so those paths now omit AGENT_RELAY_RESULT_* and rely on inline configs or worker process env instead. + +--- + +## Chapters + +### 1. Work +*Agent: default* + +- Do not persist result callback tokens in shared MCP config: Do not persist result callback tokens in shared MCP config diff --git a/.trajectories/index.json b/.trajectories/index.json index f63dfcf61..983b63043 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-21T04:14:45.258Z", + "lastUpdated": "2026-05-22T16:05:12.489Z", "trajectories": { "traj_05xg7j388bc4": { "title": "Add browser workflow step integration", @@ -1138,6 +1138,13 @@ "startedAt": "2026-05-21T04:14:44.815Z", "completedAt": "2026-05-21T04:14:45.063Z", "path": "/private/tmp/relay-quiet-broker-logs/.trajectories/completed/2026-05/traj_u3loicehnwb4.json" + }, + "traj_78ytpicts778": { + "title": "Address PR 932 result callback review findings", + "status": "completed", + "startedAt": "2026-05-22T15:59:25.187Z", + "completedAt": "2026-05-22T16:05:12.320Z", + "path": "/private/tmp/relay-pr-932-review/.trajectories/completed/2026-05/traj_78ytpicts778.json" } } -} +} \ No newline at end of file diff --git a/crates/broker/src/snippets.rs b/crates/broker/src/snippets.rs index cf2ec4c54..b73da9528 100644 --- a/crates/broker/src/snippets.rs +++ b/crates/broker/src/snippets.rs @@ -427,7 +427,7 @@ pub fn ensure_opencode_config_with_result( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, - agent_result: Option<&AgentResultMcpConfig>, + _agent_result: Option<&AgentResultMcpConfig>, ) -> io::Result { let path = root.join(OPENCODE_CONFIG); @@ -482,7 +482,6 @@ pub fn ensure_opencode_config_with_result( Value::String(dw.to_string()), ); } - apply_agent_result_env(&mut env, agent_result); if !env.is_empty() { mcp_server.insert("environment".into(), Value::Object(env)); } @@ -571,7 +570,7 @@ pub fn ensure_cursor_mcp_config( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, - agent_result: Option<&AgentResultMcpConfig>, + _agent_result: Option<&AgentResultMcpConfig>, ) -> io::Result { let cursor_dir = root.join(".cursor"); fs::create_dir_all(&cursor_dir)?; @@ -584,7 +583,7 @@ pub fn ensure_cursor_mcp_config( relay_agent_token, workspaces_json, default_workspace, - agent_result, + None, ); let mut new_value: Value = serde_json::from_str(&mcp_json).map_err(|e| { io::Error::new( @@ -863,6 +862,9 @@ pub async fn configure_relaycast_mcp_with_result( } } } else if is_gemini || is_droid { + // Result callback tokens are per-spawn secrets. Gemini/Droid, OpenCode, + // and Cursor write shared config surfaces, so keep result callback env + // limited to the worker process env instead of persisting it in those files. if is_gemini { ensure_gemini_folder_trusted(cwd); } @@ -875,7 +877,7 @@ pub async fn configure_relaycast_mcp_with_result( is_gemini, workspaces_json, default_workspace, - agent_result, + None, ) .await?; } else if is_opencode && !existing_args.iter().any(|a| a == "--agent") { @@ -887,7 +889,7 @@ pub async fn configure_relaycast_mcp_with_result( agent_token, workspaces_json, default_workspace, - agent_result, + None, ) .with_context(|| { "failed to write opencode.json for relaycast MCP. \ @@ -904,7 +906,7 @@ pub async fn configure_relaycast_mcp_with_result( agent_token, workspaces_json, default_workspace, - agent_result, + None, ) .with_context(|| { "failed to write .cursor/mcp.json for relaycast MCP. \ @@ -1015,7 +1017,7 @@ fn gemini_droid_mcp_add_args_with_result( is_gemini: bool, workspaces_json: Option<&str>, default_workspace: Option<&str>, - agent_result: Option<&AgentResultMcpConfig>, + _agent_result: Option<&AgentResultMcpConfig>, ) -> Vec { let env_flag = gemini_droid_mcp_env_flag(is_gemini); let mut args = vec!["mcp".to_string(), "add".to_string()]; @@ -1051,12 +1053,6 @@ fn gemini_droid_mcp_add_args_with_result( args.push(env_flag.to_string()); args.push(format!("RELAY_DEFAULT_WORKSPACE={dw}")); } - if let Some(config) = agent_result { - for (key, value) in config.env_pairs() { - args.push(env_flag.to_string()); - args.push(format!("{key}={value}")); - } - } args.push("relaycast".to_string()); // Droid's CLI parser continues parsing options after positional args. // Insert `--` so `-y` is treated as an argument to `npx`. @@ -1173,6 +1169,20 @@ mod tests { ); } + fn test_agent_result_config() -> crate::types::AgentResultMcpConfig { + crate::types::AgentResultMcpConfig { + callback_url: "http://127.0.0.1:3889/api/agent-result".to_string(), + token: "arr_test".to_string(), + schema: Some(json!({"type": "object"})), + } + } + + fn assert_agent_result_env_absent(env: &Value) { + assert!(env["AGENT_RELAY_RESULT_URL"].is_null()); + assert!(env["AGENT_RELAY_RESULT_TOKEN"].is_null()); + assert!(env["AGENT_RELAY_RESULT_SCHEMA"].is_null()); + } + #[test] fn creates_reaycast_mcp_config_when_missing() { let temp = tempdir().expect("tempdir"); @@ -1586,6 +1596,26 @@ mod tests { assert!(args.contains(&"RELAY_AGENT_TOKEN=tok_droid_123".to_string())); } + #[test] + fn gemini_droid_mcp_add_args_omit_agent_result_env() { + let config = test_agent_result_config(); + let args = super::gemini_droid_mcp_add_args_with_result( + Some("rk_live_xyz"), + Some("https://api.relaycast.dev"), + Some("GeminiWorker"), + Some("tok_gem_123"), + true, + None, + None, + Some(&config), + ); + + assert!( + !args.iter().any(|arg| arg.contains("AGENT_RELAY_RESULT")), + "Gemini/Droid mcp add writes shared config and must not persist per-agent result tokens" + ); + } + // ----------------------------------------------------------------------- // Codex provider tests // ----------------------------------------------------------------------- @@ -1677,6 +1707,32 @@ mod tests { ); } + #[tokio::test] + async fn codex_includes_agent_result_env_in_inline_config() { + let temp = tempdir().expect("tempdir"); + let config = test_agent_result_config(); + let args = super::configure_relaycast_mcp_with_result( + "codex", + "Agent", + None, + None, + &[], + temp.path(), + None, + None, + None, + Some(&config), + ) + .await + .expect("configure codex mcp with result"); + + assert!( + args.iter() + .any(|a| a == "mcp_servers.relaycast.env.AGENT_RELAY_RESULT_TOKEN=\"arr_test\""), + "Codex inline config can carry per-agent result callback env" + ); + } + #[tokio::test] async fn codex_omits_optional_fields_when_none() { let temp = tempdir().expect("tempdir"); @@ -1784,6 +1840,32 @@ mod tests { assert_eq!(agent["tools"]["relaycast_*"].as_bool(), Some(true)); } + #[tokio::test] + async fn opencode_result_contract_does_not_persist_callback_env() { + let temp = tempdir().expect("tempdir"); + let config = test_agent_result_config(); + let args = super::configure_relaycast_mcp_with_result( + "opencode", + "OcAgent", + Some("rk_live_oc"), + Some("https://api.relaycast.dev"), + &[], + temp.path(), + None, + None, + None, + Some(&config), + ) + .await + .expect("configure opencode mcp with result"); + + assert_eq!(args, vec!["--agent", "relaycast"]); + let contents = + fs::read_to_string(temp.path().join("opencode.json")).expect("read opencode.json"); + let json: Value = serde_json::from_str(&contents).expect("parse opencode.json"); + assert_agent_result_env_absent(&json["mcp"]["relaycast"]["environment"]); + } + #[tokio::test] async fn opencode_upserts_into_existing_config_preserving_other_keys() { let temp = tempdir().expect("tempdir"); @@ -1899,6 +1981,35 @@ mod tests { ); } + #[tokio::test] + async fn cursor_result_contract_does_not_persist_callback_env() { + let temp = tempdir().expect("tempdir"); + let config = test_agent_result_config(); + let args = super::configure_relaycast_mcp_with_result( + "cursor", + "CursorAgent", + Some("rk_live_cursor"), + Some("https://api.relaycast.dev"), + &[], + temp.path(), + None, + None, + None, + Some(&config), + ) + .await + .expect("configure cursor mcp with result"); + + assert!( + args.is_empty(), + "cursor should configure MCP via file, not CLI args" + ); + let contents = fs::read_to_string(temp.path().join(".cursor").join("mcp.json")) + .expect("read cursor mcp config"); + let json: Value = serde_json::from_str(&contents).expect("parse cursor mcp config"); + assert_agent_result_env_absent(&json["mcpServers"]["relaycast"]["env"]); + } + #[tokio::test] async fn cursor_agent_alias_writes_mcp_json_with_token() { let temp = tempdir().expect("tempdir");