From 6e420986ceb3a4d58b757f2235e69af4a6274c71 Mon Sep 17 00:00:00 2001 From: charley-openai Date: Mon, 27 Apr 2026 13:55:16 -0700 Subject: [PATCH 1/2] Ingest node_repl stderr telemetry spans --- codex-rs/codex-mcp/src/connection_manager.rs | 13 + codex-rs/codex-mcp/src/lib.rs | 3 + codex-rs/codex-mcp/src/rmcp_client.rs | 174 +++- codex-rs/core/src/mcp_tool_call.rs | 79 ++ codex-rs/core/src/mcp_tool_call_tests.rs | 77 ++ codex-rs/docs/mcp_subspan_tracing.md | 174 ++++ codex-rs/otel/src/lib.rs | 3 + codex-rs/otel/src/stderr_span_telemetry.rs | 756 ++++++++++++++++++ codex-rs/rmcp-client/src/lib.rs | 2 + codex-rs/rmcp-client/src/rmcp_client.rs | 11 +- .../rmcp-client/src/stdio_server_launcher.rs | 69 +- .../tests/process_group_cleanup.rs | 2 + codex-rs/rmcp-client/tests/resources.rs | 1 + 13 files changed, 1356 insertions(+), 8 deletions(-) create mode 100644 codex-rs/docs/mcp_subspan_tracing.md create mode 100644 codex-rs/otel/src/stderr_span_telemetry.rs diff --git a/codex-rs/codex-mcp/src/connection_manager.rs b/codex-rs/codex-mcp/src/connection_manager.rs index 483a82796a58..2b971d9a7b2d 100644 --- a/codex-rs/codex-mcp/src/connection_manager.rs +++ b/codex-rs/codex-mcp/src/connection_manager.rs @@ -564,6 +564,19 @@ impl McpConnectionManager { .server_supports_sandbox_state_meta_capability) } + pub async fn server_supports_subspan_tracing_for_tool( + &self, + server: &str, + tool: &str, + ) -> Result { + Ok(self + .client_by_name(server) + .await? + .subspan_tracing_capability + .as_ref() + .is_some_and(|capability| capability.supports_tool(tool))) + } + /// List resources from the specified server. pub async fn list_resources( &self, diff --git a/codex-rs/codex-mcp/src/lib.rs b/codex-rs/codex-mcp/src/lib.rs index 1d3fd176197f..9f8e2f378d4d 100644 --- a/codex-rs/codex-mcp/src/lib.rs +++ b/codex-rs/codex-mcp/src/lib.rs @@ -1,5 +1,8 @@ pub use connection_manager::McpConnectionManager; pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY; +pub use rmcp_client::MCP_SUBSPAN_TRACING_CAPABILITY; +pub use rmcp_client::MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL; +pub use rmcp_client::MCP_SUBSPAN_TRACING_VERSION; pub use runtime::McpRuntimeEnvironment; pub use runtime::SandboxState; pub use tools::ToolInfo; diff --git a/codex-rs/codex-mcp/src/rmcp_client.rs b/codex-rs/codex-mcp/src/rmcp_client.rs index b88942c4e91d..d9b47ff05bb3 100644 --- a/codex-rs/codex-mcp/src/rmcp_client.rs +++ b/codex-rs/codex-mcp/src/rmcp_client.rs @@ -7,7 +7,9 @@ //! [`crate::connection_manager`]. use std::borrow::Cow; +use std::collections::BTreeMap; use std::collections::HashMap; +use std::collections::HashSet; use std::env; use std::ffi::OsString; use std::sync::Arc; @@ -50,6 +52,8 @@ use codex_rmcp_client::ExecutorStdioServerLauncher; use codex_rmcp_client::LocalStdioServerLauncher; use codex_rmcp_client::RmcpClient; use codex_rmcp_client::StdioServerLauncher; +use codex_rmcp_client::StdioServerTelemetry; +use codex_rmcp_client::StdioServerTelemetrySink; use futures::future::BoxFuture; use futures::future::FutureExt; use futures::future::Shared; @@ -58,6 +62,7 @@ use rmcp::model::ElicitationCapability; use rmcp::model::FormElicitationCapability; use rmcp::model::Implementation; use rmcp::model::InitializeRequestParams; +use rmcp::model::JsonObject; use rmcp::model::ProtocolVersion; use rmcp::model::Tool as RmcpTool; use tokio_util::sync::CancellationToken; @@ -66,6 +71,12 @@ use tracing::warn; /// MCP server capability indicating that Codex should include [`SandboxState`] /// in tool-call request `_meta` under this key. pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta"; +/// MCP server capability indicating that Codex should include W3C trace context +/// in tool-call request `_meta` so the server can emit reconstructed child spans. +/// See `codex-rs/docs/mcp_subspan_tracing.md` for the protocol contract. +pub const MCP_SUBSPAN_TRACING_CAPABILITY: &str = "codex/subspan-tracing"; +pub const MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL: &str = "stderr-jsonl"; +pub const MCP_SUBSPAN_TRACING_VERSION: u64 = 1; pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms"; pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str = @@ -89,9 +100,52 @@ pub(crate) struct ManagedClient { pub(crate) tool_timeout: Option, pub(crate) server_instructions: Option, pub(crate) server_supports_sandbox_state_meta_capability: bool, + pub(crate) subspan_tracing_capability: Option, pub(crate) codex_apps_tools_cache_context: Option, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct McpSubspanTracingCapability { + tools: Option>, +} + +impl McpSubspanTracingCapability { + fn from_experimental_capability(capability: &JsonObject) -> Option { + if capability + .get("version") + .and_then(serde_json::Value::as_u64) + != Some(MCP_SUBSPAN_TRACING_VERSION) + { + return None; + } + + let supports_stderr_jsonl = capability + .get("transports") + .and_then(serde_json::Value::as_array) + .is_some_and(|transports| { + transports.iter().any(|transport| { + transport.as_str() == Some(MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL) + }) + }); + if !supports_stderr_jsonl { + return None; + } + + let tools = capability + .get("tools") + .and_then(serde_json::Value::as_object) + .map(|tools| tools.keys().cloned().collect()); + + Some(Self { tools }) + } + + pub(crate) fn supports_tool(&self, tool_name: &str) -> bool { + self.tools + .as_ref() + .is_none_or(|tools| tools.contains(tool_name)) + } +} + impl ManagedClient { fn listed_tools(&self) -> Vec { let total_start = Instant::now(); @@ -152,7 +206,9 @@ impl AsyncManagedClient { .map(|tools| filter_tools(tools, &tool_filter)); let startup_tool_filter = tool_filter; let startup_complete = Arc::new(AtomicBool::new(false)); + let subspan_tracing_enabled = Arc::new(AtomicBool::new(false)); let startup_complete_for_fut = Arc::clone(&startup_complete); + let subspan_tracing_enabled_for_fut = Arc::clone(&subspan_tracing_enabled); let cancel_token_for_fut = cancel_token.clone(); let fut = async move { let outcome = match async { @@ -167,6 +223,7 @@ impl AsyncManagedClient { store_mode, runtime_environment, runtime_auth_provider, + Arc::clone(&subspan_tracing_enabled_for_fut), ) .await?, ); @@ -182,6 +239,7 @@ impl AsyncManagedClient { tx_event, elicitation_requests, codex_apps_tools_cache_context, + subspan_tracing_enabled: subspan_tracing_enabled_for_fut, }, ) .await @@ -462,12 +520,26 @@ async fn start_server_task( tx_event, elicitation_requests, codex_apps_tools_cache_context, + subspan_tracing_enabled, } = params; let elicitation = elicitation_capability_for_server(&server_name); - let params = InitializeRequestParams { + let mut subspan_tracing_capability = JsonObject::new(); + subspan_tracing_capability.insert( + "version".to_string(), + serde_json::json!(MCP_SUBSPAN_TRACING_VERSION), + ); + subspan_tracing_capability.insert( + "transports".to_string(), + serde_json::json!([MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL]), + ); + let client_experimental_capabilities = BTreeMap::from([( + MCP_SUBSPAN_TRACING_CAPABILITY.to_string(), + subspan_tracing_capability, + )]); + let initialize_params = InitializeRequestParams { meta: None, capabilities: ClientCapabilities { - experimental: None, + experimental: Some(client_experimental_capabilities), extensions: None, roots: None, sampling: None, @@ -488,7 +560,7 @@ async fn start_server_task( let send_elicitation = elicitation_requests.make_sender(server_name.clone(), tx_event); let initialize_result = client - .initialize(params, startup_timeout, send_elicitation) + .initialize(initialize_params, startup_timeout, send_elicitation) .await .map_err(StartupOutcomeError::from)?; @@ -498,6 +570,13 @@ async fn start_server_task( .as_ref() .and_then(|exp| exp.get(MCP_SANDBOX_STATE_META_CAPABILITY)) .is_some(); + let subspan_tracing_capability = initialize_result + .capabilities + .experimental + .as_ref() + .and_then(|exp| exp.get(MCP_SUBSPAN_TRACING_CAPABILITY)) + .and_then(McpSubspanTracingCapability::from_experimental_capability); + subspan_tracing_enabled.store(subspan_tracing_capability.is_some(), Ordering::Release); let list_start = Instant::now(); let fetch_start = Instant::now(); let tools = list_tools_for_client_uncached( @@ -534,6 +613,7 @@ async fn start_server_task( tool_filter, server_instructions: initialize_result.instructions, server_supports_sandbox_state_meta_capability, + subspan_tracing_capability, codex_apps_tools_cache_context, }; @@ -547,6 +627,7 @@ struct StartServerTaskParams { tx_event: Sender, elicitation_requests: ElicitationRequestManager, codex_apps_tools_cache_context: Option, + subspan_tracing_enabled: Arc, } async fn make_rmcp_client( @@ -555,6 +636,7 @@ async fn make_rmcp_client( store_mode: OAuthCredentialsStoreMode, runtime_environment: McpRuntimeEnvironment, runtime_auth_provider: Option, + subspan_tracing_enabled: Arc, ) -> Result { let McpServerConfig { transport, @@ -607,9 +689,20 @@ async fn make_rmcp_client( // `RmcpClient` always sees a launched MCP stdio server. The // launcher hides whether that means a local child process or an // executor process whose stdin/stdout bytes cross the process API. - RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher) - .await - .map_err(|err| StartupOutcomeError::from(anyhow!(err))) + RmcpClient::new_stdio_client( + command_os, + args_os, + env_os, + &env_vars, + cwd, + launcher, + Some(subspan_tracing_telemetry_sink( + server_name, + subspan_tracing_enabled, + )), + ) + .await + .map_err(|err| StartupOutcomeError::from(anyhow!(err))) } McpServerTransportConfig::StreamableHttp { url, @@ -643,6 +736,32 @@ async fn make_rmcp_client( } } +fn subspan_tracing_telemetry_sink( + server_name: &str, + enabled: Arc, +) -> StdioServerTelemetrySink { + let server_name = server_name.to_string(); + Arc::new(move |telemetry: StdioServerTelemetry| { + if !enabled.load(Ordering::Acquire) { + return; + } + + if let Err(error) = codex_otel::emit_mcp_subspan_telemetry(telemetry.payload) { + match error { + codex_otel::StderrSpanTelemetryError::UnsupportedVersion + | codex_otel::StderrSpanTelemetryError::UnsupportedType => { + tracing::debug!( + "ignoring unsupported MCP subspan telemetry from {server_name}: {error}" + ); + } + _ => { + warn!("ignoring invalid MCP subspan telemetry from {server_name}: {error}"); + } + } + } + }) +} + #[cfg(test)] mod tests { use super::*; @@ -744,4 +863,47 @@ mod tests { assert!(meta.0.contains_key(key), "{key} should be preserved"); } } + + #[test] + fn subspan_tracing_capability_accepts_stderr_jsonl_transport_and_tool_filter() { + let capability: JsonObject = serde_json::from_value(serde_json::json!({ + "version": 1, + "transports": ["stderr-jsonl"], + "tools": { + "js": { + "attributeProfile": "browser-use-v1" + } + } + })) + .expect("capability object"); + + let capability = McpSubspanTracingCapability::from_experimental_capability(&capability) + .expect("valid capability"); + + assert!(capability.supports_tool("js")); + assert!(!capability.supports_tool("other")); + } + + #[test] + fn subspan_tracing_capability_rejects_unknown_version_or_transport() { + for capability in [ + serde_json::json!({ + "version": 2, + "transports": ["stderr-jsonl"], + }), + serde_json::json!({ + "version": 1, + "transports": ["events"], + }), + serde_json::json!({ + "version": 1, + }), + ] { + let capability: JsonObject = + serde_json::from_value(capability).expect("capability object"); + assert!( + McpSubspanTracingCapability::from_experimental_capability(&capability).is_none() + ); + } + } } diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index c93ab296b15b..98a21420c2a2 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -40,6 +40,8 @@ use codex_config::types::AppToolApproval; use codex_features::Feature; use codex_hooks::PermissionRequestDecision; use codex_mcp::CODEX_APPS_MCP_SERVER_NAME; +use codex_mcp::MCP_SUBSPAN_TRACING_CAPABILITY; +use codex_mcp::MCP_SUBSPAN_TRACING_VERSION; use codex_mcp::SandboxState; use codex_mcp::declared_openai_file_input_param_names; use codex_mcp::mcp_permission_prompt_is_auto_approved; @@ -534,6 +536,9 @@ async fn execute_mcp_tool_call( rewritten_arguments: Option, request_meta: Option, ) -> Result { + let request_meta = + augment_mcp_tool_request_meta_with_subspan_tracing(sess, server, tool_name, request_meta) + .await; let request_meta = with_mcp_tool_call_thread_id_meta(request_meta, &sess.conversation_id.to_string()); let request_meta = @@ -721,6 +726,7 @@ const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps"; const MCP_TOOL_OPENAI_OUTPUT_TEMPLATE_META_KEY: &str = "openai/outputTemplate"; const MCP_TOOL_UI_RESOURCE_URI_META_KEY: &str = "ui/resourceUri"; const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId"; +const MCP_TOOL_SUBSPAN_TRACING_META_KEY: &str = MCP_SUBSPAN_TRACING_CAPABILITY; fn custom_mcp_tool_approval_mode( turn_context: &TurnContext, @@ -780,6 +786,79 @@ fn build_mcp_tool_call_request_meta( (!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta)) } +#[expect( + clippy::await_holding_invalid_type, + reason = "MCP subspan metadata reads through the session-owned manager guard" +)] +async fn augment_mcp_tool_request_meta_with_subspan_tracing( + sess: &Session, + server: &str, + tool_name: &str, + meta: Option, +) -> Option { + let supports_subspan_tracing = sess + .services + .mcp_connection_manager + .read() + .await + .server_supports_subspan_tracing_for_tool(server, tool_name) + .await + .unwrap_or(false); + augment_mcp_tool_request_meta_with_subspan_tracing_if_supported(supports_subspan_tracing, meta) +} + +fn augment_mcp_tool_request_meta_with_subspan_tracing_if_supported( + supports_subspan_tracing: bool, + meta: Option, +) -> Option { + if !supports_subspan_tracing { + return meta; + } + + let Some(trace_context) = codex_otel::current_span_w3c_trace_context() else { + return meta; + }; + let Some(traceparent) = trace_context.traceparent else { + return meta; + }; + + let mut telemetry_meta = serde_json::Map::new(); + telemetry_meta.insert("enabled".to_string(), serde_json::Value::Bool(true)); + telemetry_meta.insert( + "version".to_string(), + serde_json::Value::Number(MCP_SUBSPAN_TRACING_VERSION.into()), + ); + telemetry_meta.insert( + "traceparent".to_string(), + serde_json::Value::String(traceparent), + ); + if let Some(tracestate) = trace_context.tracestate { + telemetry_meta.insert( + "tracestate".to_string(), + serde_json::Value::String(tracestate), + ); + } + + match meta { + Some(serde_json::Value::Object(mut map)) => { + map.insert( + MCP_TOOL_SUBSPAN_TRACING_META_KEY.to_string(), + serde_json::Value::Object(telemetry_meta), + ); + Some(serde_json::Value::Object(map)) + } + None => { + let mut map = serde_json::Map::new(); + map.insert( + MCP_TOOL_SUBSPAN_TRACING_META_KEY.to_string(), + serde_json::Value::Object(telemetry_meta), + ); + Some(serde_json::Value::Object(map)) + } + other => other, + } +} + fn with_mcp_tool_call_thread_id_meta( meta: Option, thread_id: &str, diff --git a/codex-rs/core/src/mcp_tool_call_tests.rs b/codex-rs/core/src/mcp_tool_call_tests.rs index f7fd1b08c27e..5c36f1d612bc 100644 --- a/codex-rs/core/src/mcp_tool_call_tests.rs +++ b/codex-rs/core/src/mcp_tool_call_tests.rs @@ -941,6 +941,83 @@ async fn codex_apps_tool_call_request_meta_includes_call_id_without_existing_cod ); } +#[test] +fn subspan_tracing_request_meta_includes_trace_context_when_supported() { + use opentelemetry::trace::TracerProvider as _; + use opentelemetry_sdk::trace::SdkTracerProvider; + use tracing_subscriber::prelude::*; + + let provider = SdkTracerProvider::builder().build(); + let tracer = provider.tracer("codex-core-mcp-tool-call-tests"); + let subscriber = + tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)); + let _guard = tracing::subscriber::set_default(subscriber); + + let span = tracing::info_span!("mcp.tools.call"); + let _entered = span.enter(); + + let meta = augment_mcp_tool_request_meta_with_subspan_tracing_if_supported( + /*supports_subspan_tracing*/ true, + Some(serde_json::json!({ + "threadId": "thread-live", + })), + ) + .expect("meta"); + + let telemetry = meta + .get(MCP_TOOL_SUBSPAN_TRACING_META_KEY) + .expect("subspan tracing metadata"); + assert_eq!( + telemetry.get("enabled"), + Some(&serde_json::Value::Bool(true)) + ); + assert_eq!( + telemetry.get("version"), + Some(&serde_json::json!(MCP_SUBSPAN_TRACING_VERSION)) + ); + assert!( + telemetry + .get("traceparent") + .and_then(serde_json::Value::as_str) + .is_some_and(|traceparent| traceparent.starts_with("00-")) + ); + assert_eq!( + meta.get("threadId"), + Some(&serde_json::json!("thread-live")) + ); +} + +#[test] +fn subspan_tracing_request_meta_is_only_added_when_supported_and_trace_context_exists() { + assert_eq!( + augment_mcp_tool_request_meta_with_subspan_tracing_if_supported( + /*supports_subspan_tracing*/ true, + Some(serde_json::json!({"threadId": "thread-live"})), + ), + Some(serde_json::json!({"threadId": "thread-live"})) + ); + + use opentelemetry::trace::TracerProvider as _; + use opentelemetry_sdk::trace::SdkTracerProvider; + use tracing_subscriber::prelude::*; + + let provider = SdkTracerProvider::builder().build(); + let tracer = provider.tracer("codex-core-mcp-tool-call-tests"); + let subscriber = + tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)); + let _guard = tracing::subscriber::set_default(subscriber); + let span = tracing::info_span!("mcp.tools.call"); + let _entered = span.enter(); + + assert_eq!( + augment_mcp_tool_request_meta_with_subspan_tracing_if_supported( + /*supports_subspan_tracing*/ false, + Some(serde_json::json!({"threadId": "thread-live"})), + ), + Some(serde_json::json!({"threadId": "thread-live"})) + ); +} + #[test] fn mcp_tool_call_thread_id_meta_is_added_to_request_meta() { assert_eq!( diff --git a/codex-rs/docs/mcp_subspan_tracing.md b/codex-rs/docs/mcp_subspan_tracing.md new file mode 100644 index 000000000000..721337515e1f --- /dev/null +++ b/codex-rs/docs/mcp_subspan_tracing.md @@ -0,0 +1,174 @@ +# MCP Subspan Tracing [experimental] + +This document describes Codex's experimental MCP client extension for ingesting child OpenTelemetry spans emitted by MCP servers. + +- Status: experimental and subject to change without notice +- Capability: `codex/subspan-tracing` +- Supported version: `1` +- Supported transport: `stderr-jsonl` + +## Purpose + +Codex creates an `mcp.tools.call` span around each MCP tool call. Some MCP servers perform meaningful nested work that is useful to inspect as child spans of that call. The `codex/subspan-tracing` capability lets a server opt in to receiving the active W3C trace context for a tool call and emitting sanitized span records back to Codex. + +This is intended for local stdio MCP servers. Telemetry records are out-of-band with respect to the MCP JSON-RPC stream and must not affect tool-call success or transport liveness. + +## Capability Negotiation + +Codex advertises client support during MCP `initialize`: + +```json +{ + "capabilities": { + "experimental": { + "codex/subspan-tracing": { + "version": 1, + "transports": ["stderr-jsonl"] + } + } + } +} +``` + +An MCP server opts in by returning a compatible experimental capability in its initialize result: + +```json +{ + "capabilities": { + "experimental": { + "codex/subspan-tracing": { + "version": 1, + "transports": ["stderr-jsonl"], + "tools": { + "js": { + "attributeProfile": "browser-use-v1" + } + } + } + } + } +} +``` + +If `tools` is present, Codex enables subspan tracing only for those tool names. If `tools` is omitted, Codex treats the capability as applying to every tool exposed by that server. + +Unknown versions or transports are ignored. + +## Tool Call Metadata + +For a negotiated server/tool pair, Codex adds `_meta["codex/subspan-tracing"]` while it is inside the active `mcp.tools.call` span: + +```json +{ + "_meta": { + "codex/subspan-tracing": { + "enabled": true, + "version": 1, + "traceparent": "00-00000000000000000000000000000001-0000000000000002-01", + "tracestate": "vendor=value" + } + } +} +``` + +`tracestate` is omitted when no tracestate is active. If tracing is not active, Codex does not add this metadata. + +Servers should disable subspan emission for a tool call unless: + +- `enabled` is `true` +- `version` is `1` +- `traceparent` is present and parseable + +## Stderr Transport + +For `stderr-jsonl`, each telemetry record is written to the MCP server process stderr as one line with this exact prefix: + +```text +@codex-telemetry +``` + +The rest of the line is a single JSON object: + +```text +@codex-telemetry {"v":1,"type":"span","name":"example.work",...} +``` + +Normal stderr output must not use that prefix. Codex preserves ordinary stderr logging behavior for non-telemetry lines. + +## Span Record Schema + +Span records use schema version `v: 1`: + +```json +{ + "v": 1, + "type": "span", + "name": "browser_use.tab.goto", + "trace_id": "00000000000000000000000000000001", + "span_id": "0000000000000010", + "parent_span_id": "0000000000000002", + "trace_flags": "01", + "tracestate": "vendor=value", + "start_unix_nanos": 1000000000, + "end_unix_nanos": 2000000000, + "attrs": { + "browser_use.url": "https://example.com", + "browser_use.timeout_ms": 2500 + } +} +``` + +Required fields: + +- `v`: must be `1` +- `type`: must be `"span"` +- `name`: allowlisted span name +- `trace_id`: 32 lowercase or uppercase hex characters, nonzero +- `span_id`: 16 lowercase or uppercase hex characters, nonzero +- `parent_span_id`: 16 lowercase or uppercase hex characters, nonzero +- `trace_flags`: 2 hex characters +- `start_unix_nanos`: Unix timestamp in nanoseconds +- `end_unix_nanos`: Unix timestamp in nanoseconds, greater than or equal to `start_unix_nanos` + +Optional fields: + +- `tracestate`: W3C tracestate header value +- `attrs`: span attributes object + +Codex also accepts `traceparent` for backward compatibility, but explicit IDs are the canonical protocol for reconstructed spans because they preserve parent-child relationships across records. + +## Span IDs and Hierarchy + +The first server-created span for a tool call should use: + +- `trace_id` from the request `traceparent` +- `parent_span_id` from the request `traceparent` +- a newly generated `span_id` + +Nested server spans should use the same `trace_id`, their own generated `span_id`, and the parent reconstructed span's `span_id` as `parent_span_id`. + +## Sanitization + +Servers must emit only sanitized, allowlisted attributes. Attribute values must be primitives: + +- string +- integer or float +- boolean + +Do not emit objects, arrays, cookies, credentials, auth headers, bearer tokens, full request/response payloads, arbitrary DOM text, or user-sensitive selectors. Codex applies its own allowlist and drops unsupported attributes, but servers should sanitize before writing records. + +## Failure Behavior + +Subspan telemetry is best effort: + +- malformed telemetry lines are ignored +- unsupported versions or record types are ignored +- invalid span records are ignored or logged at warning level +- telemetry write failures must not fail the MCP tool call +- telemetry parser failures must not break MCP transport +- no telemetry is emitted or reconstructed when Codex tracing is inactive + +## Current Attribute Profile + +`browser-use-v1` is the initial attribute profile used by Browser Use instrumentation. Codex currently allows Browser Use, Node REPL, and JS-related span names and attribute keys needed for that profile. New profiles should be added deliberately with their own allowlist changes and tests. + diff --git a/codex-rs/otel/src/lib.rs b/codex-rs/otel/src/lib.rs index 0ea401140e0e..328d6f0d3a29 100644 --- a/codex-rs/otel/src/lib.rs +++ b/codex-rs/otel/src/lib.rs @@ -2,6 +2,7 @@ pub(crate) mod config; mod events; pub(crate) mod metrics; pub(crate) mod provider; +mod stderr_span_telemetry; pub(crate) mod trace_context; mod otlp; @@ -23,6 +24,8 @@ pub use crate::metrics::runtime_metrics::RuntimeMetricsSummary; pub use crate::metrics::timer::Timer; pub use crate::metrics::*; pub use crate::provider::OtelProvider; +pub use crate::stderr_span_telemetry::StderrSpanTelemetryError; +pub use crate::stderr_span_telemetry::emit_mcp_subspan_telemetry; pub use crate::trace_context::context_from_w3c_trace_context; pub use crate::trace_context::current_span_trace_id; pub use crate::trace_context::current_span_w3c_trace_context; diff --git a/codex-rs/otel/src/stderr_span_telemetry.rs b/codex-rs/otel/src/stderr_span_telemetry.rs new file mode 100644 index 000000000000..0072151f0701 --- /dev/null +++ b/codex-rs/otel/src/stderr_span_telemetry.rs @@ -0,0 +1,756 @@ +use std::time::Duration; +use std::time::SystemTime; + +use opentelemetry::Context; +use opentelemetry::KeyValue; +use opentelemetry::global; +use opentelemetry::trace::Span as _; +use opentelemetry::trace::SpanContext; +use opentelemetry::trace::SpanId; +use opentelemetry::trace::SpanKind; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::trace::TraceFlags; +use opentelemetry::trace::TraceId; +use opentelemetry::trace::TraceState; +use opentelemetry::trace::Tracer; +use serde_json::Map; +use serde_json::Value; + +const MCP_SUBSPAN_TELEMETRY_TRACER_NAME: &str = "codex-mcp-subspan-stderr"; +const CURRENT_SCHEMA_VERSION: u64 = 1; +const SPAN_RECORD_TYPE: &str = "span"; +const MAX_ATTRIBUTE_STRING_BYTES: usize = 1024; + +const ALLOWED_SPAN_NAMES: &[&str] = &[ + "node_repl.js", + "browser_use.playwright.dom_snapshot", + "browser_use.tab.goto", + "browser_use.tab.click", + "browser_use.tab.type", + "browser_use.tab.screenshot", + "browser_use.cdp.execute", + "browser_use.tab.wait_for_load_state", +]; + +const ALLOWED_ATTRIBUTE_PREFIXES: &[&str] = &["browser_use.", "node_repl.", "js."]; +const ALLOWED_ATTRIBUTE_KEYS: &[&str] = &["error.type", "error.message"]; + +#[derive(Debug, thiserror::Error)] +pub enum StderrSpanTelemetryError { + #[error("telemetry payload must be a JSON object")] + NotObject, + #[error("unsupported telemetry schema version")] + UnsupportedVersion, + #[error("unsupported telemetry record type")] + UnsupportedType, + #[error("missing or invalid telemetry field `{0}`")] + InvalidField(&'static str), + #[error("unsupported telemetry span name")] + UnsupportedSpanName, +} + +#[derive(Debug, Clone, PartialEq)] +struct SpanTelemetryRecord { + name: String, + span_id: Option, + trace_id: Option, + parent_span_id: Option, + trace_flags: Option, + traceparent: Option, + tracestate: Option, + start_time: SystemTime, + end_time: SystemTime, + attributes: Vec, +} + +pub fn emit_mcp_subspan_telemetry(payload: Value) -> Result<(), StderrSpanTelemetryError> { + let record = parse_span_telemetry_record(payload)?; + let tracer = global::tracer(MCP_SUBSPAN_TELEMETRY_TRACER_NAME); + emit_span_telemetry_record_with_tracer(&tracer, &record) +} + +fn emit_span_telemetry_record_with_tracer( + tracer: &T, + record: &SpanTelemetryRecord, +) -> Result<(), StderrSpanTelemetryError> +where + T: Tracer, +{ + let parent_context = record.parent_context()?; + + let mut builder = tracer + .span_builder(record.name.clone()) + .with_kind(SpanKind::Internal) + .with_start_time(record.start_time) + .with_attributes(record.attributes.clone()); + if let Some(span_id) = record.span_id { + builder = builder.with_span_id(span_id); + } + let mut span = tracer.build_with_context(builder, &parent_context); + span.end_with_timestamp(record.end_time); + Ok(()) +} + +impl SpanTelemetryRecord { + fn parent_context(&self) -> Result { + if let (Some(trace_id), Some(parent_span_id), Some(trace_flags)) = + (self.trace_id, self.parent_span_id, self.trace_flags) + { + let trace_state = trace_state_from_header(self.tracestate.as_deref())?; + let span_context = SpanContext::new( + trace_id, + parent_span_id, + trace_flags, + /*is_remote*/ true, + trace_state, + ); + return Ok(Context::new().with_remote_span_context(span_context)); + } + + let Some(traceparent) = self.traceparent.as_deref() else { + return Err(StderrSpanTelemetryError::InvalidField("traceparent")); + }; + crate::trace_context::context_from_trace_headers( + Some(traceparent), + self.tracestate.as_deref(), + ) + .ok_or(StderrSpanTelemetryError::InvalidField("traceparent")) + } +} + +fn parse_span_telemetry_record( + payload: Value, +) -> Result { + let object = payload + .as_object() + .ok_or(StderrSpanTelemetryError::NotObject)?; + + match object.get("v").and_then(Value::as_u64) { + Some(CURRENT_SCHEMA_VERSION) => {} + Some(_) => return Err(StderrSpanTelemetryError::UnsupportedVersion), + None => return Err(StderrSpanTelemetryError::InvalidField("v")), + } + + match object.get("type").and_then(Value::as_str) { + Some(SPAN_RECORD_TYPE) => {} + Some(_) => return Err(StderrSpanTelemetryError::UnsupportedType), + None => return Err(StderrSpanTelemetryError::InvalidField("type")), + } + + let name = required_string(object, "name")?.to_string(); + if !ALLOWED_SPAN_NAMES.contains(&name.as_str()) { + return Err(StderrSpanTelemetryError::UnsupportedSpanName); + } + + let traceparent = optional_string(object, "traceparent").map(str::to_string); + let span_id = optional_span_id_alias(object, &["span_id", "spanId"])?; + let trace_id = optional_trace_id_alias(object, &["trace_id", "traceId"])?; + let parent_span_id = optional_span_id_alias(object, &["parent_span_id", "parentSpanId"])?; + let trace_flags = optional_trace_flags_alias(object, &["trace_flags", "traceFlags"])?; + let tracestate = optional_string(object, "tracestate").map(str::to_string); + if span_id.is_some() || trace_id.is_some() || parent_span_id.is_some() || trace_flags.is_some() + { + if span_id.is_none() { + return Err(StderrSpanTelemetryError::InvalidField("span_id")); + } + if trace_id.is_none() { + return Err(StderrSpanTelemetryError::InvalidField("trace_id")); + } + if parent_span_id.is_none() { + return Err(StderrSpanTelemetryError::InvalidField("parent_span_id")); + } + if trace_flags.is_none() { + return Err(StderrSpanTelemetryError::InvalidField("trace_flags")); + } + } else if traceparent.is_none() { + return Err(StderrSpanTelemetryError::InvalidField("traceparent")); + } + + let start_time = timestamp_from_unix_nanos(required_u64_alias( + object, + &[ + "start_unix_nanos", + "startTimeUnixNanos", + "startTimeUnixNano", + "start_time_unix_nanos", + ], + "start_unix_nanos", + )?)?; + let end_time = timestamp_from_unix_nanos(required_u64_alias( + object, + &[ + "end_unix_nanos", + "endTimeUnixNanos", + "endTimeUnixNano", + "end_time_unix_nanos", + ], + "end_unix_nanos", + )?)?; + if end_time < start_time { + return Err(StderrSpanTelemetryError::InvalidField("end_unix_nanos")); + } + + let attributes = object + .get("attrs") + .or_else(|| object.get("attributes")) + .and_then(Value::as_object) + .map(sanitized_attributes) + .unwrap_or_default(); + + Ok(SpanTelemetryRecord { + name, + span_id, + trace_id, + parent_span_id, + trace_flags, + traceparent, + tracestate, + start_time, + end_time, + attributes, + }) +} + +fn required_string<'a>( + object: &'a Map, + key: &'static str, +) -> Result<&'a str, StderrSpanTelemetryError> { + object + .get(key) + .and_then(Value::as_str) + .filter(|value| !value.is_empty()) + .ok_or(StderrSpanTelemetryError::InvalidField(key)) +} + +fn optional_string<'a>(object: &'a Map, key: &str) -> Option<&'a str> { + object + .get(key) + .and_then(Value::as_str) + .filter(|value| !value.is_empty()) +} + +fn optional_string_alias<'a>( + object: &'a Map, + keys: &[&'static str], +) -> Option<(&'static str, &'a str)> { + keys.iter() + .find_map(|key| optional_string(object, key).map(|value| (*key, value))) +} + +fn optional_trace_id_alias( + object: &Map, + keys: &[&'static str], +) -> Result, StderrSpanTelemetryError> { + let Some((key, value)) = optional_string_alias(object, keys) else { + return Ok(None); + }; + parse_trace_id(value) + .map(Some) + .map_err(|_| StderrSpanTelemetryError::InvalidField(key)) +} + +fn optional_span_id_alias( + object: &Map, + keys: &[&'static str], +) -> Result, StderrSpanTelemetryError> { + let Some((key, value)) = optional_string_alias(object, keys) else { + return Ok(None); + }; + parse_span_id(value) + .map(Some) + .map_err(|_| StderrSpanTelemetryError::InvalidField(key)) +} + +fn optional_trace_flags_alias( + object: &Map, + keys: &[&'static str], +) -> Result, StderrSpanTelemetryError> { + let Some((key, value)) = optional_string_alias(object, keys) else { + return Ok(None); + }; + parse_trace_flags(value) + .map(Some) + .map_err(|_| StderrSpanTelemetryError::InvalidField(key)) +} + +fn parse_trace_id(value: &str) -> Result { + if !is_exact_hex(value, /*len*/ 32) { + return Err(()); + } + let trace_id = TraceId::from_hex(value).map_err(|_| ())?; + if trace_id == TraceId::INVALID { + return Err(()); + } + Ok(trace_id) +} + +fn parse_span_id(value: &str) -> Result { + if !is_exact_hex(value, /*len*/ 16) { + return Err(()); + } + let span_id = SpanId::from_hex(value).map_err(|_| ())?; + if span_id == SpanId::INVALID { + return Err(()); + } + Ok(span_id) +} + +fn parse_trace_flags(value: &str) -> Result { + if !is_exact_hex(value, /*len*/ 2) { + return Err(()); + } + u8::from_str_radix(value, 16) + .map(TraceFlags::new) + .map_err(|_| ()) +} + +fn is_exact_hex(value: &str, len: usize) -> bool { + value.len() == len && value.bytes().all(|byte| byte.is_ascii_hexdigit()) +} + +fn trace_state_from_header(value: Option<&str>) -> Result { + let Some(value) = value else { + return Ok(TraceState::default()); + }; + value + .parse() + .map_err(|_| StderrSpanTelemetryError::InvalidField("tracestate")) +} + +fn required_u64_alias( + object: &Map, + keys: &[&'static str], + error_key: &'static str, +) -> Result { + keys.iter() + .find_map(|key| object.get(*key).and_then(u64_value)) + .ok_or(StderrSpanTelemetryError::InvalidField(error_key)) +} + +fn u64_value(value: &Value) -> Option { + value + .as_u64() + .or_else(|| value.as_str().and_then(|value| value.parse().ok())) +} + +fn timestamp_from_unix_nanos(nanos: u64) -> Result { + let secs = nanos / 1_000_000_000; + let sub_nanos = (nanos % 1_000_000_000) as u32; + SystemTime::UNIX_EPOCH + .checked_add(Duration::new(secs, sub_nanos)) + .ok_or(StderrSpanTelemetryError::InvalidField("timestamp")) +} + +fn sanitized_attributes(attrs: &Map) -> Vec { + attrs + .iter() + .filter(|(key, _)| is_allowed_attribute_key(key)) + .filter_map(|(key, value)| safe_attribute_value(key, value)) + .collect() +} + +fn is_allowed_attribute_key(key: &str) -> bool { + ALLOWED_ATTRIBUTE_KEYS.contains(&key) + || ALLOWED_ATTRIBUTE_PREFIXES + .iter() + .any(|prefix| key.starts_with(prefix)) +} + +fn safe_attribute_value(key: &str, value: &Value) -> Option { + match value { + Value::Bool(value) => Some(KeyValue::new(key.to_string(), *value)), + Value::Number(value) => { + if let Some(value) = value.as_i64() { + Some(KeyValue::new(key.to_string(), value)) + } else if let Some(value) = value.as_u64().and_then(|value| i64::try_from(value).ok()) { + Some(KeyValue::new(key.to_string(), value)) + } else { + value + .as_f64() + .map(|value| KeyValue::new(key.to_string(), value)) + } + } + Value::String(value) => Some(KeyValue::new( + key.to_string(), + truncate_attribute_string(value).to_string(), + )), + Value::Null | Value::Array(_) | Value::Object(_) => None, + } +} + +fn truncate_attribute_string(value: &str) -> &str { + if value.len() <= MAX_ATTRIBUTE_STRING_BYTES { + return value; + } + + let mut end = MAX_ATTRIBUTE_STRING_BYTES; + while !value.is_char_boundary(end) { + end = end.saturating_sub(1); + } + &value[..end] +} + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::Value as OtelValue; + use opentelemetry::trace::SpanId; + use opentelemetry::trace::TraceId; + use opentelemetry::trace::TracerProvider as _; + use opentelemetry_sdk::trace::InMemorySpanExporter; + use opentelemetry_sdk::trace::SdkTracerProvider; + use opentelemetry_sdk::trace::SpanData; + use pretty_assertions::assert_eq; + use std::collections::BTreeMap; + + #[test] + fn valid_span_telemetry_reconstructs_otel_span_with_sanitized_attrs() { + let exporter = InMemorySpanExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let tracer = provider.tracer("codex-otel-tests"); + let trace_id = "00000000000000000000000000000001"; + let parent_span_id = "0000000000000002"; + + let record = parse_span_telemetry_record(serde_json::json!({ + "v": 1, + "type": "span", + "name": "browser_use.tab.goto", + "traceparent": format!("00-{trace_id}-{parent_span_id}-01"), + "start_unix_nanos": 1_000_000_123u64, + "end_unix_nanos": 2_000_000_456u64, + "attrs": { + "browser_use.url": "https://example.com", + "browser_use.timeout_ms": 2500, + "unknown.secret": "drop me", + "browser_use.object": {"drop": true} + } + })) + .expect("valid record"); + + emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted"); + provider.force_flush().expect("flush spans"); + let spans = exporter.get_finished_spans().expect("finished spans"); + assert_eq!(spans.len(), 1); + let span = &spans[0]; + + assert_eq!(span.name.as_ref(), "browser_use.tab.goto"); + assert_eq!( + span.span_context.trace_id(), + TraceId::from_hex(trace_id).unwrap() + ); + assert_eq!( + span.parent_span_id, + SpanId::from_hex(parent_span_id).unwrap() + ); + assert_eq!( + span.start_time, + SystemTime::UNIX_EPOCH + Duration::new(1, 123) + ); + assert_eq!( + span.end_time, + SystemTime::UNIX_EPOCH + Duration::new(2, 456) + ); + + let attrs = span + .attributes + .iter() + .map(|kv| (kv.key.as_str().to_string(), kv.value.clone())) + .collect::>(); + assert_eq!( + attrs.get("browser_use.url"), + Some(&OtelValue::String("https://example.com".into())) + ); + assert_eq!( + attrs.get("browser_use.timeout_ms"), + Some(&OtelValue::I64(2500)) + ); + assert!(!attrs.contains_key("unknown.secret")); + assert!(!attrs.contains_key("browser_use.object")); + } + + #[test] + fn explicit_ids_reconstruct_span_and_parent_ids() { + let exporter = InMemorySpanExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let tracer = provider.tracer("codex-otel-tests"); + let trace_id = "00000000000000000000000000000001"; + let parent_span_id = "0000000000000002"; + let span_id = "0000000000000010"; + + let record = parse_span_telemetry_record(serde_json::json!({ + "v": 1, + "type": "span", + "name": "node_repl.js", + "trace_id": trace_id, + "span_id": span_id, + "parent_span_id": parent_span_id, + "trace_flags": "01", + "tracestate": "vendor=value", + "start_unix_nanos": 1_000_000_123u64, + "end_unix_nanos": 2_000_000_456u64, + })) + .expect("valid record"); + + emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted"); + provider.force_flush().expect("flush spans"); + let spans = exporter.get_finished_spans().expect("finished spans"); + assert_eq!(spans.len(), 1); + let span = &spans[0]; + + assert_eq!( + span.span_context.trace_id(), + TraceId::from_hex(trace_id).unwrap() + ); + assert_eq!( + span.span_context.span_id(), + SpanId::from_hex(span_id).unwrap() + ); + assert_eq!( + span.parent_span_id, + SpanId::from_hex(parent_span_id).unwrap() + ); + assert!(span.span_context.trace_flags().is_sampled()); + } + + #[test] + fn camel_case_explicit_ids_reconstruct_span_and_parent_ids() { + let exporter = InMemorySpanExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let tracer = provider.tracer("codex-otel-tests"); + let trace_id = "00000000000000000000000000000001"; + let parent_span_id = "0000000000000002"; + let span_id = "0000000000000010"; + + let record = parse_span_telemetry_record(serde_json::json!({ + "v": 1, + "type": "span", + "name": "browser_use.tab.click", + "traceId": trace_id, + "spanId": span_id, + "parentSpanId": parent_span_id, + "traceFlags": "01", + "start_unix_nanos": 1_000_000_123u64, + "end_unix_nanos": 2_000_000_456u64, + })) + .expect("valid record"); + + emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted"); + provider.force_flush().expect("flush spans"); + let spans = exporter.get_finished_spans().expect("finished spans"); + assert_eq!(spans.len(), 1); + let span = &spans[0]; + + assert_eq!( + span.span_context.trace_id(), + TraceId::from_hex(trace_id).unwrap() + ); + assert_eq!( + span.span_context.span_id(), + SpanId::from_hex(span_id).unwrap() + ); + assert_eq!( + span.parent_span_id, + SpanId::from_hex(parent_span_id).unwrap() + ); + assert!(span.span_context.trace_flags().is_sampled()); + } + + #[test] + fn child_span_can_parent_to_previous_reconstructed_span_id() { + let exporter = InMemorySpanExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let tracer = provider.tracer("codex-otel-tests"); + let trace_id = "00000000000000000000000000000001"; + let mcp_span_id = "0000000000000002"; + let node_span_id = "0000000000000010"; + let child_span_id = "0000000000000011"; + + for payload in [ + serde_json::json!({ + "v": 1, + "type": "span", + "name": "node_repl.js", + "trace_id": trace_id, + "span_id": node_span_id, + "parent_span_id": mcp_span_id, + "trace_flags": "01", + "start_unix_nanos": 1_000_000_000u64, + "end_unix_nanos": 3_000_000_000u64, + }), + serde_json::json!({ + "v": 1, + "type": "span", + "name": "browser_use.tab.goto", + "trace_id": trace_id, + "span_id": child_span_id, + "parent_span_id": node_span_id, + "trace_flags": "01", + "start_unix_nanos": 1_500_000_000u64, + "end_unix_nanos": 2_000_000_000u64, + }), + ] { + let record = parse_span_telemetry_record(payload).expect("valid record"); + emit_span_telemetry_record_with_tracer(&tracer, &record).expect("span emitted"); + } + + provider.force_flush().expect("flush spans"); + let spans = exporter.get_finished_spans().expect("finished spans"); + let node_span = find_span(&spans, "node_repl.js"); + let child_span = find_span(&spans, "browser_use.tab.goto"); + + assert_eq!( + node_span.span_context.span_id(), + SpanId::from_hex(node_span_id).unwrap() + ); + assert_eq!( + child_span.span_context.span_id(), + SpanId::from_hex(child_span_id).unwrap() + ); + assert_eq!( + child_span.parent_span_id, + SpanId::from_hex(node_span_id).unwrap() + ); + } + + #[test] + fn invalid_span_telemetry_is_rejected_without_emitting_span() { + let exporter = InMemorySpanExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let tracer = provider.tracer("codex-otel-tests"); + + let error = parse_span_telemetry_record(serde_json::json!({ + "v": 99, + "type": "span", + })) + .expect_err("unsupported version"); + assert!(matches!( + error, + StderrSpanTelemetryError::UnsupportedVersion + )); + + assert!( + emit_span_telemetry_record_with_tracer( + &tracer, + &SpanTelemetryRecord { + name: "browser_use.tab.goto".to_string(), + span_id: None, + trace_id: None, + parent_span_id: None, + trace_flags: None, + traceparent: Some("not-a-traceparent".to_string()), + tracestate: None, + start_time: SystemTime::UNIX_EPOCH, + end_time: SystemTime::UNIX_EPOCH, + attributes: Vec::new(), + }, + ) + .is_err() + ); + provider.force_flush().expect("flush spans"); + assert!( + exporter + .get_finished_spans() + .expect("finished spans") + .is_empty() + ); + } + + #[test] + fn invalid_explicit_ids_are_rejected_without_emitting_span() { + let exporter = InMemorySpanExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let tracer = provider.tracer("codex-otel-tests"); + + for payload in [ + serde_json::json!({ + "v": 1, + "type": "span", + "name": "node_repl.js", + "trace_id": "00000000000000000000000000000000", + "span_id": "0000000000000010", + "parent_span_id": "0000000000000002", + "trace_flags": "01", + "start_unix_nanos": 1_000_000_000u64, + "end_unix_nanos": 2_000_000_000u64, + }), + serde_json::json!({ + "v": 1, + "type": "span", + "name": "node_repl.js", + "trace_id": "00000000000000000000000000000001", + "span_id": "0000000000000000", + "parent_span_id": "0000000000000002", + "trace_flags": "01", + "start_unix_nanos": 1_000_000_000u64, + "end_unix_nanos": 2_000_000_000u64, + }), + serde_json::json!({ + "v": 1, + "type": "span", + "name": "node_repl.js", + "trace_id": "00000000000000000000000000000001", + "span_id": "0000000000000010", + "parent_span_id": "0002", + "trace_flags": "01", + "start_unix_nanos": 1_000_000_000u64, + "end_unix_nanos": 2_000_000_000u64, + }), + serde_json::json!({ + "v": 1, + "type": "span", + "name": "node_repl.js", + "trace_id": "00000000000000000000000000000001", + "span_id": "0000000000000010", + "parent_span_id": "0000000000000002", + "trace_flags": "001", + "start_unix_nanos": 1_000_000_000u64, + "end_unix_nanos": 2_000_000_000u64, + }), + ] { + let error = parse_span_telemetry_record(payload).expect_err("invalid id"); + assert!(matches!(error, StderrSpanTelemetryError::InvalidField(_))); + } + + assert!( + emit_mcp_subspan_telemetry(serde_json::json!({ + "v": 1, + "type": "span", + "name": "node_repl.js", + "trace_id": "00000000000000000000000000000001", + "span_id": "0000000000000010", + "parent_span_id": "0000000000000002", + "trace_flags": "zz", + "start_unix_nanos": 1_000_000_000u64, + "end_unix_nanos": 2_000_000_000u64, + })) + .is_err() + ); + provider.force_flush().expect("flush spans"); + assert!( + exporter + .get_finished_spans() + .expect("finished spans") + .is_empty() + ); + drop(tracer); + } + + fn find_span<'a>(spans: &'a [SpanData], name: &str) -> &'a SpanData { + spans + .iter() + .find(|span| span.name == name) + .unwrap_or_else(|| panic!("missing span {name}")) + } +} diff --git a/codex-rs/rmcp-client/src/lib.rs b/codex-rs/rmcp-client/src/lib.rs index 57e9f0e80000..3e12698ed651 100644 --- a/codex-rs/rmcp-client/src/lib.rs +++ b/codex-rs/rmcp-client/src/lib.rs @@ -35,3 +35,5 @@ pub use rmcp_client::ToolWithConnectorId; pub use stdio_server_launcher::ExecutorStdioServerLauncher; pub use stdio_server_launcher::LocalStdioServerLauncher; pub use stdio_server_launcher::StdioServerLauncher; +pub use stdio_server_launcher::StdioServerTelemetry; +pub use stdio_server_launcher::StdioServerTelemetrySink; diff --git a/codex-rs/rmcp-client/src/rmcp_client.rs b/codex-rs/rmcp-client/src/rmcp_client.rs index 6f38acaddf45..bbe6ba72887b 100644 --- a/codex-rs/rmcp-client/src/rmcp_client.rs +++ b/codex-rs/rmcp-client/src/rmcp_client.rs @@ -68,6 +68,7 @@ use crate::oauth::StoredOAuthTokens; use crate::stdio_server_launcher::StdioServerCommand; use crate::stdio_server_launcher::StdioServerLauncher; use crate::stdio_server_launcher::StdioServerProcessHandle; +use crate::stdio_server_launcher::StdioServerTelemetrySink; use crate::stdio_server_launcher::StdioServerTransport; use crate::utils::apply_default_headers; use crate::utils::build_default_headers; @@ -282,9 +283,17 @@ impl RmcpClient { env_vars: &[McpServerEnvVar], cwd: Option, launcher: Arc, + telemetry_sink: Option, ) -> io::Result { let transport_recipe = TransportRecipe::Stdio { - command: StdioServerCommand::new(program, args, env, env_vars.to_vec(), cwd), + command: StdioServerCommand::new( + program, + args, + env, + env_vars.to_vec(), + cwd, + telemetry_sink, + ), launcher, }; let transport = Self::create_pending_transport(&transport_recipe) diff --git a/codex-rs/rmcp-client/src/stdio_server_launcher.rs b/codex-rs/rmcp-client/src/stdio_server_launcher.rs index fb3dab525e18..268c447e02be 100644 --- a/codex-rs/rmcp-client/src/stdio_server_launcher.rs +++ b/codex-rs/rmcp-client/src/stdio_server_launcher.rs @@ -82,8 +82,16 @@ pub struct StdioServerCommand { env: Option>, env_vars: Vec, cwd: Option, + telemetry_sink: Option, } +#[derive(Debug, Clone, PartialEq)] +pub struct StdioServerTelemetry { + pub payload: serde_json::Value, +} + +pub type StdioServerTelemetrySink = Arc; + /// Client-side rmcp transport for a launched MCP stdio server. /// /// The concrete process placement stays private to this module. `RmcpClient` @@ -149,6 +157,7 @@ impl StdioServerCommand { env: Option>, env_vars: Vec, cwd: Option, + telemetry_sink: Option, ) -> Self { Self { program, @@ -156,6 +165,7 @@ impl StdioServerCommand { env, env_vars, cwd, + telemetry_sink, } } } @@ -243,6 +253,7 @@ impl LocalStdioServerLauncher { env, env_vars, cwd, + telemetry_sink, } = command; let program_name = program.to_string_lossy().into_owned(); let envs = create_env_for_mcp_server(env, &env_vars).map_err(io::Error::other)?; @@ -276,7 +287,7 @@ impl LocalStdioServerLauncher { loop { match reader.next_line().await { Ok(Some(line)) => { - info!("MCP server stderr ({program_name}): {line}"); + handle_stderr_line(&program_name, &line, telemetry_sink.as_ref()); } Ok(None) => break, Err(error) => { @@ -479,6 +490,7 @@ impl ExecutorStdioServerLauncher { env, env_vars, cwd, + telemetry_sink: _, } = command; let program_name = program.to_string_lossy().into_owned(); let envs = create_env_overlay_for_remote_mcp_server(env, &env_vars); @@ -578,6 +590,38 @@ impl ExecutorStdioServerLauncher { } } +const CODEX_TELEMETRY_STDERR_PREFIX: &str = "@codex-telemetry "; + +fn handle_stderr_line( + program_name: &str, + line: &str, + telemetry_sink: Option<&StdioServerTelemetrySink>, +) { + let Some(sink) = telemetry_sink else { + info!("MCP server stderr ({program_name}): {line}"); + return; + }; + + let Some(telemetry) = parse_stderr_telemetry_line(line) else { + info!("MCP server stderr ({program_name}): {line}"); + return; + }; + + match telemetry { + Ok(telemetry) => sink(telemetry), + Err(error) => { + warn!("Failed to parse MCP server telemetry ({program_name}): {error}"); + } + } +} + +fn parse_stderr_telemetry_line( + line: &str, +) -> Option> { + let payload = line.strip_prefix(CODEX_TELEMETRY_STDERR_PREFIX)?; + Some(serde_json::from_str(payload).map(|payload| StdioServerTelemetry { payload })) +} + #[cfg(test)] mod tests { use super::*; @@ -651,4 +695,27 @@ mod tests { ); assert!(!env.contains_key("UNREQUESTED_SECRET")); } + + #[test] + fn stderr_telemetry_parser_separates_prefixed_lines_from_normal_stderr() { + assert!(parse_stderr_telemetry_line("ordinary stderr").is_none()); + + assert_eq!( + parse_stderr_telemetry_line("@codex-telemetry {\"v\":1,\"type\":\"span\"}") + .expect("telemetry line") + .expect("valid telemetry"), + StdioServerTelemetry { + payload: serde_json::json!({ + "v": 1, + "type": "span", + }), + } + ); + + assert!( + parse_stderr_telemetry_line("@codex-telemetry not-json") + .expect("telemetry line") + .is_err() + ); + } } diff --git a/codex-rs/rmcp-client/tests/process_group_cleanup.rs b/codex-rs/rmcp-client/tests/process_group_cleanup.rs index 10d1e8ec74ef..84784973c4f6 100644 --- a/codex-rs/rmcp-client/tests/process_group_cleanup.rs +++ b/codex-rs/rmcp-client/tests/process_group_cleanup.rs @@ -116,6 +116,7 @@ async fn drop_kills_wrapper_process_group() -> Result<()> { &[], /*cwd*/ None, Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)), + /*telemetry_sink*/ None, ) .await?; @@ -147,6 +148,7 @@ async fn shutdown_kills_initialized_stdio_server_with_in_flight_operation() -> R &[], /*cwd*/ None, Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)), + /*telemetry_sink*/ None, ) .await?, ); diff --git a/codex-rs/rmcp-client/tests/resources.rs b/codex-rs/rmcp-client/tests/resources.rs index f2e4c49911bd..5dbd51647da1 100644 --- a/codex-rs/rmcp-client/tests/resources.rs +++ b/codex-rs/rmcp-client/tests/resources.rs @@ -64,6 +64,7 @@ async fn rmcp_client_can_list_and_read_resources() -> anyhow::Result<()> { &[], /*cwd*/ None, Arc::new(LocalStdioServerLauncher::new(std::env::current_dir()?)), + /*telemetry_sink*/ None, ) .await?; From 5fa172006e9a86d85c61d461f36c30fab461316f Mon Sep 17 00:00:00 2001 From: charley-openai Date: Fri, 1 May 2026 16:21:19 -0700 Subject: [PATCH 2/2] Gate MCP subspan tracing on stderr support --- codex-rs/codex-mcp/src/rmcp_client.rs | 103 +++++++++++++++--- codex-rs/docs/mcp_subspan_tracing.md | 7 +- .../src/executor_process_transport.rs | 30 +++-- .../rmcp-client/src/stdio_server_launcher.rs | 5 +- 4 files changed, 115 insertions(+), 30 deletions(-) diff --git a/codex-rs/codex-mcp/src/rmcp_client.rs b/codex-rs/codex-mcp/src/rmcp_client.rs index d9b47ff05bb3..71a03d604cac 100644 --- a/codex-rs/codex-mcp/src/rmcp_client.rs +++ b/codex-rs/codex-mcp/src/rmcp_client.rs @@ -207,6 +207,8 @@ impl AsyncManagedClient { let startup_tool_filter = tool_filter; let startup_complete = Arc::new(AtomicBool::new(false)); let subspan_tracing_enabled = Arc::new(AtomicBool::new(false)); + let subspan_tracing_stderr_jsonl_available = + subspan_tracing_stderr_jsonl_available(&config, &runtime_environment); let startup_complete_for_fut = Arc::clone(&startup_complete); let subspan_tracing_enabled_for_fut = Arc::clone(&subspan_tracing_enabled); let cancel_token_for_fut = cancel_token.clone(); @@ -240,6 +242,7 @@ impl AsyncManagedClient { elicitation_requests, codex_apps_tools_cache_context, subspan_tracing_enabled: subspan_tracing_enabled_for_fut, + subspan_tracing_stderr_jsonl_available, }, ) .await @@ -521,25 +524,28 @@ async fn start_server_task( elicitation_requests, codex_apps_tools_cache_context, subspan_tracing_enabled, + subspan_tracing_stderr_jsonl_available, } = params; let elicitation = elicitation_capability_for_server(&server_name); - let mut subspan_tracing_capability = JsonObject::new(); - subspan_tracing_capability.insert( - "version".to_string(), - serde_json::json!(MCP_SUBSPAN_TRACING_VERSION), - ); - subspan_tracing_capability.insert( - "transports".to_string(), - serde_json::json!([MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL]), - ); - let client_experimental_capabilities = BTreeMap::from([( - MCP_SUBSPAN_TRACING_CAPABILITY.to_string(), - subspan_tracing_capability, - )]); + let client_experimental_capabilities = subspan_tracing_stderr_jsonl_available.then(|| { + let mut subspan_tracing_capability = JsonObject::new(); + subspan_tracing_capability.insert( + "version".to_string(), + serde_json::json!(MCP_SUBSPAN_TRACING_VERSION), + ); + subspan_tracing_capability.insert( + "transports".to_string(), + serde_json::json!([MCP_SUBSPAN_TRACING_TRANSPORT_STDERR_JSONL]), + ); + BTreeMap::from([( + MCP_SUBSPAN_TRACING_CAPABILITY.to_string(), + subspan_tracing_capability, + )]) + }); let initialize_params = InitializeRequestParams { meta: None, capabilities: ClientCapabilities { - experimental: Some(client_experimental_capabilities), + experimental: client_experimental_capabilities, extensions: None, roots: None, sampling: None, @@ -628,6 +634,22 @@ struct StartServerTaskParams { elicitation_requests: ElicitationRequestManager, codex_apps_tools_cache_context: Option, subspan_tracing_enabled: Arc, + subspan_tracing_stderr_jsonl_available: bool, +} + +fn subspan_tracing_stderr_jsonl_available( + config: &McpServerConfig, + runtime_environment: &McpRuntimeEnvironment, +) -> bool { + if !matches!(config.transport, McpServerTransportConfig::Stdio { .. }) { + return false; + } + + match config.experimental_environment.as_deref() { + None | Some("local") => true, + Some("remote") => runtime_environment.environment().is_remote(), + Some(_) => false, + } } async fn make_rmcp_client( @@ -765,8 +787,11 @@ fn subspan_tracing_telemetry_sink( #[cfg(test)] mod tests { use super::*; + use codex_exec_server::Environment; use rmcp::model::JsonObject; use rmcp::model::Meta; + use std::path::PathBuf; + use std::sync::Arc; fn tool_with_connector_meta() -> RmcpTool { RmcpTool { @@ -864,6 +889,56 @@ mod tests { } } + fn local_runtime_environment() -> McpRuntimeEnvironment { + McpRuntimeEnvironment::new( + Arc::new(Environment::default_for_tests()), + PathBuf::from("/tmp"), + ) + } + + fn remote_runtime_environment() -> McpRuntimeEnvironment { + McpRuntimeEnvironment::new( + Arc::new( + Environment::create_for_tests(Some("ws://executor.example".to_string())) + .expect("remote environment"), + ), + PathBuf::from("/tmp"), + ) + } + + fn mcp_server_config(value: serde_json::Value) -> McpServerConfig { + serde_json::from_value(value).expect("mcp server config") + } + + #[tokio::test] + async fn subspan_tracing_stderr_jsonl_is_available_only_when_stderr_is_observed() { + assert!(subspan_tracing_stderr_jsonl_available( + &mcp_server_config(serde_json::json!({"command": "node"})), + &local_runtime_environment(), + )); + + assert!(!subspan_tracing_stderr_jsonl_available( + &mcp_server_config(serde_json::json!({"url": "https://mcp.example"})), + &local_runtime_environment(), + )); + + assert!(!subspan_tracing_stderr_jsonl_available( + &mcp_server_config(serde_json::json!({ + "command": "node", + "experimental_environment": "remote", + })), + &local_runtime_environment(), + )); + + assert!(subspan_tracing_stderr_jsonl_available( + &mcp_server_config(serde_json::json!({ + "command": "node", + "experimental_environment": "remote", + })), + &remote_runtime_environment(), + )); + } + #[test] fn subspan_tracing_capability_accepts_stderr_jsonl_transport_and_tool_filter() { let capability: JsonObject = serde_json::from_value(serde_json::json!({ diff --git a/codex-rs/docs/mcp_subspan_tracing.md b/codex-rs/docs/mcp_subspan_tracing.md index 721337515e1f..3184d5113b53 100644 --- a/codex-rs/docs/mcp_subspan_tracing.md +++ b/codex-rs/docs/mcp_subspan_tracing.md @@ -11,11 +11,11 @@ This document describes Codex's experimental MCP client extension for ingesting Codex creates an `mcp.tools.call` span around each MCP tool call. Some MCP servers perform meaningful nested work that is useful to inspect as child spans of that call. The `codex/subspan-tracing` capability lets a server opt in to receiving the active W3C trace context for a tool call and emitting sanitized span records back to Codex. -This is intended for local stdio MCP servers. Telemetry records are out-of-band with respect to the MCP JSON-RPC stream and must not affect tool-call success or transport liveness. +This is intended for stdio MCP servers whose stderr stream is observed by Codex. Telemetry records are out-of-band with respect to the MCP JSON-RPC stream and must not affect tool-call success or transport liveness. ## Capability Negotiation -Codex advertises client support during MCP `initialize`: +Codex advertises client support during MCP `initialize` only when the active MCP transport can observe stderr telemetry: ```json { @@ -90,7 +90,7 @@ For `stderr-jsonl`, each telemetry record is written to the MCP server process s The rest of the line is a single JSON object: ```text -@codex-telemetry {"v":1,"type":"span","name":"example.work",...} +@codex-telemetry {"v":1,"type":"span","name":"browser_use.tab.goto",...} ``` Normal stderr output must not use that prefix. Codex preserves ordinary stderr logging behavior for non-telemetry lines. @@ -171,4 +171,3 @@ Subspan telemetry is best effort: ## Current Attribute Profile `browser-use-v1` is the initial attribute profile used by Browser Use instrumentation. Codex currently allows Browser Use, Node REPL, and JS-related span names and attribute keys needed for that profile. New profiles should be added deliberately with their own allowlist changes and tests. - diff --git a/codex-rs/rmcp-client/src/executor_process_transport.rs b/codex-rs/rmcp-client/src/executor_process_transport.rs index 41f0b7660d95..cb8fdaeda2ee 100644 --- a/codex-rs/rmcp-client/src/executor_process_transport.rs +++ b/codex-rs/rmcp-client/src/executor_process_transport.rs @@ -41,9 +41,11 @@ use serde_json::to_vec; use tokio::runtime::Handle; use tokio::sync::broadcast; use tracing::debug; -use tracing::info; use tracing::warn; +use crate::stdio_server_launcher::StdioServerTelemetrySink; +use crate::stdio_server_launcher::handle_stderr_line; + static PROCESS_COUNTER: AtomicUsize = AtomicUsize::new(1); // Remote public implementation. @@ -78,6 +80,9 @@ pub(super) struct ExecutorProcessTransport { /// Buffered stderr bytes for diagnostic logging. stderr: Vec, + /// Optional sink for structured stderr telemetry lines. + telemetry_sink: Option, + /// Whether the executor has reported process closure or a terminal /// subscription failure. Once closed, any remaining partial stdout line is /// flushed once and then rmcp receives EOF. @@ -95,7 +100,11 @@ pub(super) struct ExecutorProcessTransport { } impl ExecutorProcessTransport { - pub(super) fn new(process: Arc, program_name: String) -> Self { + pub(super) fn new( + process: Arc, + program_name: String, + telemetry_sink: Option, + ) -> Self { // Subscribe before returning the transport to rmcp. Some test servers // can emit output or exit quickly after `process/start`, and the // process event log will replay anything that landed before this @@ -107,6 +116,7 @@ impl ExecutorProcessTransport { program_name, stdout: Vec::new(), stderr: Vec::new(), + telemetry_sink, closed: false, terminated: false, last_seq: 0, @@ -321,10 +331,10 @@ impl ExecutorProcessTransport { if line.last() == Some(&b'\r') { line.pop(); } - info!( - "MCP server stderr ({}): {}", - self.program_name, - String::from_utf8_lossy(&line) + handle_stderr_line( + &self.program_name, + &String::from_utf8_lossy(&line), + self.telemetry_sink.as_ref(), ); } } @@ -334,10 +344,10 @@ impl ExecutorProcessTransport { return; } let line = take(&mut self.stderr); - info!( - "MCP server stderr ({}): {}", - self.program_name, - String::from_utf8_lossy(&line) + handle_stderr_line( + &self.program_name, + &String::from_utf8_lossy(&line), + self.telemetry_sink.as_ref(), ); } diff --git a/codex-rs/rmcp-client/src/stdio_server_launcher.rs b/codex-rs/rmcp-client/src/stdio_server_launcher.rs index 268c447e02be..1d82ca3f3938 100644 --- a/codex-rs/rmcp-client/src/stdio_server_launcher.rs +++ b/codex-rs/rmcp-client/src/stdio_server_launcher.rs @@ -490,7 +490,7 @@ impl ExecutorStdioServerLauncher { env, env_vars, cwd, - telemetry_sink: _, + telemetry_sink, } = command; let program_name = program.to_string_lossy().into_owned(); let envs = create_env_overlay_for_remote_mcp_server(env, &env_vars); @@ -525,6 +525,7 @@ impl ExecutorStdioServerLauncher { inner: StdioServerTransportInner::Executor(ExecutorProcessTransport::new( started.process, program_name, + telemetry_sink, )), process, }) @@ -592,7 +593,7 @@ impl ExecutorStdioServerLauncher { const CODEX_TELEMETRY_STDERR_PREFIX: &str = "@codex-telemetry "; -fn handle_stderr_line( +pub(super) fn handle_stderr_line( program_name: &str, line: &str, telemetry_sink: Option<&StdioServerTelemetrySink>,