@@ -554,6 +554,7 @@ async fn ingest_rejected_turn_steer(
554554 response : Box :: new ( sample_thread_resume_response (
555555 "thread-2" , /*ephemeral*/ false , "gpt-5" ,
556556 ) ) ,
557+ thread_originator : None ,
557558 } ,
558559 out,
559560 )
@@ -627,6 +628,7 @@ async fn ingest_turn_prerequisites(
627628 response : Box :: new ( sample_thread_start_response (
628629 "thread-2" , /*ephemeral*/ false , "gpt-5" ,
629630 ) ) ,
631+ thread_originator : None ,
630632 } ,
631633 out,
632634 )
@@ -650,6 +652,7 @@ async fn ingest_turn_prerequisites(
650652 connection_id : 7 ,
651653 request_id : RequestId :: Integer ( 3 ) ,
652654 response : Box :: new ( sample_turn_start_response ( "turn-2" ) ) ,
655+ thread_originator : None ,
653656 } ,
654657 out,
655658 )
@@ -716,6 +719,7 @@ async fn ingest_review_prerequisites(
716719 response : Box :: new ( sample_thread_start_response (
717720 "thread-1" , /*ephemeral*/ false , "gpt-5" ,
718721 ) ) ,
722+ thread_originator : None ,
719723 } ,
720724 events,
721725 )
@@ -1648,6 +1652,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
16481652 /*ephemeral*/ false ,
16491653 "gpt-5" ,
16501654 ) ) ,
1655+ thread_originator : None ,
16511656 } ,
16521657 & mut events,
16531658 )
@@ -1693,6 +1698,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
16931698 response : Box :: new ( sample_thread_resume_response (
16941699 "thread-1" , /*ephemeral*/ true , "gpt-5" ,
16951700 ) ) ,
1701+ thread_originator : None ,
16961702 } ,
16971703 & mut events,
16981704 )
@@ -1737,6 +1743,157 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
17371743 ) ;
17381744}
17391745
1746+ #[ tokio:: test]
1747+ async fn thread_originator_overrides_shared_connection_across_thread_events ( ) {
1748+ let mut reducer = AnalyticsReducer :: default ( ) ;
1749+ let mut events = Vec :: new ( ) ;
1750+
1751+ reducer
1752+ . ingest ( sample_initialize_fact ( /*connection_id*/ 7 ) , & mut events)
1753+ . await ;
1754+ for ( request_id, thread_id, thread_originator) in [
1755+ ( 1 , "thread-work" , Some ( TEST_PRODUCT_CLIENT_ID . to_string ( ) ) ) ,
1756+ ( 2 , "thread-default" , None ) ,
1757+ ] {
1758+ reducer
1759+ . ingest (
1760+ AnalyticsFact :: ClientResponse {
1761+ connection_id : 7 ,
1762+ request_id : RequestId :: Integer ( request_id) ,
1763+ response : Box :: new ( sample_thread_start_response (
1764+ thread_id, /*ephemeral*/ false , "gpt-5" ,
1765+ ) ) ,
1766+ thread_originator,
1767+ } ,
1768+ & mut events,
1769+ )
1770+ . await ;
1771+ }
1772+
1773+ let initialized = serde_json:: to_value ( & events) . expect ( "serialize thread events" ) ;
1774+ assert_eq ! (
1775+ initialized
1776+ . as_array( )
1777+ . expect( "thread events" )
1778+ . iter( )
1779+ . map( |event| {
1780+ json!( {
1781+ "thread_id" : event[ "event_params" ] [ "thread_id" ] ,
1782+ "app_server_client" : event[ "event_params" ] [ "app_server_client" ] ,
1783+ } )
1784+ } )
1785+ . collect:: <Vec <_>>( ) ,
1786+ vec![
1787+ json!( {
1788+ "thread_id" : "thread-work" ,
1789+ "app_server_client" : {
1790+ "product_client_id" : TEST_PRODUCT_CLIENT_ID ,
1791+ "client_name" : "codex-tui" ,
1792+ "client_version" : "1.0.0" ,
1793+ "rpc_transport" : "websocket" ,
1794+ "experimental_api_enabled" : false ,
1795+ } ,
1796+ } ) ,
1797+ json!( {
1798+ "thread_id" : "thread-default" ,
1799+ "app_server_client" : {
1800+ "product_client_id" : DEFAULT_ORIGINATOR ,
1801+ "client_name" : "codex-tui" ,
1802+ "client_version" : "1.0.0" ,
1803+ "rpc_transport" : "websocket" ,
1804+ "experimental_api_enabled" : false ,
1805+ } ,
1806+ } ) ,
1807+ ]
1808+ ) ;
1809+
1810+ events. clear ( ) ;
1811+ reducer
1812+ . ingest (
1813+ AnalyticsFact :: ClientRequest {
1814+ connection_id : 7 ,
1815+ request_id : RequestId :: Integer ( 3 ) ,
1816+ request : Box :: new ( sample_turn_start_request (
1817+ "thread-work" ,
1818+ /*request_id*/ 3 ,
1819+ ) ) ,
1820+ } ,
1821+ & mut events,
1822+ )
1823+ . await ;
1824+ reducer
1825+ . ingest (
1826+ AnalyticsFact :: ClientResponse {
1827+ connection_id : 7 ,
1828+ request_id : RequestId :: Integer ( 3 ) ,
1829+ response : Box :: new ( sample_turn_start_response ( "turn-1" ) ) ,
1830+ thread_originator : None ,
1831+ } ,
1832+ & mut events,
1833+ )
1834+ . await ;
1835+ ingest_completed_command_execution_item ( & mut reducer, & mut events, "thread-work" , "item-work" )
1836+ . await ;
1837+ ingest_complete_child_turn ( & mut reducer, & mut events, "thread-work" , "turn-1" ) . await ;
1838+ reducer
1839+ . ingest (
1840+ AnalyticsFact :: Custom ( CustomAnalyticsFact :: Compaction ( Box :: new (
1841+ CodexCompactionEvent {
1842+ thread_id : "thread-work" . to_string ( ) ,
1843+ turn_id : "turn-compact" . to_string ( ) ,
1844+ trigger : CompactionTrigger :: Manual ,
1845+ reason : CompactionReason :: UserRequested ,
1846+ implementation : CompactionImplementation :: Responses ,
1847+ phase : CompactionPhase :: StandaloneTurn ,
1848+ strategy : CompactionStrategy :: Memento ,
1849+ status : CompactionStatus :: Completed ,
1850+ codex_error_kind : None ,
1851+ codex_error_http_status_code : None ,
1852+ active_context_tokens_before : 131_000 ,
1853+ active_context_tokens_after : 64_000 ,
1854+ retained_image_count : None ,
1855+ compaction_summary_tokens : None ,
1856+ cached_input_tokens : None ,
1857+ started_at : 100 ,
1858+ completed_at : 101 ,
1859+ duration_ms : Some ( 1200 ) ,
1860+ } ,
1861+ ) ) ) ,
1862+ & mut events,
1863+ )
1864+ . await ;
1865+
1866+ let lifecycle = serde_json:: to_value ( & events) . expect ( "serialize lifecycle events" ) ;
1867+ assert_eq ! (
1868+ lifecycle
1869+ . as_array( )
1870+ . expect( "lifecycle events" )
1871+ . iter( )
1872+ . map( |event| {
1873+ json!( {
1874+ "event_type" : event[ "event_type" ] ,
1875+ "product_client_id" :
1876+ event[ "event_params" ] [ "app_server_client" ] [ "product_client_id" ] ,
1877+ } )
1878+ } )
1879+ . collect:: <Vec <_>>( ) ,
1880+ vec![
1881+ json!( {
1882+ "event_type" : "codex_command_execution_event" ,
1883+ "product_client_id" : TEST_PRODUCT_CLIENT_ID ,
1884+ } ) ,
1885+ json!( {
1886+ "event_type" : "codex_turn_event" ,
1887+ "product_client_id" : TEST_PRODUCT_CLIENT_ID ,
1888+ } ) ,
1889+ json!( {
1890+ "event_type" : "codex_compaction_event" ,
1891+ "product_client_id" : TEST_PRODUCT_CLIENT_ID ,
1892+ } ) ,
1893+ ]
1894+ ) ;
1895+ }
1896+
17401897#[ tokio:: test]
17411898async fn unrelated_client_requests_are_ignored_by_reducer ( ) {
17421899 let mut reducer = AnalyticsReducer :: default ( ) ;
@@ -1763,6 +1920,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() {
17631920 connection_id : 7 ,
17641921 request_id : RequestId :: Integer ( 3 ) ,
17651922 response : Box :: new ( sample_turn_start_response ( "turn-2" ) ) ,
1923+ thread_originator : None ,
17661924 } ,
17671925 & mut events,
17681926 )
@@ -1788,6 +1946,7 @@ async fn unrelated_client_responses_are_ignored_by_reducer() {
17881946 response : Box :: new ( ClientResponsePayload :: ThreadArchive (
17891947 ThreadArchiveResponse { } ,
17901948 ) ) ,
1949+ thread_originator : None ,
17911950 } ,
17921951 & mut events,
17931952 )
@@ -1847,6 +2006,7 @@ async fn compaction_event_ingests_custom_fact() {
18472006 Some ( AppServerThreadSource :: Subagent ) ,
18482007 Some ( parent_thread_id. to_string ( ) ) ,
18492008 ) ) ,
2009+ thread_originator : None ,
18502010 } ,
18512011 & mut events,
18522012 )
@@ -1967,6 +2127,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
19672127 /*ephemeral*/ false ,
19682128 "gpt-5" ,
19692129 ) ) ,
2130+ thread_originator : None ,
19702131 } ,
19712132 & mut events,
19722133 )
@@ -2495,6 +2656,7 @@ async fn item_review_summaries_do_not_cross_threads_with_reused_item_ids() {
24952656 response : Box :: new ( sample_thread_start_response (
24962657 "thread-2" , /*ephemeral*/ false , "gpt-5" ,
24972658 ) ) ,
2659+ thread_originator : None ,
24982660 } ,
24992661 & mut events,
25002662 )
@@ -2745,7 +2907,7 @@ async fn subagent_thread_started_publishes_without_initialize() {
27452907}
27462908
27472909#[ tokio:: test]
2748- async fn subagent_events_use_inherited_connection_unless_turn_connection_is_explicit ( ) {
2910+ async fn subagent_events_keep_thread_originator_with_explicit_turn_connection ( ) {
27492911 let mut reducer = AnalyticsReducer :: default ( ) ;
27502912 let mut events = Vec :: new ( ) ;
27512913 let parent_thread_id =
@@ -2782,6 +2944,7 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
27822944 /*ephemeral*/ false ,
27832945 "gpt-5" ,
27842946 ) ) ,
2947+ thread_originator : None ,
27852948 } ,
27862949 & mut events,
27872950 )
@@ -2904,6 +3067,7 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
29043067 connection_id : 8 ,
29053068 request_id : RequestId :: Integer ( 3 ) ,
29063069 response : Box :: new ( sample_turn_start_response ( "turn-explicit" ) ) ,
3070+ thread_originator : None ,
29073071 } ,
29083072 & mut events,
29093073 )
@@ -2914,7 +3078,11 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
29143078 } ;
29153079 assert_eq ! (
29163080 event. event_params. app_server_client. product_client_id,
2917- DEFAULT_ORIGINATOR
3081+ "parent-client"
3082+ ) ;
3083+ assert_eq ! (
3084+ event. event_params. app_server_client. client_name. as_deref( ) ,
3085+ Some ( "codex-tui" )
29183086 ) ;
29193087}
29203088
@@ -3804,6 +3972,7 @@ async fn accepted_turn_steer_emits_expected_event() {
38043972 connection_id : 7 ,
38053973 request_id : RequestId :: Integer ( 4 ) ,
38063974 response : Box :: new ( sample_turn_steer_response ( "turn-2" ) ) ,
3975+ thread_originator : None ,
38073976 } ,
38083977 & mut out,
38093978 )
@@ -3975,6 +4144,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
39754144 connection_id : 7 ,
39764145 request_id : RequestId :: Integer ( 3 ) ,
39774146 response : Box :: new ( sample_turn_start_response ( "turn-2" ) ) ,
4147+ thread_originator : None ,
39784148 } ,
39794149 & mut out,
39804150 )
@@ -4303,6 +4473,7 @@ async fn accepted_steers_increment_turn_steer_count() {
43034473 connection_id : 7 ,
43044474 request_id : RequestId :: Integer ( 4 ) ,
43054475 response : Box :: new ( sample_turn_steer_response ( "turn-2" ) ) ,
4476+ thread_originator : None ,
43064477 } ,
43074478 & mut out,
43084479 )
@@ -4350,6 +4521,7 @@ async fn accepted_steers_increment_turn_steer_count() {
43504521 connection_id : 7 ,
43514522 request_id : RequestId :: Integer ( 6 ) ,
43524523 response : Box :: new ( sample_turn_steer_response ( "turn-2" ) ) ,
4524+ thread_originator : None ,
43534525 } ,
43544526 & mut out,
43554527 )
0 commit comments