33
44use abomonation:: { decode, encode, unsafe_abomonate, Abomonation } ;
55use alloc:: boxed:: Box ;
6+ use alloc:: vec:: Vec ;
67use core2:: io:: Result as IOResult ;
78use core2:: io:: Write ;
9+ use fallible_collections:: { FallibleVec , FallibleVecGlobal } ;
10+ use kpi:: system:: CpuThread ;
811use log:: { debug, error, info, warn} ;
912use rpc:: client:: Client ;
1013use rpc:: rpc:: { ClientId , RPCError , RPCHeader } ;
1114use rpc:: RPCClient ;
1215
1316use super :: dcm:: node_registration:: dcm_register_node;
14- use crate :: arch:: rackscale:: controller:: SHMEM_MANAGERS ;
17+ use crate :: arch:: rackscale:: controller:: { HWTHREADS , SHMEM_MANAGERS } ;
1518use crate :: error:: KResult ;
1619use crate :: memory:: LARGE_PAGE_SIZE ;
1720use crate :: transport:: shmem:: { create_shmem_manager, get_affinity_shmem} ;
@@ -61,15 +64,41 @@ pub(crate) fn initialize_client(
6164 send_client_data : bool , // This field is used to indicate if init_client() should send ClientRegistrationRequest
6265) -> KResult < Box < Client > > {
6366 if send_client_data {
67+ // Fetch system information
6468 let ( affinity_shmem_offset, affinity_shmem_size) = get_affinity_shmem ( ) ;
69+ let hwthreads = atopology:: MACHINE_TOPOLOGY . threads ( ) ;
70+ let num_threads = atopology:: MACHINE_TOPOLOGY . num_threads ( ) ;
6571
72+ // Create CpuThreads vector
73+ let mut return_threads = Vec :: try_with_capacity ( num_threads) ?;
74+ for hwthread in hwthreads {
75+ return_threads. try_push ( kpi:: system:: CpuThread {
76+ id : hwthread. id as usize ,
77+ node_id : hwthread. node_id . unwrap_or ( 0 ) as usize ,
78+ package_id : hwthread. package_id as usize ,
79+ core_id : hwthread. core_id as usize ,
80+ thread_id : hwthread. thread_id as usize ,
81+ } ) ?;
82+ }
83+ info ! ( "return_threads: {:?}" , return_threads) ;
84+
85+ // Construct client registration request
6686 let req = ClientRegistrationRequest {
6787 affinity_shmem_offset,
6888 affinity_shmem_size,
6989 num_cores : atopology:: MACHINE_TOPOLOGY . num_threads ( ) as u64 ,
7090 } ;
71- let mut req_data = [ 0u8 ; core:: mem:: size_of :: < ClientRegistrationRequest > ( ) ] ;
72- unsafe { encode ( & req, & mut ( & mut req_data) . as_mut ( ) ) } . unwrap ( ) ;
91+
92+ // Serialize and send to controller
93+ let mut req_data = Vec :: try_with_capacity (
94+ core:: mem:: size_of :: < ClientRegistrationRequest > ( )
95+ + core:: mem:: size_of :: < CpuThread > ( ) * num_threads
96+ + core:: mem:: size_of :: < Vec < CpuThread > > ( ) ,
97+ )
98+ . expect ( "failed to alloc memory for client registration" ) ;
99+ unsafe { encode ( & req, & mut req_data) } . expect ( "Failed to encode ClientRegistrationRequest" ) ;
100+ unsafe { encode ( & return_threads, & mut req_data) }
101+ . expect ( "Failed to encode hardware thread vector" ) ;
73102 client. connect ( & [ & req_data] ) ?;
74103 } else {
75104 client. connect ( & [ & [ ] ] ) ?;
@@ -85,31 +114,68 @@ pub(crate) fn register_client(
85114 use crate :: memory:: LARGE_PAGE_SIZE ;
86115
87116 // Decode client registration request
88- return if let Some ( ( req, _remaining) ) = unsafe { decode :: < ClientRegistrationRequest > ( payload) }
117+ if let Some ( ( req, hwthreads_data) ) =
118+ unsafe { decode :: < ClientRegistrationRequest > ( & mut payload[ ..hdr. msg_len as usize ] ) }
89119 {
90120 let memslices = req. affinity_shmem_size / ( LARGE_PAGE_SIZE as u64 ) ;
91121 info ! ( "Received registration request from client with {:?} cores and shmem {:x?}-{:x?} ({:?} memslices)" ,
92122 req. num_cores, req. affinity_shmem_offset, req. affinity_shmem_offset + req. affinity_shmem_size, memslices) ;
93123
94- // Register client resources with DCM, DCM doesn't care about pids, so
95- // send w/ dummy pid
96- let node_id = dcm_register_node ( 0 , req. num_cores , memslices) ;
97- info ! ( "Registered client DCM, assigned client_id={:?}" , node_id) ;
98-
99- // Create shmem memory manager
100- // Probably not most accurate to use node_id for affinity here
101- let mut managers = SHMEM_MANAGERS . lock ( ) ;
102- managers[ node_id as usize ] =
103- create_shmem_manager ( req. affinity_shmem_offset , req. affinity_shmem_size , node_id) ;
104- log:: info!(
105- "Created shmem manager on behalf of client {:?}: {:?}" ,
106- node_id,
107- managers[ node_id as usize ]
108- ) ;
109-
110- Ok ( node_id)
124+ if let Some ( ( hwthreads, remaining) ) = unsafe { decode :: < Vec < CpuThread > > ( hwthreads_data) } {
125+ if remaining. len ( ) == 0 {
126+ // Register client resources with DCM, DCM doesn't care about pids, so
127+ // send w/ dummy pid
128+ let node_id = dcm_register_node ( 0 , req. num_cores , memslices) ;
129+ info ! ( "Registered client DCM, assigned client_id={:?}" , node_id) ;
130+
131+ // Create shmem memory manager
132+ // Probably not most accurate to use node_id for affinity here
133+ let mut managers = SHMEM_MANAGERS . lock ( ) ;
134+ managers[ node_id as usize ] = create_shmem_manager (
135+ req. affinity_shmem_offset ,
136+ req. affinity_shmem_size ,
137+ node_id,
138+ ) ;
139+ log:: info!(
140+ "Created shmem manager on behalf of client {:?}: {:?}" ,
141+ node_id,
142+ managers[ node_id as usize ]
143+ ) ;
144+
145+ // Record information about the hardware threads
146+ info ! ( "hwthreads: {:?}" , hwthreads) ;
147+ let mut rack_threads = HWTHREADS . lock ( ) ;
148+ let mut gtid = 0 ;
149+ for i in 0 ..node_id as usize {
150+ gtid += rack_threads[ i] . len ( ) ;
151+ }
152+ info ! ( "Starting client gtid at: {:?}" , gtid) ;
153+ for hwthread in hwthreads {
154+ // Create new, globally unique global thread id (gtid)
155+ rack_threads[ node_id as usize ] . push ( CpuThread {
156+ id : gtid,
157+ node_id : hwthread. node_id ,
158+ package_id : hwthread. package_id ,
159+ core_id : hwthread. core_id ,
160+ thread_id : hwthread. thread_id ,
161+ } ) ;
162+ gtid += 1 ;
163+ }
164+ info ! ( "rack_threads: {:?}" , rack_threads) ;
165+
166+ Ok ( node_id)
167+ } else {
168+ error ! ( "Extra data in register_client" ) ;
169+ Err ( RPCError :: MalformedResponse )
170+ }
171+ } else {
172+ error ! (
173+ "Failed to decode client registration hwtheads information during register_client"
174+ ) ;
175+ Err ( RPCError :: MalformedResponse )
176+ }
111177 } else {
112178 error ! ( "Failed to decode client registration request during register_client" ) ;
113179 Err ( RPCError :: MalformedResponse )
114- } ;
180+ }
115181}
0 commit comments