Skip to content

Commit 05b4ef5

Browse files
committed
Intro to TcpConfig
1 parent 74fdcad commit 05b4ef5

4 files changed

Lines changed: 106 additions & 29 deletions

File tree

examples/tun.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9393

9494
let mut ipstack_config = ipstack::IpStackConfig::default();
9595
ipstack_config.mtu(MTU);
96-
ipstack_config.tcp_timeout(std::time::Duration::from_secs(args.tcp_timeout));
96+
let mut tcp_config = ipstack::TcpConfig::default();
97+
tcp_config.timeout = std::time::Duration::from_secs(args.tcp_timeout);
98+
ipstack_config.with_tcp_config(tcp_config);
9799
ipstack_config.udp_timeout(std::time::Duration::from_secs(args.udp_timeout));
98100

99101
let mut ip_stack = ipstack::IpStack::new(ipstack_config, tun::create_as_async(&tun_config)?);

src/lib.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
use ahash::AHashMap;
44
use packet::{NetworkPacket, NetworkTuple, TransportHeader};
5-
use std::time::Duration;
5+
use std::{sync::Arc, time::Duration};
66
use tokio::{
77
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
88
select,
@@ -19,8 +19,9 @@ mod packet;
1919
mod stream;
2020

2121
pub use self::error::{IpStackError, Result};
22+
pub use self::stream::TcpConfig;
2223
pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport};
23-
pub use ::etherparse::IpNumber;
24+
pub use etherparse::IpNumber;
2425

2526
#[cfg(unix)]
2627
const TTL: u8 = 64;
@@ -41,28 +42,29 @@ const TUN_PROTO_IP6: [u8; 2] = [0x00, 0x0A];
4142
#[cfg(any(target_os = "macos", target_os = "ios"))]
4243
const TUN_PROTO_IP4: [u8; 2] = [0x00, 0x02];
4344

45+
#[non_exhaustive]
4446
pub struct IpStackConfig {
4547
pub mtu: u16,
4648
pub packet_information: bool,
47-
pub tcp_timeout: Duration,
4849
pub udp_timeout: Duration,
50+
pub tcp_config: Arc<TcpConfig>,
4951
}
5052

5153
impl Default for IpStackConfig {
5254
fn default() -> Self {
5355
IpStackConfig {
5456
mtu: u16::MAX,
5557
packet_information: false,
56-
tcp_timeout: Duration::from_secs(60),
58+
tcp_config: Arc::new(TcpConfig::default()),
5759
udp_timeout: Duration::from_secs(30),
5860
}
5961
}
6062
}
6163

6264
impl IpStackConfig {
63-
pub fn tcp_timeout(&mut self, timeout: Duration) -> &mut Self {
64-
self.tcp_timeout = timeout;
65-
self
65+
/// Set custom TCP configuration
66+
pub fn with_tcp_config(&mut self, config: TcpConfig) {
67+
self.tcp_config = Arc::new(config);
6668
}
6769
pub fn udp_timeout(&mut self, timeout: Duration) -> &mut Self {
6870
self.udp_timeout = timeout;
@@ -194,7 +196,7 @@ fn create_stream(
194196
let dst_addr = packet.dst_addr();
195197
match packet.transport_header() {
196198
TransportHeader::Tcp(h) => {
197-
let stream = IpStackTcpStream::new(src_addr, dst_addr, h.clone(), up_pkt_sender, cfg.mtu, cfg.tcp_timeout, msgr)?;
199+
let stream = IpStackTcpStream::new(src_addr, dst_addr, h.clone(), up_pkt_sender, cfg.mtu, msgr, cfg.tcp_config.clone())?;
198200
Ok(IpStackStream::Tcp(stream))
199201
}
200202
TransportHeader::Udp(_) => {

src/stream/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
22

33
pub use self::tcp::IpStackTcpStream;
4+
pub use self::tcp::TcpConfig;
45
pub use self::udp::IpStackUdpStream;
56
pub use self::unknown::IpStackUnknownTransport;
67

src/stream/tcp.rs

Lines changed: 92 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::{
1515
io::ErrorKind::{BrokenPipe, ConnectionRefused, InvalidInput, UnexpectedEof},
1616
net::SocketAddr,
1717
pin::Pin,
18+
sync::Arc,
1819
task::{Context, Poll, Waker},
1920
time::Duration,
2021
};
@@ -26,6 +27,35 @@ const TWO_MSL: Duration = Duration::from_secs(2);
2627
const CLOSE_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
2728
const LAST_ACK_MAX_RETRIES: usize = 3;
2829
const LAST_ACK_TIMEOUT: Duration = Duration::from_millis(500);
30+
const TIMEOUT: Duration = Duration::from_secs(60);
31+
32+
#[non_exhaustive]
33+
#[derive(Debug, Clone)]
34+
/// TCP configuration
35+
pub struct TcpConfig {
36+
/// Maximum number of retries for sending the last ACK in the LAST_ACK state. Default is 3.
37+
pub last_ack_max_retries: usize,
38+
/// Timeout for the last ACK in the LAST_ACK state. Default is 500ms.
39+
pub last_ack_timeout: Duration,
40+
/// Timeout for the CLOSE_WAIT state. Default is 5 seconds.
41+
pub close_wait_timeout: Duration,
42+
/// Timeout for TCP connections. Default is 60 seconds.
43+
pub timeout: Duration,
44+
/// Timeout for the TIME_WAIT state. Default is 2 seconds.
45+
pub two_msl: Duration,
46+
}
47+
48+
impl Default for TcpConfig {
49+
fn default() -> Self {
50+
TcpConfig {
51+
last_ack_max_retries: LAST_ACK_MAX_RETRIES,
52+
last_ack_timeout: LAST_ACK_TIMEOUT,
53+
close_wait_timeout: CLOSE_WAIT_TIMEOUT,
54+
timeout: TIMEOUT,
55+
two_msl: TWO_MSL,
56+
}
57+
}
58+
}
2959

3060
#[derive(Debug)]
3161
enum Shutdown {
@@ -81,13 +111,13 @@ pub struct IpStackTcpStream {
81111
write_notify: std::sync::Arc<std::sync::Mutex<Option<Waker>>>,
82112
destroy_messenger: Option<::tokio::sync::oneshot::Sender<()>>,
83113
timeout: Pin<Box<tokio::time::Sleep>>,
84-
timeout_interval: Duration,
85114
data_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
86115
data_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
87116
read_notify: std::sync::Arc<std::sync::Mutex<Option<Waker>>>,
88117
task_handle: Option<tokio::task::JoinHandle<std::io::Result<()>>>,
89118
exit_notifier: Option<tokio::sync::mpsc::Sender<()>>,
90119
temp_read_buffer: Vec<u8>,
120+
config: Arc<TcpConfig>,
91121
}
92122

93123
impl IpStackTcpStream {
@@ -97,8 +127,8 @@ impl IpStackTcpStream {
97127
tcp: TcpHeader,
98128
up_packet_sender: PacketSender,
99129
mtu: u16,
100-
timeout_interval: Duration,
101130
destroy_messenger: Option<::tokio::sync::oneshot::Sender<()>>,
131+
config: Arc<TcpConfig>,
102132
) -> Result<IpStackTcpStream, IpStackError> {
103133
let tcb = Tcb::new(SeqNum(tcp.sequence_number), mtu);
104134
let tuple = NetworkTuple::new(src_addr, dst_addr, true);
@@ -114,7 +144,7 @@ impl IpStackTcpStream {
114144

115145
let (stream_sender, stream_receiver) = tokio::sync::mpsc::unbounded_channel::<NetworkPacket>();
116146
let (data_tx, data_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
117-
let deadline = tokio::time::Instant::now() + timeout_interval;
147+
let deadline = tokio::time::Instant::now() + config.timeout;
118148

119149
let mut stream = IpStackTcpStream {
120150
src_addr,
@@ -127,13 +157,13 @@ impl IpStackTcpStream {
127157
write_notify: std::sync::Arc::new(std::sync::Mutex::new(None)),
128158
destroy_messenger,
129159
timeout: Box::pin(tokio::time::sleep_until(deadline)),
130-
timeout_interval,
131160
data_tx,
132161
data_rx,
133162
read_notify: std::sync::Arc::new(std::sync::Mutex::new(None)),
134163
task_handle: None,
135164
exit_notifier: None,
136165
temp_read_buffer: Vec::new(),
166+
config,
137167
};
138168

139169
let sessions = SESSION_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst).saturating_add(1);
@@ -146,7 +176,7 @@ impl IpStackTcpStream {
146176
}
147177

148178
fn reset_timeout(&mut self) {
149-
let deadline = tokio::time::Instant::now() + self.timeout_interval;
179+
let deadline = tokio::time::Instant::now() + self.config.timeout;
150180
self.timeout.as_mut().reset(deadline);
151181
}
152182

@@ -350,11 +380,13 @@ impl IpStackTcpStream {
350380

351381
let (exit_task_notifier, exit_monitor) = tokio::sync::mpsc::channel::<()>(10);
352382
let exit_notifier = exit_task_notifier.clone();
383+
let config = self.config.clone();
353384
self.exit_notifier = Some(exit_task_notifier);
354385

355386
let task_handle = tokio::spawn(async move {
356387
let v = tcp_main_logic_loop(
357388
tcb,
389+
config,
358390
stream_receiver,
359391
up_packet_sender,
360392
exit_notifier,
@@ -382,6 +414,7 @@ impl IpStackTcpStream {
382414
#[allow(clippy::too_many_arguments)]
383415
async fn tcp_main_logic_loop(
384416
tcb: TcbPtr,
417+
config: Arc<TcpConfig>,
385418
mut stream_receiver: PacketReceiver,
386419
up_packet_sender: PacketSender,
387420
exit_notifier: tokio::sync::mpsc::Sender<()>,
@@ -413,27 +446,34 @@ async fn tcp_main_logic_loop(
413446

414447
let tcb_clone = tcb.clone();
415448

416-
async fn task_wait_to_close(tcb: TcbPtr, exit_notifier: tokio::sync::mpsc::Sender<()>, nt: NetworkTuple) {
417-
tokio::time::sleep(TWO_MSL).await;
449+
async fn task_wait_to_close(tcb: TcbPtr, exit_notifier: tokio::sync::mpsc::Sender<()>, nt: NetworkTuple, two_msl: Duration) {
450+
tokio::time::sleep(two_msl).await;
418451
{
419452
let mut tcb = tcb.lock().unwrap();
420453
tcb.change_state(TcpState::Closed);
421454
let state = tcb.get_state();
422-
log::debug!("{nt} {state:?}: [task_wait_to_close] session closed after {TWO_MSL:?}");
455+
log::debug!("{nt} {state:?}: [task_wait_to_close] session closed after {two_msl:?}");
423456
}
424457
exit_notifier.send(()).await.unwrap_or(());
425458
}
426459

427-
async fn task_last_ack(tcb: TcbPtr, exit_notifier: tokio::sync::mpsc::Sender<()>, nt: NetworkTuple, pkt_sdr: PacketSender) {
460+
async fn task_last_ack(
461+
tcb: TcbPtr,
462+
exit_notifier: tokio::sync::mpsc::Sender<()>,
463+
nt: NetworkTuple,
464+
pkt_sdr: PacketSender,
465+
last_ack_timeout: Duration,
466+
last_ack_max_retries: usize,
467+
) {
428468
let hint = "[task_last_ack]";
429-
for idx in 1..=LAST_ACK_MAX_RETRIES {
469+
for idx in 1..=last_ack_max_retries {
430470
let state = { tcb.lock().unwrap().get_state() };
431471
if state == TcpState::Closed {
432472
log::debug!("{nt} {state:?}: {hint} session closed, exiting 1...");
433473
return;
434474
}
435475

436-
tokio::time::sleep(LAST_ACK_TIMEOUT).await;
476+
tokio::time::sleep(last_ack_timeout).await;
437477

438478
{
439479
let tcb = tcb.lock().unwrap();
@@ -442,7 +482,7 @@ async fn tcp_main_logic_loop(
442482
log::debug!("{nt} {state:?}: {hint} session closed, exiting 2...");
443483
return;
444484
}
445-
log::debug!("{nt} {state:?}: {hint} timer expired, resending ACK|FIN (retry {idx}/{LAST_ACK_MAX_RETRIES})");
485+
log::debug!("{nt} {state:?}: {hint} timer expired, resending ACK|FIN (retry {idx}/{last_ack_max_retries})");
446486
_ = write_packet_to_device(&pkt_sdr, nt, &tcb, ACK | FIN, None, None);
447487
}
448488
}
@@ -460,8 +500,11 @@ async fn tcp_main_logic_loop(
460500
exit_notifier: tokio::sync::mpsc::Sender<()>,
461501
nt: NetworkTuple,
462502
up_packet_sender: PacketSender,
503+
close_wait_timeout: Duration,
504+
last_ack_timeout: Duration,
505+
last_ack_max_retries: usize,
463506
) -> std::io::Result<()> {
464-
tokio::time::sleep(CLOSE_WAIT_TIMEOUT).await; // Wait CLOSE_WAIT_TIMEOUT for upstream
507+
tokio::time::sleep(close_wait_timeout).await; // Wait CLOSE_WAIT_TIMEOUT for upstream
465508
let tcb_clone = tcb.clone();
466509
let mut tcb = tcb.lock().unwrap();
467510
let state = tcb.get_state();
@@ -476,7 +519,14 @@ async fn tcp_main_logic_loop(
476519
log::debug!("{nt} {state:?}: Forced transition to {new_state:?}");
477520

478521
// Here we set a timer to wait for the last ACK from the other side.
479-
tokio::spawn(task_last_ack(tcb_clone, exit_notifier, nt, up_packet_sender));
522+
tokio::spawn(task_last_ack(
523+
tcb_clone,
524+
exit_notifier,
525+
nt,
526+
up_packet_sender,
527+
last_ack_timeout,
528+
last_ack_max_retries,
529+
));
480530

481531
Ok::<(), std::io::Error>(())
482532
}
@@ -608,7 +658,14 @@ async fn tcp_main_logic_loop(
608658
// If the timer expires, we send an ACK|FIN packet to the other side again and wait anthoer timeout
609659
// till the retries reach the limit, and then close the session forcibly.
610660
let up = up_packet_sender.clone();
611-
tokio::spawn(task_last_ack(tcb_clone.clone(), exit_notifier, network_tuple, up));
661+
tokio::spawn(task_last_ack(
662+
tcb_clone.clone(),
663+
exit_notifier,
664+
network_tuple,
665+
up,
666+
config.last_ack_timeout,
667+
config.last_ack_max_retries,
668+
));
612669
} else {
613670
// Upstream data pending, wake write_notify and wait
614671
write_notify.lock().unwrap().take().map(|w| w.wake_by_ref()).unwrap_or(());
@@ -617,7 +674,15 @@ async fn tcp_main_logic_loop(
617674
// Spawn a timeout task to force FIN if upstream is unresponsive
618675
let tcb = tcb_clone.clone();
619676
let up = up_packet_sender.clone();
620-
tokio::spawn(task_timed_out_for_close_wait(tcb, exit_notifier, network_tuple, up));
677+
tokio::spawn(task_timed_out_for_close_wait(
678+
tcb,
679+
exit_notifier,
680+
network_tuple,
681+
up,
682+
config.close_wait_timeout,
683+
config.last_ack_timeout,
684+
config.last_ack_max_retries,
685+
));
621686
}
622687
} else if flags == (ACK | PSH) && pkt_type == PacketType::NewPacket {
623688
if !payload.is_empty() && tcb.get_ack() == incoming_seq {
@@ -641,7 +706,14 @@ async fn tcp_main_logic_loop(
641706
// If the timer expires, we send an ACK|FIN packet to the other side again and wait anthoer timeout
642707
// till the retries reach the limit, and then close the session forcibly.
643708
let up = up_packet_sender.clone();
644-
tokio::spawn(task_last_ack(tcb_clone.clone(), exit_notifier, network_tuple, up));
709+
tokio::spawn(task_last_ack(
710+
tcb_clone.clone(),
711+
exit_notifier,
712+
network_tuple,
713+
up,
714+
config.last_ack_timeout,
715+
config.last_ack_max_retries,
716+
));
645717
} else {
646718
write_notify.lock().unwrap().take().map(|w| w.wake_by_ref()).unwrap_or(());
647719
}
@@ -665,7 +737,7 @@ async fn tcp_main_logic_loop(
665737
write_packet_to_device(&up_packet_sender, network_tuple, &tcb, ACK, None, None)?;
666738
tcb.change_state(TcpState::TimeWait);
667739

668-
tokio::spawn(task_wait_to_close(tcb_clone.clone(), exit_notifier, network_tuple));
740+
tokio::spawn(task_wait_to_close(tcb_clone.clone(), exit_notifier, network_tuple, config.two_msl));
669741
let new_state = tcb.get_state();
670742
log::trace!("{network_tuple} {state:?}: Final ACK|FIN received too early, transitioned to {new_state:?} directly");
671743
} else if flags & ACK == ACK {
@@ -688,7 +760,7 @@ async fn tcp_main_logic_loop(
688760
tcb.increase_ack();
689761
write_packet_to_device(&up_packet_sender, network_tuple, &tcb, ACK, None, None)?;
690762
tcb.change_state(TcpState::TimeWait);
691-
tokio::spawn(task_wait_to_close(tcb_clone.clone(), exit_notifier, network_tuple));
763+
tokio::spawn(task_wait_to_close(tcb_clone.clone(), exit_notifier, network_tuple, config.two_msl));
692764
let new_state = tcb.get_state();
693765
log::trace!("{network_tuple} {state:?}: Received final ACK|FIN, transitioned to {new_state:?}");
694766
} else if flags & ACK == ACK && len == 0 {
@@ -708,7 +780,7 @@ async fn tcp_main_logic_loop(
708780
}
709781
if flags & FIN == FIN {
710782
tcb.change_state(TcpState::TimeWait);
711-
tokio::spawn(task_wait_to_close(tcb_clone.clone(), exit_notifier, network_tuple));
783+
tokio::spawn(task_wait_to_close(tcb_clone.clone(), exit_notifier, network_tuple, config.two_msl));
712784
let new_state = tcb.get_state();
713785
log::trace!("{network_tuple} {state:?}: Received final ACK|FIN, transitioned to {new_state:?}");
714786
}

0 commit comments

Comments
 (0)