Skip to content

Commit 4b1de75

Browse files
committed
First cut of refactor to return confirmation for sent app messages
1 parent 6d6e967 commit 4b1de75

11 files changed

Lines changed: 276 additions & 15 deletions

File tree

crates/hotfix/src/initiator.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use tracing::{debug, warn};
1515
use crate::application::Application;
1616
use crate::config::SessionConfig;
1717
use crate::message::{InboundMessage, OutboundMessage};
18+
use crate::session::error::{SendError, SendOutcome};
1819
use crate::session::{InternalSessionRef, SessionHandle};
1920
use crate::store::MessageStore;
2021
use crate::transport::connect;
@@ -50,6 +51,25 @@ impl<Outbound: OutboundMessage> Initiator<Outbound> {
5051
Ok(initiator)
5152
}
5253

54+
/// Sends a message and waits for confirmation that it was persisted.
55+
///
56+
/// Returns `SendOutcome::Sent` with the sequence number if the message was
57+
/// successfully persisted and sent, or `SendOutcome::Dropped` if the application
58+
/// callback chose to drop the message.
59+
pub async fn send(&self, msg: Outbound) -> Result<SendOutcome, SendError> {
60+
self.session_handle.send(msg).await
61+
}
62+
63+
/// Sends a message without waiting for confirmation.
64+
///
65+
/// This is a fire-and-forget operation. The message will be queued for sending
66+
/// but no confirmation is provided about whether it was actually sent.
67+
pub async fn send_forget(&self, msg: Outbound) -> Result<(), SendError> {
68+
self.session_handle.send_forget(msg).await
69+
}
70+
71+
#[deprecated(since = "0.7.2", note = "use `send` or `send_forget` instead")]
72+
#[allow(deprecated)]
5373
pub async fn send_message(&self, msg: Outbound) -> Result<()> {
5474
self.session_handle.send_message(msg).await?;
5575

crates/hotfix/src/session.rs

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ use crate::message::verification::verify_message;
3636
use crate::message::verification_error::{CompIdType, MessageVerificationError};
3737
use crate::message_utils::{is_admin, prepare_message_for_resend};
3838
use crate::session::admin_request::AdminRequest;
39+
pub use crate::session::error::{SendError, SendOutcome};
3940
pub use crate::session::info::{SessionInfo, Status};
4041
pub use crate::session::session_handle::SessionHandle;
4142
#[cfg(not(feature = "test-utils"))]
4243
pub(crate) use crate::session::session_ref::InternalSessionRef;
4344
#[cfg(feature = "test-utils")]
4445
pub use crate::session::session_ref::InternalSessionRef;
46+
use crate::session::session_ref::OutboundRequest;
4547
use crate::session::state::SessionState;
4648
use crate::session::state::{AwaitingResendTransitionOutcome, TestRequestId};
4749
use crate::session_schedule::SessionSchedule;
@@ -819,6 +821,66 @@ where
819821
Ok(())
820822
}
821823

824+
async fn send_app_message_with_confirmation(
825+
&mut self,
826+
message: Outbound,
827+
) -> Result<SendOutcome, SendError> {
828+
if !self.state.is_connected() {
829+
return Err(SendError::Disconnected);
830+
}
831+
832+
match self.application.on_outbound_message(&message).await {
833+
OutboundDecision::Send => {
834+
let sequence_number = self.send_message_returning_seq(message).await?;
835+
Ok(SendOutcome::Sent { sequence_number })
836+
}
837+
OutboundDecision::Drop => {
838+
debug!("dropped outbound message as instructed by the application");
839+
Ok(SendOutcome::Dropped)
840+
}
841+
OutboundDecision::TerminateSession => {
842+
warn!("the application indicated we should terminate the session");
843+
self.state.disconnect_writer().await;
844+
Err(SendError::SessionTerminated)
845+
}
846+
}
847+
}
848+
849+
async fn send_message_returning_seq(
850+
&mut self,
851+
message: impl OutboundMessage,
852+
) -> Result<u64, SendError> {
853+
let seq_num = self.store.next_sender_seq_number();
854+
let msg_type = message.message_type().as_bytes().to_vec();
855+
let msg = generate_message(
856+
&self.config.begin_string,
857+
&self.config.sender_comp_id,
858+
&self.config.target_comp_id,
859+
seq_num,
860+
message,
861+
)
862+
.map_err(|e| {
863+
SendError::Persist(crate::store::StoreError::PersistMessage {
864+
sequence_number: seq_num,
865+
source: e.into(),
866+
})
867+
})?;
868+
869+
self.store
870+
.increment_sender_seq_number()
871+
.await
872+
.map_err(SendError::SequenceNumber)?;
873+
874+
self.store
875+
.add(seq_num, &msg)
876+
.await
877+
.map_err(SendError::Persist)?;
878+
879+
self.send_raw(&msg_type, msg).await;
880+
881+
Ok(seq_num)
882+
}
883+
822884
async fn send_message(&mut self, message: impl OutboundMessage) -> Result<()> {
823885
let seq_num = self.store.next_sender_seq_number();
824886
let msg_type = message.message_type().as_bytes().to_vec();
@@ -957,9 +1019,19 @@ where
9571019
}
9581020
}
9591021

960-
async fn handle_outbound_message(&mut self, message: Outbound) {
961-
if let Err(err) = self.send_app_message(message).await {
962-
error!(err = ?err, "failed to send app message: {err}");
1022+
async fn handle_outbound_message(&mut self, request: OutboundRequest<Outbound>) {
1023+
let OutboundRequest { message, confirm } = request;
1024+
match confirm {
1025+
Some(tx) => {
1026+
let result = self.send_app_message_with_confirmation(message).await;
1027+
// Ignore send errors - receiver may have been dropped
1028+
let _ = tx.send(result);
1029+
}
1030+
None => {
1031+
if let Err(err) = self.send_app_message(message).await {
1032+
error!(err = ?err, "failed to send app message: {err}");
1033+
}
1034+
}
9631035
}
9641036
}
9651037

@@ -1069,7 +1141,7 @@ where
10691141
async fn run_session<App, Inbound, Outbound, Store>(
10701142
mut session: Session<App, Inbound, Outbound, Store>,
10711143
mut event_receiver: mpsc::Receiver<SessionEvent>,
1072-
mut outbound_message_receiver: mpsc::Receiver<Outbound>,
1144+
mut outbound_message_receiver: mpsc::Receiver<OutboundRequest<Outbound>>,
10731145
mut admin_request_receiver: mpsc::Receiver<AdminRequest>,
10741146
) where
10751147
App: Application<Inbound, Outbound>,
@@ -1094,9 +1166,9 @@ async fn run_session<App, Inbound, Outbound, Store>(
10941166
None => break,
10951167
}
10961168
}
1097-
next_outbound_message = outbound_message_receiver.recv() => {
1098-
match next_outbound_message {
1099-
Some(message) => session.handle_outbound_message(message).await,
1169+
next_outbound_request = outbound_message_receiver.recv() => {
1170+
match next_outbound_request {
1171+
Some(request) => session.handle_outbound_message(request).await,
11001172
None => break,
11011173
}
11021174
}

crates/hotfix/src/session/error.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,31 @@ pub enum SessionError {
1111
}
1212

1313
pub type Result<T> = std::result::Result<T, SessionError>;
14+
15+
/// Outcome of a successful message send operation.
16+
#[derive(Debug, Clone, PartialEq, Eq)]
17+
pub enum SendOutcome {
18+
/// Message was persisted and sent with the given sequence number.
19+
Sent { sequence_number: u64 },
20+
/// Message was dropped by the application callback.
21+
Dropped,
22+
}
23+
24+
/// Error that can occur when sending a message.
25+
#[derive(Debug, Error)]
26+
pub enum SendError {
27+
#[error("session is disconnected")]
28+
Disconnected,
29+
30+
#[error("failed to persist message")]
31+
Persist(#[source] StoreError),
32+
33+
#[error("failed to update sequence number")]
34+
SequenceNumber(#[source] StoreError),
35+
36+
#[error("session terminated by application")]
37+
SessionTerminated,
38+
39+
#[error("confirmation channel closed")]
40+
ConfirmationLost,
41+
}

crates/hotfix/src/session/session_handle.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use crate::session::admin_request::AdminRequest;
2+
use crate::session::error::{SendError, SendOutcome};
3+
use crate::session::session_ref::OutboundRequest;
24
use crate::session::{InternalSessionRef, SessionInfo};
35
use anyhow::anyhow;
46
use tokio::sync::{mpsc, oneshot};
@@ -11,7 +13,7 @@ use tokio::sync::{mpsc, oneshot};
1113
/// and only exposes APIs intended for consumers of the engine.
1214
#[derive(Clone, Debug)]
1315
pub struct SessionHandle<Outbound> {
14-
outbound_message_sender: mpsc::Sender<Outbound>,
16+
outbound_message_sender: mpsc::Sender<OutboundRequest<Outbound>>,
1517
admin_request_sender: mpsc::Sender<AdminRequest>,
1618
}
1719

@@ -24,9 +26,49 @@ impl<Outbound> SessionHandle<Outbound> {
2426
Ok(receiver.await?)
2527
}
2628

29+
/// Sends a message and waits for confirmation that it was persisted.
30+
///
31+
/// Returns `SendOutcome::Sent` with the sequence number if the message was
32+
/// successfully persisted and sent, or `SendOutcome::Dropped` if the application
33+
/// callback chose to drop the message.
34+
pub async fn send(&self, msg: Outbound) -> Result<SendOutcome, SendError> {
35+
let (tx, rx) = oneshot::channel();
36+
let request = OutboundRequest {
37+
message: msg,
38+
confirm: Some(tx),
39+
};
40+
self.outbound_message_sender
41+
.send(request)
42+
.await
43+
.map_err(|_| SendError::Disconnected)?;
44+
45+
rx.await.map_err(|_| SendError::ConfirmationLost)?
46+
}
47+
48+
/// Sends a message without waiting for confirmation.
49+
///
50+
/// This is a fire-and-forget operation. The message will be queued for sending
51+
/// but no confirmation is provided about whether it was actually sent.
52+
pub async fn send_forget(&self, msg: Outbound) -> Result<(), SendError> {
53+
let request = OutboundRequest {
54+
message: msg,
55+
confirm: None,
56+
};
57+
self.outbound_message_sender
58+
.send(request)
59+
.await
60+
.map_err(|_| SendError::Disconnected)?;
61+
62+
Ok(())
63+
}
64+
65+
#[deprecated(since = "0.5.0", note = "use `send` or `send_forget` instead")]
2766
pub async fn send_message(&self, msg: Outbound) -> anyhow::Result<()> {
2867
self.outbound_message_sender
29-
.send(msg)
68+
.send(OutboundRequest {
69+
message: msg,
70+
confirm: None,
71+
})
3072
.await
3173
.map_err(|_| anyhow!("failed to send message"))?;
3274

crates/hotfix/src/session/session_ref.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,22 @@ use crate::config::SessionConfig;
77
use crate::message::{InboundMessage, OutboundMessage, RawFixMessage};
88
use crate::session::Session;
99
use crate::session::admin_request::AdminRequest;
10+
use crate::session::error::{SendError, SendOutcome};
1011
use crate::session::event::{AwaitingActiveSessionResponse, SessionEvent};
1112
use crate::store::MessageStore;
1213
use crate::transport::writer::WriterRef;
1314
use crate::{Application, session};
1415

16+
/// A request to send an outbound message, optionally with confirmation.
17+
pub(crate) struct OutboundRequest<M> {
18+
pub message: M,
19+
pub confirm: Option<oneshot::Sender<Result<SendOutcome, SendError>>>,
20+
}
21+
1522
#[derive(Clone)]
1623
pub struct InternalSessionRef<Outbound> {
1724
pub(crate) event_sender: mpsc::Sender<SessionEvent>,
18-
pub(crate) outbound_message_sender: mpsc::Sender<Outbound>,
25+
pub(crate) outbound_message_sender: mpsc::Sender<OutboundRequest<Outbound>>,
1926
pub(crate) admin_request_sender: mpsc::Sender<AdminRequest>,
2027
}
2128

@@ -26,7 +33,8 @@ impl<Outbound: OutboundMessage> InternalSessionRef<Outbound> {
2633
store: impl MessageStore + 'static,
2734
) -> Result<Self> {
2835
let (event_sender, event_receiver) = mpsc::channel::<SessionEvent>(100);
29-
let (outbound_message_sender, outbound_message_receiver) = mpsc::channel::<Outbound>(10);
36+
let (outbound_message_sender, outbound_message_receiver) =
37+
mpsc::channel::<OutboundRequest<Outbound>>(10);
3038
let (admin_request_sender, admin_request_receiver) = mpsc::channel::<AdminRequest>(10);
3139
let session = Session::new(config, application, store)?;
3240
tokio::spawn(session::run_session(

crates/hotfix/src/transport/socket/socket_reader.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ mod tests {
8787
use crate::message::Message;
8888
use crate::session::admin_request::AdminRequest;
8989
use crate::session::event::SessionEvent;
90+
use crate::session::session_ref::OutboundRequest;
9091
use tokio::io::{AsyncWriteExt, duplex};
9192
use tokio::sync::mpsc;
9293

@@ -107,7 +108,8 @@ mod tests {
107108
mpsc::Receiver<SessionEvent>,
108109
) {
109110
let (event_sender, event_receiver) = mpsc::channel::<SessionEvent>(100);
110-
let (outbound_message_sender, _outbound_receiver) = mpsc::channel::<TestMessage>(10);
111+
let (outbound_message_sender, _outbound_receiver) =
112+
mpsc::channel::<OutboundRequest<TestMessage>>(10);
111113
let (admin_request_sender, _admin_receiver) = mpsc::channel::<AdminRequest>(10);
112114

113115
let session_ref = InternalSessionRef {

crates/hotfix/tests/session_test_cases/common/actions.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::common::fakes::{FakeCounterparty, SessionSpy};
22
use crate::common::test_messages::TestMessage;
33
use hotfix::message::OutboundMessage;
4+
use hotfix::session::{SendError, SendOutcome};
45
use std::time::Duration;
56

67
pub struct When<T> {
@@ -19,10 +20,17 @@ impl When<&SessionSpy> {
1920
pub async fn sends_message(self, message: TestMessage) {
2021
self.target
2122
.session_handle()
22-
.send_message(message)
23+
.send(message)
2324
.await
2425
.expect("message to be sent successfully");
2526
}
27+
28+
pub async fn sends_message_with_confirmation(
29+
self,
30+
message: TestMessage,
31+
) -> Result<SendOutcome, SendError> {
32+
self.target.session_handle().send(message).await
33+
}
2634
}
2735

2836
impl When<&mut FakeCounterparty<TestMessage>> {

crates/hotfix/tests/session_test_cases/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ mod logout_tests;
88
mod received_gap_fill_tests;
99
mod received_reset_tests;
1010
mod resend_tests;
11+
mod send_confirmation_tests;

0 commit comments

Comments
 (0)