Skip to content

Commit 6eede11

Browse files
committed
Watchdog forces reader close when peer ignores FIN
1 parent 48f5333 commit 6eede11

5 files changed

Lines changed: 151 additions & 17 deletions

File tree

Lines changed: 140 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,157 @@
1+
use std::time::Duration;
2+
3+
use tokio::sync::oneshot;
4+
use tracing::warn;
5+
16
use crate::transport::reader::ReaderRef;
27
use crate::transport::writer::WriterRef;
38

9+
const FORCE_CLOSE_TIMEOUT: Duration = Duration::from_secs(10);
10+
411
pub struct FixConnection {
512
writer: WriterRef,
613
reader: ReaderRef,
14+
writer_exit: oneshot::Receiver<()>,
715
}
816

917
impl FixConnection {
10-
pub fn new(writer: WriterRef, reader: ReaderRef) -> Self {
11-
Self { writer, reader }
18+
pub fn new(writer: WriterRef, reader: ReaderRef, writer_exit: oneshot::Receiver<()>) -> Self {
19+
Self {
20+
writer,
21+
reader,
22+
writer_exit,
23+
}
1224
}
25+
1326
pub fn get_writer(&self) -> WriterRef {
1427
self.writer.clone()
1528
}
1629

1730
pub async fn run_until_disconnect(self) {
18-
self.reader.wait_for_disconnect().await
31+
let Self {
32+
reader,
33+
mut writer_exit,
34+
..
35+
} = self;
36+
let ReaderRef {
37+
mut disconnect_signal,
38+
kill,
39+
} = reader;
40+
41+
tokio::select! {
42+
_ = &mut disconnect_signal => return,
43+
_ = &mut writer_exit => {}
44+
}
45+
46+
match tokio::time::timeout(FORCE_CLOSE_TIMEOUT, &mut disconnect_signal).await {
47+
Ok(_) => {}
48+
Err(_) => {
49+
warn!(
50+
"reader did not observe EOF within {:?}, forcing close",
51+
FORCE_CLOSE_TIMEOUT
52+
);
53+
let _ = kill.send(());
54+
let _ = disconnect_signal.await;
55+
}
56+
}
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use super::*;
63+
use crate::transport::writer::WriterMessage;
64+
use tokio::sync::mpsc;
65+
66+
/// Build a `FixConnection` and return the ends the test controls:
67+
/// dc_sender to fire from the "reader", writer_exit_tx to fire from the "writer",
68+
/// and kill_rx so the test can observe or simulate the reader being killed.
69+
fn test_connection() -> (
70+
FixConnection,
71+
oneshot::Sender<()>,
72+
oneshot::Sender<()>,
73+
oneshot::Receiver<()>,
74+
) {
75+
let (dc_tx, dc_rx) = oneshot::channel::<()>();
76+
let (kill_tx, kill_rx) = oneshot::channel::<()>();
77+
let reader_ref = ReaderRef::new(dc_rx, kill_tx);
78+
79+
let (writer_mpsc_tx, _writer_mpsc_rx) = mpsc::channel::<WriterMessage>(1);
80+
let writer_ref = WriterRef::new(writer_mpsc_tx);
81+
82+
let (writer_exit_tx, writer_exit_rx) = oneshot::channel::<()>();
83+
84+
let conn = FixConnection::new(writer_ref, reader_ref, writer_exit_rx);
85+
(conn, dc_tx, writer_exit_tx, kill_rx)
86+
}
87+
88+
/// Reader signals disconnect first — return immediately, kill is never sent.
89+
#[tokio::test(start_paused = true)]
90+
async fn returns_on_reader_disconnect_before_writer_exit() {
91+
let (conn, dc_tx, _writer_exit_tx, mut kill_rx) = test_connection();
92+
93+
dc_tx.send(()).expect("dc receiver dropped");
94+
95+
conn.run_until_disconnect().await;
96+
97+
// Kill should not have been sent. The sender has been dropped by now
98+
// (scope ended inside run_until_disconnect), so try_recv returns Closed
99+
// rather than Empty. Either way, an Ok(()) would mean kill was sent.
100+
assert!(
101+
!matches!(kill_rx.try_recv(), Ok(())),
102+
"kill signal should not have been sent"
103+
);
104+
}
105+
106+
/// Writer exits first, reader disconnects within the watchdog window — no kill.
107+
#[tokio::test(start_paused = true)]
108+
async fn returns_when_reader_disconnects_after_writer_exit_within_timeout() {
109+
let (conn, dc_tx, writer_exit_tx, mut kill_rx) = test_connection();
110+
111+
writer_exit_tx
112+
.send(())
113+
.expect("writer_exit receiver dropped");
114+
115+
// Fire the reader disconnect from a task that runs on the same paused clock.
116+
tokio::spawn(async move {
117+
tokio::time::sleep(Duration::from_secs(1)).await;
118+
let _ = dc_tx.send(());
119+
});
120+
121+
conn.run_until_disconnect().await;
122+
123+
assert!(
124+
!matches!(kill_rx.try_recv(), Ok(())),
125+
"kill signal should not have been sent when reader disconnected within timeout"
126+
);
127+
}
128+
129+
/// Writer exits first, reader stays blocked past the watchdog — kill fires,
130+
/// and a simulated reader fires dc once it sees the kill.
131+
#[tokio::test(start_paused = true)]
132+
async fn watchdog_fires_kill_when_reader_stuck() {
133+
let (conn, dc_tx, writer_exit_tx, kill_rx) = test_connection();
134+
135+
writer_exit_tx
136+
.send(())
137+
.expect("writer_exit receiver dropped");
138+
139+
// Stand in for the reader: when the watchdog kills us, we publish dc.
140+
tokio::spawn(async move {
141+
if kill_rx.await.is_ok() {
142+
let _ = dc_tx.send(());
143+
}
144+
});
145+
146+
let start = tokio::time::Instant::now();
147+
conn.run_until_disconnect().await;
148+
let elapsed = start.elapsed();
149+
150+
assert!(
151+
elapsed >= FORCE_CLOSE_TIMEOUT,
152+
"expected watchdog to take at least {:?}, took {:?}",
153+
FORCE_CLOSE_TIMEOUT,
154+
elapsed
155+
);
19156
}
20157
}

crates/hotfix/src/transport/reader.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ pub struct ReaderMessage;
77

88
pub struct ReaderRef {
99
pub(crate) disconnect_signal: oneshot::Receiver<()>,
10-
// Consumed by the watchdog in FixConnection::run_until_disconnect (follow-up commit).
11-
// Kept wired now so the reader's select! already accepts an external kill signal.
12-
#[allow(dead_code)]
1310
pub(crate) kill: oneshot::Sender<()>,
1411
}
1512

crates/hotfix/src/transport/socket.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ where
4747
{
4848
let (reader, writer) = tokio::io::split(stream);
4949

50-
let (writer_ref, _writer_exit) = spawn_socket_writer(writer);
50+
let (writer_ref, writer_exit) = spawn_socket_writer(writer);
5151
let reader_ref = spawn_socket_reader(reader, session_ref);
5252

53-
FixConnection::new(writer_ref, reader_ref)
53+
FixConnection::new(writer_ref, reader_ref, writer_exit)
5454
}

crates/hotfix/src/transport/socket/socket_writer.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,7 @@ mod tests {
314314
Poll::Ready(Ok(()))
315315
}
316316

317-
fn poll_shutdown(
318-
self: Pin<&mut Self>,
319-
_cx: &mut Context<'_>,
320-
) -> Poll<std::io::Result<()>> {
317+
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
321318
Poll::Pending
322319
}
323320
}
@@ -335,10 +332,11 @@ mod tests {
335332
writer_ref.disconnect().await;
336333

337334
// Advance virtual time past the shutdown timeout.
338-
tokio::time::advance(WRITER_SHUTDOWN_TIMEOUT + std::time::Duration::from_millis(100))
339-
.await;
335+
tokio::time::advance(WRITER_SHUTDOWN_TIMEOUT + std::time::Duration::from_millis(100)).await;
340336

341337
// Exit should have fired by now.
342-
exit_rx.await.expect("exit sender dropped without signalling");
338+
exit_rx
339+
.await
340+
.expect("exit sender dropped without signalling");
343341
}
344342
}

crates/hotfix/tests/session_test_cases/common/fakes/fake_counterparty.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ where
3737
) -> Result<Self> {
3838
let (writer_ref, receiver) = Self::create_writer();
3939
let (reader_ref, dc_sender) = Self::create_reader();
40-
let connection = FixConnection::new(writer_ref, reader_ref);
40+
let (_writer_exit_tx, writer_exit_rx) = oneshot::channel();
41+
let connection = FixConnection::new(writer_ref, reader_ref, writer_exit_rx);
4142
let message_config = MessageConfig::default();
4243
let message_builder = MessageBuilder::new(Dictionary::fix44(), message_config)?;
4344

@@ -61,7 +62,8 @@ where
6162
pub async fn reconnect(&mut self, reset_store: bool) -> Result<()> {
6263
let (writer_ref, receiver) = Self::create_writer();
6364
let (reader_ref, dc_sender) = Self::create_reader();
64-
let connection = FixConnection::new(writer_ref, reader_ref);
65+
let (_writer_exit_tx, writer_exit_rx) = oneshot::channel();
66+
let connection = FixConnection::new(writer_ref, reader_ref, writer_exit_rx);
6567

6668
self.receiver = receiver;
6769
self._dc_sender = dc_sender;

0 commit comments

Comments
 (0)