Skip to content

Commit f8446fc

Browse files
committed
Remove anyhow from session handle and initiator APIs
1 parent 976df05 commit f8446fc

7 files changed

Lines changed: 41 additions & 33 deletions

File tree

crates/hotfix-web/src/session_controller.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ impl<Outbound: OutboundMessage> SessionController for HttpSessionController<Outb
2222
}
2323

2424
async fn request_reset_on_next_logon(&self) -> anyhow::Result<()> {
25-
self.session_handle.request_reset_on_next_logon().await
25+
self.session_handle.request_reset_on_next_logon().await?;
26+
Ok(())
2627
}
2728

2829
async fn shutdown(&self, reconnect: bool) -> anyhow::Result<()> {
29-
self.session_handle.shutdown(reconnect).await
30+
self.session_handle.shutdown(reconnect).await?;
31+
Ok(())
3032
}
3133
}
3234

crates/hotfix/src/initiator.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
//! The initiator establishes the transport layer connection with
77
//! the peer, and sends the initial Logon (35=A) message. For transport,
88
//! `HotFIX` supports plain TCP and encrypted TLS over TCP connections.
9-
use anyhow::Result;
109
use std::time::Duration;
1110
use tokio::sync::watch;
1211
use tokio::time::sleep;
@@ -15,7 +14,7 @@ use tracing::{debug, warn};
1514
use crate::application::Application;
1615
use crate::config::SessionConfig;
1716
use crate::message::{InboundMessage, OutboundMessage};
18-
use crate::session::error::{SendError, SendOutcome};
17+
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
1918
use crate::session::{InternalSessionRef, SessionHandle};
2019
use crate::store::MessageStore;
2120
use crate::transport::connect;
@@ -32,7 +31,7 @@ impl<Outbound: OutboundMessage> Initiator<Outbound> {
3231
config: SessionConfig,
3332
application: impl Application<Inbound, Outbound>,
3433
store: impl MessageStore + 'static,
35-
) -> Result<Self> {
34+
) -> Result<Self, SessionCreationError> {
3635
let session_ref = InternalSessionRef::new(config.clone(), application, store)?;
3736
let (completion_tx, completion_rx) = watch::channel(false);
3837

@@ -76,9 +75,11 @@ impl<Outbound: OutboundMessage> Initiator<Outbound> {
7675
self.session_handle.clone()
7776
}
7877

79-
pub async fn shutdown(self, reconnect: bool) -> Result<()> {
78+
pub async fn shutdown(self, reconnect: bool) -> Result<(), SendError> {
8079
self.session_handle.shutdown(reconnect).await?;
81-
tokio::time::timeout(Duration::from_secs(5), self.wait_for_shutdown()).await?;
80+
tokio::time::timeout(Duration::from_secs(5), self.wait_for_shutdown())
81+
.await
82+
.map_err(|_| SendError::SessionGone)?;
8283

8384
Ok(())
8485
}

crates/hotfix/src/session/error.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ pub enum SendError {
5252
SessionGone,
5353
}
5454

55+
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for SendError {
56+
fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
57+
SendError::SessionGone
58+
}
59+
}
60+
61+
impl From<tokio::sync::oneshot::error::RecvError> for SendError {
62+
fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
63+
SendError::SessionGone
64+
}
65+
}
66+
5567
/// Error that can occur when sending a message internally within the session.
5668
///
5769
/// This is a subset of `SendError` without `SessionTerminated` and `SessionGone`,

crates/hotfix/src/session/session_handle.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,8 @@ impl<Outbound> SessionHandle<Outbound> {
3636
message: msg,
3737
confirm: Some(tx),
3838
};
39-
self.outbound_message_sender
40-
.send(request)
41-
.await
42-
.map_err(|_| SendError::Disconnected)?;
43-
44-
rx.await.map_err(|_| SendError::SessionGone)?
39+
self.outbound_message_sender.send(request).await?;
40+
rx.await?
4541
}
4642

4743
/// Sends a message without waiting for confirmation.
@@ -53,27 +49,21 @@ impl<Outbound> SessionHandle<Outbound> {
5349
message: msg,
5450
confirm: None,
5551
};
56-
self.outbound_message_sender
57-
.send(request)
58-
.await
59-
.map_err(|_| SendError::Disconnected)?;
60-
52+
self.outbound_message_sender.send(request).await?;
6153
Ok(())
6254
}
6355

64-
pub async fn shutdown(&self, reconnect: bool) -> anyhow::Result<()> {
56+
pub async fn shutdown(&self, reconnect: bool) -> Result<(), SendError> {
6557
self.admin_request_sender
6658
.send(AdminRequest::InitiateGracefulShutdown { reconnect })
6759
.await?;
68-
6960
Ok(())
7061
}
7162

72-
pub async fn request_reset_on_next_logon(&self) -> anyhow::Result<()> {
63+
pub async fn request_reset_on_next_logon(&self) -> Result<(), SendError> {
7364
self.admin_request_sender
7465
.send(AdminRequest::ResetSequenceNumbersOnNextLogon)
7566
.await?;
76-
7767
Ok(())
7868
}
7969
}

crates/hotfix/src/session/session_ref.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use anyhow::Result;
21
use thiserror::Error;
32
use tokio::sync::{mpsc, oneshot};
43
use tracing::debug;
@@ -7,7 +6,7 @@ use crate::config::SessionConfig;
76
use crate::message::{InboundMessage, OutboundMessage, RawFixMessage};
87
use crate::session::Session;
98
use crate::session::admin_request::AdminRequest;
10-
use crate::session::error::{SendError, SendOutcome};
9+
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
1110
use crate::session::event::{AwaitingActiveSessionResponse, SessionEvent};
1211
use crate::store::MessageStore;
1312
use crate::transport::writer::WriterRef;
@@ -31,7 +30,7 @@ impl<Outbound: OutboundMessage> InternalSessionRef<Outbound> {
3130
config: SessionConfig,
3231
application: impl Application<Inbound, Outbound>,
3332
store: impl MessageStore + 'static,
34-
) -> Result<Self> {
33+
) -> Result<Self, SessionCreationError> {
3534
let (event_sender, event_receiver) = mpsc::channel::<SessionEvent>(100);
3635
let (outbound_message_sender, outbound_message_receiver) =
3736
mpsc::channel::<OutboundRequest<Outbound>>(10);

examples/load-testing/src/main.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,19 @@ async fn start_session(
9494
db_config: Database,
9595
app: LoadTestingApplication,
9696
) -> Result<Initiator<OutboundMsg>> {
97-
match db_config {
97+
let initiator = match db_config {
9898
Database::Memory => {
9999
let store = hotfix::store::in_memory::InMemoryMessageStore::default();
100-
Initiator::start(session_config, app, store).await
100+
Initiator::start(session_config, app, store).await?
101101
}
102102
Database::File => {
103103
let store = hotfix::store::file::FileStore::new("data", "load-testing-store")
104104
.expect("be able to create store");
105-
Initiator::start(session_config, app, store).await
105+
Initiator::start(session_config, app, store).await?
106106
}
107-
}
107+
};
108+
109+
Ok(initiator)
108110
}
109111

110112
async fn submit_messages(session_handle: SessionHandle<OutboundMsg>, message_count: u32) {

examples/simple-new-order/src/main.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,19 @@ async fn start_session(
152152
.pop()
153153
.context("config must include a session")?;
154154

155-
match db_config {
155+
let initiator = match db_config {
156156
Database::Memory => {
157157
let store = hotfix::store::in_memory::InMemoryMessageStore::default();
158-
Initiator::start(session_config, app, store).await
158+
Initiator::start(session_config, app, store).await?
159159
}
160160
Database::File => {
161161
let store = hotfix::store::file::FileStore::new("data", "simple-new-order-store")
162162
.context("failed to create file store")?;
163-
Initiator::start(session_config, app, store).await
163+
Initiator::start(session_config, app, store).await?
164164
}
165-
}
165+
};
166+
167+
Ok(initiator)
166168
}
167169

168170
async fn start_web_service(

0 commit comments

Comments
 (0)