Skip to content

Commit b32cdfe

Browse files
authored
fix: prevent sessions from hanging due to lack of explicit shutdown and safeguards for TCP connection (#347)
* Give the reader an external kill signal * Shutdown write half and publish exit signal * Watchdog forces reader close when peer ignores FIN
1 parent 706a898 commit b32cdfe

6 files changed

Lines changed: 351 additions & 48 deletions

File tree

Lines changed: 140 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,157 @@
1+
use std::time::Duration;
2+
3+
use tokio::sync::oneshot;
4+
use tracing::warn;
5+
16
use crate::transport::reader::ReaderRef;
27
use crate::transport::writer::WriterRef;
38

9+
const FORCE_CLOSE_TIMEOUT: Duration = Duration::from_secs(10);
10+
411
pub struct FixConnection {
512
writer: WriterRef,
613
reader: ReaderRef,
14+
writer_exit: oneshot::Receiver<()>,
715
}
816

917
impl FixConnection {
10-
pub fn new(writer: WriterRef, reader: ReaderRef) -> Self {
11-
Self { writer, reader }
18+
pub fn new(writer: WriterRef, reader: ReaderRef, writer_exit: oneshot::Receiver<()>) -> Self {
19+
Self {
20+
writer,
21+
reader,
22+
writer_exit,
23+
}
1224
}
25+
1326
pub fn get_writer(&self) -> WriterRef {
1427
self.writer.clone()
1528
}
1629

1730
pub async fn run_until_disconnect(self) {
18-
self.reader.wait_for_disconnect().await
31+
let Self {
32+
reader,
33+
mut writer_exit,
34+
..
35+
} = self;
36+
let ReaderRef {
37+
mut disconnect_signal,
38+
kill,
39+
} = reader;
40+
41+
tokio::select! {
42+
_ = &mut disconnect_signal => return,
43+
_ = &mut writer_exit => {}
44+
}
45+
46+
match tokio::time::timeout(FORCE_CLOSE_TIMEOUT, &mut disconnect_signal).await {
47+
Ok(_) => {}
48+
Err(_) => {
49+
warn!(
50+
"reader did not observe EOF within {:?}, forcing close",
51+
FORCE_CLOSE_TIMEOUT
52+
);
53+
let _ = kill.send(());
54+
let _ = disconnect_signal.await;
55+
}
56+
}
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use super::*;
63+
use crate::transport::writer::WriterMessage;
64+
use tokio::sync::mpsc;
65+
66+
/// Build a `FixConnection` and return the ends the test controls:
67+
/// dc_sender to fire from the "reader", writer_exit_tx to fire from the "writer",
68+
/// and kill_rx so the test can observe or simulate the reader being killed.
69+
fn test_connection() -> (
70+
FixConnection,
71+
oneshot::Sender<()>,
72+
oneshot::Sender<()>,
73+
oneshot::Receiver<()>,
74+
) {
75+
let (dc_tx, dc_rx) = oneshot::channel::<()>();
76+
let (kill_tx, kill_rx) = oneshot::channel::<()>();
77+
let reader_ref = ReaderRef::new(dc_rx, kill_tx);
78+
79+
let (writer_mpsc_tx, _writer_mpsc_rx) = mpsc::channel::<WriterMessage>(1);
80+
let writer_ref = WriterRef::new(writer_mpsc_tx);
81+
82+
let (writer_exit_tx, writer_exit_rx) = oneshot::channel::<()>();
83+
84+
let conn = FixConnection::new(writer_ref, reader_ref, writer_exit_rx);
85+
(conn, dc_tx, writer_exit_tx, kill_rx)
86+
}
87+
88+
/// Reader signals disconnect first — return immediately, kill is never sent.
89+
#[tokio::test(start_paused = true)]
90+
async fn returns_on_reader_disconnect_before_writer_exit() {
91+
let (conn, dc_tx, _writer_exit_tx, mut kill_rx) = test_connection();
92+
93+
dc_tx.send(()).expect("dc receiver dropped");
94+
95+
conn.run_until_disconnect().await;
96+
97+
// Kill should not have been sent. The sender has been dropped by now
98+
// (scope ended inside run_until_disconnect), so try_recv returns Closed
99+
// rather than Empty. Either way, an Ok(()) would mean kill was sent.
100+
assert!(
101+
!matches!(kill_rx.try_recv(), Ok(())),
102+
"kill signal should not have been sent"
103+
);
104+
}
105+
106+
/// Writer exits first, reader disconnects within the watchdog window — no kill.
107+
#[tokio::test(start_paused = true)]
108+
async fn returns_when_reader_disconnects_after_writer_exit_within_timeout() {
109+
let (conn, dc_tx, writer_exit_tx, mut kill_rx) = test_connection();
110+
111+
writer_exit_tx
112+
.send(())
113+
.expect("writer_exit receiver dropped");
114+
115+
// Fire the reader disconnect from a task that runs on the same paused clock.
116+
tokio::spawn(async move {
117+
tokio::time::sleep(Duration::from_secs(1)).await;
118+
let _ = dc_tx.send(());
119+
});
120+
121+
conn.run_until_disconnect().await;
122+
123+
assert!(
124+
!matches!(kill_rx.try_recv(), Ok(())),
125+
"kill signal should not have been sent when reader disconnected within timeout"
126+
);
127+
}
128+
129+
/// Writer exits first, reader stays blocked past the watchdog — kill fires,
130+
/// and a simulated reader fires dc once it sees the kill.
131+
#[tokio::test(start_paused = true)]
132+
async fn watchdog_fires_kill_when_reader_stuck() {
133+
let (conn, dc_tx, writer_exit_tx, kill_rx) = test_connection();
134+
135+
writer_exit_tx
136+
.send(())
137+
.expect("writer_exit receiver dropped");
138+
139+
// Stand in for the reader: when the watchdog kills us, we publish dc.
140+
tokio::spawn(async move {
141+
if kill_rx.await.is_ok() {
142+
let _ = dc_tx.send(());
143+
}
144+
});
145+
146+
let start = tokio::time::Instant::now();
147+
conn.run_until_disconnect().await;
148+
let elapsed = start.elapsed();
149+
150+
assert!(
151+
elapsed >= FORCE_CLOSE_TIMEOUT,
152+
"expected watchdog to take at least {:?}, took {:?}",
153+
FORCE_CLOSE_TIMEOUT,
154+
elapsed
155+
);
19156
}
20157
}

crates/hotfix/src/transport/reader.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@ use tracing::warn;
66
pub struct ReaderMessage;
77

88
pub struct ReaderRef {
9-
disconnect_signal: oneshot::Receiver<()>,
9+
pub(crate) disconnect_signal: oneshot::Receiver<()>,
10+
pub(crate) kill: oneshot::Sender<()>,
1011
}
1112

1213
impl ReaderRef {
13-
pub fn new(disconnect_signal: oneshot::Receiver<()>) -> Self {
14-
Self { disconnect_signal }
14+
pub fn new(disconnect_signal: oneshot::Receiver<()>, kill: oneshot::Sender<()>) -> Self {
15+
Self {
16+
disconnect_signal,
17+
kill,
18+
}
1519
}
1620

1721
pub async fn wait_for_disconnect(self) {

crates/hotfix/src/transport/socket.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ where
4747
{
4848
let (reader, writer) = tokio::io::split(stream);
4949

50-
let writer_ref = spawn_socket_writer(writer);
50+
let (writer_ref, writer_exit) = spawn_socket_writer(writer);
5151
let reader_ref = spawn_socket_reader(reader, session_ref);
5252

53-
FixConnection::new(writer_ref, reader_ref)
53+
FixConnection::new(writer_ref, reader_ref, writer_exit)
5454
}

crates/hotfix/src/transport/socket/socket_reader.rs

Lines changed: 77 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ pub fn spawn_socket_reader(
1212
session_ref: InternalSessionRef<impl OutboundMessage>,
1313
) -> ReaderRef {
1414
let (dc_sender, dc_receiver) = oneshot::channel();
15+
let (kill_sender, kill_receiver) = oneshot::channel();
1516
let actor = ReaderActor::new(reader, session_ref, dc_sender);
16-
tokio::spawn(run_reader(actor));
17+
tokio::spawn(run_reader(actor, kill_receiver));
1718

18-
ReaderRef::new(dc_receiver)
19+
ReaderRef::new(dc_receiver, kill_sender)
1920
}
2021

2122
struct ReaderActor<M, R> {
@@ -38,40 +39,52 @@ impl<M, R: AsyncRead> ReaderActor<M, R> {
3839
}
3940
}
4041

41-
async fn run_reader<Outbound, R>(mut actor: ReaderActor<Outbound, R>)
42-
where
42+
async fn run_reader<Outbound, R>(
43+
mut actor: ReaderActor<Outbound, R>,
44+
mut kill_rx: oneshot::Receiver<()>,
45+
) where
4346
Outbound: OutboundMessage,
4447
R: AsyncRead,
4548
{
4649
let mut parser = Parser::default();
4750
loop {
4851
let mut buf = vec![];
4952

50-
match actor.reader.read_buf(&mut buf).await {
51-
Ok(0) => {
52-
let _ = actor
53-
.session_ref
54-
.disconnect("received EOF".to_string())
55-
.await;
56-
break;
57-
}
58-
Err(err) => {
59-
let _ = actor.session_ref.disconnect(err.to_string()).await;
60-
break;
61-
}
62-
Ok(_) => {
63-
let messages = parser.parse(&buf);
64-
65-
for msg in messages {
66-
if actor
53+
tokio::select! {
54+
result = actor.reader.read_buf(&mut buf) => match result {
55+
Ok(0) => {
56+
let _ = actor
6757
.session_ref
68-
.new_fix_message_received(msg)
69-
.await
70-
.is_err()
71-
{
72-
debug!("reader received message but session has been terminated");
58+
.disconnect("received EOF".to_string())
59+
.await;
60+
break;
61+
}
62+
Err(err) => {
63+
let _ = actor.session_ref.disconnect(err.to_string()).await;
64+
break;
65+
}
66+
Ok(_) => {
67+
let messages = parser.parse(&buf);
68+
69+
for msg in messages {
70+
if actor
71+
.session_ref
72+
.new_fix_message_received(msg)
73+
.await
74+
.is_err()
75+
{
76+
debug!("reader received message but session has been terminated");
77+
}
7378
}
7479
}
80+
},
81+
res = &mut kill_rx => {
82+
let reason = match res {
83+
Ok(()) => "forced close by watchdog",
84+
Err(_) => "reader handle dropped",
85+
};
86+
let _ = actor.session_ref.disconnect(reason.to_string()).await;
87+
break;
7588
}
7689
}
7790
}
@@ -228,4 +241,42 @@ mod tests {
228241
// wait for disconnect signal
229242
let _ = reader_ref.wait_for_disconnect().await;
230243
}
244+
245+
/// Kill signal terminates the reader even when the peer is silent, and
246+
/// the session observes the watchdog-sourced disconnect reason.
247+
#[tokio::test]
248+
async fn kill_signal_terminates_reader() {
249+
let (_writer, reader) = duplex(1024);
250+
let (reader_half, _writer_half) = tokio::io::split(reader);
251+
252+
let (session_ref, mut event_receiver) = create_test_session_ref();
253+
let reader_ref = spawn_socket_reader(reader_half, session_ref);
254+
255+
// Destructure so we can both fire the kill and later await the disconnect signal.
256+
let ReaderRef {
257+
disconnect_signal,
258+
kill,
259+
} = reader_ref;
260+
261+
kill.send(()).expect("kill receiver dropped");
262+
263+
// Reader should publish the watchdog reason to the session.
264+
match tokio::time::timeout(
265+
tokio::time::Duration::from_millis(100),
266+
event_receiver.recv(),
267+
)
268+
.await
269+
{
270+
Ok(Some(SessionEvent::Disconnected(reason))) => {
271+
assert_eq!(reason, "forced close by watchdog");
272+
}
273+
other => panic!("expected Disconnected(\"forced close by watchdog\"), got {other:?}"),
274+
}
275+
276+
// And the disconnect signal should fire shortly after.
277+
tokio::time::timeout(tokio::time::Duration::from_millis(100), disconnect_signal)
278+
.await
279+
.expect("disconnect signal not fired within timeout")
280+
.expect("disconnect sender dropped without signalling");
281+
}
231282
}

0 commit comments

Comments
 (0)