Skip to content

Commit 9f21735

Browse files
Changes before error encountered
Co-authored-by: SajjadPourali <20374762+SajjadPourali@users.noreply.github.com>
1 parent a1823bf commit 9f21735

2 files changed

Lines changed: 65 additions & 60 deletions

File tree

src/stream/tcb.rs

Lines changed: 54 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub(super) enum PacketType {
4343
/// - `unordered_packets` is the bytes stream received from the lower device,
4444
/// which can be acknowledged and extracted by `consume_unordered_packets` method
4545
/// then can be read by upstream application via `Tcp::poll_read` method.
46+
/// - `ordered_packets` is the list of contiguous packets ready to be read by the application.
4647
#[derive(Debug, Clone)]
4748
pub(crate) struct Tcb {
4849
seq: SeqNum,
@@ -53,6 +54,7 @@ pub(crate) struct Tcb {
5354
state: TcpState,
5455
inflight_packets: BTreeMap<SeqNum, InflightPacket>,
5556
unordered_packets: BTreeMap<SeqNum, Vec<u8>>,
57+
ordered_packets: Vec<Vec<u8>>,
5658
duplicate_ack_count: usize,
5759
duplicate_ack_count_helper: SeqNum,
5860
}
@@ -72,6 +74,7 @@ impl Tcb {
7274
state: TcpState::Listen,
7375
inflight_packets: BTreeMap::new(),
7476
unordered_packets: BTreeMap::new(),
77+
ordered_packets: Vec::new(),
7578
duplicate_ack_count: 0,
7679
duplicate_ack_count_helper: seq.into(),
7780
}
@@ -106,46 +109,40 @@ impl Tcb {
106109
self.unordered_packets.insert(seq, buf);
107110
}
108111
pub(super) fn get_available_read_buffer_size(&self) -> usize {
109-
READ_BUFFER_SIZE.saturating_sub(self.get_unordered_packets_total_len())
112+
let total_buffered = self.get_unordered_packets_total_len() + self.get_ordered_packets_total_len();
113+
READ_BUFFER_SIZE.saturating_sub(total_buffered)
110114
}
111115
#[inline]
112116
pub(crate) fn get_unordered_packets_total_len(&self) -> usize {
113117
self.unordered_packets.values().map(|p| p.len()).sum()
114118
}
119+
#[inline]
120+
pub(crate) fn get_ordered_packets_total_len(&self) -> usize {
121+
self.ordered_packets.iter().map(|p| p.len()).sum()
122+
}
115123

116-
pub(super) fn consume_unordered_packets(&mut self, max_bytes: usize) -> Option<Vec<u8>> {
117-
let mut data = Vec::new();
118-
let mut remaining_bytes = max_bytes;
119-
120-
while remaining_bytes > 0 {
121-
if let Some(seq) = self.unordered_packets.keys().next().copied() {
122-
if seq != self.ack {
123-
break; // sequence number is not continuous, stop extracting
124-
}
125-
126-
// remove and get the first packet
127-
let mut payload = self.unordered_packets.remove(&seq).unwrap();
128-
let payload_len = payload.len();
129-
130-
if payload_len <= remaining_bytes {
131-
// current packet can be fully extracted
132-
data.extend(payload);
133-
self.ack += payload_len as u32;
134-
remaining_bytes -= payload_len;
135-
} else {
136-
// current packet can only be partially extracted
137-
let remaining_payload = payload.split_off(remaining_bytes);
138-
data.extend_from_slice(&payload);
139-
self.ack += remaining_bytes as u32;
140-
self.unordered_packets.insert(self.ack, remaining_payload);
141-
break;
142-
}
143-
} else {
144-
break; // no more packets to extract
124+
pub(super) fn consume_unordered_packets(&mut self) {
125+
while let Some(seq) = self.unordered_packets.keys().next().copied() {
126+
if seq != self.ack {
127+
break; // sequence number is not continuous, stop extracting
145128
}
129+
130+
// remove and get the first packet
131+
let payload = self.unordered_packets.remove(&seq).unwrap();
132+
let payload_len = payload.len();
133+
134+
// Move the packet to ordered_packets
135+
self.ordered_packets.push(payload);
136+
self.ack += payload_len as u32;
146137
}
138+
}
147139

148-
if data.is_empty() { None } else { Some(data) }
140+
pub(super) fn pop_ordered_packet(&mut self) -> Option<Vec<u8>> {
141+
if self.ordered_packets.is_empty() {
142+
None
143+
} else {
144+
Some(self.ordered_packets.remove(0))
145+
}
149146
}
150147

151148
pub(super) fn increase_seq(&mut self) {
@@ -359,26 +356,32 @@ mod tests {
359356
tcb.add_unordered_packet(SeqNum(1500), vec![2; 500]); // seq=1500, len=500
360357
tcb.add_unordered_packet(SeqNum(2000), vec![3; 500]); // seq=2000, len=500
361358

362-
// test 1: extract up to 700 bytes
363-
let data = tcb.consume_unordered_packets(700).unwrap();
364-
assert_eq!(data.len(), 700); // extract 500 + 200
365-
assert_eq!(data[..500], vec![1; 500]); // the first packet
366-
assert_eq!(data[500..700], vec![2; 200]); // the first 200 bytes of the second packet
367-
assert_eq!(tcb.ack, SeqNum(1700)); // ack increased by 700
368-
assert_eq!(tcb.unordered_packets.len(), 2); // remaining two packets
369-
assert_eq!(tcb.unordered_packets.get(&SeqNum(1700)).unwrap().len(), 300); // the second packet remaining 300 bytes
370-
assert_eq!(tcb.unordered_packets.get(&SeqNum(2000)).unwrap().len(), 500); // the third packet unchanged
371-
372-
// test 2: extract up to 800 bytes
373-
let data = tcb.consume_unordered_packets(800).unwrap();
374-
assert_eq!(data.len(), 800); // extract 300 bytes of the second packet and the third packet
375-
assert_eq!(data[..300], vec![2; 300]); // the remaining 300 bytes of the second packet
376-
assert_eq!(data[300..800], vec![3; 500]); // the third packet
377-
assert_eq!(tcb.ack, SeqNum(2500)); // ack increased by 800
378-
assert_eq!(tcb.unordered_packets.len(), 0); // no remaining packets
379-
380-
// test 3: no data to extract
381-
let data = tcb.consume_unordered_packets(1000);
359+
// test 1: consume contiguous packets
360+
tcb.consume_unordered_packets();
361+
assert_eq!(tcb.ack, SeqNum(2500)); // ack increased by 1500
362+
assert_eq!(tcb.unordered_packets.len(), 0); // all packets consumed
363+
assert_eq!(tcb.ordered_packets.len(), 3); // three packets in ordered list
364+
365+
// test 2: pop first ordered packet
366+
let data = tcb.pop_ordered_packet().unwrap();
367+
assert_eq!(data.len(), 500);
368+
assert_eq!(data, vec![1; 500]);
369+
assert_eq!(tcb.ordered_packets.len(), 2);
370+
371+
// test 3: pop second ordered packet
372+
let data = tcb.pop_ordered_packet().unwrap();
373+
assert_eq!(data.len(), 500);
374+
assert_eq!(data, vec![2; 500]);
375+
assert_eq!(tcb.ordered_packets.len(), 1);
376+
377+
// test 4: pop third ordered packet
378+
let data = tcb.pop_ordered_packet().unwrap();
379+
assert_eq!(data.len(), 500);
380+
assert_eq!(data, vec![3; 500]);
381+
assert_eq!(tcb.ordered_packets.len(), 0);
382+
383+
// test 5: no more data to extract
384+
let data = tcb.pop_ordered_packet();
382385
assert!(data.is_none());
383386
}
384387

src/stream/tcp.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ async fn tcp_main_logic_loop(
623623
if flags & ACK == ACK {
624624
if len > 0 {
625625
tcb.add_unordered_packet(incoming_seq, payload);
626-
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, network_tuple, &data_tx, &read_notify)?;
626+
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, network_tuple, &read_notify)?;
627627
}
628628
tcb.change_state(TcpState::Established);
629629
}
@@ -650,7 +650,7 @@ async fn tcp_main_logic_loop(
650650
PacketType::NewPacket => {
651651
tcb.add_unordered_packet(incoming_seq, payload);
652652
let nt = network_tuple;
653-
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, nt, &data_tx, &read_notify)?;
653+
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, nt, &read_notify)?;
654654
write_notify.lock().unwrap().take().map(|w| w.wake_by_ref()).unwrap_or(());
655655
}
656656
PacketType::Ack => {
@@ -711,7 +711,7 @@ async fn tcp_main_logic_loop(
711711
} else if flags == (ACK | PSH) && pkt_type == PacketType::NewPacket {
712712
if !payload.is_empty() && tcb.get_ack() == incoming_seq {
713713
tcb.add_unordered_packet(incoming_seq, payload);
714-
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, network_tuple, &data_tx, &read_notify)?;
714+
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, network_tuple, &read_notify)?;
715715
}
716716
} else {
717717
// unnormal case, we do nothing here
@@ -769,7 +769,7 @@ async fn tcp_main_logic_loop(
769769
if len > 0 {
770770
// if the other side is still sending data, we need to deal with it like PacketStatus::NewPacket
771771
tcb.add_unordered_packet(incoming_seq, payload);
772-
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, network_tuple, &data_tx, &read_notify)?;
772+
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, network_tuple, &read_notify)?;
773773
write_notify.lock().unwrap().take().map(|w| w.wake_by_ref()).unwrap_or(());
774774
}
775775
let new_state = tcb.get_state();
@@ -799,7 +799,7 @@ async fn tcp_main_logic_loop(
799799
} else {
800800
// if the other side is still sending data, we need to deal with it like PacketStatus::NewPacket
801801
tcb.add_unordered_packet(incoming_seq, payload);
802-
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, network_tuple, &data_tx, &read_notify)?;
802+
extract_data_n_write_upstream(&up_packet_sender, &mut tcb, network_tuple, &read_notify)?;
803803
write_notify.lock().unwrap().take().map(|w| w.wake_by_ref()).unwrap_or(());
804804
}
805805
if flags & FIN == FIN {
@@ -833,7 +833,6 @@ fn extract_data_n_write_upstream(
833833
up_packet_sender: &PacketSender,
834834
tcb: &mut Tcb,
835835
network_tuple: NetworkTuple,
836-
data_tx: &tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
837836
read_notify: &std::sync::Arc<std::sync::Mutex<Option<Waker>>>,
838837
) -> std::io::Result<()> {
839838
let (state, seq, ack) = (tcb.get_state(), tcb.get_seq(), tcb.get_ack());
@@ -843,10 +842,13 @@ fn extract_data_n_write_upstream(
843842
return Ok(());
844843
}
845844

846-
if let Some(data) = tcb.consume_unordered_packets(8192) {
845+
let before_len = tcb.get_ordered_packets_total_len();
846+
tcb.consume_unordered_packets();
847+
let after_len = tcb.get_ordered_packets_total_len();
848+
849+
if after_len > before_len {
847850
let hint = if state == TcpState::Established { "normally" } else { "still" };
848-
log::trace!("{network_tuple} {state:?}: {l_info} {hint} receiving data, len = {}", data.len());
849-
data_tx.send(data).map_err(|e| std::io::Error::new(BrokenPipe, e))?;
851+
log::trace!("{network_tuple} {state:?}: {l_info} {hint} receiving data, new ordered bytes = {}", after_len - before_len);
850852
read_notify.lock().unwrap().take().map(|w| w.wake_by_ref()).unwrap_or(());
851853
write_packet_to_device(up_packet_sender, network_tuple, tcb, None, ACK, None, None)?;
852854
}

0 commit comments

Comments
 (0)