Skip to content

Commit 98fa3c1

Browse files
committed
fix: packet processor structure, now is a trait
1 parent 68d44e0 commit 98fa3c1

7 files changed

Lines changed: 312 additions & 164 deletions

File tree

Cargo.lock

Lines changed: 38 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ version = "0.1.0"
44
edition = "2024"
55

66
[dependencies]
7-
tokio = { version = "1.47.1", features = ["full"] }
87
wg_internal = { git = "https://github.com/WGL-2024/WGL_repo_2024.git", features = ["debug"] }
98
crossbeam-channel = "0.5.15"
9+
serde = { version = "1.0.219", features = ["derive"] }
10+
anyhow = "1.0.99"
11+
tokio = { version = "1.47.1", features = ["full"] }

src/assembler.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,22 @@ use wg_internal::{network::NodeId, packet::Fragment};
55

66

77
#[derive(Debug, Default)]
8-
pub struct FragmentAssembler<'a> {
9-
pub fragments: HashMap<(u64, NodeId), Vec<&'a Fragment>>, // session_id -> data buffer
8+
pub struct FragmentAssembler {
9+
pub fragments: HashMap<(u64, NodeId), Vec<Fragment>>, // session_id -> data buffer
1010
pub expected_fragments: HashMap<(u64, NodeId), u64>, // session_id -> total_fragments
1111
pub received_fragments: HashMap<(u64, NodeId), Vec<bool>>, // session_id -> received status
1212
}
1313

14-
impl<'a> FragmentAssembler<'a> {
15-
pub fn add_fragment(&mut self, fragment: &'a Fragment, session_id: u64, sender: NodeId) -> Option<Vec<u8>> {
14+
impl FragmentAssembler {
15+
pub fn add_fragment(&mut self, fragment: Fragment, session_id: u64, sender: NodeId) -> Option<Vec<u8>> {
1616
let communication_id = ( session_id, sender );
1717
#[allow(clippy::cast_possible_truncation)]
1818
let index = fragment.fragment_index as usize;
1919

2020
if let Vacant(entry) = self.fragments.entry(communication_id) {
21-
entry.insert(vec![fragment]);
2221
self.expected_fragments.insert(communication_id, fragment.total_n_fragments);
2322
self.received_fragments.insert(communication_id, vec![false; index]);
23+
entry.insert(vec![fragment]);
2424
}
2525

2626
{

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod network;
22
pub mod types;
33
pub mod assembler;
44
pub mod routing_handler;
5+
pub mod packet_processor;
56

67
pub use routing_handler::RoutingHandler;
78
pub use assembler::FragmentAssembler;

src/packet_processor.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use crate::{network::NetworkError, FragmentAssembler, RoutingHandler};
2+
use std::any::Any;
3+
4+
use crossbeam_channel::{select_biased, Receiver};
5+
use wg_internal::packet::{Packet, PacketType};
6+
7+
pub trait Processor {
8+
fn controller_recv(&self) -> &Receiver<Box<dyn Any>>;
9+
fn packet_recv(&self) -> &Receiver<Packet>;
10+
fn assembler(&mut self) -> &mut FragmentAssembler;
11+
fn routing_header(&mut self) -> &mut RoutingHandler;
12+
13+
fn handle_msg(&mut self, msg: Vec<u8>);
14+
fn handle_command(&mut self, cmd: Box<dyn Any>);
15+
16+
/// Handles a packet in a standard way
17+
/// # Errors
18+
/// returns an Errors if handling fails
19+
fn handle_packet(&mut self, pkt: Packet) -> Result<(), NetworkError> {
20+
let router = self.routing_header();
21+
match pkt.pack_type {
22+
PacketType::MsgFragment(fragment) => {
23+
if let Some(msg) = self.assembler().add_fragment(
24+
fragment,
25+
pkt.session_id,
26+
pkt.routing_header.hops[0],
27+
) {
28+
self.handle_msg(msg);
29+
}
30+
}
31+
PacketType::Ack(ack) => {
32+
router.handle_ack(&ack, pkt.session_id, pkt.routing_header.hops[0]);
33+
}
34+
PacketType::Nack(nack) => {
35+
router.handle_nack(&nack, pkt.session_id, pkt.routing_header.hops[0])?;
36+
}
37+
PacketType::FloodRequest(flood_request) => {
38+
router.handle_flood_request(flood_request, pkt.session_id)?;
39+
}
40+
PacketType::FloodResponse(flood_response) => {
41+
router.handle_flood_response(&flood_response);
42+
}
43+
}
44+
Ok(())
45+
}
46+
47+
fn run(&mut self) {
48+
loop {
49+
select_biased! {
50+
recv(self.controller_recv()) -> cmd => {
51+
if let Ok(cmd) = cmd {
52+
self.handle_command(cmd);
53+
}
54+
}
55+
56+
recv(self.packet_recv()) -> pkt => {
57+
if let Ok(pkt) = pkt {
58+
match self.handle_packet(pkt) {
59+
Ok(()) => {},
60+
Err(_) => return
61+
}
62+
}
63+
}
64+
}
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)