diff --git a/.trajectories/completed/2026-05/traj_5k0jtc1g5l33.json b/.trajectories/completed/2026-05/traj_5k0jtc1g5l33.json new file mode 100644 index 000000000..8e0c3df43 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_5k0jtc1g5l33.json @@ -0,0 +1,53 @@ +{ + "id": "traj_5k0jtc1g5l33", + "version": 1, + "task": { + "title": "Resolve PR 932 conflicts and review comments" + }, + "status": "completed", + "startedAt": "2026-05-22T19:13:09.359Z", + "completedAt": "2026-05-22T19:13:16.971Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-22T19:13:13.303Z" + } + ], + "chapters": [ + { + "id": "chap_2zrrn7ann1rr", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-22T19:13:13.303Z", + "endedAt": "2026-05-22T19:13:16.971Z", + "events": [ + { + "ts": 1779477193304, + "type": "decision", + "content": "Kept PR-specific structured-result changelog entry while accepting current release sections", + "raw": { + "question": "Kept PR-specific structured-result changelog entry while accepting current release sections", + "chosen": "Kept PR-specific structured-result changelog entry while accepting current release sections", + "alternatives": [], + "reasoning": "Main already compacted older unreleased entries into 7.0.x release sections; retaining only this PR's unreleased user impact avoids duplicate release notes." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Resolved current PR 932 changelog conflict and addressed still-valid review comments for trajectory metadata, callback URL formatting, bearer auth parsing, and structured-result waiters.", + "approach": "Standard approach", + "confidence": 0.86 + }, + "commits": [], + "filesChanged": [], + "projectId": "repo:relay", + "tags": [], + "_trace": { + "startRef": "75771972ef5b43a624aed6dc5a01d731826020c5", + "endRef": "75771972ef5b43a624aed6dc5a01d731826020c5" + } +} diff --git a/.trajectories/completed/2026-05/traj_5k0jtc1g5l33.md b/.trajectories/completed/2026-05/traj_5k0jtc1g5l33.md new file mode 100644 index 000000000..b4fc5f4ef --- /dev/null +++ b/.trajectories/completed/2026-05/traj_5k0jtc1g5l33.md @@ -0,0 +1,31 @@ +# Trajectory: Resolve PR 932 conflicts and review comments + +> **Status:** ✅ Completed +> **Confidence:** 86% +> **Started:** May 22, 2026 at 03:13 PM +> **Completed:** May 22, 2026 at 03:13 PM + +--- + +## Summary + +Resolved current PR 932 changelog conflict and addressed still-valid review comments for trajectory metadata, callback URL formatting, bearer auth parsing, and structured-result waiters. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Kept PR-specific structured-result changelog entry while accepting current release sections +- **Chose:** Kept PR-specific structured-result changelog entry while accepting current release sections +- **Reasoning:** Main already compacted older unreleased entries into 7.0.x release sections; retaining only this PR's unreleased user impact avoids duplicate release notes. + +--- + +## Chapters + +### 1. Work +*Agent: default* + +- Kept PR-specific structured-result changelog entry while accepting current release sections diff --git a/.trajectories/completed/2026-05/traj_78ytpicts778.json b/.trajectories/completed/2026-05/traj_78ytpicts778.json new file mode 100644 index 000000000..3317c1639 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_78ytpicts778.json @@ -0,0 +1,53 @@ +{ + "id": "traj_78ytpicts778", + "version": 1, + "task": { + "title": "Address PR 932 result callback review findings" + }, + "status": "completed", + "startedAt": "2026-05-22T15:59:25.187Z", + "completedAt": "2026-05-22T16:05:12.320Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-22T16:04:55.453Z" + } + ], + "chapters": [ + { + "id": "chap_fjkgwh3kotyj", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-22T16:04:55.453Z", + "endedAt": "2026-05-22T16:05:12.320Z", + "events": [ + { + "ts": 1779465895454, + "type": "decision", + "content": "Do not persist result callback tokens in shared MCP config: Do not persist result callback tokens in shared MCP config", + "raw": { + "question": "Do not persist result callback tokens in shared MCP config", + "chosen": "Do not persist result callback tokens in shared MCP config", + "alternatives": [], + "reasoning": "The result callback token is per spawn. Gemini/Droid mcp add, opencode.json, and .cursor/mcp.json are shared or persistent surfaces, so those paths now omit AGENT_RELAY_RESULT_* and rely on inline configs or worker process env instead." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Verified PR 932 review findings. Fixed the valid shared-config token persistence issue by omitting AGENT_RELAY_RESULT_* from Gemini/Droid mcp add, opencode.json, and .cursor/mcp.json while retaining inline Codex result env. Skipped the skip_relay_prompt finding because current worker spawn already sets AGENT_RELAY_RESULT_* on the worker process even when MCP prompt injection is skipped.", + "approach": "Standard approach", + "confidence": 0.88 + }, + "commits": [], + "filesChanged": [], + "projectId": "/private/tmp/relay-pr-932-review", + "tags": [], + "_trace": { + "startRef": "3332e83756f0e905b77ae7f6a111e9787a66cf99", + "endRef": "3332e83756f0e905b77ae7f6a111e9787a66cf99" + } +} \ No newline at end of file diff --git a/.trajectories/completed/2026-05/traj_78ytpicts778.md b/.trajectories/completed/2026-05/traj_78ytpicts778.md new file mode 100644 index 000000000..7113000a6 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_78ytpicts778.md @@ -0,0 +1,31 @@ +# Trajectory: Address PR 932 result callback review findings + +> **Status:** ✅ Completed +> **Confidence:** 88% +> **Started:** May 22, 2026 at 11:59 AM +> **Completed:** May 22, 2026 at 12:05 PM + +--- + +## Summary + +Verified PR 932 review findings. Fixed the valid shared-config token persistence issue by omitting AGENT_RELAY_RESULT_* from Gemini/Droid mcp add, opencode.json, and .cursor/mcp.json while retaining inline Codex result env. Skipped the skip_relay_prompt finding because current worker spawn already sets AGENT_RELAY_RESULT_* on the worker process even when MCP prompt injection is skipped. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Do not persist result callback tokens in shared MCP config +- **Chose:** Do not persist result callback tokens in shared MCP config +- **Reasoning:** The result callback token is per spawn. Gemini/Droid mcp add, opencode.json, and .cursor/mcp.json are shared or persistent surfaces, so those paths now omit AGENT_RELAY_RESULT_* and rely on inline configs or worker process env instead. + +--- + +## Chapters + +### 1. Work +*Agent: default* + +- Do not persist result callback tokens in shared MCP config: Do not persist result callback tokens in shared MCP config diff --git a/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.json b/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.json new file mode 100644 index 000000000..5253cd04e --- /dev/null +++ b/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.json @@ -0,0 +1,53 @@ +{ + "id": "traj_ceo5q9bh2od3", + "version": 1, + "task": { + "title": "Add structured spawned-agent results" + }, + "status": "completed", + "startedAt": "2026-05-20T21:24:17.929Z", + "completedAt": "2026-05-20T21:43:51.936Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-20T21:43:48.187Z" + } + ], + "chapters": [ + { + "id": "chap_7lq3uodpwrbr", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-20T21:43:48.187Z", + "endedAt": "2026-05-20T21:43:51.936Z", + "events": [ + { + "ts": 1779313428188, + "type": "decision", + "content": "Implemented structured agent results as a broker-mediated callback token plus MCP tool", + "raw": { + "question": "Implemented structured agent results as a broker-mediated callback token plus MCP tool", + "chosen": "Implemented structured agent results as a broker-mediated callback token plus MCP tool", + "alternatives": [], + "reasoning": "The SDK can declare a result contract at spawn time, the broker mints a per-agent callback token, and the injected Relaycast MCP server exposes submit_result without requiring the spawned agent to know broker credentials." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Added structured spawned-agent result contracts across the broker, MCP tool, and TypeScript SDK, with focused Rust and Vitest coverage plus docs.", + "approach": "Standard approach", + "confidence": 0.82 + }, + "commits": [], + "filesChanged": [], + "projectId": "/private/tmp/relay-structured-agent-results", + "tags": [], + "_trace": { + "startRef": "db037b0557e7353e169e92293f4adde06e48a6c6", + "endRef": "db037b0557e7353e169e92293f4adde06e48a6c6" + } +} diff --git a/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.md b/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.md new file mode 100644 index 000000000..217a7c41b --- /dev/null +++ b/.trajectories/completed/2026-05/traj_ceo5q9bh2od3.md @@ -0,0 +1,33 @@ +# Trajectory: Add structured spawned-agent results + +> **Status:** ✅ Completed +> **Confidence:** 82% +> **Started:** May 20, 2026 at 05:24 PM +> **Completed:** May 20, 2026 at 05:43 PM + +--- + +## Summary + +Added structured spawned-agent result contracts across the broker, MCP tool, and TypeScript SDK, with focused Rust and Vitest coverage plus docs. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Implemented structured agent results as a broker-mediated callback token plus MCP tool + +- **Chose:** Implemented structured agent results as a broker-mediated callback token plus MCP tool +- **Reasoning:** The SDK can declare a result contract at spawn time, the broker mints a per-agent callback token, and the injected Relaycast MCP server exposes submit_result without requiring the spawned agent to know broker credentials. + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Implemented structured agent results as a broker-mediated callback token plus MCP tool diff --git a/.trajectories/index.json b/.trajectories/index.json index e9cb2b9c9..2effa4efb 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-22T16:57:55.540Z", + "lastUpdated": "2026-05-22T19:13:17.153Z", "trajectories": { "traj_05xg7j388bc4": { "title": "Add browser workflow step integration", @@ -1125,6 +1125,13 @@ "completedAt": "2026-05-20T11:46:17.506Z", "path": "/Users/khaliqgant/Projects/AgentWorkforce/.msd-autofix-1bdf6c0b/.trajectories/completed/2026-05/traj_af7iew24eiip.json" }, + "traj_ceo5q9bh2od3": { + "title": "Add structured spawned-agent results", + "status": "completed", + "startedAt": "2026-05-20T21:24:17.929Z", + "completedAt": "2026-05-20T21:43:51.936Z", + "path": ".trajectories/completed/2026-05/traj_ceo5q9bh2od3.json" + }, "traj_dcl9hgoiuac5": { "title": "Verify --broker-name override for agent-relay up", "status": "completed", @@ -1139,6 +1146,20 @@ "completedAt": "2026-05-21T04:14:45.063Z", "path": "/private/tmp/relay-quiet-broker-logs/.trajectories/completed/2026-05/traj_u3loicehnwb4.json" }, + "traj_78ytpicts778": { + "title": "Address PR 932 result callback review findings", + "status": "completed", + "startedAt": "2026-05-22T15:59:25.187Z", + "completedAt": "2026-05-22T16:05:12.320Z", + "path": ".trajectories/completed/2026-05/traj_78ytpicts778.json" + }, + "traj_5k0jtc1g5l33": { + "title": "Resolve PR 932 conflicts and review comments", + "status": "completed", + "startedAt": "2026-05-22T19:13:09.359Z", + "completedAt": "2026-05-22T19:13:16.971Z", + "path": ".trajectories/completed/2026-05/traj_5k0jtc1g5l33.json" + "traj_s5ojo1f4srz4": { "title": "Remove unused user-directory package", "status": "completed", diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f418692e..41540c769 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Broker and TypeScript SDK structured result contracts add the `submit_result` MCP tool, `agent.waitForResult()`, per-spawn `result.onResult`, and `relay.addListener('agentResult', ...)` for typed JSON worker outcomes. + ### Changed - Release workflow changelog generation now writes concise Keep a Changelog sections and skips web-only, release-only, trajectory, PR-review, placeholder, and withdrawn-tag entries. diff --git a/crates/broker/src/listen_api.rs b/crates/broker/src/listen_api.rs index d8562fc93..69a65963b 100644 --- a/crates/broker/src/listen_api.rs +++ b/crates/broker/src/listen_api.rs @@ -32,6 +32,7 @@ type PtyInputSerializers = Arc>, agent_token: Option, + agent_result_schema: Option, reply: tokio::sync::oneshot::Sender>, }, SetModel { @@ -175,6 +177,16 @@ pub enum ListenApiRequest { name: String, reply: tokio::sync::oneshot::Sender>, }, + /// `POST /api/agent-result` — accepts structured result payloads from the + /// per-agent MCP tool using a callback token minted at spawn time. + SubmitAgentResult { + token: String, + name: Option, + data: Value, + final_result: bool, + metadata: Option, + reply: tokio::sync::oneshot::Sender>, + }, } /// Typed errors for the inbound-delivery-mode HTTP routes. Keeps the broker arm's @@ -200,6 +212,21 @@ impl std::fmt::Display for DeliveryRouteError { impl std::error::Error for DeliveryRouteError {} +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum AgentResultRouteError { + InvalidToken, +} + +impl std::fmt::Display for AgentResultRouteError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AgentResultRouteError::InvalidToken => write!(f, "invalid_result_token"), + } + } +} + +impl std::error::Error for AgentResultRouteError {} + /// Reply payload for [`ListenApiRequest::SetInboundDeliveryMode`]. `flushed` /// is the number of pending messages drained during the transition /// (always `0` unless we transitioned `manual_flush → auto_inject`). @@ -393,6 +420,7 @@ fn listen_api_router_with_auth( Router::new() .route("/health", routing::get(listen_api_health)) + .route("/api/agent-result", routing::post(listen_api_agent_result)) .merge(protected) .with_state(state.clone()) } @@ -503,6 +531,17 @@ fn unauthorized_error_envelope() -> Value { }) } +fn bearer_token(value: &str) -> Option<&str> { + let mut parts = value.trim().splitn(2, char::is_whitespace); + let scheme = parts.next()?; + let token = parts.next()?.trim(); + if scheme.eq_ignore_ascii_case("bearer") && !token.is_empty() { + Some(token) + } else { + None + } +} + async fn listen_api_auth_middleware( axum::extract::State(state): axum::extract::State, request: axum::http::Request, @@ -524,9 +563,7 @@ async fn listen_api_auth_middleware( .headers() .get("authorization") .and_then(|value| value.to_str().ok()) - .and_then(|value| value.strip_prefix("Bearer ")) - .map(str::trim) - .filter(|value| !value.is_empty()) + .and_then(bearer_token) }); if provided != Some(expected) { @@ -616,6 +653,11 @@ async fn listen_api_spawn( .or_else(|| body.get("agentToken")) .and_then(Value::as_str) .map(String::from); + let agent_result_schema = body + .get("agent_result_schema") + .or_else(|| body.get("agentResultSchema")) + .or_else(|| body.get("resultSchema")) + .cloned(); if name.is_empty() { return ( @@ -644,6 +686,7 @@ async fn listen_api_spawn( skip_relay_prompt, restart_policy, agent_token, + agent_result_schema, reply: reply_tx, }) .await @@ -755,6 +798,82 @@ async fn listen_api_threads( } } +async fn listen_api_agent_result( + axum::extract::State(state): axum::extract::State, + headers: axum::http::HeaderMap, + axum::Json(body): axum::Json, +) -> (axum::http::StatusCode, axum::Json) { + let token = headers + .get("x-agent-result-token") + .and_then(|value| value.to_str().ok()) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(String::from) + .or_else(|| { + headers + .get("authorization") + .and_then(|value| value.to_str().ok()) + .and_then(bearer_token) + .map(String::from) + }); + let Some(token) = token else { + return ( + axum::http::StatusCode::UNAUTHORIZED, + axum::Json(json!({ "success": false, "error": "missing_result_token" })), + ); + }; + + let Some(data) = body.get("data").or_else(|| body.get("result")).cloned() else { + return ( + axum::http::StatusCode::BAD_REQUEST, + axum::Json(json!({ "success": false, "error": "Missing required field: data" })), + ); + }; + let name = body + .get("name") + .or_else(|| body.get("agent")) + .and_then(Value::as_str) + .map(String::from); + let final_result = body + .get("final") + .or_else(|| body.get("final_result")) + .and_then(Value::as_bool) + .unwrap_or(true); + let metadata = body.get("metadata").cloned(); + + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + if state + .tx + .send(ListenApiRequest::SubmitAgentResult { + token, + name, + data, + final_result, + metadata, + reply: reply_tx, + }) + .await + .is_err() + { + return ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + axum::Json(json!({ "success": false, "error": "internal channel closed" })), + ); + } + + match reply_rx.await { + Ok(Ok(value)) => (axum::http::StatusCode::OK, axum::Json(value)), + Ok(Err(AgentResultRouteError::InvalidToken)) => ( + axum::http::StatusCode::UNAUTHORIZED, + axum::Json(json!({ "success": false, "error": "invalid_result_token" })), + ), + Err(_) => ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + axum::Json(json!({ "success": false, "error": "internal reply dropped" })), + ), + } +} + async fn listen_api_release( axum::extract::State(state): axum::extract::State, axum::extract::Path(name): axum::extract::Path, @@ -2255,6 +2374,32 @@ mod auth_tests { list_replier.await.expect("list replier should complete"); } + #[tokio::test] + async fn api_route_accepts_lowercase_bearer_scheme() { + let (router, mut rx) = test_router(Some("secret")); + let list_replier = tokio::spawn(async move { + if let Some(ListenApiRequest::List { reply }) = rx.recv().await { + let _ = reply.send(Ok(json!({ "agents": [] }))); + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned") + .method("GET") + .header("authorization", "bearer secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::OK); + + list_replier.await.expect("list replier should complete"); + } + #[tokio::test] async fn spawn_route_forwards_extended_fields() { let (router, mut rx) = test_router(Some("secret")); @@ -2277,6 +2422,7 @@ mod auth_tests { skip_relay_prompt: _, restart_policy: _, agent_token: _, + agent_result_schema, reply, }) => { assert_eq!(name, "worker-a"); @@ -2295,6 +2441,10 @@ mod auth_tests { assert_eq!(shadow_mode.as_deref(), Some("subagent")); assert_eq!(continue_from.as_deref(), Some("worker-prev")); assert_eq!(idle_threshold_secs, Some(30)); + assert_eq!( + agent_result_schema, + Some(json!({"type": "object", "properties": {"ok": {"type": "boolean"}}})) + ); let _ = reply.send(Ok( json!({ "success": true, "name": "worker-a", "pid": 42 }), )); @@ -2325,6 +2475,7 @@ mod auth_tests { "shadowMode": "subagent", "continueFrom": "worker-prev", "idleThresholdSecs": 30, + "resultSchema": {"type": "object", "properties": {"ok": {"type": "boolean"}}}, }) .to_string(), )) @@ -2340,6 +2491,77 @@ mod auth_tests { spawn_replier.await.expect("spawn replier should complete"); } + #[tokio::test] + async fn agent_result_route_accepts_callback_token_without_broker_auth() { + let (router, mut rx) = test_router(Some("secret")); + + let replier = tokio::spawn(async move { + match rx.recv().await { + Some(ListenApiRequest::SubmitAgentResult { + token, + name, + data, + final_result, + metadata, + reply, + }) => { + assert_eq!(token, "arr_test"); + assert_eq!(name.as_deref(), Some("worker-a")); + assert_eq!(data, json!({"ok": true})); + assert!(final_result); + assert_eq!(metadata, Some(json!({"source": "test"}))); + let _ = reply.send(Ok(json!({"success": true, "result_id": "ar_1"}))); + } + other => panic!("unexpected request: {:?}", other.map(|_| "other")), + } + }); + + let response = router + .oneshot( + Request::builder() + .uri("/api/agent-result") + .method("POST") + .header("authorization", "bearer arr_test") + .header("content-type", "application/json") + .body(Body::from( + json!({ + "agent": "worker-a", + "data": {"ok": true}, + "metadata": {"source": "test"} + }) + .to_string(), + )) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::OK); + let body = response_json(response).await; + assert_eq!(body["result_id"], json!("ar_1")); + + replier.await.expect("result replier should complete"); + } + + #[tokio::test] + async fn agent_result_route_rejects_missing_callback_token() { + let (router, _rx) = test_router(Some("secret")); + + let response = router + .oneshot( + Request::builder() + .uri("/api/agent-result") + .method("POST") + .header("content-type", "application/json") + .body(Body::from(json!({"data": {"ok": true}}).to_string())) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + } + #[tokio::test] async fn set_model_route_forwards_request() { let (router, mut rx) = test_router(Some("secret")); diff --git a/crates/broker/src/protocol.rs b/crates/broker/src/protocol.rs index 7ad80fcf8..5e18f649d 100644 --- a/crates/broker/src/protocol.rs +++ b/crates/broker/src/protocol.rs @@ -268,6 +268,15 @@ pub enum BrokerEvent { #[serde(default)] since: Option, }, + AgentResult { + name: String, + result_id: String, + data: Value, + #[serde(rename = "final")] + final_result: bool, + #[serde(default)] + metadata: Option, + }, AgentBlockedOnSend { name: String, blocked_secs: u64, @@ -488,6 +497,36 @@ mod tests { assert_eq!(decoded, event); } + #[test] + fn agent_result_event_round_trip_with_metadata() { + let event = BrokerToSdk::Event(BrokerEvent::AgentResult { + name: "Worker1".into(), + result_id: "res_42".into(), + data: json!({"answer": 42}), + final_result: true, + metadata: Some(json!({"latency_ms": 123})), + }); + let encoded = serde_json::to_string(&event).unwrap(); + // The `final_result` field MUST serialize as `final` per the SDK wire contract. + assert!(encoded.contains("\"final\":true")); + let decoded: BrokerToSdk = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, event); + } + + #[test] + fn agent_result_event_round_trip_without_metadata() { + let event = BrokerToSdk::Event(BrokerEvent::AgentResult { + name: "Worker2".into(), + result_id: "res_7".into(), + data: json!("partial"), + final_result: false, + metadata: None, + }); + let encoded = serde_json::to_string(&event).unwrap(); + let decoded: BrokerToSdk = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, event); + } + #[test] fn broker_event_delivery_failed_round_trip() { let event = BrokerToSdk::Event(BrokerEvent::DeliveryFailed { diff --git a/crates/broker/src/relaycast/mod.rs b/crates/broker/src/relaycast/mod.rs index 42c460224..b198f0ae3 100644 --- a/crates/broker/src/relaycast/mod.rs +++ b/crates/broker/src/relaycast/mod.rs @@ -4,7 +4,10 @@ pub(crate) mod dm_participants; pub(crate) mod workspace; pub(crate) mod ws; -pub(crate) use crate::snippets::{configure_relaycast_mcp_with_token, ensure_relaycast_mcp_config}; +pub(crate) use crate::snippets::{ + configure_relaycast_mcp_with_result, configure_relaycast_mcp_with_token, + ensure_relaycast_mcp_config, +}; pub(crate) use auth::AuthClient; pub(crate) use bridge::{map_ws_broker_command, map_ws_event}; pub(crate) use dm_participants::{resolve_dm_participants_cached, DmParticipantsCache}; diff --git a/crates/broker/src/runtime/api.rs b/crates/broker/src/runtime/api.rs index 945885971..1f812d341 100644 --- a/crates/broker/src/runtime/api.rs +++ b/crates/broker/src/runtime/api.rs @@ -18,6 +18,7 @@ impl BrokerRuntime { let pending_deliveries = &mut self.pending_deliveries; let pending_requests = &mut self.pending_requests; let delivery_states = &mut self.delivery_states; + let agent_result_tokens = &mut self.agent_result_tokens; let dedup = &mut self.dedup; let recent_thread_messages = &mut self.recent_thread_messages; let delivery_retry_interval = self.delivery_retry_interval; @@ -45,6 +46,7 @@ impl BrokerRuntime { skip_relay_prompt, restart_policy, agent_token, + agent_result_schema, reply, } => { let effective_channels = if channels.is_empty() { @@ -190,6 +192,17 @@ impl BrokerRuntime { .first() .map(|workspace| workspace.workspace_id.clone()) }); + let agent_result = agent_result_schema.map(|schema| AgentResultMcpConfig { + callback_url: workers + .env_value("AGENT_RELAY_RESULT_URL") + .unwrap_or("http://127.0.0.1:3889/api/agent-result") + .to_string(), + token: format!("arr_{}", Uuid::new_v4().simple()), + schema: Some(schema), + }); + if let Some(config) = &agent_result { + agent_result_tokens.insert(config.token.clone(), name.clone()); + } match workers .spawn( spec, @@ -198,6 +211,7 @@ impl BrokerRuntime { worker_relay_key.clone(), skip_relay_prompt, spawn_workspace_id.clone(), + agent_result.clone(), ) .await { @@ -278,11 +292,49 @@ impl BrokerRuntime { }))); } Err(e) => { + if let Some(config) = &agent_result { + agent_result_tokens.remove(&config.token); + } eprintln!("[agent-relay] HTTP API: failed to spawn '{}': {}", name, e); let _ = reply.send(Err(e.to_string())); } } } + ListenApiRequest::SubmitAgentResult { + token, + name, + data, + final_result, + metadata, + reply, + } => { + let Some(agent_name) = agent_result_tokens.get(&token).cloned() else { + let _ = reply.send(Err(listen_api::AgentResultRouteError::InvalidToken)); + return; + }; + if let Some(requested_name) = name.as_deref() { + if requested_name != agent_name { + let _ = reply.send(Err(listen_api::AgentResultRouteError::InvalidToken)); + return; + } + } + + let result_id = format!("ar_{}", Uuid::new_v4().simple()); + let payload = json!({ + "kind": "agent_result", + "name": agent_name, + "result_id": result_id, + "data": data, + "final": final_result, + "metadata": metadata, + }); + let _ = send_event(sdk_out_tx, payload).await; + let _ = reply.send(Ok(json!({ + "success": true, + "name": agent_name, + "result_id": result_id, + }))); + } ListenApiRequest::SetModel { name, model, @@ -368,6 +420,7 @@ impl BrokerRuntime { } fail_pending_requests_for_worker(pending_requests, &name, "agent_released"); delivery_states.remove(&name); + agent_result_tokens.retain(|_, agent| agent != &name); state.agents.remove(&name); if paths.persist { let _ = state.save(&paths.state); diff --git a/crates/broker/src/runtime/event_loop.rs b/crates/broker/src/runtime/event_loop.rs index 1aa715523..568941bc1 100644 --- a/crates/broker/src/runtime/event_loop.rs +++ b/crates/broker/src/runtime/event_loop.rs @@ -32,6 +32,7 @@ pub(crate) struct BrokerRuntime { pub(super) terminal_failed_deliveries: HashSet, pub(super) pending_requests: HashMap, pub(super) delivery_states: HashMap, + pub(super) agent_result_tokens: HashMap, pub(super) dm_participants_cache: DmParticipantsCache, pub(super) recent_thread_messages: VecDeque, pub(super) shutdown: bool, diff --git a/crates/broker/src/runtime/init.rs b/crates/broker/src/runtime/init.rs index f8b679b67..d96363e24 100644 --- a/crates/broker/src/runtime/init.rs +++ b/crates/broker/src/runtime/init.rs @@ -1,4 +1,5 @@ use super::*; +use std::net::{IpAddr, SocketAddr}; pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { let broker_start = Instant::now(); @@ -134,7 +135,8 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re let listener = tokio::net::TcpListener::bind(&bind_addr) .await .with_context(|| format!("failed to bind API on {}", bind_addr))?; - let actual_port = listener.local_addr()?.port(); + let local_addr = listener.local_addr()?; + let actual_port = local_addr.port(); log_startup_phase( startup_debug, broker_start, @@ -335,9 +337,14 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re ); } + let callback_host = callback_host_for_url(&cmd.api_bind, local_addr); let mut worker_env = vec![ ("RELAY_BASE_URL".to_string(), http_base.clone()), ("RELAY_API_KEY".to_string(), relay_workspace_key.clone()), + ( + "AGENT_RELAY_RESULT_URL".to_string(), + format!("http://{}:{}/api/agent-result", callback_host, actual_port), + ), ( "RELAY_WORKSPACES_JSON".to_string(), relay_workspaces_json.clone(), @@ -416,6 +423,7 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re // are created lazily on first lookup and removed wherever workers // exit (`Release` arm or `reap_exited` sweep). let delivery_states: HashMap = HashMap::new(); + let agent_result_tokens: HashMap = HashMap::new(); let dm_participants_cache = DmParticipantsCache::new(); let recent_thread_messages: VecDeque = VecDeque::new(); if !pending_deliveries.is_empty() { @@ -480,6 +488,7 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re terminal_failed_deliveries, pending_requests, delivery_states, + agent_result_tokens, dm_participants_cache, recent_thread_messages, shutdown, @@ -492,3 +501,70 @@ pub(crate) async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Re runtime.run().await } + +fn callback_host_for_url(api_bind: &str, local_addr: SocketAddr) -> String { + let host = match unbracket_ipv6(api_bind.trim()) { + "" => { + if local_addr.is_ipv6() { + "::1" + } else { + "127.0.0.1" + } + } + "0.0.0.0" => "127.0.0.1", + "::" => "::1", + other => other, + }; + bracket_ipv6_host(host) +} + +fn unbracket_ipv6(host: &str) -> &str { + host.strip_prefix('[') + .and_then(|value| value.strip_suffix(']')) + .unwrap_or(host) +} + +fn bracket_ipv6_host(host: &str) -> String { + match host.parse::() { + Ok(IpAddr::V6(_)) => format!("[{}]", host), + _ => host.to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{Ipv4Addr, Ipv6Addr}; + + #[test] + fn callback_host_uses_family_specific_loopback_for_wildcards() { + assert_eq!( + callback_host_for_url("0.0.0.0", SocketAddr::from((Ipv4Addr::UNSPECIFIED, 3889))), + "127.0.0.1" + ); + assert_eq!( + callback_host_for_url("[::]", SocketAddr::from((Ipv6Addr::UNSPECIFIED, 3889))), + "[::1]" + ); + assert_eq!( + callback_host_for_url("::", SocketAddr::from((Ipv6Addr::UNSPECIFIED, 3889))), + "[::1]" + ); + } + + #[test] + fn callback_host_brackets_ipv6_literals() { + assert_eq!( + callback_host_for_url("::1", SocketAddr::from((Ipv6Addr::LOCALHOST, 3889))), + "[::1]" + ); + assert_eq!( + callback_host_for_url("[::1]", SocketAddr::from((Ipv6Addr::LOCALHOST, 3889))), + "[::1]" + ); + assert_eq!( + callback_host_for_url("127.0.0.1", SocketAddr::from((Ipv4Addr::LOCALHOST, 3889))), + "127.0.0.1" + ); + } +} diff --git a/crates/broker/src/runtime/maintenance.rs b/crates/broker/src/runtime/maintenance.rs index 64fef4aa6..7ea853c05 100644 --- a/crates/broker/src/runtime/maintenance.rs +++ b/crates/broker/src/runtime/maintenance.rs @@ -13,6 +13,7 @@ impl BrokerRuntime { let pending_deliveries = &mut self.pending_deliveries; let pending_requests = &mut self.pending_requests; let delivery_states = &mut self.delivery_states; + let agent_result_tokens = &mut self.agent_result_tokens; let delivery_retry_interval = self.delivery_retry_interval; let shutdown = &self.shutdown; @@ -169,6 +170,7 @@ impl BrokerRuntime { "worker_permanently_dead", ); delivery_states.remove(name); + agent_result_tokens.retain(|_, agent| agent != name); let _ = send_event( sdk_out_tx, json!({"kind":"agent_permanently_dead","name":name,"reason":reason}), @@ -215,6 +217,7 @@ impl BrokerRuntime { } fail_pending_requests_for_worker(pending_requests, name, "worker_exited"); delivery_states.remove(name); + agent_result_tokens.retain(|_, agent| agent != name); let _ = send_event( sdk_out_tx, json!({ @@ -302,6 +305,7 @@ impl BrokerRuntime { worker_relay_key, rst.skip_relay_prompt, None, + rst.agent_result.clone(), ) .await { diff --git a/crates/broker/src/runtime/mod.rs b/crates/broker/src/runtime/mod.rs index a39655365..343e40c3c 100644 --- a/crates/broker/src/runtime/mod.rs +++ b/crates/broker/src/runtime/mod.rs @@ -40,8 +40,8 @@ use crate::{ replay_buffer::{ReplayBuffer, DEFAULT_REPLAY_CAPACITY}, telemetry::{ActionSource, TelemetryClient, TelemetryEvent}, types::{ - BrokerCommandEvent, InboundDeliveryDispatch, InboundDeliveryMode, InboundDeliveryState, - InboundKind, PendingRelayMessage, + AgentResultMcpConfig, BrokerCommandEvent, InboundDeliveryDispatch, InboundDeliveryMode, + InboundDeliveryState, InboundKind, PendingRelayMessage, }, }; diff --git a/crates/broker/src/runtime/relaycast_events.rs b/crates/broker/src/runtime/relaycast_events.rs index d11f75afa..9775fe0f1 100644 --- a/crates/broker/src/runtime/relaycast_events.rs +++ b/crates/broker/src/runtime/relaycast_events.rs @@ -14,6 +14,7 @@ impl BrokerRuntime { let pending_deliveries = &mut self.pending_deliveries; let pending_requests = &mut self.pending_requests; let delivery_states = &mut self.delivery_states; + let agent_result_tokens = &mut self.agent_result_tokens; let dm_participants_cache = &mut self.dm_participants_cache; let recent_thread_messages = &mut self.recent_thread_messages; let delivery_retry_interval = self.delivery_retry_interval; @@ -107,6 +108,7 @@ impl BrokerRuntime { "relaycast_release", ); delivery_states.remove(&name); + agent_result_tokens.retain(|_, agent| agent != &name); telemetry.track(TelemetryEvent::AgentRelease { cli: String::new(), release_reason: "relaycast_release".to_string(), @@ -285,6 +287,7 @@ impl BrokerRuntime { worker_relay_key.clone(), false, Some(workspace_id.clone()), + None, ) .await { @@ -478,6 +481,7 @@ impl BrokerRuntime { worker_relay_key.clone(), false, Some(workspace_id.clone()), + None, ) .await { diff --git a/crates/broker/src/snippets.rs b/crates/broker/src/snippets.rs index 9baaf92a3..b73da9528 100644 --- a/crates/broker/src/snippets.rs +++ b/crates/broker/src/snippets.rs @@ -9,6 +9,8 @@ use anyhow::{Context, Result}; use serde_json::{Map, Value}; use tokio::process::Command; +use crate::types::AgentResultMcpConfig; + const RELAYCAST_MCP_PACKAGE: &str = "@relaycast/mcp"; const MCP_FILE: &str = ".mcp.json"; @@ -36,6 +38,7 @@ pub fn ensure_relaycast_mcp_config( None, None, None, + None, ); if !path.exists() { @@ -120,6 +123,26 @@ pub fn relaycast_mcp_config_json_with_token( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, +) -> String { + relaycast_mcp_config_json_with_result( + relay_api_key, + relay_base_url, + relay_agent_name, + relay_agent_token, + workspaces_json, + default_workspace, + None, + ) +} + +pub fn relaycast_mcp_config_json_with_result( + relay_api_key: Option<&str>, + relay_base_url: Option<&str>, + relay_agent_name: Option<&str>, + relay_agent_token: Option<&str>, + workspaces_json: Option<&str>, + default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> String { let server = relaycast_server_config( relay_api_key, @@ -128,6 +151,7 @@ pub fn relaycast_mcp_config_json_with_token( relay_agent_token, workspaces_json, default_workspace, + agent_result, ); let mut servers = Map::new(); servers.insert(RELAYCAST_SERVER.to_string(), server); @@ -145,6 +169,7 @@ pub fn relaycast_mcp_config_json_with_token( /// Later sources override earlier ones (matching Claude's own precedence). /// The relaycast entry always wins (prevents stale entries from overriding broker creds). #[allow(dead_code)] +#[allow(clippy::too_many_arguments)] fn merge_relaycast_with_project_mcp( relay_api_key: Option<&str>, relay_base_url: Option<&str>, @@ -153,6 +178,7 @@ fn merge_relaycast_with_project_mcp( cwd: &Path, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> String { merge_relaycast_with_project_mcp_inner( relay_api_key, @@ -163,6 +189,7 @@ fn merge_relaycast_with_project_mcp( dirs::home_dir(), workspaces_json, default_workspace, + agent_result, ) } @@ -178,6 +205,7 @@ fn merge_relaycast_with_project_mcp_inner( home: Option, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> String { let relaycast_server = relaycast_server_config( relay_api_key, @@ -186,6 +214,7 @@ fn merge_relaycast_with_project_mcp_inner( relay_agent_token, workspaces_json, default_workspace, + agent_result, ); let mut servers = Map::new(); @@ -236,6 +265,7 @@ fn relaycast_server_config( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> Value { let mut server = Map::new(); // Allow overriding the MCP command for local development/testing. @@ -308,6 +338,7 @@ fn relaycast_server_config( Value::String(dw.to_string()), ); } + apply_agent_result_env(&mut env, agent_result); if !env.is_empty() { server.insert("env".into(), Value::Object(env)); } @@ -315,6 +346,21 @@ fn relaycast_server_config( Value::Object(server) } +fn apply_agent_result_env( + env: &mut Map, + agent_result: Option<&AgentResultMcpConfig>, +) { + if let Some(config) = agent_result { + for (key, value) in config.env_pairs() { + env.insert(key.into(), Value::String(value)); + } + } +} + +fn escape_toml_basic_string(value: &str) -> String { + value.replace('\\', "\\\\").replace('"', "\\\"") +} + /// Inject RELAY_API_KEY into the relaycast server's env block within a merged /// MCP config JSON string. The shared `relaycast_server_config()` omits it /// (codex strips API keys from .mcp.json env), but for Claude's inline @@ -359,6 +405,29 @@ pub fn ensure_opencode_config( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, +) -> io::Result { + ensure_opencode_config_with_result( + root, + relay_api_key, + relay_base_url, + relay_agent_name, + relay_agent_token, + workspaces_json, + default_workspace, + None, + ) +} + +#[allow(clippy::too_many_arguments)] +pub fn ensure_opencode_config_with_result( + root: &Path, + relay_api_key: Option<&str>, + relay_base_url: Option<&str>, + relay_agent_name: Option<&str>, + relay_agent_token: Option<&str>, + workspaces_json: Option<&str>, + default_workspace: Option<&str>, + _agent_result: Option<&AgentResultMcpConfig>, ) -> io::Result { let path = root.join(OPENCODE_CONFIG); @@ -492,6 +561,7 @@ pub fn ensure_opencode_config( /// Write `.cursor/mcp.json` in the given directory with the Relaycast MCP server /// configured with per-agent credentials (name + token). /// Returns `true` if the config was created or updated. +#[allow(clippy::too_many_arguments)] pub fn ensure_cursor_mcp_config( root: &Path, relay_api_key: Option<&str>, @@ -500,18 +570,20 @@ pub fn ensure_cursor_mcp_config( relay_agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, + _agent_result: Option<&AgentResultMcpConfig>, ) -> io::Result { let cursor_dir = root.join(".cursor"); fs::create_dir_all(&cursor_dir)?; let path = cursor_dir.join("mcp.json"); - let mcp_json = relaycast_mcp_config_json_with_token( + let mcp_json = relaycast_mcp_config_json_with_result( relay_api_key, relay_base_url, relay_agent_name, relay_agent_token, workspaces_json, default_workspace, + None, ); let mut new_value: Value = serde_json::from_str(&mcp_json).map_err(|e| { io::Error::new( @@ -614,6 +686,34 @@ pub async fn configure_relaycast_mcp_with_token( agent_token: Option<&str>, workspaces_json: Option<&str>, default_workspace: Option<&str>, +) -> Result> { + configure_relaycast_mcp_with_result( + cli, + agent_name, + api_key, + base_url, + existing_args, + cwd, + agent_token, + workspaces_json, + default_workspace, + None, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +pub async fn configure_relaycast_mcp_with_result( + cli: &str, + agent_name: &str, + api_key: Option<&str>, + base_url: Option<&str>, + existing_args: &[String], + cwd: &Path, + agent_token: Option<&str>, + workspaces_json: Option<&str>, + default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> Result> { let cli_lower = detect_cli_name(cli).to_lowercase(); let is_claude = cli_lower == "claude" || cli_lower.starts_with("claude:"); @@ -638,13 +738,14 @@ pub async fn configure_relaycast_mcp_with_token( // MCP servers (filesystem, database, etc.). // We do NOT pass --strict-mcp-config — that would block .mcp.json loading // and prevent child agents from inheriting MCP servers. - let mcp_json = relaycast_mcp_config_json_with_token( + let mcp_json = relaycast_mcp_config_json_with_result( api_key, base_url, Some(agent_name), agent_token, workspaces_json, default_workspace, + agent_result, ); // Claude Code does not reliably pass parent process env vars to MCP server // subprocesses, so RELAY_API_KEY must be injected directly into the config. @@ -685,18 +786,27 @@ pub async fn configure_relaycast_mcp_with_token( if let Some(key) = api_key { args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_API_KEY=\"{key}\""), + format!( + "mcp_servers.relaycast.env.RELAY_API_KEY=\"{}\"", + escape_toml_basic_string(key) + ), ]); } if let Some(url) = base_url { args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_BASE_URL=\"{url}\""), + format!( + "mcp_servers.relaycast.env.RELAY_BASE_URL=\"{}\"", + escape_toml_basic_string(url) + ), ]); } args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_AGENT_NAME=\"{agent_name}\""), + format!( + "mcp_servers.relaycast.env.RELAY_AGENT_NAME=\"{}\"", + escape_toml_basic_string(agent_name) + ), ]); args.extend([ "--config".to_string(), @@ -709,7 +819,10 @@ pub async fn configure_relaycast_mcp_with_token( if let Some(token) = agent_token.map(str::trim).filter(|s| !s.is_empty()) { args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_AGENT_TOKEN=\"{token}\""), + format!( + "mcp_servers.relaycast.env.RELAY_AGENT_TOKEN=\"{}\"", + escape_toml_basic_string(token) + ), ]); // Skip bootstrap when the broker has already pre-registered the agent. args.extend([ @@ -720,20 +833,38 @@ pub async fn configure_relaycast_mcp_with_token( // Forward multi-workspace context to codex child agents. // JSON values must have inner quotes escaped for TOML basic-string parsing. if let Some(wj) = workspaces_json.map(str::trim).filter(|s| !s.is_empty()) { - let escaped = wj.replace('\\', "\\\\").replace('"', "\\\""); args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_WORKSPACES_JSON=\"{escaped}\""), + format!( + "mcp_servers.relaycast.env.RELAY_WORKSPACES_JSON=\"{}\"", + escape_toml_basic_string(wj) + ), ]); } if let Some(dw) = default_workspace.map(str::trim).filter(|s| !s.is_empty()) { - let escaped = dw.replace('\\', "\\\\").replace('"', "\\\""); args.extend([ "--config".to_string(), - format!("mcp_servers.relaycast.env.RELAY_DEFAULT_WORKSPACE=\"{escaped}\""), + format!( + "mcp_servers.relaycast.env.RELAY_DEFAULT_WORKSPACE=\"{}\"", + escape_toml_basic_string(dw) + ), ]); } + if let Some(config) = agent_result { + for (key, value) in config.env_pairs() { + args.extend([ + "--config".to_string(), + format!( + "mcp_servers.relaycast.env.{key}=\"{}\"", + escape_toml_basic_string(&value) + ), + ]); + } + } } else if is_gemini || is_droid { + // Result callback tokens are per-spawn secrets. Gemini/Droid, OpenCode, + // and Cursor write shared config surfaces, so keep result callback env + // limited to the worker process env instead of persisting it in those files. if is_gemini { ensure_gemini_folder_trusted(cwd); } @@ -746,10 +877,11 @@ pub async fn configure_relaycast_mcp_with_token( is_gemini, workspaces_json, default_workspace, + None, ) .await?; } else if is_opencode && !existing_args.iter().any(|a| a == "--agent") { - ensure_opencode_config( + ensure_opencode_config_with_result( cwd, api_key, base_url, @@ -757,6 +889,7 @@ pub async fn configure_relaycast_mcp_with_token( agent_token, workspaces_json, default_workspace, + None, ) .with_context(|| { "failed to write opencode.json for relaycast MCP. \ @@ -773,6 +906,7 @@ pub async fn configure_relaycast_mcp_with_token( agent_token, workspaces_json, default_workspace, + None, ) .with_context(|| { "failed to write .cursor/mcp.json for relaycast MCP. \ @@ -852,6 +986,7 @@ fn gemini_droid_manual_mcp_add_cmd(cli: &str, is_gemini: bool) -> String { ) } +#[cfg(test)] fn gemini_droid_mcp_add_args( api_key: Option<&str>, base_url: Option<&str>, @@ -860,6 +995,29 @@ fn gemini_droid_mcp_add_args( is_gemini: bool, workspaces_json: Option<&str>, default_workspace: Option<&str>, +) -> Vec { + gemini_droid_mcp_add_args_with_result( + api_key, + base_url, + agent_name, + agent_token, + is_gemini, + workspaces_json, + default_workspace, + None, + ) +} + +#[allow(clippy::too_many_arguments)] +fn gemini_droid_mcp_add_args_with_result( + api_key: Option<&str>, + base_url: Option<&str>, + agent_name: Option<&str>, + agent_token: Option<&str>, + is_gemini: bool, + workspaces_json: Option<&str>, + default_workspace: Option<&str>, + _agent_result: Option<&AgentResultMcpConfig>, ) -> Vec { let env_flag = gemini_droid_mcp_env_flag(is_gemini); let mut args = vec!["mcp".to_string(), "add".to_string()]; @@ -917,6 +1075,7 @@ async fn configure_gemini_droid_mcp( is_gemini: bool, workspaces_json: Option<&str>, default_workspace: Option<&str>, + agent_result: Option<&AgentResultMcpConfig>, ) -> Result<()> { // Extract the executable from cli which may contain inline args // (e.g. "gemini --model foo"). Command::new needs just the binary. @@ -935,7 +1094,7 @@ async fn configure_gemini_droid_mcp( .and_then(|mut c| c.wait()); let mut mcp_cmd = Command::new(&exe); - mcp_cmd.args(gemini_droid_mcp_add_args( + mcp_cmd.args(gemini_droid_mcp_add_args_with_result( api_key, base_url, agent_name, @@ -943,6 +1102,7 @@ async fn configure_gemini_droid_mcp( is_gemini, workspaces_json, default_workspace, + agent_result, )); mcp_cmd .stdin(Stdio::null()) @@ -996,7 +1156,7 @@ fn write_pretty_json(path: &Path, value: &Value) -> io::Result<()> { mod tests { use std::fs; - use serde_json::{Map, Value}; + use serde_json::{json, Map, Value}; use tempfile::tempdir; use super::ensure_relaycast_mcp_config; @@ -1009,6 +1169,20 @@ mod tests { ); } + fn test_agent_result_config() -> crate::types::AgentResultMcpConfig { + crate::types::AgentResultMcpConfig { + callback_url: "http://127.0.0.1:3889/api/agent-result".to_string(), + token: "arr_test".to_string(), + schema: Some(json!({"type": "object"})), + } + } + + fn assert_agent_result_env_absent(env: &Value) { + assert!(env["AGENT_RELAY_RESULT_URL"].is_null()); + assert!(env["AGENT_RELAY_RESULT_TOKEN"].is_null()); + assert!(env["AGENT_RELAY_RESULT_SCHEMA"].is_null()); + } + #[test] fn creates_reaycast_mcp_config_when_missing() { let temp = tempdir().expect("tempdir"); @@ -1422,6 +1596,26 @@ mod tests { assert!(args.contains(&"RELAY_AGENT_TOKEN=tok_droid_123".to_string())); } + #[test] + fn gemini_droid_mcp_add_args_omit_agent_result_env() { + let config = test_agent_result_config(); + let args = super::gemini_droid_mcp_add_args_with_result( + Some("rk_live_xyz"), + Some("https://api.relaycast.dev"), + Some("GeminiWorker"), + Some("tok_gem_123"), + true, + None, + None, + Some(&config), + ); + + assert!( + !args.iter().any(|arg| arg.contains("AGENT_RELAY_RESULT")), + "Gemini/Droid mcp add writes shared config and must not persist per-agent result tokens" + ); + } + // ----------------------------------------------------------------------- // Codex provider tests // ----------------------------------------------------------------------- @@ -1513,6 +1707,32 @@ mod tests { ); } + #[tokio::test] + async fn codex_includes_agent_result_env_in_inline_config() { + let temp = tempdir().expect("tempdir"); + let config = test_agent_result_config(); + let args = super::configure_relaycast_mcp_with_result( + "codex", + "Agent", + None, + None, + &[], + temp.path(), + None, + None, + None, + Some(&config), + ) + .await + .expect("configure codex mcp with result"); + + assert!( + args.iter() + .any(|a| a == "mcp_servers.relaycast.env.AGENT_RELAY_RESULT_TOKEN=\"arr_test\""), + "Codex inline config can carry per-agent result callback env" + ); + } + #[tokio::test] async fn codex_omits_optional_fields_when_none() { let temp = tempdir().expect("tempdir"); @@ -1620,6 +1840,32 @@ mod tests { assert_eq!(agent["tools"]["relaycast_*"].as_bool(), Some(true)); } + #[tokio::test] + async fn opencode_result_contract_does_not_persist_callback_env() { + let temp = tempdir().expect("tempdir"); + let config = test_agent_result_config(); + let args = super::configure_relaycast_mcp_with_result( + "opencode", + "OcAgent", + Some("rk_live_oc"), + Some("https://api.relaycast.dev"), + &[], + temp.path(), + None, + None, + None, + Some(&config), + ) + .await + .expect("configure opencode mcp with result"); + + assert_eq!(args, vec!["--agent", "relaycast"]); + let contents = + fs::read_to_string(temp.path().join("opencode.json")).expect("read opencode.json"); + let json: Value = serde_json::from_str(&contents).expect("parse opencode.json"); + assert_agent_result_env_absent(&json["mcp"]["relaycast"]["environment"]); + } + #[tokio::test] async fn opencode_upserts_into_existing_config_preserving_other_keys() { let temp = tempdir().expect("tempdir"); @@ -1735,6 +1981,35 @@ mod tests { ); } + #[tokio::test] + async fn cursor_result_contract_does_not_persist_callback_env() { + let temp = tempdir().expect("tempdir"); + let config = test_agent_result_config(); + let args = super::configure_relaycast_mcp_with_result( + "cursor", + "CursorAgent", + Some("rk_live_cursor"), + Some("https://api.relaycast.dev"), + &[], + temp.path(), + None, + None, + None, + Some(&config), + ) + .await + .expect("configure cursor mcp with result"); + + assert!( + args.is_empty(), + "cursor should configure MCP via file, not CLI args" + ); + let contents = fs::read_to_string(temp.path().join(".cursor").join("mcp.json")) + .expect("read cursor mcp config"); + let json: Value = serde_json::from_str(&contents).expect("parse cursor mcp config"); + assert_agent_result_env_absent(&json["mcpServers"]["relaycast"]["env"]); + } + #[tokio::test] async fn cursor_agent_alias_writes_mcp_json_with_token() { let temp = tempdir().expect("tempdir"); @@ -2072,6 +2347,7 @@ mod tests { temp.path(), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2109,6 +2385,7 @@ mod tests { temp.path(), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2131,6 +2408,7 @@ mod tests { temp.path(), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2180,6 +2458,7 @@ mod tests { &project, None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2356,6 +2635,7 @@ mod tests { Some(fake_home), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2403,6 +2683,7 @@ mod tests { Some(fake_home), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2447,6 +2728,7 @@ mod tests { Some(fake_home), None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2494,6 +2776,7 @@ mod tests { None, None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2553,6 +2836,7 @@ mod tests { None, None, None, + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); let servers = parsed["mcpServers"].as_object().expect("mcpServers"); @@ -2595,8 +2879,15 @@ mod tests { #[test] fn relaycast_server_config_includes_workspace_vars() { - let server = - super::relaycast_server_config(None, None, None, None, Some("wj-json"), Some("ws-id")); + let server = super::relaycast_server_config( + None, + None, + None, + None, + Some("wj-json"), + Some("ws-id"), + None, + ); let env = &server["env"]; assert_eq!( env["RELAY_WORKSPACES_JSON"].as_str(), @@ -2610,9 +2901,48 @@ mod tests { ); } + #[test] + fn relaycast_server_config_includes_agent_result_vars() { + let schema = json!({ + "type": "object", + "properties": { + "ok": { "type": "boolean" } + }, + "required": ["ok"] + }); + let schema_json = schema.to_string(); + let config = crate::types::AgentResultMcpConfig { + callback_url: "http://127.0.0.1:3889/api/agent-result".to_string(), + token: "arr_test".to_string(), + schema: Some(schema.clone()), + }; + + let server = super::relaycast_server_config( + None, + None, + Some("agent-1"), + None, + None, + None, + Some(&config), + ); + let env = &server["env"]; + + assert_eq!( + env["AGENT_RELAY_RESULT_URL"].as_str(), + Some("http://127.0.0.1:3889/api/agent-result") + ); + assert_eq!(env["AGENT_RELAY_RESULT_TOKEN"].as_str(), Some("arr_test")); + assert_eq!( + env["AGENT_RELAY_RESULT_SCHEMA"].as_str(), + Some(schema_json.as_str()) + ); + } + #[test] fn relaycast_server_config_empty_workspace_vars_omitted() { - let server = super::relaycast_server_config(None, None, None, None, Some(""), Some(" ")); + let server = + super::relaycast_server_config(None, None, None, None, Some(""), Some(" "), None); // env may be absent (Value::Null) or present but without workspace keys let env = &server["env"]; assert!( @@ -2637,6 +2967,7 @@ mod tests { None, Some("wj"), Some("dw"), + None, ); let parsed: Value = serde_json::from_str(&merged).expect("valid JSON"); assert_eq!( diff --git a/crates/broker/src/supervisor.rs b/crates/broker/src/supervisor.rs index 20a2c2cff..d1c00f28e 100644 --- a/crates/broker/src/supervisor.rs +++ b/crates/broker/src/supervisor.rs @@ -10,6 +10,7 @@ use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; use crate::protocol::AgentSpec; +use crate::types::AgentResultMcpConfig; /// Configurable restart policy attached to an agent at spawn time. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -58,6 +59,7 @@ struct RestartState { pub initial_task: Option, pub parent: Option, pub skip_relay_prompt: bool, + pub agent_result: Option, } /// Decision returned by the supervisor after an agent exits. @@ -74,6 +76,7 @@ pub struct PendingRestart { pub initial_task: Option, pub restart_count: u32, pub skip_relay_prompt: bool, + pub agent_result: Option, } /// Manages restart state for all supervised agents. @@ -95,6 +98,7 @@ impl Supervisor { } /// Register an agent for supervision. Called at spawn time. + #[allow(clippy::too_many_arguments)] pub fn register( &mut self, name: &str, @@ -103,6 +107,7 @@ impl Supervisor { initial_task: Option, skip_relay_prompt: bool, policy: RestartPolicy, + agent_result: Option, ) { self.states.insert( name.to_string(), @@ -115,6 +120,7 @@ impl Supervisor { initial_task, parent, skip_relay_prompt, + agent_result, }, ); } @@ -192,6 +198,7 @@ impl Supervisor { initial_task: state.initial_task.clone(), restart_count: state.total_restarts + 1, skip_relay_prompt: state.skip_relay_prompt, + agent_result: state.agent_result.clone(), }, )) } else { @@ -269,6 +276,7 @@ mod tests { None, false, RestartPolicy::default(), + None, ); assert!(sup.is_supervised("w1")); @@ -292,6 +300,7 @@ mod tests { Some("do stuff".into()), true, RestartPolicy::default(), + None, ); let decision = sup.on_exit("w1", Some(1), None).unwrap(); @@ -311,7 +320,7 @@ mod tests { max_consecutive_failures: 10, // high so this doesn't trigger ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); // First crash -> restart assert!(matches!( @@ -340,7 +349,7 @@ mod tests { max_restarts: 10, // high so this doesn't trigger ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); // Crash 1 -> consecutive=1, restart assert!(matches!( @@ -368,7 +377,7 @@ mod tests { max_restarts: 10, ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); // Two crashes sup.on_exit("w1", Some(1), None); @@ -391,7 +400,7 @@ mod tests { enabled: false, ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); let decision = sup.on_exit("w1", Some(1), None).unwrap(); assert!(matches!(decision, RestartDecision::PermanentlyDead { .. })); @@ -407,6 +416,7 @@ mod tests { None, false, RestartPolicy::default(), + None, ); sup.unregister("w1"); @@ -428,6 +438,7 @@ mod tests { Some("task".into()), true, policy, + None, ); sup.on_exit("w1", Some(1), None); @@ -449,7 +460,7 @@ mod tests { cooldown_ms: 60_000, // 60 seconds ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, false, policy); + sup.register("w1", test_spec("w1"), None, None, false, policy, None); sup.on_exit("w1", Some(1), None); @@ -468,6 +479,7 @@ mod tests { None, false, RestartPolicy::default(), + None, ); assert_eq!(sup.restart_count("w1"), 0); @@ -488,7 +500,7 @@ mod tests { cooldown_ms: 0, ..Default::default() }; - sup.register("w1", test_spec("w1"), None, None, true, policy); + sup.register("w1", test_spec("w1"), None, None, true, policy, None); sup.on_exit("w1", Some(1), None); diff --git a/crates/broker/src/types.rs b/crates/broker/src/types.rs index 4ee20451e..bfe1099fa 100644 --- a/crates/broker/src/types.rs +++ b/crates/broker/src/types.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use serde_json::Value; use crate::protocol::MessageInjectionMode; @@ -214,6 +215,30 @@ pub struct InboundDeliveryState { pub pending: std::collections::VecDeque, } +/// Per-spawn structured result callback configuration. The broker generates a +/// token for agents spawned with a result contract and injects this into that +/// agent's MCP server environment. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AgentResultMcpConfig { + pub callback_url: String, + pub token: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub schema: Option, +} + +impl AgentResultMcpConfig { + pub fn env_pairs(&self) -> Vec<(&'static str, String)> { + let mut pairs = vec![ + ("AGENT_RELAY_RESULT_URL", self.callback_url.clone()), + ("AGENT_RELAY_RESULT_TOKEN", self.token.clone()), + ]; + if let Some(schema) = &self.schema { + pairs.push(("AGENT_RELAY_RESULT_SCHEMA", schema.to_string())); + } + pairs + } +} + /// Per-worker cap on the pending queue. Prevents unbounded growth when a /// `manual_flush` delivery mode is left open for hours; oldest message is evicted /// with a `tracing::warn!` (see [`InboundDeliveryState::push_pending`]). diff --git a/crates/broker/src/worker.rs b/crates/broker/src/worker.rs index 059011061..2b952c17e 100644 --- a/crates/broker/src/worker.rs +++ b/crates/broker/src/worker.rs @@ -8,8 +8,9 @@ use std::{ use crate::{ metrics::MetricsCollector, protocol::{AgentRuntime, AgentSpec, ProtocolEnvelope, RelayDelivery, PROTOCOL_VERSION}, - relaycast::configure_relaycast_mcp_with_token, + relaycast::configure_relaycast_mcp_with_result, supervisor::Supervisor, + types::AgentResultMcpConfig, }; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; @@ -153,6 +154,7 @@ impl WorkerRegistry { .map(|(_, v)| v.as_str()) } + #[allow(clippy::too_many_arguments)] async fn build_mcp_args( &self, cli_name: &str, @@ -161,11 +163,17 @@ impl WorkerRegistry { cwd: &Path, worker_relay_api_key: Option<&str>, skip_relay_prompt: bool, + agent_result: Option<&AgentResultMcpConfig>, ) -> Result> { + // `skip_relay_prompt` is an explicit opt-out: the caller does not want the + // relaycast MCP server (messaging/channel/etc. tools) injected, e.g. to + // save tokens. We honor that even when `agent_result` is configured — + // `AGENT_RELAY_RESULT_*` env vars are still set on the worker process + // below, so a separately-configured relaycast MCP can pick them up. if skip_relay_prompt { return Ok(Vec::new()); } - configure_relaycast_mcp_with_token( + configure_relaycast_mcp_with_result( cli_name, agent_name, self.env_value("RELAY_API_KEY"), @@ -175,6 +183,7 @@ impl WorkerRegistry { worker_relay_api_key, self.env_value("RELAY_WORKSPACES_JSON"), self.env_value("RELAY_DEFAULT_WORKSPACE"), + agent_result, ) .await } @@ -187,6 +196,7 @@ impl WorkerRegistry { self.workers.get(name).and_then(|h| h.child.id()) } + #[allow(clippy::too_many_arguments)] pub(crate) async fn spawn( &mut self, spec: AgentSpec, @@ -195,6 +205,7 @@ impl WorkerRegistry { worker_relay_api_key: Option, skip_relay_prompt: bool, workspace_id: Option, + agent_result: Option, ) -> Result { let mut spec = spec; if self.workers.contains_key(&spec.name) { @@ -281,6 +292,7 @@ impl WorkerRegistry { Path::new(spec.cwd.as_deref().unwrap_or(".")), worker_relay_api_key.as_deref(), skip_relay_prompt, + agent_result.as_ref(), ) .await?; @@ -345,6 +357,7 @@ impl WorkerRegistry { Path::new(spec.cwd.as_deref().unwrap_or(".")), worker_relay_api_key.as_deref(), skip_relay_prompt, + agent_result.as_ref(), ) .await?; @@ -383,6 +396,11 @@ impl WorkerRegistry { for (key, value) in &self.worker_env { command.env(key, value); } + if let Some(config) = &agent_result { + for (key, value) in config.env_pairs() { + command.env(key, value); + } + } if !skip_relay_prompt && !matches!(spec.runtime, AgentRuntime::Headless) { if let Some(relay_key) = worker_relay_api_key { command.env("RELAY_AGENT_TOKEN", relay_key); diff --git a/package-lock.json b/package-lock.json index b80fa874d..51f079023 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17323,7 +17323,9 @@ "listr2": "^10.2.1", "tar": "^7.5.10", "ws": "^8.18.3", - "yaml": "^2.7.0" + "yaml": "^2.7.0", + "zod": "^3.23.8", + "zod-to-json-schema": "^3.23.1" }, "devDependencies": { "@types/node": "^22.13.10", diff --git a/packages/sdk/package.json b/packages/sdk/package.json index a7972ed5b..452c341d5 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -181,7 +181,9 @@ "listr2": "^10.2.1", "tar": "^7.5.10", "ws": "^8.18.3", - "yaml": "^2.7.0" + "yaml": "^2.7.0", + "zod": "^3.23.8", + "zod-to-json-schema": "^3.23.1" }, "optionalDependencies": { "@agent-relay/broker-darwin-arm64": "7.0.1", diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index c9e926115..02a524691 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -130,6 +130,25 @@ describe('AgentRelayClient orchestration payloads', () => { }); }); + it('spawnPty forwards structured result JSON schema', async () => { + const client = createProtocolClient(); + const request = vi + .spyOn((client as any).transport, 'request') + .mockResolvedValue({ name: 'agent-result', runtime: 'pty' }); + + await client.spawnPty({ + name: 'agent-result', + cli: 'claude', + agentResultSchema: { type: 'object', properties: { ok: { type: 'boolean' } } }, + }); + + const body = JSON.parse(request.mock.calls[0]?.[1]?.body ?? '{}'); + expect(body.agentResultSchema).toEqual({ + type: 'object', + properties: { ok: { type: 'boolean' } }, + }); + }); + it('spawnPty forwards model and args in the broker payload', async () => { const client = createProtocolClient(); const request = vi @@ -547,6 +566,108 @@ describe('AgentRelay orchestration handles', () => { } }); + it('agent.waitForResult resolves typed structured results and invokes hooks', async () => { + const { client, emit, mock } = createMockFacadeClient(); + + const relay = createWiredRelay(client); + const globalResults: unknown[] = []; + const callbackResults: unknown[] = []; + relay.addListener('agentResult', (result) => globalResults.push(result)); + + try { + const agent = await relay.spawnPty<{ ok: boolean }>({ + name: 'result-agent', + cli: 'claude', + channels: ['general'], + result: { + jsonSchema: { type: 'object', properties: { ok: { type: 'boolean' } }, required: ['ok'] }, + schema(value) { + if (!value || typeof value !== 'object' || typeof (value as { ok?: unknown }).ok !== 'boolean') { + throw new Error('invalid result'); + } + return value as { ok: boolean }; + }, + onResult: (data) => callbackResults.push(data), + }, + }); + + expect(mock.spawnPty).toHaveBeenCalledWith( + expect.objectContaining({ + agentResultSchema: { type: 'object', properties: { ok: { type: 'boolean' } }, required: ['ok'] }, + }) + ); + + let settled = false; + const waitPromise = agent.waitForResult(1_000).then((result) => { + settled = true; + return result; + }); + emit({ + kind: 'agent_result', + name: 'result-agent', + result_id: 'ar_partial', + data: { ok: false }, + final: false, + }); + await Promise.resolve(); + + expect(settled).toBe(false); + expect(globalResults).toHaveLength(1); + expect(callbackResults).toEqual([{ ok: false }]); + + emit({ + kind: 'agent_result', + name: 'result-agent', + result_id: 'ar_1', + data: { ok: true }, + final: true, + }); + + await expect(waitPromise).resolves.toMatchObject({ + name: 'result-agent', + resultId: 'ar_1', + data: { ok: true }, + final: true, + }); + expect(globalResults).toHaveLength(2); + expect(callbackResults).toEqual([{ ok: false }, { ok: true }]); + } finally { + await relay.shutdown(); + } + }); + + it('reusing an agent name rejects pending structured result waiters', async () => { + const { client } = createMockFacadeClient(); + + const relay = createWiredRelay(client); + + try { + const agent = await relay.spawnPty<{ ok: boolean }>({ + name: 'reused-result-agent', + cli: 'claude', + result: { jsonSchema: true }, + }); + const waiter = agent.waitForResult().then( + () => undefined, + (error) => error as Error + ); + + await relay.spawnPty<{ ok: boolean }>({ + name: 'reused-result-agent', + cli: 'claude', + result: { jsonSchema: true }, + }); + + const error = await waiter; + expect(error).toBeInstanceOf(Error); + expect(error.message).toBe( + "Agent 'reused-result-agent' lifecycle reset before structured result was submitted" + ); + } finally { + await relay.shutdown(); + } + }); + it('waitForAgentMessage waits for relay_inbound from the agent', async () => { const { client, emit } = createMockFacadeClient(); vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); diff --git a/packages/sdk/src/__tests__/unit.test.ts b/packages/sdk/src/__tests__/unit.test.ts index 5ca78af16..4445c9d54 100644 --- a/packages/sdk/src/__tests__/unit.test.ts +++ b/packages/sdk/src/__tests__/unit.test.ts @@ -82,6 +82,9 @@ function makeFakeAgentWithControls(name: string, exitAfterMs?: number): FakeAgen } return idlePromise!; }, + waitForResult() { + return Promise.reject(new Error(`No structured result for '${name}'`)); + }, async sendMessage() { return { eventId: 'fake', from: name, to: '', text: '' }; }, diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 1937f381c..958be87b2 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -177,6 +177,7 @@ function buildSpawnPtyBody(input: SpawnPtyInput): Record { ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), + ...(input.agentResultSchema !== undefined ? { agentResultSchema: input.agentResultSchema } : {}), }; } @@ -200,6 +201,7 @@ function buildSpawnProviderBody( ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), + ...(input.agentResultSchema !== undefined ? { agentResultSchema: input.agentResultSchema } : {}), transport, }; } diff --git a/packages/sdk/src/lifecycle-hooks.ts b/packages/sdk/src/lifecycle-hooks.ts index ed06dee0b..6cb72df2a 100644 --- a/packages/sdk/src/lifecycle-hooks.ts +++ b/packages/sdk/src/lifecycle-hooks.ts @@ -5,8 +5,8 @@ * * 1. **Broker events** — `agentSpawned`, `agentReleased`, `agentExited`, * `agentReady`, `agentIdle`, `agentExitRequested`, - * `agentActivityChanged`, `messageReceived`, `messageSent`, - * `workerOutput`, `deliveryUpdate`, `channelSubscribed`, + * `agentActivityChanged`, `agentResult`, `messageReceived`, + * `messageSent`, `workerOutput`, `deliveryUpdate`, `channelSubscribed`, * `channelUnsubscribed`. These fire when the broker emits the * corresponding event over the WS stream. * 2. **Call-site hooks** — `beforeAgentSpawn`, `afterAgentSpawn`, @@ -27,7 +27,7 @@ */ import type { AgentRuntime, BrokerEvent } from './protocol.js'; -import type { Agent, AgentActivityChange, Message } from './relay.js'; +import type { Agent, AgentActivityChange, AgentResult, Message } from './relay.js'; import type { SpawnPtyInput, SpawnProviderInput } from './types.js'; // ── SpawnPatch ───────────────────────────────────────────────────────────── @@ -151,6 +151,7 @@ export type AgentRelayEvents = { deliveryUpdate: [BrokerEvent]; agentExitRequested: [AgentExitRequestedPayload]; agentIdle: [AgentIdlePayload]; + agentResult: [AgentResult]; agentActivityChanged: [AgentActivityChange]; channelSubscribed: [ChannelSubscriptionPayload]; channelUnsubscribed: [ChannelSubscriptionPayload]; diff --git a/packages/sdk/src/protocol.ts b/packages/sdk/src/protocol.ts index edbace5b7..62cdc1e8a 100644 --- a/packages/sdk/src/protocol.ts +++ b/packages/sdk/src/protocol.ts @@ -428,6 +428,14 @@ export type BrokerEvent = idle_secs: number; since?: string; } + | { + kind: 'agent_result'; + name: string; + result_id: string; + data: unknown; + final: boolean; + metadata?: unknown; + } | { kind: 'agent_blocked_on_send'; name: string; diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index d336d232e..f9fbf30b1 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -28,6 +28,8 @@ import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs'; import path from 'node:path'; import { RelayCast } from '@relaycast/sdk'; +import { zodToJsonSchema } from 'zod-to-json-schema'; +import type { ZodTypeAny } from 'zod'; import { AgentRelayClient, type AgentRelayBrokerInitArgs, type AgentRelaySpawnOptions } from './client.js'; import { EventBus } from './event-bus.js'; @@ -43,7 +45,7 @@ import { type ResolvedPersona, } from './personas.js'; import { AgentRelayProtocolError } from './transport.js'; -import type { SendMessageInput, SpawnPtyInput } from './types.js'; +import type { JsonSchema, SendMessageInput, SpawnPtyInput } from './types.js'; import type { AgentRuntime, BrokerEvent, @@ -163,6 +165,37 @@ export interface Message { mode?: MessageInjectionMode; } +export interface AgentResultMeta { + name: string; + resultId: string; + final: boolean; + /** Optional diagnostic metadata about the result. Any JSON-compatible value. */ + metadata?: unknown; +} + +export interface AgentResult extends AgentResultMeta { + data: T; +} + +export type AgentResultParser = (value: unknown) => T; + +export type AgentResultSchema = + | ZodTypeAny + | { + parse?: (value: unknown) => T; + safeParse?: (value: unknown) => { success: true; data: T } | { success: false; error: unknown }; + } + | AgentResultParser; + +export interface AgentResultOptions { + /** Runtime validator/parser for submitted data. Zod schemas are supported. */ + schema?: AgentResultSchema; + /** JSON Schema exposed to the spawned agent's MCP result tool. Defaults to any JSON. */ + jsonSchema?: JsonSchema; + /** Invoked after a result arrives and passes local schema validation. */ + onResult?: (data: T, meta: AgentResultMeta) => void | Promise; +} + export type AgentStatus = 'spawning' | 'ready' | 'idle' | 'exited'; export type DeliveryWaitStatus = 'ack' | 'failed' | 'timeout'; export type DeliveryStateStatus = 'queued' | 'injected' | 'active' | 'verified' | 'failed'; @@ -239,7 +272,7 @@ export interface ReleaseOptions extends ReleaseLifecycleHooks { reason?: string; } -export interface SpawnOptions extends SpawnLifecycleHooks { +export interface SpawnOptions extends SpawnLifecycleHooks { args?: string[]; channels?: string[]; model?: string; @@ -260,14 +293,19 @@ export interface SpawnOptions extends SpawnLifecycleHooks { /** When true, skip injecting the relay MCP configuration and protocol prompt into the spawned agent. * Useful for minor tasks where relay messaging is not needed, saving tokens. */ skipRelayPrompt?: boolean; + /** + * Enables a structured-result MCP tool for the spawned agent and validates + * submissions on the SDK side. + */ + result?: AgentResultOptions; } -export interface SpawnAndWaitOptions extends SpawnOptions { +export interface SpawnAndWaitOptions extends SpawnOptions { timeoutMs?: number; waitForMessage?: boolean; } -export interface SpawnPersonaOptions extends SpawnOptions { +export interface SpawnPersonaOptions extends SpawnOptions { /** Override the spawned agent's name. Defaults to the persona id. */ name?: string; /** Initial task / user prompt for the agent. */ @@ -298,7 +336,7 @@ export interface SpawnPersonaOptions extends SpawnOptions { type AgentOutputPayload = { stream: string; chunk: string }; type AgentOutputCallback = ((chunk: string) => void) | ((data: AgentOutputPayload) => void); -export interface Agent { +export interface Agent { readonly name: string; readonly runtime: AgentRuntime; readonly channels: string[]; @@ -320,6 +358,8 @@ export interface Agent { * @param timeoutMs — optional timeout in ms. Resolves with `"idle"` when first idle event fires, * `"timeout"` if timeoutMs elapses first, or `"exited"` if the agent exits. */ waitForIdle(timeoutMs?: number): Promise<'idle' | 'timeout' | 'exited'>; + /** Wait for the structured result submitted through the spawned agent's result MCP tool. */ + waitForResult(timeoutMs?: number): Promise>; sendMessage(input: { to: string; text: string; @@ -353,10 +393,10 @@ export interface HumanHandle { } export interface AgentSpawner { - spawn(options?: SpawnerSpawnOptions): Promise; + spawn(options?: SpawnerSpawnOptions): Promise>; } -export interface SpawnerSpawnOptions extends SpawnLifecycleHooks { +export interface SpawnerSpawnOptions extends SpawnLifecycleHooks { name?: string; args?: string[]; channels?: string[]; @@ -375,10 +415,9 @@ export interface SpawnerSpawnOptions extends SpawnLifecycleHooks { /** When true, skip injecting the relay MCP configuration and protocol prompt into the spawned agent. * Useful for minor tasks where relay messaging is not needed, saving tokens. */ skipRelayPrompt?: boolean; + result?: AgentResultOptions; } -export type EventHook = ((value: T) => void) | null; - export interface AgentRelayOptions { binaryPath?: string; binaryArgs?: AgentRelayBrokerInitArgs; @@ -426,10 +465,22 @@ type OutputListener = { stream?: string; }; -type InternalAgent = Agent & { +type InternalAgent = Agent & { _setChannels: (channels: string[]) => void; }; +type InternalAgentResultContract = { + schema?: AgentResultSchema; + jsonSchema: JsonSchema; + onResult?: (data: T, meta: AgentResultMeta) => void | Promise; +}; + +type AgentResultResolver = { + resolve: (result: AgentResult) => void; + reject: (error: Error) => void; + token: number; +}; + interface AgentActivityState { active: boolean; pendingDeliveries: Map; @@ -536,6 +587,10 @@ export class AgentRelay { private readonly deliveryStates = new Map(); private readonly agentActivityStates = new Map(); private readonly outputListeners = new Map>(); + private readonly resultContracts = new Map(); + private readonly lastAgentResults = new Map>(); + private readonly resultResolvers = new Map(); + private resultResolverSeq = 0; private readonly exitResolvers = new Map< string, { resolve: (reason: 'exited' | 'released') => void; token: number } @@ -707,7 +762,9 @@ export class AgentRelay { // ── Spawning ──────────────────────────────────────────────────────────── - async spawnPty(input: SpawnPtyInput & SpawnLifecycleHooks): Promise { + async spawnPty( + input: SpawnPtyInput & SpawnLifecycleHooks & { result?: AgentResultOptions } + ): Promise> { const client = await this.ensureStarted(); if (!input.channels || input.channels.length === 0) { console.warn( @@ -724,6 +781,10 @@ export class AgentRelay { }; await this.invokeLifecycleHook(input.onStart, lifecycleContext, `spawnPty("${input.name}") onStart`); let result: { name: string; runtime: AgentRuntime }; + const resultContract = this.prepareAgentResultContract(input.result); + if (resultContract) { + this.resultContracts.set(input.name, resultContract as InternalAgentResultContract); + } try { result = await client.spawnPty({ name: input.name, @@ -740,8 +801,12 @@ export class AgentRelay { idleThresholdSecs: input.idleThresholdSecs, restartPolicy: input.restartPolicy, skipRelayPrompt: input.skipRelayPrompt, + agentResultSchema: resultContract?.jsonSchema, }); } catch (error) { + if (resultContract) { + this.resultContracts.delete(input.name); + } await this.invokeLifecycleHook( input.onError, { @@ -753,7 +818,11 @@ export class AgentRelay { throw error; } this.resetAgentLifecycleState(result.name); - const agent = this.makeAgent(result.name, result.runtime, channels); + if (result.name !== input.name && resultContract) { + this.resultContracts.delete(input.name); + this.resultContracts.set(result.name, resultContract as InternalAgentResultContract); + } + const agent = this.makeAgent(result.name, result.runtime, channels) as Agent; this.knownAgents.set(agent.name, agent); await this.invokeLifecycleHook( input.onSuccess, @@ -767,7 +836,12 @@ export class AgentRelay { return agent; } - async spawn(name: string, cli: string, task?: string, options?: SpawnOptions): Promise { + async spawn( + name: string, + cli: string, + task?: string, + options?: SpawnOptions + ): Promise> { return this.spawnPty({ name, cli, @@ -783,19 +857,25 @@ export class AgentRelay { idleThresholdSecs: options?.idleThresholdSecs, restartPolicy: options?.restartPolicy, skipRelayPrompt: options?.skipRelayPrompt, + result: options?.result, onStart: options?.onStart, onSuccess: options?.onSuccess, onError: options?.onError, }); } - async spawnAndWait(name: string, cli: string, task: string, options?: SpawnAndWaitOptions): Promise { + async spawnAndWait( + name: string, + cli: string, + task: string, + options?: SpawnAndWaitOptions + ): Promise> { const { timeoutMs, waitForMessage, ...spawnOptions } = options ?? {}; await this.spawn(name, cli, task, spawnOptions); if (waitForMessage) { - return this.waitForAgentMessage(name, timeoutMs ?? 60_000); + return this.waitForAgentMessage(name, timeoutMs ?? 60_000) as Promise>; } - return this.waitForAgentReady(name, timeoutMs ?? 60_000); + return this.waitForAgentReady(name, timeoutMs ?? 60_000) as Promise>; } /** @@ -816,7 +896,10 @@ export class AgentRelay { * @param options — overrides for tier, search dirs, name, task, and the * underlying spawn options */ - async spawnPersona(personaId: string, options: SpawnPersonaOptions = {}): Promise { + async spawnPersona( + personaId: string, + options: SpawnPersonaOptions = {} + ): Promise> { const personaCwd = options.personaCwd ?? options.cwd ?? process.cwd(); const searchDirs = options.searchDirs ?? this.defaultPersonaDirs; const loadOpts: PersonaLoadOptions = { @@ -841,7 +924,7 @@ export class AgentRelay { const task = composePersonaTask(spec, options.task); const spawnName = options.name ?? persona.id; - let agent: Agent; + let agent: Agent; try { agent = await this.spawnPty({ name: spawnName, @@ -858,6 +941,7 @@ export class AgentRelay { idleThresholdSecs: options.idleThresholdSecs, restartPolicy: options.restartPolicy, skipRelayPrompt: options.skipRelayPrompt, + result: options.result, onStart: options.onStart, onSuccess: options.onSuccess, onError: options.onError, @@ -1279,6 +1363,15 @@ export class AgentRelay { entry.resolve('exited'); } this.idleResolvers.clear(); + const shutdownError = new Error('AgentRelay shutdown before structured result was submitted'); + for (const waiters of this.resultResolvers.values()) { + for (const waiter of waiters) { + waiter.reject(shutdownError); + } + } + this.resultResolvers.clear(); + this.resultContracts.clear(); + this.lastAgentResults.clear(); } // ── Private helpers ───────────────────────────────────────────────────── @@ -1589,10 +1682,14 @@ export class AgentRelay { void this.bus.emit('agentReleased', agent); this.knownAgents.delete(event.name); this.outputListeners.delete(event.name); + this.resultContracts.delete(event.name); this.exitResolvers.get(event.name)?.resolve('released'); this.exitResolvers.delete(event.name); this.idleResolvers.get(event.name)?.resolve('exited'); this.idleResolvers.delete(event.name); + for (const waiter of this.takeResultResolvers(event.name)) { + waiter.reject(new Error(`Agent '${event.name}' was released before submitting a result`)); + } break; } case 'agent_exited': { @@ -1611,10 +1708,14 @@ export class AgentRelay { void this.bus.emit('agentExited', agent); this.knownAgents.delete(event.name); this.outputListeners.delete(event.name); + this.resultContracts.delete(event.name); this.exitResolvers.get(event.name)?.resolve('exited'); this.exitResolvers.delete(event.name); this.idleResolvers.get(event.name)?.resolve('exited'); this.idleResolvers.delete(event.name); + for (const waiter of this.takeResultResolvers(event.name)) { + waiter.reject(new Error(`Agent '${event.name}' exited before submitting a result`)); + } break; } case 'agent_exit': { @@ -1724,6 +1825,16 @@ export class AgentRelay { this.idleResolvers.delete(event.name); break; } + case 'agent_result': { + this.dispatchAgentResult(event.name, { + name: event.name, + resultId: event.result_id, + data: event.data, + final: event.final, + metadata: event.metadata ?? undefined, + }); + break; + } } if (event.kind.startsWith('delivery_') || event.kind.startsWith('message_delivery_')) { void this.bus.emit('deliveryUpdate', event); @@ -1731,7 +1842,131 @@ export class AgentRelay { }); } - private makeAgent(name: string, runtime: AgentRuntime, channels: string[]): Agent { + private prepareAgentResultContract( + options: AgentResultOptions | undefined + ): InternalAgentResultContract | undefined { + if (!options) { + return undefined; + } + return { + schema: options.schema, + jsonSchema: options.jsonSchema ?? this.schemaToJsonSchema(options.schema), + onResult: options.onResult, + }; + } + + private schemaToJsonSchema(schema: AgentResultSchema | undefined): JsonSchema { + if (schema && typeof schema === 'object' && this.isZodSchema(schema)) { + return zodToJsonSchema(schema as ZodTypeAny, { target: 'jsonSchema7' }) as JsonSchema; + } + return true; + } + + private isZodSchema(schema: object): boolean { + return '_def' in schema && typeof (schema as { safeParse?: unknown }).safeParse === 'function'; + } + + private validateAgentResult(contract: InternalAgentResultContract | undefined, value: unknown): T { + const schema = contract?.schema; + if (!schema) { + return value as T; + } + if (typeof schema === 'function') { + return schema(value) as T; + } + if (typeof schema.safeParse === 'function') { + const parsed = schema.safeParse(value); + if (parsed.success) { + return parsed.data; + } + throw new Error(`Agent result failed schema validation: ${String(parsed.error)}`); + } + if (typeof schema.parse === 'function') { + return schema.parse(value); + } + return value as T; + } + + private takeResultResolvers(name: string): AgentResultResolver[] { + const waiters = this.resultResolvers.get(name); + if (!waiters || waiters.length === 0) return []; + this.resultResolvers.delete(name); + return waiters; + } + + private dispatchAgentResult(name: string, raw: AgentResult): void { + const contract = this.resultContracts.get(name); + let result: AgentResult; + try { + const data = this.validateAgentResult(contract, raw.data); + result = { ...raw, data }; + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + for (const waiter of this.takeResultResolvers(name)) waiter.reject(err); + console.warn(`[AgentRelay] structured result from "${name}" failed validation`, err); + return; + } + + void this.bus.emit('agentResult', result); + if (contract?.onResult) { + Promise.resolve(contract.onResult(result.data, result)).catch((error) => { + console.warn(`[AgentRelay] result("${name}") onResult hook threw`, error); + }); + } + if (result.final) { + this.lastAgentResults.set(name, result); + for (const waiter of this.takeResultResolvers(name)) waiter.resolve(result); + } + } + + private waitForAgentResult(name: string, timeoutMs?: number): Promise> { + const existing = this.lastAgentResults.get(name); + if (existing) { + return Promise.resolve(existing); + } + // Don't register a waiter for an agent we don't know about and haven't + // observed a result for — the resolver would never settle. + if (!this.knownAgents.has(name) && !this.resultContracts.has(name)) { + return Promise.reject(new Error(`Agent '${name}' is not running and has no structured result`)); + } + if (timeoutMs === 0) { + return Promise.reject(new Error(`Timed out waiting for structured result from '${name}'`)); + } + return new Promise>((resolve, reject) => { + let timer: ReturnType | undefined; + const token = ++this.resultResolverSeq; + const waiter: AgentResultResolver = { + resolve: (result) => { + if (timer) clearTimeout(timer); + resolve(result); + }, + reject: (error) => { + if (timer) clearTimeout(timer); + reject(error); + }, + token, + }; + const existingWaiters = this.resultResolvers.get(name); + if (existingWaiters) { + existingWaiters.push(waiter); + } else { + this.resultResolvers.set(name, [waiter]); + } + if (timeoutMs !== undefined) { + timer = setTimeout(() => { + const list = this.resultResolvers.get(name); + if (list) { + const idx = list.findIndex((w) => w.token === token); + if (idx >= 0) list.splice(idx, 1); + if (list.length === 0) this.resultResolvers.delete(name); + } + reject(new Error(`Timed out waiting for structured result from '${name}' after ${timeoutMs}ms`)); + }, timeoutMs); + } + }); + } + + private makeAgent(name: string, runtime: AgentRuntime, channels: string[]): Agent { // eslint-disable-next-line @typescript-eslint/no-this-alias const relay = this; let agentChannels = [...channels]; @@ -1789,6 +2024,11 @@ export class AgentRelay { relay.exitResolvers.delete(name); relay.idleResolvers.get(name)?.resolve('exited'); relay.idleResolvers.delete(name); + relay.resultContracts.delete(name); + relay.lastAgentResults.delete(name); + for (const waiter of relay.takeResultResolvers(name)) { + waiter.reject(new Error(`Agent '${name}' was released before submitting a result`)); + } await relay.invokeLifecycleHook( releaseOptions.onSuccess, releaseContext, @@ -1873,6 +2113,9 @@ export class AgentRelay { } }); }, + waitForResult(timeoutMs?: number) { + return relay.waitForAgentResult(name, timeoutMs); + }, async sendMessage(input) { const client = await relay.ensureStarted(); let result: Awaited>; @@ -1945,7 +2188,7 @@ export class AgentRelay { private createSpawner(cli: string, defaultName: string, runtime: AgentRuntime): AgentSpawner { return { - spawn: async (options?) => { + spawn: async (options?: SpawnerSpawnOptions) => { const name = options?.name ?? defaultName; const channels = options?.channels ?? ['general']; const args = options?.args ?? []; @@ -1963,6 +2206,7 @@ export class AgentRelay { idleThresholdSecs: options?.idleThresholdSecs, agentToken: options?.agentToken, skipRelayPrompt: options?.skipRelayPrompt, + result: options?.result, onStart: options?.onStart, onSuccess: options?.onSuccess, onError: options?.onError, @@ -1978,6 +2222,10 @@ export class AgentRelay { }; await this.invokeLifecycleHook(options?.onStart, lifecycleContext, `spawn("${name}") onStart`); let result: { name: string; runtime: AgentRuntime }; + const resultContract = this.prepareAgentResultContract(options?.result); + if (resultContract) { + this.resultContracts.set(name, resultContract as InternalAgentResultContract); + } try { result = await client.spawnProvider({ name, @@ -1991,8 +2239,12 @@ export class AgentRelay { idleThresholdSecs: options?.idleThresholdSecs, agentToken: options?.agentToken, skipRelayPrompt: options?.skipRelayPrompt, + agentResultSchema: resultContract?.jsonSchema, }); } catch (error) { + if (resultContract) { + this.resultContracts.delete(name); + } await this.invokeLifecycleHook( options?.onError, { @@ -2005,7 +2257,11 @@ export class AgentRelay { } this.resetAgentLifecycleState(result.name); - const agent = this.makeAgent(result.name, result.runtime, channels); + if (result.name !== name && resultContract) { + this.resultContracts.delete(name); + this.resultContracts.set(result.name, resultContract as InternalAgentResultContract); + } + const agent = this.makeAgent(result.name, result.runtime, channels) as Agent; this.knownAgents.set(agent.name, agent); await this.invokeLifecycleHook( options?.onSuccess, @@ -2042,6 +2298,10 @@ export class AgentRelay { this.exitedAgents.delete(name); this.idleAgents.delete(name); this.agentActivityStates.delete(name); + this.lastAgentResults.delete(name); + for (const waiter of this.takeResultResolvers(name)) { + waiter.reject(new Error(`Agent '${name}' lifecycle reset before structured result was submitted`)); + } } private normalizeReleaseOptions(reasonOrOptions?: string | ReleaseOptions): ReleaseOptions { diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts index 32f23bdb3..232103bfe 100644 --- a/packages/sdk/src/types.ts +++ b/packages/sdk/src/types.ts @@ -10,6 +10,8 @@ import type { RestartPolicy, } from './protocol.js'; +export type JsonSchema = Record | boolean; + export interface SpawnPtyInput { name: string; cli: string; @@ -25,6 +27,7 @@ export interface SpawnPtyInput { restartPolicy?: RestartPolicy; continueFrom?: string; skipRelayPrompt?: boolean; + agentResultSchema?: JsonSchema; /** Optional pre-minted relaycast agent token (`at_live_`, from * `registerAgent(workspaceKey, name)` in `@agent-relay/sdk/http`). The * broker plumbs this as `RELAY_AGENT_TOKEN`, which the relaycast MCP @@ -42,6 +45,7 @@ export interface SpawnHeadlessInput { channels?: string[]; task?: string; skipRelayPrompt?: boolean; + agentResultSchema?: JsonSchema; /** Optional pre-minted relaycast agent token (`at_live_`, from * `registerAgent(workspaceKey, name)` in `@agent-relay/sdk/http`). The * broker plumbs this as `RELAY_AGENT_TOKEN`, which the relaycast MCP @@ -70,6 +74,7 @@ export interface SpawnProviderInput { restartPolicy?: RestartPolicy; continueFrom?: string; skipRelayPrompt?: boolean; + agentResultSchema?: JsonSchema; /** Optional pre-minted relaycast agent token (`at_live_`, from * `registerAgent(workspaceKey, name)` in `@agent-relay/sdk/http`). The * broker plumbs this as `RELAY_AGENT_TOKEN`, which the relaycast MCP diff --git a/src/cli/relaycast-mcp.startup.test.ts b/src/cli/relaycast-mcp.startup.test.ts index 573bc3c99..84efaab6b 100644 --- a/src/cli/relaycast-mcp.startup.test.ts +++ b/src/cli/relaycast-mcp.startup.test.ts @@ -335,6 +335,43 @@ describe('createPatchedRelayMcpServer', () => { }); }); + it('registers submit_result when a spawned-agent result callback is configured', async () => { + vi.stubEnv('AGENT_RELAY_RESULT_URL', 'http://127.0.0.1:3889/api/agent-result'); + vi.stubEnv('AGENT_RELAY_RESULT_TOKEN', 'arr_test'); + vi.stubEnv('AGENT_RELAY_RESULT_SCHEMA', '{"type":"object","properties":{"ok":{"type":"boolean"}}}'); + const { mod, mocks } = await loadRelaycastMcpModule(); + vi.stubGlobal( + 'fetch', + vi.fn(async () => ({ + ok: true, + status: 200, + text: async () => JSON.stringify({ success: true, result_id: 'ar_1' }), + })) + ); + + mod.createPatchedRelayMcpServer({ agentName: 'WorkerA' }); + const server = mocks.serverInstances[0]; + const submitResult = server.tools.get('submit_result'); + + expect(submitResult).toBeDefined(); + const result = await submitResult?.handler({ data: { ok: true }, metadata: { source: 'test' } }); + + expect(fetch).toHaveBeenCalledWith( + 'http://127.0.0.1:3889/api/agent-result', + expect.objectContaining({ + method: 'POST', + headers: expect.objectContaining({ Authorization: 'Bearer arr_test' }), + body: JSON.stringify({ + agent: 'WorkerA', + data: { ok: true }, + final: true, + metadata: { source: 'test' }, + }), + }) + ); + expect(result?.structuredContent).toEqual({ success: true, result_id: 'ar_1' }); + }); + it('reinitializes the websocket bridge when the workspace changes', async () => { const { mod, mocks } = await loadRelaycastMcpModule(); mod.createPatchedRelayMcpServer({ diff --git a/src/cli/relaycast-mcp.ts b/src/cli/relaycast-mcp.ts index bee1d40a1..09eb741f8 100644 --- a/src/cli/relaycast-mcp.ts +++ b/src/cli/relaycast-mcp.ts @@ -62,6 +62,12 @@ export interface PatchedMcpServerOptions { type RegistrationSession = Pick; type SessionSetter = (partial: Partial) => void; +type AgentResultCallbackConfig = { + url: string; + token: string; + schema?: unknown; + agentName?: string; +}; type RegisterAgentWithRebindArgs = { session: RegistrationSession; @@ -114,6 +120,110 @@ export function normalizeAgentType(value: string | undefined): AgentType | undef return undefined; } +function readAgentResultCallbackConfig(agentName?: string): AgentResultCallbackConfig | undefined { + const url = resolveEnv('AGENT_RELAY_RESULT_URL'); + const token = resolveEnv('AGENT_RELAY_RESULT_TOKEN'); + if (!url || !token) { + return undefined; + } + + const rawSchema = resolveEnv('AGENT_RELAY_RESULT_SCHEMA'); + let schema: unknown; + if (rawSchema) { + try { + schema = JSON.parse(rawSchema); + } catch { + schema = rawSchema; + } + } + + return { url, token, schema, agentName }; +} + +function registerAgentResultTool(server: McpServer, config: AgentResultCallbackConfig | undefined): void { + if (!config) { + return; + } + + const schemaText = + config.schema === undefined + ? '' + : ` Expected JSON schema: ${JSON.stringify(config.schema).slice(0, 4000)}`; + + server.registerTool( + 'submit_result', + { + title: 'Submit Result', + description: + 'Submit the structured result for this spawned Agent Relay task. Call this when the requested work is complete and the result object is ready.' + + schemaText, + inputSchema: { + data: z.unknown().describe('The JSON result payload requested by the spawning SDK caller.'), + final: z + .boolean() + .optional() + .describe('Whether this is the final result for the task. Defaults to true.'), + metadata: z + .record(z.string(), z.unknown()) + .optional() + .describe('Optional diagnostic metadata about the result.'), + }, + outputSchema: jsonResult, + annotations: { + readOnlyHint: false, + destructiveHint: false, + idempotentHint: false, + openWorldHint: false, + }, + }, + async ({ data, final, metadata }) => { + const timeoutMs = Number(resolveEnv('AGENT_RELAY_RESULT_TIMEOUT_MS') ?? 10_000); + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + let response: Response; + try { + response = await fetch(config.url, { + method: 'POST', + signal: controller.signal, + headers: { + Authorization: `Bearer ${config.token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + agent: config.agentName, + data, + final: final ?? true, + metadata, + }), + }); + } catch (err) { + if ((err as { name?: string }).name === 'AbortError') { + throw new Error(`Agent Relay result submission timed out after ${timeoutMs}ms`); + } + throw err; + } finally { + clearTimeout(timer); + } + const responseText = await response.text(); + let payload: Record; + try { + payload = responseText ? (JSON.parse(responseText) as Record) : {}; + } catch { + payload = { success: false, error: responseText }; + } + if (!response.ok) { + throw new Error( + `Agent Relay result submission failed (${response.status}): ${String(payload.error ?? responseText)}` + ); + } + return { + content: [{ type: 'text', text: JSON.stringify(payload, null, 2) }], + structuredContent: payload, + }; + } + ); +} + async function createWorkspace(name: string, baseUrl?: string): Promise> { const response = await fetch(`${normalizeBaseUrl(baseUrl)}/v1/workspaces`, { method: 'POST', @@ -477,6 +587,7 @@ export function createPatchedRelayMcpServer(options: PatchedMcpServerOptions): M registerMessagingTools(mcpServer, getAgentClient as never); registerFeatureTools(mcpServer, getAgentClient as never); registerProgrammabilityTools(mcpServer, getRelay as never, getAgentClient as never); + registerAgentResultTool(mcpServer, readAgentResultCallbackConfig(options.agentName)); mcpServer.registerPrompt( 'system', diff --git a/web/content/docs/event-handlers.mdx b/web/content/docs/event-handlers.mdx index faac71819..ce64a5461 100644 --- a/web/content/docs/event-handlers.mdx +++ b/web/content/docs/event-handlers.mdx @@ -1,6 +1,6 @@ --- title: 'Event handlers' -description: 'Subscribe to relay events so your app can react to messages, agent lifecycle changes, spawn intercepts, output, and delivery updates.' +description: 'Subscribe to relay events so your app can react to messages, agent lifecycle changes, structured results, spawn intercepts, output, and delivery updates.' --- Event handlers are the main way to observe what the relay is doing in real time. Register listeners on the `AgentRelay` instance to log activity, update UI, trigger follow-up work, or keep your own state in sync. @@ -149,6 +149,16 @@ relay.addListener('beforeAgentSpawn', (ctx) => { `afterAgentSpawn` exposes the post-patch `resolvedInput` so observers can see exactly what was sent. `beforeAgentRelease` / `afterAgentRelease` are observe-only. +## Structured result events + +```typescript +relay.addListener('agentResult', (result) => { + console.log(`result from ${result.name}`, result.resultId, result.data); +}); +``` + +`agentResult` fires when a spawned agent submits JSON through the `submit_result` MCP tool. Use it for global observers; use `agent.waitForResult(...)` or the per-spawn `result.onResult` callback when one caller owns a specific agent's result. + ## Output and delivery events diff --git a/web/content/docs/spawning-an-agent.mdx b/web/content/docs/spawning-an-agent.mdx index 6c4f771e6..48e9dc1eb 100644 --- a/web/content/docs/spawning-an-agent.mdx +++ b/web/content/docs/spawning-an-agent.mdx @@ -107,4 +107,26 @@ These options control how the local broker/client is started before any agents a These options are available on the shorthand helpers and on `relay.spawn(...)`: - \ No newline at end of file + + +### Structured results + +TypeScript spawns can include a `result` contract. Agent Relay exposes a `submit_result` MCP tool to the spawned agent, then routes the submitted JSON back through `agent.waitForResult(...)`, the per-spawn `onResult` callback, and `relay.addListener('agentResult', ...)`. + +```typescript TypeScript file="spawn-with-result.ts" +import { z } from 'zod'; + +const Result = z.object({ + decision: z.enum(['approve', 'request_changes']), + notes: z.array(z.string()), +}); + +const reviewer = await relay.claude.spawn({ + name: 'Reviewer', + task: 'Review the change and submit your decision as structured JSON.', + result: { schema: Result }, +}); + +const { data } = await reviewer.waitForResult(120_000); +console.log(data.decision); +``` diff --git a/web/content/docs/typescript-sdk.mdx b/web/content/docs/typescript-sdk.mdx index b1fd43313..daacf6923 100644 --- a/web/content/docs/typescript-sdk.mdx +++ b/web/content/docs/typescript-sdk.mdx @@ -110,10 +110,43 @@ interface Agent { waitForReady(timeoutMs?: number): Promise; waitForExit(timeoutMs?: number): Promise<'exited' | 'timeout' | 'released'>; waitForIdle(timeoutMs?: number): Promise<'idle' | 'timeout' | 'exited'>; + waitForResult(timeoutMs?: number): Promise; onOutput(callback: (chunk: string) => void): () => void; // returns unsubscribe } ``` +### Structured Agent Results + +Provide `result` when spawning an agent to expose a `submit_result` MCP tool inside that agent. The tool posts JSON back to the local broker, and the SDK validates it before resolving waiters or hooks. + +```typescript file="structured-result.ts" +import { AgentRelay } from '@agent-relay/sdk'; +import { z } from 'zod'; + +const relay = new AgentRelay(); + +const Summary = z.object({ + status: z.enum(['pass', 'fail']), + findings: z.array(z.string()), +}); + +const reviewer = await relay.claude.spawn({ + name: 'Reviewer', + task: 'Review the branch and submit the final JSON result.', + result: { + schema: Summary, + onResult: (summary) => { + console.log(summary.status, summary.findings.length); + }, + }, +}); + +const result = await reviewer.waitForResult(120_000); +console.log(result.data.status); +``` + +You can also pass `jsonSchema` directly when you only want to describe the expected payload to the spawned agent. Use `relay.addListener('agentResult', ...)` to observe all structured results globally. + ### `ReleaseOptions` `agent.release(...)` accepts either a reason string or a `ReleaseOptions` object: @@ -261,20 +294,21 @@ await relay.broadcast('All hands: stand by for new task'); ## Event Hooks -Assign a function to subscribe, `null` to unsubscribe: +Register listeners with `addListener`; keep the returned function to unsubscribe: ```typescript -relay.onMessageReceived = (msg: Message) => { ... } -relay.onMessageSent = (msg: Message) => { ... } -relay.onAgentSpawned = (agent: Agent) => { ... } -relay.onAgentReleased = (agent: Agent) => { ... } -relay.onAgentExited = (agent: Agent) => { ... } -relay.onAgentReady = (agent: Agent) => { ... } -relay.onAgentIdle = ({ name, idleSecs }) => { ... } -relay.onAgentActivityChanged = ({ name, active, pendingDeliveries, reason }) => { ... } -relay.onAgentExitRequested = ({ name, reason }) => { ... } -relay.onWorkerOutput = ({ name, stream, chunk }) => { ... } -relay.onDeliveryUpdate = (event: BrokerEvent) => { ... } +relay.addListener('messageReceived', (msg: Message) => { ... }) +relay.addListener('messageSent', (msg: Message) => { ... }) +relay.addListener('agentSpawned', (agent: Agent) => { ... }) +relay.addListener('agentReleased', (agent: Agent) => { ... }) +relay.addListener('agentExited', (agent: Agent) => { ... }) +relay.addListener('agentReady', (agent: Agent) => { ... }) +relay.addListener('agentIdle', ({ name, idleSecs }) => { ... }) +relay.addListener('agentActivityChanged', ({ name, active, pendingDeliveries, reason }) => { ... }) +relay.addListener('agentExitRequested', ({ name, reason }) => { ... }) +relay.addListener('workerOutput', ({ name, stream, chunk }) => { ... }) +relay.addListener('deliveryUpdate', (event: BrokerEvent) => { ... }) +relay.addListener('agentResult', (result: AgentResult) => { ... }) ``` **Message type:** @@ -367,13 +401,13 @@ import { AgentRelay, Models } from '@agent-relay/sdk'; const relay = new AgentRelay(); -relay.onMessageReceived = (msg) => { +relay.addListener('messageReceived', (msg) => { console.log(`${msg.from} → ${msg.to}: ${msg.text}`); -}; +}); -relay.onAgentSpawned = (agent) => { +relay.addListener('agentSpawned', (agent) => { console.log(`Spawned: ${agent.name}`); -}; +}); // Spawn agents const planner = await relay.claude.spawn({