Skip to content

Commit 1e3f99c

Browse files
committed
fixed: nack handling and packet resending
1 parent 1454500 commit 1e3f99c

3 files changed

Lines changed: 88 additions & 17 deletions

File tree

Cargo.lock

Lines changed: 59 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ serde = { version = "1.0.219", features = ["derive"] }
1010
anyhow = "1.0.99"
1111
uuid = { version = "1.18.0", features = [ "serde", "v4"] }
1212
tempfile = "3.20.0"
13+
rand = "0.9.2"

src/routing_handler.rs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::{
55
};
66
use crossbeam_channel::Sender;
77
use std::collections::{HashMap, HashSet};
8+
use rand::Rng;
89
use wg_internal::{
910
network::{NodeId, SourceRoutingHeader},
1011
packet::{Ack, FloodRequest, FloodResponse, Fragment, Nack, NackType, NodeType, Packet},
@@ -13,7 +14,7 @@ use wg_internal::{
1314
#[derive(Debug, Clone)]
1415
struct Buffer {
1516
// represents packets which reached the destination
16-
packets_received: HashMap<(u64, NodeId), Vec<(bool, Packet)>>,
17+
packets_received: HashMap<u64, Vec<(bool, Packet)>>,
1718
packets_to_send: Vec<Packet>,
1819
pending_ser_requests: HashSet<SerializedRequest>,
1920
}
@@ -27,17 +28,17 @@ impl Buffer {
2728
}
2829
}
2930

30-
fn insert(&mut self, packet: Packet, session_id: u64, from: NodeId) {
31-
let id = (session_id, from);
31+
fn insert(&mut self, packet: Packet, session_id: u64) {
32+
let id = session_id;
3233
if let Some(v) = self.packets_received.get_mut(&id) {
3334
v.push((false, packet));
3435
} else {
3536
let _ = self.packets_received.insert(id, vec![(false, packet)]);
3637
}
3738
}
3839

39-
fn mark_as_received(&mut self, session_id: u64, fragment_index: u64, form: NodeId) {
40-
let id = (session_id, form);
40+
fn mark_as_received(&mut self, session_id: u64, fragment_index: u64) {
41+
let id = session_id;
4142
if let Some(f) = self.packets_received.get_mut(&id) {
4243
#[allow(clippy::cast_possible_truncation)]
4344
let index = fragment_index as usize;
@@ -55,9 +56,8 @@ impl Buffer {
5556
&mut self,
5657
session_id: u64,
5758
fragment_index: u64,
58-
from: NodeId,
5959
) -> Option<Packet> {
60-
let id = (session_id, from);
60+
let id = session_id;
6161
if let Some(session) = self.packets_received.get(&id) {
6262
#[allow(clippy::cast_possible_truncation)]
6363
session
@@ -84,6 +84,7 @@ pub struct RoutingHandler {
8484
neighbors: HashMap<NodeId, Sender<Packet>>,
8585
flood_seen: HashSet<(u64, NodeId)>,
8686
session_counter: u64,
87+
session_id: u64,
8788
flood_counter: u64,
8889
controller_send: Sender<Box<dyn Event>>,
8990
buffer: Buffer,
@@ -103,6 +104,7 @@ impl RoutingHandler {
103104
network_view: Network::new(Node::new(id, node_type, vec![])),
104105
neighbors,
105106
session_counter: 0,
107+
session_id: 0,
106108
flood_counter: 0,
107109
flood_seen: HashSet::new(),
108110
controller_send,
@@ -111,6 +113,12 @@ impl RoutingHandler {
111113
}
112114
}
113115

116+
fn update_session_id(&mut self) {
117+
let mut rng = rand::rng();
118+
self.session_counter += 1;
119+
self.session_id = rng.random()
120+
}
121+
114122
/// Sends a packet to a specific neighbor and notifies the controller about the packet sent.
115123
/// # Errors
116124
/// Returns an error if sending the packet to the neighbor fails or if sending the event to the controller fails.
@@ -132,11 +140,11 @@ impl RoutingHandler {
132140
&mut self,
133141
pending_request: Option<SerializedRequest>,
134142
) -> Result<(), NetworkError> {
135-
self.session_counter += 1;
143+
self.update_session_id();
136144
self.flood_counter += 1;
137145
let packet = Packet::new_flood_request(
138146
SourceRoutingHeader::empty_route(),
139-
self.session_counter,
147+
self.session_id,
140148
FloodRequest {
141149
flood_id: self.flood_counter,
142150
initiator_id: self.id,
@@ -325,7 +333,7 @@ impl RoutingHandler {
325333
if let Some(sender) = self.neighbors.get(&first_hop) {
326334
self.send(sender, packet.clone())?;
327335
let session_id = packet.session_id;
328-
self.buffer.insert(packet, session_id, self.id);
336+
self.buffer.insert(packet, session_id);
329337
} else {
330338
return Err(NetworkError::NodeIsNotANeighbor(first_hop));
331339
}
@@ -399,17 +407,20 @@ impl RoutingHandler {
399407
&mut self,
400408
message: &[u8],
401409
dest: Option<NodeId>,
402-
session_id: Option<u64>,
410+
sid: Option<u64>,
403411
) -> Result<(), NetworkError> {
404412
// Split into 128-byte chunks
405413
let chunks = message.chunks(128);
406414
let total_n_fragments = chunks.len() as u64;
407415

408416
// Decide session id
409-
let session_id = session_id.unwrap_or_else(|| {
410-
self.session_counter += 1;
411-
self.session_counter
412-
});
417+
let session_id: u64;
418+
if let Some(id) = sid {
419+
session_id = id;
420+
} else {
421+
self.update_session_id();
422+
session_id = self.session_id;
423+
}
413424

414425
if let Some(destination) = dest {
415426
// Try to send directly
@@ -465,7 +476,7 @@ impl RoutingHandler {
465476

466477
pub fn handle_ack(&mut self, ack: &Ack, session_id: u64, from: NodeId) {
467478
self.buffer
468-
.mark_as_received(session_id, ack.fragment_index, from);
479+
.mark_as_received(session_id, ack.fragment_index);
469480
}
470481

471482
/// Retries sending a specific packet identified by `session_id` and `fragment_index` from a specific node.
@@ -480,7 +491,7 @@ impl RoutingHandler {
480491
) -> Result<(), NetworkError> {
481492
if let Some(packet) = self
482493
.buffer
483-
.get_fragment_by_id(session_id, fragment_index, from)
494+
.get_fragment_by_id(session_id, fragment_index)
484495
{
485496
self.try_send(packet)?;
486497
}

0 commit comments

Comments
 (0)