Skip to content

Commit 8c31640

Browse files
authored
feat: replace anyhow errors in session layer with proper error variants (#302)
* Introduce new error type for session creation issues * Improve error handling in session schedule code * Expect message sequence numbers once message has passed validation * Introduce new internal error types in session code * Make fix44 a default feature * Clean up error mapping in session layer * Remove anyhow from session handle and initiator APIs * Remove remaining traces of anyhow in core hotfix crate * Add unit tests for session errors * Add better test coverage for initiator and fix bug with shut down session trying to reconnect * Remove unused Disconnected internal send error variant
1 parent 45422d0 commit 8c31640

12 files changed

Lines changed: 739 additions & 263 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,14 @@ rustls = "0.23"
4848
rustls-native-certs = "0.8"
4949
rustls-pemfile = "2.2"
5050
rustls-pki-types = { version = "1" }
51-
webpki-roots = "1.0"
51+
rcgen = "0.13"
5252
serde = "^1.0.177"
5353
serde_json = "1.0.143"
5454
smartstring = "1"
5555
strum = "0.27"
5656
strum_macros = "0.27"
5757
syn = "2"
58+
tempfile = "3"
5859
testcontainers = "0.25"
5960
thiserror = "2"
6061
tokio = { version = "1" }
@@ -65,6 +66,7 @@ tower = "0.5"
6566
tracing = "0.1"
6667
tracing-subscriber = "0.3"
6768
uuid = { version = "1.5.0" }
69+
webpki-roots = "1.0"
6870
wiremock = "0.6"
6971

7072
[workspace.lints.rust]

crates/hotfix-web/src/session_controller.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ pub struct HttpSessionController<Outbound> {
1818
#[async_trait::async_trait]
1919
impl<Outbound: OutboundMessage> SessionController for HttpSessionController<Outbound> {
2020
async fn get_session_info(&self) -> anyhow::Result<SessionInfo> {
21-
self.session_handle.get_session_info().await
21+
Ok(self.session_handle.get_session_info().await?)
2222
}
2323

2424
async fn request_reset_on_next_logon(&self) -> anyhow::Result<()> {
25-
self.session_handle.request_reset_on_next_logon().await
25+
self.session_handle.request_reset_on_next_logon().await?;
26+
Ok(())
2627
}
2728

2829
async fn shutdown(&self, reconnect: bool) -> anyhow::Result<()> {
29-
self.session_handle.shutdown(reconnect).await
30+
self.session_handle.shutdown(reconnect).await?;
31+
Ok(())
3032
}
3133
}
3234

crates/hotfix/Cargo.toml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ keywords.workspace = true
1212
categories.workspace = true
1313

1414
[features]
15-
default = ["test-utils"]
15+
default = ["fix44", "test-utils"]
1616
fix44 = ["hotfix-message/fix44"]
1717
mongodb = ["dep:hotfix-store-mongodb"]
1818
test-utils = ["hotfix-store/test-utils"]
@@ -25,11 +25,9 @@ hotfix-message = { version = "0.3.0", path = "../hotfix-message", features = ["u
2525
hotfix-store = { version = "0.1.1", path = "../hotfix-store" }
2626
hotfix-store-mongodb = { version = "0.1.3", path = "../hotfix-store-mongodb", optional = true }
2727

28-
anyhow = { workspace = true }
2928
async-trait = { workspace = true }
3029
chrono = { workspace = true, features = ["serde"] }
3130
chrono-tz = { workspace = true, features = ["serde"] }
32-
futures = { workspace = true }
3331
rustls-pki-types = { workspace = true }
3432
rustls = { workspace = true }
3533
rustls-native-certs = { workspace = true }
@@ -41,12 +39,12 @@ tokio = { workspace = true, features = ["full"] }
4139
tokio-rustls = { workspace = true }
4240
toml = { workspace = true }
4341
tracing = { workspace = true }
44-
uuid = { workspace = true, features = ["v4"] }
4542

4643
[dev-dependencies]
4744
hotfix-message = { version = "0.3.0", path = "../hotfix-message", features = ["fix44", "utils-chrono"] }
4845

49-
rcgen = "0.13"
46+
anyhow = { workspace = true }
47+
rcgen = { workspace = true }
5048
rustls = { workspace = true, features = ["ring"] }
51-
tempfile = "3"
49+
tempfile = { workspace = true }
5250
tokio = { workspace = true, features = ["test-util"] }

crates/hotfix/src/initiator.rs

Lines changed: 158 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
//! The initiator establishes the transport layer connection with
77
//! the peer, and sends the initial Logon (35=A) message. For transport,
88
//! `HotFIX` supports plain TCP and encrypted TLS over TCP connections.
9-
use anyhow::Result;
109
use std::time::Duration;
1110
use tokio::sync::watch;
1211
use tokio::time::sleep;
@@ -15,7 +14,7 @@ use tracing::{debug, warn};
1514
use crate::application::Application;
1615
use crate::config::SessionConfig;
1716
use crate::message::{InboundMessage, OutboundMessage};
18-
use crate::session::error::{SendError, SendOutcome};
17+
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
1918
use crate::session::{InternalSessionRef, SessionHandle};
2019
use crate::store::MessageStore;
2120
use crate::transport::connect;
@@ -32,7 +31,7 @@ impl<Outbound: OutboundMessage> Initiator<Outbound> {
3231
config: SessionConfig,
3332
application: impl Application<Inbound, Outbound>,
3433
store: impl MessageStore + 'static,
35-
) -> Result<Self> {
34+
) -> Result<Self, SessionCreationError> {
3635
let session_ref = InternalSessionRef::new(config.clone(), application, store)?;
3736
let (completion_tx, completion_rx) = watch::channel(false);
3837

@@ -76,9 +75,11 @@ impl<Outbound: OutboundMessage> Initiator<Outbound> {
7675
self.session_handle.clone()
7776
}
7877

79-
pub async fn shutdown(self, reconnect: bool) -> Result<()> {
78+
pub async fn shutdown(self, reconnect: bool) -> Result<(), SendError> {
8079
self.session_handle.shutdown(reconnect).await?;
81-
tokio::time::timeout(Duration::from_secs(5), self.wait_for_shutdown()).await?;
80+
tokio::time::timeout(Duration::from_secs(5), self.wait_for_shutdown())
81+
.await
82+
.map_err(|_| SendError::SessionGone)?;
8283

8384
Ok(())
8485
}
@@ -151,15 +152,22 @@ async fn establish_connection<Outbound: OutboundMessage>(
151152
completion_tx.send_replace(true);
152153
}
153154

154-
#[cfg(all(test, feature = "fix44"))]
155+
#[cfg(test)]
156+
#[allow(clippy::expect_used)]
155157
mod tests {
156158
use super::*;
157159
use crate::application::{Application, InboundDecision, OutboundDecision};
158-
use crate::message::InboundMessage;
160+
use crate::message::logon::{Logon, ResetSeqNumConfig};
161+
use crate::message::logout::Logout;
162+
use crate::message::parser::Parser;
163+
use crate::message::{InboundMessage, generate_message};
159164
use crate::store::in_memory::InMemoryMessageStore;
165+
use hotfix_message::Part;
160166
use hotfix_message::message::Message;
167+
use hotfix_message::session_fields::MSG_TYPE;
161168
use std::time::Duration;
162-
use tokio::net::TcpListener;
169+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
170+
use tokio::net::{TcpListener, TcpStream};
163171

164172
// Minimal message type for tests
165173
#[derive(Clone)]
@@ -193,6 +201,90 @@ mod tests {
193201
async fn on_logon(&mut self) {}
194202
}
195203

204+
/// A minimal FIX counterparty for testing the Initiator over TCP.
205+
struct TestCounterparty {
206+
stream: TcpStream,
207+
parser: Parser,
208+
seq_num: u64,
209+
// Counterparty's view: sender is TEST-TARGET, target is TEST-SENDER
210+
sender_comp_id: String,
211+
target_comp_id: String,
212+
}
213+
214+
impl TestCounterparty {
215+
async fn accept(listener: &TcpListener, config: &SessionConfig) -> Self {
216+
let (stream, _) = tokio::time::timeout(Duration::from_secs(2), listener.accept())
217+
.await
218+
.expect("timeout waiting for connection")
219+
.expect("failed to accept connection");
220+
221+
Self {
222+
stream,
223+
parser: Parser::default(),
224+
seq_num: 1,
225+
// Swap sender/target for counterparty perspective
226+
sender_comp_id: config.target_comp_id.clone(),
227+
target_comp_id: config.sender_comp_id.clone(),
228+
}
229+
}
230+
231+
async fn read_message(&mut self) -> Message {
232+
let mut buf = [0u8; 4096];
233+
loop {
234+
let n = self.stream.read(&mut buf).await.expect("read failed");
235+
if n == 0 {
236+
panic!("connection closed before receiving complete message");
237+
}
238+
let messages = self.parser.parse(&buf[..n]);
239+
if let Some(raw_msg) = messages.into_iter().next() {
240+
let builder = hotfix_message::MessageBuilder::new(
241+
hotfix_message::dict::Dictionary::fix44(),
242+
hotfix_message::message::Config::default(),
243+
)
244+
.expect("failed to create message builder");
245+
match builder.build(raw_msg.as_bytes()) {
246+
hotfix_message::parsed_message::ParsedMessage::Valid(msg) => return msg,
247+
_ => panic!("received invalid FIX message"),
248+
}
249+
}
250+
}
251+
}
252+
253+
async fn expect_message(&mut self, expected_type: &str) -> Message {
254+
let msg = tokio::time::timeout(Duration::from_secs(2), self.read_message())
255+
.await
256+
.expect("timeout waiting for message");
257+
let msg_type: &str = msg.header().get(MSG_TYPE).expect("missing MSG_TYPE");
258+
assert_eq!(msg_type, expected_type, "unexpected message type");
259+
msg
260+
}
261+
262+
async fn send_logon(&mut self, heartbeat_interval: u64) {
263+
let logon = Logon::new(heartbeat_interval, ResetSeqNumConfig::NoReset(None));
264+
self.send_message(logon).await;
265+
}
266+
267+
async fn send_logout(&mut self) {
268+
self.send_message(Logout::default()).await;
269+
}
270+
271+
async fn send_message(&mut self, message: impl OutboundMessage) {
272+
let raw = generate_message(
273+
"FIX.4.4",
274+
&self.sender_comp_id,
275+
&self.target_comp_id,
276+
self.seq_num,
277+
message,
278+
)
279+
.expect("failed to generate message");
280+
self.seq_num += 1;
281+
self.stream
282+
.write_all(&raw)
283+
.await
284+
.expect("failed to send message");
285+
}
286+
}
287+
196288
fn create_test_config(host: &str, port: u16) -> SessionConfig {
197289
SessionConfig {
198290
begin_string: "FIX.4.4".to_string(),
@@ -211,6 +303,27 @@ mod tests {
211303
}
212304
}
213305

306+
async fn create_logged_on_initiator() -> (Initiator<DummyMessage>, TestCounterparty) {
307+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
308+
let port = listener.local_addr().unwrap().port();
309+
let config = create_test_config("127.0.0.1", port);
310+
311+
let initiator = Initiator::start(config.clone(), NoOpApp, InMemoryMessageStore::default())
312+
.await
313+
.unwrap();
314+
315+
let mut counterparty = TestCounterparty::accept(&listener, &config).await;
316+
317+
// Complete the logon handshake
318+
counterparty.expect_message("A").await; // Receive Logon
319+
counterparty.send_logon(30).await; // Send Logon response
320+
321+
// Give the session a moment to process the logon
322+
sleep(Duration::from_millis(50)).await;
323+
324+
(initiator, counterparty)
325+
}
326+
214327
#[tokio::test]
215328
async fn test_start_creates_initiator_successfully() {
216329
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
@@ -320,4 +433,41 @@ mod tests {
320433
let result = initiator.send_forget(DummyMessage).await;
321434
assert!(result.is_ok());
322435
}
436+
437+
#[tokio::test]
438+
async fn test_session_handle_returns_working_handle() {
439+
use crate::session::error::SendOutcome;
440+
441+
let (initiator, mut counterparty) = create_logged_on_initiator().await;
442+
443+
// Get the session handle and use it to send a message
444+
let handle = initiator.session_handle();
445+
let result = handle.send(DummyMessage).await;
446+
447+
assert!(matches!(result, Ok(SendOutcome::Sent { .. })));
448+
449+
// Verify counterparty received the message (msg type "0" = Heartbeat)
450+
counterparty.expect_message("0").await;
451+
}
452+
453+
#[tokio::test]
454+
async fn test_shutdown_with_logout_handshake() {
455+
let (initiator, mut counterparty) = create_logged_on_initiator().await;
456+
457+
assert!(!initiator.is_shutdown());
458+
459+
// Spawn shutdown in background - it sends Logout and waits for response
460+
let shutdown_handle = tokio::spawn(async move { initiator.shutdown(false).await });
461+
462+
// Counterparty receives Logout and responds
463+
counterparty.expect_message("5").await; // Logout
464+
counterparty.send_logout().await;
465+
466+
// Close the TCP connection - this completes the disconnect
467+
drop(counterparty);
468+
469+
// Shutdown should complete successfully
470+
let result = shutdown_handle.await.expect("shutdown task panicked");
471+
assert!(result.is_ok(), "Shutdown should complete, got {:?}", result);
472+
}
323473
}

0 commit comments

Comments
 (0)