Skip to content

Commit dafd5b6

Browse files
committed
Create structure for awaiting session times
1 parent 10b95b6 commit dafd5b6

4 files changed

Lines changed: 72 additions & 1 deletion

File tree

crates/hotfix/src/initiator.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,13 @@ async fn establish_connection<M: FixMessage>(config: SessionConfig, session_ref:
5858
warn!("session indicated we shouldn't reconnect");
5959
break;
6060
}
61+
session_ref.await_active_session_time().await;
6162

6263
match FixConnection::connect(&config, session_ref.clone()).await {
6364
Ok(conn) => {
6465
session_ref.register_writer(conn.get_writer()).await;
66+
67+
// TODO: should this ask the session about disconnects?
6568
conn.run_until_disconnect().await;
6669

6770
warn!("session connection dropped, attempting to reconnect");

crates/hotfix/src/session.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::message::resend_request::ResendRequest;
3030
use crate::message::sequence_reset::SequenceReset;
3131
use crate::message::test_request::TestRequest;
3232
use crate::message_utils::is_admin;
33+
use crate::session::event::AwaitingActiveSessionResponse;
3334
use crate::session::state::AwaitingResendState;
3435
use crate::session_schedule::SessionSchedule;
3536
use event::SessionEvent;
@@ -94,6 +95,15 @@ impl<M: FixMessage> SessionRef<M> {
9495
.unwrap();
9596
receiver.await.expect("to receive a response")
9697
}
98+
99+
pub async fn await_active_session_time(&self) {
100+
let (sender, receiver) = oneshot::channel::<AwaitingActiveSessionResponse>();
101+
self.sender
102+
.send(SessionEvent::AwaitingActiveSession(sender))
103+
.await
104+
.unwrap();
105+
receiver.await.expect("to receive a response");
106+
}
97107
}
98108

99109
struct Session<M, S> {
@@ -137,6 +147,7 @@ impl<M: FixMessage, S: MessageStore> Session<M, S> {
137147
state: SessionState::Disconnected {
138148
reconnect: true,
139149
_reason: "initialising".to_string(),
150+
session_awaiter: None,
140151
},
141152
application,
142153
store,
@@ -323,12 +334,14 @@ impl<M: FixMessage, S: MessageStore> Session<M, S> {
323334
self.state = SessionState::Disconnected {
324335
reconnect: true,
325336
_reason: reason,
337+
session_awaiter: None,
326338
}
327339
}
328340
SessionState::LoggedOut { reconnect } => {
329341
self.state = SessionState::Disconnected {
330342
reconnect,
331343
_reason: "logged out".to_string(),
344+
session_awaiter: None,
332345
}
333346
}
334347
SessionState::Disconnected { .. } => {
@@ -647,6 +660,9 @@ impl<M: FixMessage, S: MessageStore> Session<M, S> {
647660
.send(self.state.should_reconnect())
648661
.expect("be able to respond");
649662
}
663+
SessionEvent::AwaitingActiveSession(responder) => {
664+
self.state.register_session_awaiter(responder);
665+
}
650666
}
651667
}
652668

crates/hotfix/src/session/event.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,19 @@ pub enum SessionEvent<M> {
1515
Connected(WriterRef),
1616
/// Ask the session whether we should attempt to reconnect.
1717
ShouldReconnect(oneshot::Sender<bool>),
18+
/// Ask the session to notify us when the session is active.
19+
AwaitingActiveSession(oneshot::Sender<AwaitingActiveSessionResponse>),
20+
}
21+
22+
/// The response sent by the session to AwaitingActiveSession messages.
23+
///
24+
/// This doesn't include an Inactive variant, as the session won't respond until
25+
/// it's active or in a state that indicates it should just be shut down due to an
26+
/// unrecoverable error.
27+
#[derive(Debug, Clone, Copy)]
28+
pub enum AwaitingActiveSessionResponse {
29+
/// The session is now active and ready to connect.
30+
Active,
31+
/// The session should be shut down due to an unrecoverable error.
32+
Shutdown,
1833
}

crates/hotfix/src/session/state.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use hotfix_message::message::Message;
22
use std::collections::VecDeque;
3+
use tokio::sync::oneshot;
34
use tracing::{debug, error};
45

56
use crate::message::parser::RawFixMessage;
7+
use crate::session::event::AwaitingActiveSessionResponse;
68
use crate::transport::socket_writer::WriterRef;
79

810
pub enum SessionState {
@@ -17,7 +19,14 @@ pub enum SessionState {
1719
/// The peer has logged us out.
1820
LoggedOut { reconnect: bool },
1921
/// The TCP connection has been dropped.
20-
Disconnected { reconnect: bool, _reason: String },
22+
///
23+
/// This is also the state we're in if we purposefully disconnected due to the current
24+
/// time being out of session hours.
25+
Disconnected {
26+
reconnect: bool,
27+
session_awaiter: Option<oneshot::Sender<AwaitingActiveSessionResponse>>,
28+
_reason: String,
29+
},
2130
}
2231

2332
impl SessionState {
@@ -92,6 +101,34 @@ impl SessionState {
92101
_ => false,
93102
}
94103
}
104+
105+
pub fn register_session_awaiter(
106+
&self,
107+
responder: oneshot::Sender<AwaitingActiveSessionResponse>,
108+
) {
109+
match self {
110+
SessionState::Disconnected {
111+
reconnect: true,
112+
session_awaiter,
113+
_reason,
114+
} => {
115+
if session_awaiter.is_some() {
116+
error!("session awaiter already registered");
117+
if let Err(err) = responder.send(AwaitingActiveSessionResponse::Shutdown) {
118+
error!("failed to send session awaiter response: {err:?}");
119+
}
120+
} else {
121+
// set the session_awaiter on the current state
122+
}
123+
}
124+
_ => {
125+
error!("session awaiter can only be registered on disconnected sessions");
126+
if let Err(err) = responder.send(AwaitingActiveSessionResponse::Shutdown) {
127+
error!("failed to send session awaiter response: {err:?}");
128+
}
129+
}
130+
}
131+
}
95132
}
96133

97134
/// Session state we're in while processing messages we requested to be resent.

0 commit comments

Comments
 (0)