Skip to content

Commit 706a898

Browse files
authored
fix: propagate shutdown response from schedule calculations (#346)
* Propagate ScheduleResponse::Shutdown to the connection loop * Add unit tests for await_in_schedule * Cover await_in_schedule InSchedule/Shutdown/SessionGone paths
1 parent 9d4735d commit 706a898

5 files changed

Lines changed: 98 additions & 43 deletions

File tree

crates/hotfix/src/initiator.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::application::Application;
1515
use crate::config::SessionConfig;
1616
use crate::message::OutboundMessage;
1717
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
18+
use crate::session::event::ScheduleResponse;
1819
use crate::session::{InternalSessionRef, SessionHandle};
1920
use crate::store::MessageStore;
2021
use crate::transport::connect;
@@ -107,9 +108,18 @@ async fn establish_connection<Outbound: OutboundMessage>(
107108
completion_tx: watch::Sender<bool>,
108109
) {
109110
loop {
110-
if session_ref.await_in_schedule().await.is_err() {
111-
warn!("session task terminated when checking schedule");
112-
break;
111+
match session_ref.await_in_schedule().await {
112+
Ok(ScheduleResponse::InSchedule) => {
113+
debug!("resuming connection as schedule is active");
114+
}
115+
Ok(ScheduleResponse::Shutdown) => {
116+
warn!("session indicated shutdown during schedule check");
117+
break;
118+
}
119+
Err(_) => {
120+
warn!("session task terminated when checking schedule");
121+
break;
122+
}
113123
}
114124

115125
match connect(&config, session_ref.clone()).await {

crates/hotfix/src/session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mod session_handle;
99
pub mod session_ref;
1010
mod state;
1111
#[cfg(test)]
12-
mod test_utils;
12+
pub(crate) mod test_utils;
1313

1414
use chrono::Utc;
1515
use hotfix_message::dict::Dictionary;

crates/hotfix/src/session/session_ref.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,13 @@ impl<Outbound: OutboundMessage> InternalSessionRef<Outbound> {
8282
Ok(receiver.await?)
8383
}
8484

85-
pub async fn await_in_schedule(&self) -> Result<(), SessionGone> {
85+
pub async fn await_in_schedule(&self) -> Result<ScheduleResponse, SessionGone> {
8686
debug!("awaiting in-schedule time");
8787
let (sender, receiver) = oneshot::channel::<ScheduleResponse>();
8888
self.event_sender
8989
.send(SessionEvent::AwaitSchedule(sender))
9090
.await?;
91-
receiver.await?;
92-
93-
debug!("resuming connection as schedule is active");
94-
Ok(())
91+
Ok(receiver.await?)
9592
}
9693
}
9794

@@ -110,3 +107,52 @@ impl From<oneshot::error::RecvError> for SessionGone {
110107
Self(err.to_string())
111108
}
112109
}
110+
111+
#[cfg(test)]
112+
mod tests {
113+
use super::*;
114+
use crate::session::test_utils::create_test_session_ref;
115+
116+
#[tokio::test]
117+
async fn await_in_schedule_returns_in_schedule_when_session_responds_in_schedule() {
118+
let (session_ref, mut event_receiver) = create_test_session_ref();
119+
120+
tokio::spawn(async move {
121+
match event_receiver.recv().await {
122+
Some(SessionEvent::AwaitSchedule(responder)) => {
123+
let _ = responder.send(ScheduleResponse::InSchedule);
124+
}
125+
other => panic!("unexpected event: {other:?}"),
126+
}
127+
});
128+
129+
let result = session_ref.await_in_schedule().await;
130+
assert!(matches!(result, Ok(ScheduleResponse::InSchedule)));
131+
}
132+
133+
#[tokio::test]
134+
async fn await_in_schedule_returns_shutdown_when_session_responds_shutdown() {
135+
let (session_ref, mut event_receiver) = create_test_session_ref();
136+
137+
tokio::spawn(async move {
138+
match event_receiver.recv().await {
139+
Some(SessionEvent::AwaitSchedule(responder)) => {
140+
let _ = responder.send(ScheduleResponse::Shutdown);
141+
}
142+
other => panic!("unexpected event: {other:?}"),
143+
}
144+
});
145+
146+
let result = session_ref.await_in_schedule().await;
147+
assert!(matches!(result, Ok(ScheduleResponse::Shutdown)));
148+
}
149+
150+
#[tokio::test]
151+
async fn await_in_schedule_returns_err_when_event_channel_closed() {
152+
let (session_ref, event_receiver) = create_test_session_ref();
153+
drop(event_receiver);
154+
155+
let result = session_ref.await_in_schedule().await;
156+
assert!(matches!(result, Err(SessionGone(_))));
157+
}
158+
}

crates/hotfix/src/session/test_utils.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
use crate::config::SessionConfig;
2+
use crate::message::{Message, OutboundMessage};
3+
use crate::session::admin_request::AdminRequest;
24
use crate::session::ctx::SessionCtx;
5+
use crate::session::event::SessionEvent;
6+
use crate::session::session_ref::{InternalSessionRef, OutboundRequest};
37
use crate::store::{MessageStore, Result as StoreResult};
48
use crate::transport::writer::{WriterMessage, WriterRef};
59
use chrono::{DateTime, Utc};
@@ -117,3 +121,31 @@ pub(crate) fn extract_field(raw: &[u8], tag: u32) -> Option<String> {
117121
}
118122
None
119123
}
124+
125+
#[derive(Clone)]
126+
pub(crate) struct TestMessage;
127+
128+
impl OutboundMessage for TestMessage {
129+
fn write(&self, _msg: &mut Message) {}
130+
fn message_type(&self) -> &str {
131+
"TEST"
132+
}
133+
}
134+
135+
pub(crate) fn create_test_session_ref() -> (
136+
InternalSessionRef<TestMessage>,
137+
mpsc::Receiver<SessionEvent>,
138+
) {
139+
let (event_sender, event_receiver) = mpsc::channel::<SessionEvent>(100);
140+
let (outbound_message_sender, _outbound_receiver) =
141+
mpsc::channel::<OutboundRequest<TestMessage>>(10);
142+
let (admin_request_sender, _admin_receiver) = mpsc::channel::<AdminRequest>(10);
143+
144+
let session_ref = InternalSessionRef {
145+
event_sender,
146+
outbound_message_sender,
147+
admin_request_sender,
148+
};
149+
150+
(session_ref, event_receiver)
151+
}

crates/hotfix/src/transport/socket/socket_reader.rs

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -84,42 +84,9 @@ where
8484
#[cfg(test)]
8585
mod tests {
8686
use super::*;
87-
use crate::message::Message;
88-
use crate::session::admin_request::AdminRequest;
8987
use crate::session::event::SessionEvent;
90-
use crate::session::session_ref::OutboundRequest;
88+
use crate::session::test_utils::create_test_session_ref;
9189
use tokio::io::{AsyncWriteExt, duplex};
92-
use tokio::sync::mpsc;
93-
94-
#[derive(Clone, Debug, PartialEq)]
95-
struct TestMessage;
96-
97-
impl OutboundMessage for TestMessage {
98-
fn write(&self, _msg: &mut Message) {}
99-
100-
fn message_type(&self) -> &str {
101-
"TEST"
102-
}
103-
}
104-
105-
/// Creates a test InternalSessionRef that captures events for verification
106-
fn create_test_session_ref() -> (
107-
InternalSessionRef<TestMessage>,
108-
mpsc::Receiver<SessionEvent>,
109-
) {
110-
let (event_sender, event_receiver) = mpsc::channel::<SessionEvent>(100);
111-
let (outbound_message_sender, _outbound_receiver) =
112-
mpsc::channel::<OutboundRequest<TestMessage>>(10);
113-
let (admin_request_sender, _admin_receiver) = mpsc::channel::<AdminRequest>(10);
114-
115-
let session_ref = InternalSessionRef {
116-
event_sender,
117-
outbound_message_sender,
118-
admin_request_sender,
119-
};
120-
121-
(session_ref, event_receiver)
122-
}
12390

12491
/// Test that the reader correctly parses a valid FIX message and sends it to the session
12592
/// for processing.

0 commit comments

Comments
 (0)