Skip to content

Commit fe14683

Browse files
committed
Make application take unparsed messages
1 parent a4ec943 commit fe14683

23 files changed

Lines changed: 161 additions & 212 deletions

File tree

crates/hotfix-message/src/message.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::parts::{Body, Header, Part, RepeatingGroup, Trailer};
1010
use crate::session_fields::{BEGIN_STRING, BODY_LENGTH, CHECK_SUM, MSG_TYPE};
1111
use hotfix_dictionary::{FieldLocation, IsFieldDefinition};
1212

13+
#[derive(Clone)]
1314
pub struct Message {
1415
pub(crate) header: Header,
1516
pub(crate) body: Body,

crates/hotfix-message/src/parts/body.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::field_map::FieldMap;
22
use crate::parts::Part;
33

4-
#[derive(Default)]
4+
#[derive(Clone, Default)]
55
pub struct Body {
66
pub(crate) fields: FieldMap,
77
}

crates/hotfix-message/src/parts/header.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::field_map::FieldMap;
44
use crate::parts::Part;
55
use crate::session_fields;
66

7-
#[derive(Default)]
7+
#[derive(Clone, Default)]
88
pub struct Header {
99
pub fields: FieldMap,
1010
}

crates/hotfix-message/src/parts/trailer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::parts::Part;
33
use crate::session_fields;
44
use hotfix_dictionary::IsFieldDefinition;
55

6-
#[derive(Default)]
6+
#[derive(Clone, Default)]
77
pub struct Trailer {
88
pub(crate) fields: FieldMap,
99
}

crates/hotfix/src/application.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1+
use hotfix_message::message::Message;
2+
13
#[async_trait::async_trait]
24
/// The application users of HotFIX can implement to hook into the engine.
3-
pub trait Application<Inbound, Outbound>: Send + Sync + 'static {
5+
pub trait Application<Outbound>: Send + Sync + 'static {
46
/// Called when a message is sent to the engine to be sent to the counterparty.
57
///
68
/// This is invoked before the raw message is persisted in the message store.
79
async fn on_outbound_message(&self, msg: &Outbound) -> OutboundDecision;
810
/// Called when a message is received from the counterparty.
911
///
10-
/// This is invoked after the message is verified and parsed into a typed message.
11-
async fn on_inbound_message(&self, msg: Inbound) -> InboundDecision;
12+
/// This is invoked after the message is verified by the session layer.
13+
async fn on_inbound_message(&self, msg: &Message) -> InboundDecision;
1214
/// Called when the session is logged out.
1315
async fn on_logout(&mut self, reason: &str);
1416
/// Called when the session is logged on.

crates/hotfix/src/initiator.rs

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tracing::{debug, warn};
1313

1414
use crate::application::Application;
1515
use crate::config::SessionConfig;
16-
use crate::message::{InboundMessage, OutboundMessage};
16+
use crate::message::OutboundMessage;
1717
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
1818
use crate::session::{InternalSessionRef, SessionHandle};
1919
use crate::store::MessageStore;
@@ -27,9 +27,9 @@ pub struct Initiator<Outbound> {
2727
}
2828

2929
impl<Outbound: OutboundMessage> Initiator<Outbound> {
30-
pub async fn start<Inbound: InboundMessage>(
30+
pub async fn start(
3131
config: SessionConfig,
32-
application: impl Application<Inbound, Outbound>,
32+
application: impl Application<Outbound>,
3333
store: impl MessageStore + 'static,
3434
) -> Result<Self, SessionCreationError> {
3535
let session_ref = InternalSessionRef::new(config.clone(), application, store)?;
@@ -157,10 +157,10 @@ async fn establish_connection<Outbound: OutboundMessage>(
157157
mod tests {
158158
use super::*;
159159
use crate::application::{Application, InboundDecision, OutboundDecision};
160+
use crate::message::generate_message;
160161
use crate::message::logon::{Logon, ResetSeqNumConfig};
161162
use crate::message::logout::Logout;
162163
use crate::message::parser::Parser;
163-
use crate::message::{InboundMessage, generate_message};
164164
use crate::store::in_memory::InMemoryMessageStore;
165165
use hotfix_message::Part;
166166
use hotfix_message::message::Message;
@@ -180,21 +180,15 @@ mod tests {
180180
}
181181
}
182182

183-
impl InboundMessage for DummyMessage {
184-
fn parse(_message: &Message) -> Self {
185-
DummyMessage
186-
}
187-
}
188-
189183
// No-op application
190184
struct NoOpApp;
191185

192186
#[async_trait::async_trait]
193-
impl Application<DummyMessage, DummyMessage> for NoOpApp {
187+
impl Application<DummyMessage> for NoOpApp {
194188
async fn on_outbound_message(&self, _msg: &DummyMessage) -> OutboundDecision {
195189
OutboundDecision::Send
196190
}
197-
async fn on_inbound_message(&self, _msg: DummyMessage) -> InboundDecision {
191+
async fn on_inbound_message(&self, _msg: &Message) -> InboundDecision {
198192
InboundDecision::Accept
199193
}
200194
async fn on_logout(&mut self, _reason: &str) {}
@@ -366,13 +360,10 @@ mod tests {
366360
let mut config = create_test_config("127.0.0.1", port);
367361
config.reconnect_interval = 1; // Short interval for test
368362

369-
let _initiator = Initiator::<DummyMessage>::start::<DummyMessage>(
370-
config,
371-
NoOpApp,
372-
InMemoryMessageStore::default(),
373-
)
374-
.await
375-
.unwrap();
363+
let _initiator =
364+
Initiator::<DummyMessage>::start(config, NoOpApp, InMemoryMessageStore::default())
365+
.await
366+
.unwrap();
376367

377368
// Accept first connection
378369
let (conn1, _) = tokio::time::timeout(Duration::from_secs(2), listener.accept())

crates/hotfix/src/message.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ pub trait OutboundMessage: Clone + Send + 'static {
2525
fn message_type(&self) -> &str;
2626
}
2727

28-
pub trait InboundMessage: Clone + Send + 'static {
29-
fn parse(message: &Message) -> Self;
30-
}
31-
3228
pub fn generate_message(
3329
begin_string: &str,
3430
sender_comp_id: &str,

crates/hotfix/src/message/heartbeat.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::message::{InboundMessage, OutboundMessage};
1+
use crate::message::OutboundMessage;
22
use hotfix_message::Part;
33
use hotfix_message::message::Message;
44
use hotfix_message::session_fields::TEST_REQ_ID;
@@ -27,10 +27,3 @@ impl OutboundMessage for Heartbeat {
2727
"0"
2828
}
2929
}
30-
31-
impl InboundMessage for Heartbeat {
32-
fn parse(_message: &Message) -> Self {
33-
// TODO: this needs to be implemented properly when we're implementing Test Requests
34-
Heartbeat { test_req_id: None }
35-
}
36-
}

crates/hotfix/src/message/reject.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::message::{InboundMessage, OutboundMessage};
1+
use crate::message::OutboundMessage;
22
use hotfix_message::Part;
33
use hotfix_message::message::Message;
44
use hotfix_message::session_fields::{
@@ -50,6 +50,20 @@ impl Reject {
5050
self.text = Some(text.to_string());
5151
self
5252
}
53+
54+
#[cfg(test)]
55+
fn parse(message: &Message) -> Self {
56+
Self {
57+
#[allow(clippy::expect_used)]
58+
ref_seq_num: message
59+
.get(REF_SEQ_NUM)
60+
.expect("ref_seq_num should be present"),
61+
ref_tag_id: message.get(REF_TAG_ID).ok(),
62+
ref_msg_type: message.get(REF_MSG_TYPE).ok(),
63+
session_reject_reason: message.get(SESSION_REJECT_REASON).ok(),
64+
text: message.get::<&str>(TEXT).ok().map(|s| s.to_string()),
65+
}
66+
}
5367
}
5468

5569
impl OutboundMessage for Reject {
@@ -75,22 +89,6 @@ impl OutboundMessage for Reject {
7589
}
7690
}
7791

78-
impl InboundMessage for Reject {
79-
fn parse(message: &Message) -> Self {
80-
Self {
81-
// TODO: how do we handle errors in parsing messages?
82-
#[allow(clippy::expect_used)]
83-
ref_seq_num: message
84-
.get(REF_SEQ_NUM)
85-
.expect("ref_seq_num should be present"),
86-
ref_tag_id: message.get(REF_TAG_ID).ok(),
87-
ref_msg_type: message.get(REF_MSG_TYPE).ok(),
88-
session_reject_reason: message.get(SESSION_REJECT_REASON).ok(),
89-
text: message.get::<&str>(TEXT).ok().map(|s| s.to_string()),
90-
}
91-
}
92-
}
93-
9492
#[cfg(test)]
9593
mod tests {
9694
use super::*;

crates/hotfix/src/session.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::Application;
2020
use crate::application::{InboundDecision, OutboundDecision};
2121
use crate::config::SessionConfig;
2222
use crate::message::OutboundMessage;
23+
use crate::message::generate_message;
2324
use crate::message::heartbeat::Heartbeat;
2425
use crate::message::logon::{Logon, ResetSeqNumConfig};
2526
use crate::message::logout::Logout;
@@ -30,7 +31,6 @@ use crate::message::sequence_reset::SequenceReset;
3031
use crate::message::test_request::TestRequest;
3132
use crate::message::verification::verify_message;
3233
use crate::message::verification_error::{CompIdType, MessageVerificationError};
33-
use crate::message::{InboundMessage, generate_message};
3434
use crate::message_utils::{is_admin, prepare_message_for_resend};
3535
use crate::session::admin_request::AdminRequest;
3636
use crate::session::error::SessionCreationError;
@@ -57,7 +57,7 @@ use hotfix_message::session_fields::{
5757

5858
const SCHEDULE_CHECK_INTERVAL: u64 = 1;
5959

60-
struct Session<A, I, O, S> {
60+
struct Session<A, O, S> {
6161
message_config: MessageConfig,
6262
config: SessionConfig,
6363
schedule: SessionSchedule,
@@ -67,21 +67,20 @@ struct Session<A, I, O, S> {
6767
store: S,
6868
schedule_check_timer: Pin<Box<Sleep>>,
6969
reset_on_next_logon: bool,
70-
_phantom: std::marker::PhantomData<fn() -> (I, O)>,
70+
_phantom: std::marker::PhantomData<fn() -> O>,
7171
}
7272

73-
impl<App, Inbound, Outbound, Store> Session<App, Inbound, Outbound, Store>
73+
impl<App, Outbound, Store> Session<App, Outbound, Store>
7474
where
75-
App: Application<Inbound, Outbound>,
76-
Inbound: InboundMessage,
75+
App: Application<Outbound>,
7776
Outbound: OutboundMessage,
7877
Store: MessageStore,
7978
{
8079
fn new(
8180
config: SessionConfig,
8281
application: App,
8382
store: Store,
84-
) -> Result<Session<App, Inbound, Outbound, Store>, SessionCreationError> {
83+
) -> Result<Session<App, Outbound, Store>, SessionCreationError> {
8584
let schedule_check_timer = sleep(Duration::from_secs(SCHEDULE_CHECK_INTERVAL));
8685

8786
let dictionary = Self::get_data_dictionary(&config)?;
@@ -245,9 +244,8 @@ where
245244
) -> Result<(), SessionOperationError> {
246245
match self.verify_message(message, true) {
247246
Ok(_) => {
248-
let parsed_message = Inbound::parse(message);
249247
if matches!(
250-
self.application.on_inbound_message(parsed_message).await,
248+
self.application.on_inbound_message(message).await,
251249
InboundDecision::TerminateSession
252250
) {
253251
error!("failed to send inbound message to application");
@@ -1131,14 +1129,13 @@ fn get_msg_seq_num(message: &Message) -> u64 {
11311129
.expect("MsgSeqNum missing from validated message - parser bug")
11321130
}
11331131

1134-
async fn run_session<App, Inbound, Outbound, Store>(
1135-
mut session: Session<App, Inbound, Outbound, Store>,
1132+
async fn run_session<App, Outbound, Store>(
1133+
mut session: Session<App, Outbound, Store>,
11361134
mut event_receiver: mpsc::Receiver<SessionEvent>,
11371135
mut outbound_message_receiver: mpsc::Receiver<OutboundRequest<Outbound>>,
11381136
mut admin_request_receiver: mpsc::Receiver<AdminRequest>,
11391137
) where
1140-
App: Application<Inbound, Outbound>,
1141-
Inbound: InboundMessage,
1138+
App: Application<Outbound>,
11421139
Outbound: OutboundMessage,
11431140
Store: MessageStore + Send + 'static,
11441141
{
@@ -1196,7 +1193,7 @@ async fn run_session<App, Inbound, Outbound, Store>(
11961193
mod tests {
11971194
use super::*;
11981195
use crate::application::{InboundDecision, OutboundDecision};
1199-
use crate::message::{InboundMessage, OutboundMessage};
1196+
use crate::message::OutboundMessage;
12001197
use crate::store::{Result as StoreResult, StoreError};
12011198
use chrono::{DateTime, Datelike, NaiveDate, NaiveTime, TimeDelta, Timelike};
12021199
use chrono_tz::Tz;
@@ -1293,21 +1290,15 @@ mod tests {
12931290
}
12941291
}
12951292

1296-
impl InboundMessage for DummyMessage {
1297-
fn parse(_message: &Message) -> Self {
1298-
DummyMessage
1299-
}
1300-
}
1301-
13021293
/// Minimal no-op application for testing
13031294
struct NoOpApp;
13041295

13051296
#[async_trait::async_trait]
1306-
impl Application<DummyMessage, DummyMessage> for NoOpApp {
1297+
impl Application<DummyMessage> for NoOpApp {
13071298
async fn on_outbound_message(&self, _: &DummyMessage) -> OutboundDecision {
13081299
OutboundDecision::Send
13091300
}
1310-
async fn on_inbound_message(&self, _: DummyMessage) -> InboundDecision {
1301+
async fn on_inbound_message(&self, _: &Message) -> InboundDecision {
13111302
InboundDecision::Accept
13121303
}
13131304
async fn on_logout(&mut self, _: &str) {}
@@ -1341,7 +1332,7 @@ mod tests {
13411332
schedule: SessionSchedule,
13421333
state: SessionState,
13431334
store: TestStore,
1344-
) -> Session<NoOpApp, DummyMessage, DummyMessage, TestStore> {
1335+
) -> Session<NoOpApp, DummyMessage, TestStore> {
13451336
let config = create_test_config();
13461337
let message_config = MessageConfig::default();
13471338
let dictionary = Dictionary::fix44();

0 commit comments

Comments
 (0)