Skip to content

Commit 76d462b

Browse files
[#158]: moved Event Sender trait in the batcher.rs module
1 parent 4d23d4c commit 76d462b

File tree

1 file changed

+77
-12
lines changed

1 file changed

+77
-12
lines changed

core/api/src/batcher.rs

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,87 @@
11
// This module is experimental and may be subject to major changes.
22

33

4-
use crate::agent::{ConnectionEvent, DroppedPacketMetric, LatencyMetric};
4+
// Do not use any of these functions
5+
// FIXME: this module will be deprecated in the next version probably
56

6-
pub enum MetricsBatcher {
7-
LatencyMetrics,
8-
DroppedPacketsMetrics,
9-
}
10-
pub enum EventBatcher {}
117

12-
impl MetricsBatcher {
13-
pub async fn send_batched_metrics() {
14-
todo!();
8+
use tokio::sync::mpsc;
9+
use tonic::{Status, async_trait};
10+
11+
use crate::{
12+
agent::{ConnectionEvent, DroppedPacketMetric, LatencyMetric, VethEvent},
13+
api::AgentApi,
14+
};
15+
16+
// Event sender trait. Takes an event from a map and send that to the mpsc channel
17+
// using the send_map function
18+
#[async_trait]
19+
pub trait EventSender: Send + Sync + 'static {
20+
async fn send_active_connection_event(&self, event: Vec<ConnectionEvent>);
21+
async fn send_active_connection_event_map(
22+
&self,
23+
map: Vec<ConnectionEvent>,
24+
tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
25+
) {
26+
let status = Status::new(tonic::Code::Ok, "success");
27+
let event = Ok(map);
28+
29+
let _ = tx.send(event).await;
30+
}
31+
32+
async fn send_latency_metrics_event(&self, event: Vec<LatencyMetric>);
33+
async fn send_latency_metrics_event_map(
34+
&self,
35+
map: Vec<LatencyMetric>,
36+
tx: mpsc::Sender<Result<Vec<LatencyMetric>, Status>>,
37+
) {
38+
let status = Status::new(tonic::Code::Ok, "success");
39+
let event = Ok(map);
40+
let _ = tx.send(event).await;
41+
}
42+
43+
async fn send_dropped_packet_metrics_event(&self, event: Vec<DroppedPacketMetric>);
44+
async fn send_dropped_packet_metrics_event_map(
45+
&self,
46+
map: Vec<DroppedPacketMetric>,
47+
tx: mpsc::Sender<Result<Vec<DroppedPacketMetric>, Status>>,
48+
) {
49+
let status = Status::new(tonic::Code::Ok, "success");
50+
let event = Ok(map);
51+
let _ = tx.send(event).await;
52+
}
53+
54+
async fn send_tracked_veth_event(&self, event: Vec<VethEvent>);
55+
async fn send_tracked_veth_event_map(
56+
&self,
57+
map: Vec<VethEvent>,
58+
tx: mpsc::Sender<Result<Vec<VethEvent>, Status>>,
59+
) {
60+
let status = Status::new(tonic::Code::Ok, "success");
61+
let event = Ok(map);
62+
let _ = tx.send(event).await;
1563
}
1664
}
1765

18-
impl EventBatcher {
19-
pub async fn send_batched_logs() {
20-
todo!();
66+
// send event function. takes an HashMap and send that using mpsc event_tx
67+
#[async_trait]
68+
impl EventSender for AgentApi {
69+
async fn send_active_connection_event(&self, event: Vec<ConnectionEvent>) {
70+
self.send_active_connection_event_map(event, self.active_connection_event_tx.clone())
71+
.await;
72+
}
73+
74+
async fn send_latency_metrics_event(&self, event: Vec<LatencyMetric>) {
75+
self.send_latency_metrics_event_map(event, self.latency_metrics_tx.clone())
76+
.await;
77+
}
78+
79+
async fn send_dropped_packet_metrics_event(&self, event: Vec<DroppedPacketMetric>) {
80+
self.send_dropped_packet_metrics_event_map(event, self.dropped_packet_metrics_tx.clone())
81+
.await;
82+
}
83+
async fn send_tracked_veth_event(&self, event: Vec<VethEvent>) {
84+
self.send_tracked_veth_event_map(event, self.tracked_veth_tx.clone())
85+
.await;
2186
}
2287
}

0 commit comments

Comments
 (0)