Skip to content

Commit 696cbb7

Browse files
[#158]: moved map manager from identity to common crate to reuse the function in metrics service
1 parent 4f6c2b0 commit 696cbb7

4 files changed

Lines changed: 91 additions & 75 deletions

File tree

core/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ bytemuck_derive = "1.10.2"
2929
map-handlers = []
3030
program-handlers = []
3131
network-structs = []
32+
monitoring-structs = []
3233
buffer-reader = []
3334
experimental = []

core/common/src/map_handlers.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,57 @@ pub fn load_perf_event_array_from_mapdata(
154154
})?;
155155
Ok(perf_event_array)
156156
}
157+
158+
#[cfg(feature = "map-handlers")]
159+
pub fn map_manager(
160+
maps: BpfMapsData,
161+
) -> Result<
162+
std::collections::HashMap<
163+
String,
164+
(
165+
aya::maps::PerfEventArray<aya::maps::MapData>,
166+
Vec<aya::maps::perf::PerfEventArrayBuffer<aya::maps::MapData>>,
167+
),
168+
>,
169+
Error,
170+
> {
171+
use aya::maps::PerfEventArray;
172+
use aya::maps::{MapData, perf::PerfEventArrayBuffer};
173+
use tracing::debug;
174+
175+
let mut map_manager = std::collections::HashMap::<
176+
String, // this will store the bpf map name
177+
(PerfEventArray<MapData>, Vec<PerfEventArrayBuffer<MapData>>), // this will manage the BPF_MAP_TYPE_PERF_EVENT_ARRAY and its buffer
178+
>::new();
179+
180+
// map_manager creates an hashmap that contains:
181+
// MAP NAME as String (KEY)
182+
//
183+
// VALUES (tuple)
184+
// a PERF_EVENT_ARRAY
185+
// a vector of PERF_EVENT_ARRAY_BUFFER
186+
//
187+
// the map manager helps the event listener to specifically call a map by its pinned name
188+
// e.g. veth_identity_map and returns the associated PERF_EVENT_ARRAY and PERF_EVENT_ARRAY_BUFFERS (1 per CPU)
189+
// also the map manager helps to write a more complete debug context by linking map names with arrays and buffers.
190+
// actually i cannot return the extact information using only the Aya library
191+
192+
// create the PerfEventArrays and the buffers from the BpfMapsData Objects
193+
for (map, name) in maps
194+
.bpf_obj_map
195+
.into_iter()
196+
.zip(maps.bpf_obj_names.into_iter())
197+
// zip two iterators at the same time for map object and map names
198+
{
199+
debug!("Debugging map type:{:?} for map name {:?}", map, &name);
200+
info!("Creating PerfEventArray for map name {:?}", &name);
201+
202+
// save the map in a registry if is a PerfEventArray to access them by name
203+
if let std::result::Result::Ok(perf_event_array) = PerfEventArray::try_from(map) {
204+
map_manager.insert(name.clone(), (perf_event_array, Vec::new()));
205+
} else {
206+
warn!("Map {:?} is not a PerfEventArray, skipping load", &name);
207+
}
208+
}
209+
Ok(map_manager)
210+
}

core/src/components/identity/src/main.rs

Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,16 @@ mod service_discovery;
1414
use crate::helpers::{get_veth_channels, read_perf_buffer};
1515
use aya::{
1616
Ebpf,
17-
maps::{
18-
MapData,
19-
perf::{PerfEventArray, PerfEventArrayBuffer},
20-
},
2117
programs::{SchedClassifier, TcAttachType, tc::SchedClassifierLinkId},
2218
util::online_cpus,
2319
};
2420

2521
#[cfg(feature = "experimental")]
2622
use crate::helpers::scan_cgroup_cronjob;
2723

28-
use bytes::BytesMut;
29-
use cortexbrain_common::map_handlers::{init_bpf_maps, map_pinner, populate_blocklist};
24+
use cortexbrain_common::map_handlers::{
25+
init_bpf_maps, map_manager, map_pinner, populate_blocklist,
26+
};
3027
use cortexbrain_common::program_handlers::load_program;
3128
use cortexbrain_common::{buffer_type::BufferType, map_handlers::BpfMapsData};
3229
use std::{
@@ -36,11 +33,11 @@ use std::{
3633
};
3734

3835
use anyhow::{Context, Ok};
36+
use cortexbrain_common::buffer_type::BufferSize;
3937
use cortexbrain_common::{constants, logger};
40-
use tokio::{fs, signal};
41-
use tracing::{debug, error, info, warn};
42-
4338
use std::collections::HashMap;
39+
use tokio::{fs, signal};
40+
use tracing::{error, info};
4441

4542
#[tokio::main]
4643
async fn main() -> Result<(), anyhow::Error> {
@@ -203,34 +200,11 @@ async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> {
203200

204201
//TODO: try to change from PerfEventArray to a RingBuffer data structure
205202

206-
let mut map_manager =
207-
HashMap::<String, (PerfEventArray<MapData>, Vec<PerfEventArrayBuffer<MapData>>)>::new();
208-
209-
// create the PerfEventArrays and the buffers from the BpfMapsData Objects
210-
for (map, name) in bpf_maps
211-
.bpf_obj_map
212-
.into_iter()
213-
.zip(bpf_maps.bpf_obj_names.into_iter())
214-
// zip two iterators at the same time for map and mapnames
215-
{
216-
debug!("Debugging map type:{:?} for map name {:?}", map, &name);
217-
info!("Creating PerfEventArray for map name {:?}", &name);
218-
219-
// save the map in a registry if is a PerfEventArray to access them by name
220-
if let std::result::Result::Ok(perf_event_array) = PerfEventArray::try_from(map) {
221-
map_manager.insert(name.clone(), (perf_event_array, Vec::new()));
222-
223-
// perf_event_arrays.push(perf_event_array); // this is step 1
224-
// let perf_event_array_buffer = Vec::new();
225-
// event_buffers.push(perf_event_array_buffer); //this is step 2
226-
} else {
227-
warn!("Map {:?} is not a PerfEventArray, skipping load", &name);
228-
}
229-
}
203+
let mut maps = map_manager(bpf_maps)?;
230204

231205
// fill the input buffers with data from the PerfEventArrays
232206
for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? {
233-
for (name, (perf_evt_array, perf_evt_array_buffer)) in map_manager.iter_mut() {
207+
for (name, (perf_evt_array, perf_evt_array_buffer)) in maps.iter_mut() {
234208
let buf = perf_evt_array.open(cpu_id, None)?;
235209
info!(
236210
"Buffer created for map {:?} on cpu_id {:?}. Buffer size: {}",
@@ -245,23 +219,20 @@ async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> {
245219
info!("Listening for events...");
246220

247221
// i need to use remove to move the values from the Map Manager to the the async tasks
248-
let (perf_veth_array, perf_veth_buffers) = map_manager
222+
let (perf_veth_array, perf_veth_buffers) = maps
249223
.remove("veth_identity_map")
250224
.expect("Cannot create perf_veth buffer");
251-
let (perf_net_events_array, perf_net_events_buffers) = map_manager
225+
let (perf_net_events_array, perf_net_events_buffers) = maps
252226
.remove("events_map")
253227
.expect("Cannot create perf_net_events buffer");
254-
let (tcp_registry_array, tcp_registry_buffers) = map_manager
228+
let (tcp_registry_array, tcp_registry_buffers) = maps
255229
.remove("TcpPacketRegistry")
256230
.expect("Cannot create tcp_registry buffer");
257231

258232
// init output buffers
259-
let veth_buffers = vec![BytesMut::with_capacity(10 * 1024); online_cpus().iter().len()];
260-
let events_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()];
261-
let tcp_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()];
262-
263-
// init veth link ids
264-
//let veth_link_ids = link_ids;
233+
let veth_buffers = BufferSize::VethEvents.set_buffer();
234+
let events_buffers = BufferSize::ClassifierNetEvents.set_buffer();
235+
let tcp_buffers = BufferSize::TcpEvents.set_buffer();
265236

266237
// spawn async tasks
267238
let veth_events_displayer = tokio::spawn(async move {

core/src/components/metrics/src/helpers.rs

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use aya::{
2-
maps::{Map, MapData, PerfEventArray, perf::PerfEventArrayBuffer},
2+
maps::{MapData, perf::PerfEventArrayBuffer},
33
util::online_cpus,
44
};
55

@@ -10,10 +10,14 @@ use std::sync::{
1010
};
1111
use tokio::signal;
1212

13-
use tracing::{debug, error, info, warn};
13+
use tracing::{error, info};
1414

15-
use crate::structs::NetworkMetrics;
16-
use crate::structs::TimeStampMetrics;
15+
use cortexbrain_common::map_handlers::map_manager;
16+
use cortexbrain_common::{
17+
buffer_type::{BufferSize, BufferType},
18+
buffer_type::{NetworkMetrics, TimeStampMetrics},
19+
map_handlers::BpfMapsData,
20+
};
1721

1822
pub async fn display_metrics_map(
1923
mut perf_buffers: Vec<PerfEventArrayBuffer<MapData>>,
@@ -119,50 +123,36 @@ pub async fn display_time_stamp_events_map(
119123
info!("Timestamp event listener stopped");
120124
}
121125

122-
pub async fn event_listener(bpf_maps: Vec<Map>) -> Result<(), anyhow::Error> {
126+
pub async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> {
123127
info!("Getting CPU count...");
124128

125-
let mut perf_event_arrays = Vec::new(); // contains a vector of PerfEventArrays
126-
let mut event_buffers = Vec::new(); // contains a vector of buffers
127-
128-
info!("Creating perf buffers...");
129-
for map in bpf_maps {
130-
debug!("Debugging map type:{:?}", map);
131-
if let std::result::Result::Ok(perf_event_array) = PerfEventArray::try_from(map) {
132-
perf_event_arrays.push(perf_event_array); // this is step 1
133-
let perf_event_array_buffer = Vec::new();
134-
event_buffers.push(perf_event_array_buffer); //this is step 2
135-
} else {
136-
warn!("Map is not a PerfEventArray, skipping load");
137-
}
138-
}
129+
let mut maps = map_manager(bpf_maps)?;
139130

140131
let cpu_count = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?;
141132

142-
//info!("CPU count: {}", cpu_count);
143-
for (perf_evt_array, perf_evt_array_buffer) in
144-
perf_event_arrays.iter_mut().zip(event_buffers.iter_mut())
145-
{
146-
for cpu_id in &cpu_count {
147-
let single_buffer = perf_evt_array.open(*cpu_id, None)?;
148-
perf_evt_array_buffer.push(single_buffer);
133+
for cpu_id in cpu_count {
134+
for (name, (perf_event_array, perf_event_buffer)) in maps.iter_mut() {
135+
let buf = perf_event_array.open(cpu_id, None)?;
136+
perf_event_buffer.push(buf);
149137
}
150138
}
151139

152-
//info!("Opening perf buffers for {} CPUs...", cpu_count);
153140
info!("Perf buffers created successfully");
154-
let mut event_buffers = event_buffers.into_iter();
155141

156-
let time_stamp_events_perf_buffer = event_buffers.next().expect("");
157-
let net_perf_buffer = event_buffers.next().expect("");
142+
let (time_stamp_events_array, time_stamp_events_perf_buffer) = maps
143+
.remove("time_stamp_events")
144+
.expect("Cannot create time_stamp_events_buffer");
145+
let (net_perf_array, net_perf_buffer) = maps
146+
.remove("net_metrics")
147+
.expect("Cannot create net_perf_buffer");
158148

159149
// Create shared running flags
160150
let net_metrics_running = Arc::new(AtomicBool::new(true));
161151
let time_stamp_events_running = Arc::new(AtomicBool::new(true));
162152

163153
// Create proper sized buffers
164-
let net_metrics_buffers = vec![BytesMut::with_capacity(1024); cpu_count.len()];
165-
let time_stamp_events_buffers = vec![BytesMut::with_capacity(1024); cpu_count.len()];
154+
let net_metrics_buffers = BufferSize::NetworkMetricsEvents.set_buffer();
155+
let time_stamp_events_buffers = BufferSize::TimeMetricsEvents.set_buffer();
166156

167157
// Clone for the signal handler
168158
let net_metrics_running_signal = net_metrics_running.clone();

0 commit comments

Comments
 (0)