@@ -14,7 +14,10 @@ mod service_discovery;
1414use crate :: helpers:: { get_veth_channels, read_perf_buffer} ;
1515use aya:: {
1616 Ebpf ,
17- maps:: { Map , perf:: PerfEventArray } ,
17+ maps:: {
18+ MapData ,
19+ perf:: { PerfEventArray , PerfEventArrayBuffer } ,
20+ } ,
1821 programs:: { SchedClassifier , TcAttachType , tc:: SchedClassifierLinkId } ,
1922 util:: online_cpus,
2023} ;
@@ -25,6 +28,7 @@ use crate::helpers::scan_cgroup_cronjob;
2528use bytes:: BytesMut ;
2629use cortexbrain_common:: map_handlers:: { init_bpf_maps, map_pinner, populate_blocklist} ;
2730use cortexbrain_common:: program_handlers:: load_program;
31+ use cortexbrain_common:: { buffer_type:: BufferType , map_handlers:: BpfMapsData } ;
2832use std:: {
2933 convert:: TryInto ,
3034 path:: Path ,
@@ -40,8 +44,7 @@ use std::collections::HashMap;
4044
4145#[ tokio:: main]
4246async fn main ( ) -> Result < ( ) , anyhow:: Error > {
43- //init tracing subscriber
44- //logger::init_default_logger();
47+ //init otlè tracing subscriber
4548 let otlp_provider = logger:: otlp_logger_init ( "identity_service-OTLP" . to_string ( ) ) ;
4649
4750 info ! ( "Starting identity service..." ) ;
@@ -195,57 +198,65 @@ async fn init_tcp_registry(bpf: Arc<Mutex<Ebpf>>) -> Result<(), anyhow::Error> {
195198// perf_veth_array: contains is associated with the network events stored in the veth_map (veth_identity_map)
196199//
197200//
198- async fn event_listener (
199- bpf_maps : Vec < Map > ,
200- //link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
201- //bpf: Arc<Mutex<Ebpf>>,
202- ) -> Result < ( ) , anyhow:: Error > {
201+ async fn event_listener ( bpf_maps : BpfMapsData ) -> Result < ( ) , anyhow:: Error > {
203202 info ! ( "Preparing perf_buffers and perf_arrays" ) ;
204203
205204 //TODO: try to change from PerfEventArray to a RingBuffer data structure
206205
207- let mut perf_event_arrays = Vec :: new ( ) ; // contains a vector of PerfEventArrays
208- let mut event_buffers = Vec :: new ( ) ; // contains a vector of buffers
206+ let mut map_manager =
207+ HashMap :: < String , ( PerfEventArray < MapData > , Vec < PerfEventArrayBuffer < MapData > > ) > :: new ( ) ;
208+
209+ // create the PerfEventArrays and the buffers from the BpfMapsData Objects
210+ for ( map, name) in bpf_maps
211+ . bpf_obj_map
212+ . into_iter ( )
213+ . zip ( bpf_maps. bpf_obj_names . into_iter ( ) )
214+ // zip two iterators at the same time for map and mapnames
215+ {
216+ debug ! ( "Debugging map type:{:?} for map name {:?}" , map, & name) ;
217+ info ! ( "Creating PerfEventArray for map name {:?}" , & name) ;
209218
210- // create the PerfEventArrays and the buffers
211- for map in bpf_maps {
212- debug ! ( "Debugging map type:{:?}" , map) ;
219+ // save the map in a registry if is a PerfEventArray to access them by name
213220 if let std:: result:: Result :: Ok ( perf_event_array) = PerfEventArray :: try_from ( map) {
214- perf_event_arrays. push ( perf_event_array) ; // this is step 1
215- let perf_event_array_buffer = Vec :: new ( ) ;
216- event_buffers. push ( perf_event_array_buffer) ; //this is step 2
221+ map_manager. insert ( name. clone ( ) , ( perf_event_array, Vec :: new ( ) ) ) ;
222+
223+ // perf_event_arrays.push(perf_event_array); // this is step 1
224+ // let perf_event_array_buffer = Vec::new();
225+ // event_buffers.push(perf_event_array_buffer); //this is step 2
217226 } else {
218- warn ! ( "Map is not a PerfEventArray, skipping load" ) ;
227+ warn ! ( "Map {:?} is not a PerfEventArray, skipping load" , & name ) ;
219228 }
220229 }
221230
222231 // fill the input buffers with data from the PerfEventArrays
223- let cpus = online_cpus ( ) . map_err ( |e| anyhow:: anyhow!( "Error {:?}" , e) ) ?;
224-
225- for ( perf_evt_array, perf_evt_array_buffer) in
226- perf_event_arrays. iter_mut ( ) . zip ( event_buffers. iter_mut ( ) )
227- {
228- for cpu_id in & cpus {
229- let single_buffer = perf_evt_array. open ( * cpu_id, None ) ?;
230- perf_evt_array_buffer. push ( single_buffer) ;
232+ for cpu_id in online_cpus ( ) . map_err ( |e| anyhow:: anyhow!( "Error {:?}" , e) ) ? {
233+ for ( name, ( perf_evt_array, perf_evt_array_buffer) ) in map_manager. iter_mut ( ) {
234+ let buf = perf_evt_array. open ( cpu_id, None ) ?;
235+ info ! (
236+ "Buffer created for map {:?} on cpu_id {:?}. Buffer size: {}" ,
237+ name,
238+ cpu_id,
239+ std:: mem:: size_of_val( & buf)
240+ ) ;
241+ perf_evt_array_buffer. push ( buf) ;
231242 }
232243 }
233244
234245 info ! ( "Listening for events..." ) ;
235246
236- let mut event_buffers = event_buffers . into_iter ( ) ;
237- let perf_veth_buffer = event_buffers
238- . next ( )
247+ // i need to use remove to move the values from the Map Manager to the the async tasks
248+ let ( perf_veth_array , perf_veth_buffers ) = map_manager
249+ . remove ( "veth_identity_map" )
239250 . expect ( "Cannot create perf_veth buffer" ) ;
240- let perf_net_events_buffer = event_buffers
241- . next ( )
251+ let ( perf_net_events_array , perf_net_events_buffers ) = map_manager
252+ . remove ( "events_map" )
242253 . expect ( "Cannot create perf_net_events buffer" ) ;
243- let tcp_registry_buffer = event_buffers
244- . next ( )
254+ let ( tcp_registry_array , tcp_registry_buffers ) = map_manager
255+ . remove ( "TcpPacketRegistry" )
245256 . expect ( "Cannot create tcp_registry buffer" ) ;
246257
247258 // init output buffers
248- let veth_buffers = vec ! [ BytesMut :: with_capacity( 1024 ) ; online_cpus( ) . iter( ) . len( ) ] ;
259+ let veth_buffers = vec ! [ BytesMut :: with_capacity( 10 * 1024 ) ; online_cpus( ) . iter( ) . len( ) ] ;
249260 let events_buffers = vec ! [ BytesMut :: with_capacity( 1024 ) ; online_cpus( ) . iter( ) . len( ) ] ;
250261 let tcp_buffers = vec ! [ BytesMut :: with_capacity( 1024 ) ; online_cpus( ) . iter( ) . len( ) ] ;
251262
@@ -254,26 +265,23 @@ async fn event_listener(
254265
255266 // spawn async tasks
256267 let veth_events_displayer = tokio:: spawn ( async move {
257- //display_veth_events(bpf.clone(), perf_veth_buffer, veth_buffers, veth_link_ids).await;
258- read_perf_buffer ( perf_veth_buffer, veth_buffers, helpers:: BufferType :: VethLog ) . await ;
268+ read_perf_buffer ( perf_veth_buffers, veth_buffers, BufferType :: VethLog ) . await ;
259269 } ) ;
260270
261271 let net_events_displayer = tokio:: spawn ( async move {
262- //display_events(perf_net_events_buffer, events_buffers).await;
263272 read_perf_buffer (
264- perf_net_events_buffer ,
273+ perf_net_events_buffers ,
265274 events_buffers,
266- helpers :: BufferType :: PacketLog ,
275+ BufferType :: PacketLog ,
267276 )
268277 . await ;
269278 } ) ;
270279
271280 let tcp_registry_events_displayer: tokio:: task:: JoinHandle < ( ) > = tokio:: spawn ( async move {
272- //display_tcp_registry_events(tcp_registry_buffer, tcp_buffers).await;
273281 read_perf_buffer (
274- tcp_registry_buffer ,
282+ tcp_registry_buffers ,
275283 tcp_buffers,
276- helpers :: BufferType :: TcpPacketRegistry ,
284+ BufferType :: TcpPacketRegistry ,
277285 )
278286 . await ;
279287 } ) ;
0 commit comments