Skip to content

Commit a6b061e

Browse files
committed
Correctly reconnect when we enter a new active session time
1 parent adf2af7 commit a6b061e

2 files changed

Lines changed: 20 additions & 11 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,14 @@ impl<M: FixMessage> SessionRef<M> {
9797
}
9898

9999
pub async fn await_active_session_time(&self) {
100+
debug!("awaiting active session time");
100101
let (sender, receiver) = oneshot::channel::<AwaitingActiveSessionResponse>();
101102
self.sender
102103
.send(SessionEvent::AwaitingActiveSession(sender))
103104
.await
104105
.unwrap();
105106
receiver.await.expect("to receive a response");
107+
debug!("resuming connection as session is active");
106108
}
107109
}
108110

@@ -399,14 +401,19 @@ impl<M: FixMessage, S: MessageStore> Session<M, S> {
399401
async fn on_logout(&mut self) {
400402
if let SessionState::AwaitingLogout { .. } = &self.state {
401403
self.state.disconnect().await;
404+
self.state = SessionState::Disconnected {
405+
reconnect: true,
406+
_reason: "we logged out gracefully".to_string(),
407+
session_awaiter: None,
408+
};
409+
} else {
410+
// TODO: reconnect = false isn't always valid, this should be more sophisticated
411+
self.state.disconnect().await;
412+
self.state = SessionState::LoggedOut { reconnect: false };
413+
self.application
414+
.send_logout("peer has logged us out".to_string())
415+
.await;
402416
}
403-
404-
// TODO: reconnect = false isn't always valid, this should be more sophisticated
405-
self.state.disconnect().await;
406-
self.state = SessionState::LoggedOut { reconnect: false };
407-
self.application
408-
.send_logout("peer has logged us out".to_string())
409-
.await;
410417
}
411418

412419
async fn on_heartbeat(&mut self, message: &Message) {

crates/hotfix/src/session/state.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1+
use crate::message::parser::RawFixMessage;
2+
use crate::session::event::AwaitingActiveSessionResponse;
3+
use crate::transport::socket_writer::WriterRef;
14
use hotfix_message::message::Message;
25
use std::collections::VecDeque;
36
use tokio::sync::oneshot;
47
use tracing::{debug, error};
58

6-
use crate::message::parser::RawFixMessage;
7-
use crate::session::event::AwaitingActiveSessionResponse;
8-
use crate::transport::socket_writer::WriterRef;
9-
109
pub enum SessionState {
1110
/// We have established a connection, sent a logon message and await a response.
1211
AwaitingLogon { writer: WriterRef, logon_sent: bool },
@@ -119,6 +118,7 @@ impl SessionState {
119118
}
120119
} else {
121120
*session_awaiter = Some(responder);
121+
debug!("registered session awaiter");
122122
}
123123
}
124124
_ => {
@@ -138,6 +138,8 @@ impl SessionState {
138138
if let Some(awaiter) = session_awaiter.take() {
139139
if let Err(err) = awaiter.send(AwaitingActiveSessionResponse::Active) {
140140
error!("failed to send session awaiter response: {err:?}");
141+
} else {
142+
debug!("notified session awaiter");
141143
}
142144
}
143145
}

0 commit comments

Comments
 (0)