Skip to content

Commit 8599b90

Browse files
[#182]: Added GetTrackedVethFromHashMap grpc endpoint to see the tracked veths (pt.2)
1 parent 147802f commit 8599b90

File tree

6 files changed

+174
-28
lines changed

6 files changed

+174
-28
lines changed

cli/src/monitoring.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tonic_reflection::pb::v1::server_reflection_response::MessageResponse;
1010
use agent_api::client::{connect_to_client, connect_to_server_reflection};
1111
use agent_api::requests::{
1212
get_all_features, send_active_connection_request, send_dropped_packets_request,
13-
send_latency_metrics_request, send_tracked_veth_request,
13+
send_latency_metrics_request, send_tracked_veth_request, send_veth_tracked_hashmap_req,
1414
};
1515

1616
use crate::errors::CliError;
@@ -304,25 +304,24 @@ pub async fn monitor_tracked_veth() -> Result<(), CliError> {
304304
"Connecting to cortexflow Client".white()
305305
);
306306
match connect_to_client().await {
307-
Ok(client) => match send_tracked_veth_request(client).await {
307+
Ok(client) => match send_veth_tracked_hashmap_req(client).await {
308308
Ok(response) => {
309309
let veth_response = response.into_inner();
310-
if veth_response.tot_monitored_veth == 0 {
311-
println!("{} {} ", "=====>".blue().bold(), "No tracked veth found");
312-
Ok(())
313-
} else {
314-
println!(
315-
"{} {} {} {} ",
316-
"=====>".blue().bold(),
317-
"Found:",
318-
&veth_response.tot_monitored_veth,
319-
"tracked veth"
320-
);
321-
for veth in veth_response.veth_names.iter() {
322-
println!("{} {}", "=====>".blue().bold(), &veth);
323-
}
324-
Ok(())
310+
// if veth_response.tot_monitored_veth == 0 {
311+
// println!("{} {} ", "=====>".blue().bold(), "No tracked veth found");
312+
// Ok(())
313+
// } else {
314+
// println!(
315+
// "{} {} {} {} ",
316+
// "=====>".blue().bold(),
317+
// "Found:",
318+
// &veth_response.tot_monitored_veth,
319+
// "tracked veth"
320+
// );
321+
for veth in veth_response.veths.iter() {
322+
println!("{} {:?}", "=====>".blue().bold(), &veth);
325323
}
324+
Ok(())
326325
}
327326
Err(e) => {
328327
return Err(CliError::AgentError(

core/api/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ aya = "0.13.1"
3232
cortexbrain-common = { path = "../common", features = [
3333
"map-handlers",
3434
"network-structs",
35-
"buffer-reader"
35+
"buffer-reader",
36+
"monitoring-structs"
3637
] }
3738
tonic-reflection = "0.14.0"
3839
tonic-build = "0.14.0"

core/api/protos/agent.proto

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,13 @@ message VethEvent{
8484
uint32 pid = 6; // Process ID
8585
}
8686

87-
//declare agent api
87+
message VethHashMapResponse{ // returns tracked veth from the tracked_veth hashmap
88+
string status = 1;
89+
map<string,string> veths = 2;
90+
}
91+
92+
// Agent Service
93+
8894
service Agent{
8995
// active connections endpoint
9096
rpc ActiveConnections(RequestActiveConnections) returns (ActiveConnectionResponse);
@@ -102,10 +108,15 @@ service Agent{
102108
// dropped packets endpoint
103109
rpc GetDroppedPacketsMetrics(google.protobuf.Empty) returns (DroppedPacketsResponse);
104110

111+
// TODO: can i combine this 2 endpoints?
105112
// active veth info endpoint
106113
rpc GetTrackedVeth(google.protobuf.Empty) returns (VethResponse);
114+
// get tracked veth from blocklist
115+
rpc GetTrackedVethFromHashMap(google.protobuf.Empty) returns (VethHashMapResponse);
107116
}
108117

118+
// Blocklist
119+
109120
message AddIpToBlocklistRequest{
110121
optional string ip = 1 ;
111122
}

core/api/src/agent.rs

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,17 @@ pub struct VethEvent {
151151
#[prost(uint32, tag = "6")]
152152
pub pid: u32,
153153
}
154+
/// returns tracked veth from the tracked_veth hashmap
155+
#[derive(Clone, PartialEq, ::prost::Message)]
156+
pub struct VethHashMapResponse {
157+
#[prost(string, tag = "1")]
158+
pub status: ::prost::alloc::string::String,
159+
#[prost(map = "string, string", tag = "2")]
160+
pub veths: ::std::collections::HashMap<
161+
::prost::alloc::string::String,
162+
::prost::alloc::string::String,
163+
>,
164+
}
154165
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
155166
pub struct AddIpToBlocklistRequest {
156167
#[prost(string, optional, tag = "1")]
@@ -192,7 +203,6 @@ pub mod agent_client {
192203
)]
193204
use tonic::codegen::*;
194205
use tonic::codegen::http::Uri;
195-
/// declare agent api
196206
#[derive(Debug, Clone)]
197207
pub struct AgentClient<T> {
198208
inner: tonic::client::Grpc<T>,
@@ -444,6 +454,31 @@ pub mod agent_client {
444454
.insert(GrpcMethod::new("agent.Agent", "GetTrackedVeth"));
445455
self.inner.unary(req, path, codec).await
446456
}
457+
/// get tracked veth from blocklist
458+
pub async fn get_tracked_veth_from_hash_map(
459+
&mut self,
460+
request: impl tonic::IntoRequest<()>,
461+
) -> std::result::Result<
462+
tonic::Response<super::VethHashMapResponse>,
463+
tonic::Status,
464+
> {
465+
self.inner
466+
.ready()
467+
.await
468+
.map_err(|e| {
469+
tonic::Status::unknown(
470+
format!("Service was not ready: {}", e.into()),
471+
)
472+
})?;
473+
let codec = tonic_prost::ProstCodec::default();
474+
let path = http::uri::PathAndQuery::from_static(
475+
"/agent.Agent/GetTrackedVethFromHashMap",
476+
);
477+
let mut req = request.into_request();
478+
req.extensions_mut()
479+
.insert(GrpcMethod::new("agent.Agent", "GetTrackedVethFromHashMap"));
480+
self.inner.unary(req, path, codec).await
481+
}
447482
}
448483
}
449484
/// Generated server implementations.
@@ -511,8 +546,15 @@ pub mod agent_server {
511546
&self,
512547
request: tonic::Request<()>,
513548
) -> std::result::Result<tonic::Response<super::VethResponse>, tonic::Status>;
549+
/// get tracked veth from blocklist
550+
async fn get_tracked_veth_from_hash_map(
551+
&self,
552+
request: tonic::Request<()>,
553+
) -> std::result::Result<
554+
tonic::Response<super::VethHashMapResponse>,
555+
tonic::Status,
556+
>;
514557
}
515-
/// declare agent api
516558
#[derive(Debug)]
517559
pub struct AgentServer<T> {
518560
inner: Arc<T>,
@@ -885,6 +927,50 @@ pub mod agent_server {
885927
};
886928
Box::pin(fut)
887929
}
930+
"/agent.Agent/GetTrackedVethFromHashMap" => {
931+
#[allow(non_camel_case_types)]
932+
struct GetTrackedVethFromHashMapSvc<T: Agent>(pub Arc<T>);
933+
impl<T: Agent> tonic::server::UnaryService<()>
934+
for GetTrackedVethFromHashMapSvc<T> {
935+
type Response = super::VethHashMapResponse;
936+
type Future = BoxFuture<
937+
tonic::Response<Self::Response>,
938+
tonic::Status,
939+
>;
940+
fn call(&mut self, request: tonic::Request<()>) -> Self::Future {
941+
let inner = Arc::clone(&self.0);
942+
let fut = async move {
943+
<T as Agent>::get_tracked_veth_from_hash_map(
944+
&inner,
945+
request,
946+
)
947+
.await
948+
};
949+
Box::pin(fut)
950+
}
951+
}
952+
let accept_compression_encodings = self.accept_compression_encodings;
953+
let send_compression_encodings = self.send_compression_encodings;
954+
let max_decoding_message_size = self.max_decoding_message_size;
955+
let max_encoding_message_size = self.max_encoding_message_size;
956+
let inner = self.inner.clone();
957+
let fut = async move {
958+
let method = GetTrackedVethFromHashMapSvc(inner);
959+
let codec = tonic_prost::ProstCodec::default();
960+
let mut grpc = tonic::server::Grpc::new(codec)
961+
.apply_compression_config(
962+
accept_compression_encodings,
963+
send_compression_encodings,
964+
)
965+
.apply_max_message_size_config(
966+
max_decoding_message_size,
967+
max_encoding_message_size,
968+
);
969+
let res = grpc.unary(method, req).await;
970+
Ok(res)
971+
};
972+
Box::pin(fut)
973+
}
888974
_ => {
889975
Box::pin(async move {
890976
let mut response = http::Response::new(

core/api/src/api.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use anyhow::Context;
2+
use anyhow::anyhow;
23
use chrono::Local;
34
use cortexbrain_common::formatters::{format_ipv4, format_ipv6};
45
use cortexbrain_common::map_handlers::load_perf_event_array_from_mapdata;
5-
use prost::bytes::BytesMut;
66
use std::str::FromStr;
77
use std::sync::Mutex;
88
use tonic::{Request, Response, Status};
@@ -28,7 +28,8 @@ use cortexbrain_common::buffer_type::VethLog;
2828
// * contains agent api configuration
2929
use crate::agent::{
3030
ActiveConnectionResponse, AddIpToBlocklistRequest, BlocklistResponse, RequestActiveConnections,
31-
RmIpFromBlocklistRequest, RmIpFromBlocklistResponse, VethResponse, agent_server::Agent,
31+
RmIpFromBlocklistRequest, RmIpFromBlocklistResponse, VethHashMapResponse, VethResponse,
32+
agent_server::Agent,
3233
};
3334
use crate::constants::PIN_BLOCKLIST_MAP_PATH;
3435

@@ -38,6 +39,9 @@ use cortexbrain_common::buffer_type::IpProtocols;
3839
use std::net::Ipv4Addr;
3940
use tracing::warn;
4041

42+
use cortexbrain_common::buffer_type::BufferSize;
43+
use cortexbrain_common::map_handlers::map_manager;
44+
4145
pub struct AgentApi {
4246
//* event_rx is an istance of a mpsc receiver.
4347
//* is used to receive the data from the transmitter (tx)
@@ -162,6 +166,9 @@ impl Default for AgentApi {
162166
tracked_veth_tx: veth_tx.clone(),
163167
};
164168

169+
// init map manager
170+
//let map_manager = map_manager(maps)?
171+
165172
// For network metrics
166173

167174
//spawn an event readers
@@ -177,7 +184,7 @@ impl Default for AgentApi {
177184
.open(cpu_id, None)
178185
.expect("Error during the creation of net_events_buf structure");
179186

180-
let buffers = vec![BytesMut::with_capacity(4096); 8];
187+
let buffers = BufferSize::ClassifierNetEvents.set_buffer();
181188
net_events_buffer.push((buf, buffers));
182189
}
183190

@@ -262,7 +269,7 @@ impl Default for AgentApi {
262269
.open(cpu_id, None)
263270
.expect("Error during the creation of net_metrics_buf structure");
264271

265-
let buffers = vec![BytesMut::with_capacity(4096); 8];
272+
let buffers = BufferSize::NetworkMetricsEvents.set_buffer();
266273
net_metrics_buffer.push((buf, buffers));
267274
}
268275

@@ -343,7 +350,7 @@ impl Default for AgentApi {
343350
.open(cpu_id, None)
344351
.expect("Error during the creation of time stamp events buf structure");
345352

346-
let buffers = vec![BytesMut::with_capacity(4096); 8];
353+
let buffers = BufferSize::TimeMetricsEvents.set_buffer();
347354
ts_events_buffer.push((buf, buffers));
348355
}
349356

@@ -421,7 +428,7 @@ impl Default for AgentApi {
421428
.open(cpu_id, None)
422429
.expect("Error during the creation of time stamp events buf structure");
423430

424-
let buffers = vec![BytesMut::with_capacity(4096); 8];
431+
let buffers = BufferSize::VethEvents.set_buffer();
425432
veth_events_buffer.push((buf, buffers));
426433
}
427434

@@ -560,7 +567,10 @@ impl Agent for AgentApi {
560567
//convert ip from string to [u8;4] type and insert into the bpf map
561568
let u8_4_ip = Ipv4Addr::from_str(&ip).unwrap().octets();
562569
//TODO: convert datetime in a kernel compatible format
563-
blocklist_map.insert(u8_4_ip, u8_4_ip, 0);
570+
blocklist_map
571+
.insert(u8_4_ip, u8_4_ip, 0)
572+
.map_err(|e| anyhow!("Cannot insert address in the blocklist. Reason: {}", e))
573+
.unwrap();
564574
info!("CURRENT BLOCKLIST: {:?}", blocklist_map);
565575
}
566576
let path = std::env::var(PIN_BLOCKLIST_MAP_PATH)
@@ -774,4 +784,33 @@ impl Agent for AgentApi {
774784

775785
Ok(Response::new(response))
776786
}
787+
788+
async fn get_tracked_veth_from_hash_map(
789+
&self,
790+
request: Request<()>,
791+
) -> Result<Response<VethHashMapResponse>, Status> {
792+
info!("Returning veth hashmap");
793+
//open blocklist map
794+
let mapdata = MapData::from_pin("/sys/fs/bpf/maps/tracked_veth")
795+
.expect("cannot open tracked_veth Mapdata");
796+
let tracked_veth_mapdata = Map::HashMap(mapdata); //load mapdata
797+
798+
let tracked_veth_map: ayaHashMap<MapData, [u8; 16], [u8; 8]> =
799+
ayaHashMap::try_from(tracked_veth_mapdata).unwrap();
800+
801+
//convert the maps with a buffer to match the protobuffer types
802+
803+
let mut converted_tracked_veth_map: HashMap<String, String> = HashMap::new();
804+
for item in tracked_veth_map.iter() {
805+
let (k, v) = item.unwrap();
806+
// convert keys and values from [u8;4] to String
807+
let key = String::from_utf8(k.to_vec()).unwrap();
808+
let value = String::from_utf8(v.to_vec()).unwrap();
809+
converted_tracked_veth_map.insert(key, value);
810+
}
811+
Ok(Response::new(VethHashMapResponse {
812+
status: "success".to_string(),
813+
veths: converted_tracked_veth_map,
814+
}))
815+
}
777816
}

core/api/src/requests.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::agent::LatencyMetricsResponse;
1414
use crate::agent::RequestActiveConnections;
1515
use crate::agent::RmIpFromBlocklistRequest;
1616
use crate::agent::RmIpFromBlocklistResponse;
17+
use crate::agent::VethHashMapResponse;
1718
use crate::agent::VethResponse;
1819
use crate::agent::agent_client::AgentClient;
1920

@@ -100,3 +101,12 @@ pub async fn send_tracked_veth_request(
100101
let response = client.get_tracked_veth(request).await?;
101102
Ok(response)
102103
}
104+
105+
#[cfg(feature = "client")]
106+
pub async fn send_veth_tracked_hashmap_req(
107+
mut client: AgentClient<Channel>,
108+
) -> Result<Response<VethHashMapResponse>, Error> {
109+
let request = Request::new(());
110+
let response = client.get_tracked_veth_from_hash_map(request).await?;
111+
Ok(response)
112+
}

0 commit comments

Comments
 (0)