Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 174 additions & 2 deletions codex-rs/analytics/src/analytics_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ async fn ingest_rejected_turn_steer(
response: Box::new(sample_thread_resume_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
thread_originator: None,
},
out,
)
Expand Down Expand Up @@ -631,6 +632,7 @@ async fn ingest_turn_prerequisites(
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
thread_originator: None,
},
out,
)
Expand All @@ -654,6 +656,7 @@ async fn ingest_turn_prerequisites(
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-2")),
thread_originator: None,
},
out,
)
Expand Down Expand Up @@ -720,6 +723,7 @@ async fn ingest_review_prerequisites(
response: Box::new(sample_thread_start_response(
"thread-1", /*ephemeral*/ false, "gpt-5",
)),
thread_originator: None,
},
events,
)
Expand Down Expand Up @@ -1652,6 +1656,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
/*ephemeral*/ false,
"gpt-5",
)),
thread_originator: None,
},
&mut events,
)
Expand Down Expand Up @@ -1697,6 +1702,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
response: Box::new(sample_thread_resume_response(
"thread-1", /*ephemeral*/ true, "gpt-5",
)),
thread_originator: None,
},
&mut events,
)
Expand Down Expand Up @@ -1741,6 +1747,157 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
);
}

#[tokio::test]
async fn thread_originator_overrides_shared_connection_across_thread_events() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();

reducer
.ingest(sample_initialize_fact(/*connection_id*/ 7), &mut events)
.await;
for (request_id, thread_id, thread_originator) in [
(1, "thread-work", Some(TEST_PRODUCT_CLIENT_ID.to_string())),
(2, "thread-default", None),
] {
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(request_id),
response: Box::new(sample_thread_start_response(
thread_id, /*ephemeral*/ false, "gpt-5",
)),
thread_originator,
},
&mut events,
)
.await;
}

let initialized = serde_json::to_value(&events).expect("serialize thread events");
assert_eq!(
initialized
.as_array()
.expect("thread events")
.iter()
.map(|event| {
json!({
"thread_id": event["event_params"]["thread_id"],
"app_server_client": event["event_params"]["app_server_client"],
})
})
.collect::<Vec<_>>(),
vec![
json!({
"thread_id": "thread-work",
"app_server_client": {
"product_client_id": TEST_PRODUCT_CLIENT_ID,
"client_name": "codex-tui",
"client_version": "1.0.0",
"rpc_transport": "websocket",
"experimental_api_enabled": false,
},
}),
json!({
"thread_id": "thread-default",
"app_server_client": {
"product_client_id": DEFAULT_ORIGINATOR,
"client_name": "codex-tui",
"client_version": "1.0.0",
"rpc_transport": "websocket",
"experimental_api_enabled": false,
},
}),
]
);

events.clear();
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request(
"thread-work",
/*request_id*/ 3,
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-1")),
thread_originator: None,
},
&mut events,
)
.await;
ingest_completed_command_execution_item(&mut reducer, &mut events, "thread-work", "item-work")
.await;
ingest_complete_child_turn(&mut reducer, &mut events, "thread-work", "turn-1").await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
CodexCompactionEvent {
thread_id: "thread-work".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
reason: CompactionReason::UserRequested,
implementation: CompactionImplementation::Responses,
phase: CompactionPhase::StandaloneTurn,
strategy: CompactionStrategy::Memento,
status: CompactionStatus::Completed,
codex_error_kind: None,
codex_error_http_status_code: None,
active_context_tokens_before: 131_000,
active_context_tokens_after: 64_000,
retained_image_count: None,
compaction_summary_tokens: None,
cached_input_tokens: None,
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
},
))),
&mut events,
)
.await;

let lifecycle = serde_json::to_value(&events).expect("serialize lifecycle events");
assert_eq!(
lifecycle
.as_array()
.expect("lifecycle events")
.iter()
.map(|event| {
json!({
"event_type": event["event_type"],
"product_client_id":
event["event_params"]["app_server_client"]["product_client_id"],
})
})
.collect::<Vec<_>>(),
vec![
json!({
"event_type": "codex_command_execution_event",
"product_client_id": TEST_PRODUCT_CLIENT_ID,
}),
json!({
"event_type": "codex_turn_event",
"product_client_id": TEST_PRODUCT_CLIENT_ID,
}),
json!({
"event_type": "codex_compaction_event",
"product_client_id": TEST_PRODUCT_CLIENT_ID,
}),
]
);
}

#[tokio::test]
async fn unrelated_client_requests_are_ignored_by_reducer() {
let mut reducer = AnalyticsReducer::default();
Expand All @@ -1767,6 +1924,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() {
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-2")),
thread_originator: None,
},
&mut events,
)
Expand All @@ -1792,6 +1950,7 @@ async fn unrelated_client_responses_are_ignored_by_reducer() {
response: Box::new(ClientResponsePayload::ThreadArchive(
ThreadArchiveResponse {},
)),
thread_originator: None,
},
&mut events,
)
Expand Down Expand Up @@ -1851,6 +2010,7 @@ async fn compaction_event_ingests_custom_fact() {
Some(AppServerThreadSource::Subagent),
Some(parent_thread_id.to_string()),
)),
thread_originator: None,
},
&mut events,
)
Expand Down Expand Up @@ -1971,6 +2131,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
/*ephemeral*/ false,
"gpt-5",
)),
thread_originator: None,
},
&mut events,
)
Expand Down Expand Up @@ -2499,6 +2660,7 @@ async fn item_review_summaries_do_not_cross_threads_with_reused_item_ids() {
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
thread_originator: None,
},
&mut events,
)
Expand Down Expand Up @@ -2749,7 +2911,7 @@ async fn subagent_thread_started_publishes_without_initialize() {
}

#[tokio::test]
async fn subagent_events_use_inherited_connection_unless_turn_connection_is_explicit() {
async fn subagent_events_keep_thread_originator_with_explicit_turn_connection() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let parent_thread_id =
Expand Down Expand Up @@ -2786,6 +2948,7 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
/*ephemeral*/ false,
"gpt-5",
)),
thread_originator: None,
},
&mut events,
)
Expand Down Expand Up @@ -2908,6 +3071,7 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
connection_id: 8,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-explicit")),
thread_originator: None,
},
&mut events,
)
Expand All @@ -2918,7 +3082,11 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
};
assert_eq!(
event.event_params.app_server_client.product_client_id,
DEFAULT_ORIGINATOR
"parent-client"
);
assert_eq!(
event.event_params.app_server_client.client_name.as_deref(),
Some("codex-tui")
);
}

Expand Down Expand Up @@ -3868,6 +4036,7 @@ async fn accepted_turn_steer_emits_expected_event() {
connection_id: 7,
request_id: RequestId::Integer(4),
response: Box::new(sample_turn_steer_response("turn-2")),
thread_originator: None,
},
&mut out,
)
Expand Down Expand Up @@ -4039,6 +4208,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-2")),
thread_originator: None,
},
&mut out,
)
Expand Down Expand Up @@ -4367,6 +4537,7 @@ async fn accepted_steers_increment_turn_steer_count() {
connection_id: 7,
request_id: RequestId::Integer(4),
response: Box::new(sample_turn_steer_response("turn-2")),
thread_originator: None,
},
&mut out,
)
Expand Down Expand Up @@ -4414,6 +4585,7 @@ async fn accepted_steers_increment_turn_steer_count() {
connection_id: 7,
request_id: RequestId::Integer(6),
response: Box::new(sample_turn_steer_response("turn-2")),
thread_originator: None,
},
&mut out,
)
Expand Down
26 changes: 26 additions & 0 deletions codex-rs/analytics/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,31 @@ impl AnalyticsEventsClient {
connection_id: u64,
request_id: RequestId,
response: ClientResponsePayload,
) {
self.track_response_inner(
connection_id,
request_id,
response,
/*thread_originator*/ None,
);
}

pub fn track_response_with_thread_originator(
&self,
connection_id: u64,
request_id: RequestId,
response: ClientResponsePayload,
thread_originator: String,
) {
self.track_response_inner(connection_id, request_id, response, Some(thread_originator));
}

fn track_response_inner(
&self,
connection_id: u64,
request_id: RequestId,
response: ClientResponsePayload,
thread_originator: Option<String>,
) {
if !matches!(
response,
Expand All @@ -450,6 +475,7 @@ impl AnalyticsEventsClient {
connection_id,
request_id,
response: Box::new(response),
thread_originator,
});
}

Expand Down
1 change: 1 addition & 0 deletions codex-rs/analytics/src/facts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ pub(crate) enum AnalyticsFact {
connection_id: u64,
request_id: RequestId,
response: Box<ClientResponsePayload>,
thread_originator: Option<String>,
},
ErrorResponse {
connection_id: u64,
Expand Down
Loading
Loading