Skip to content

Commit c7eecd0

Browse files
authored
feat: allow errors in message parsing and support business reject outcomes (#304)
* Fix flaky message verification test by addressing race condition in original sending time * Remove unnecessary fully qualified paths for Result * Make application take unparsed messages * Convert outbound message type to associated type * Add reject as inbound decision variant * Add test case for business rejects coming from the application * Remove dead code
1 parent 8c31640 commit c7eecd0

28 files changed

Lines changed: 601 additions & 255 deletions

crates/hotfix-message/src/message.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::parts::{Body, Header, Part, RepeatingGroup, Trailer};
1010
use crate::session_fields::{BEGIN_STRING, BODY_LENGTH, CHECK_SUM, MSG_TYPE};
1111
use hotfix_dictionary::{FieldLocation, IsFieldDefinition};
1212

13+
#[derive(Clone)]
1314
pub struct Message {
1415
pub(crate) header: Header,
1516
pub(crate) body: Body,

crates/hotfix-message/src/parts/body.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::field_map::FieldMap;
22
use crate::parts::Part;
33

4-
#[derive(Default)]
4+
#[derive(Clone, Default)]
55
pub struct Body {
66
pub(crate) fields: FieldMap,
77
}

crates/hotfix-message/src/parts/header.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::field_map::FieldMap;
44
use crate::parts::Part;
55
use crate::session_fields;
66

7-
#[derive(Default)]
7+
#[derive(Clone, Default)]
88
pub struct Header {
99
pub fields: FieldMap,
1010
}

crates/hotfix-message/src/parts/trailer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::parts::Part;
33
use crate::session_fields;
44
use hotfix_dictionary::IsFieldDefinition;
55

6-
#[derive(Default)]
6+
#[derive(Clone, Default)]
77
pub struct Trailer {
88
pub(crate) fields: FieldMap,
99
}

crates/hotfix/src/application.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,45 @@
1+
use crate::message::OutboundMessage;
2+
use hotfix_message::message::Message;
3+
14
#[async_trait::async_trait]
25
/// The application users of HotFIX can implement to hook into the engine.
3-
pub trait Application<Inbound, Outbound>: Send + Sync + 'static {
6+
pub trait Application: Send + Sync + 'static {
7+
type Outbound: OutboundMessage;
8+
49
/// Called when a message is sent to the engine to be sent to the counterparty.
510
///
611
/// This is invoked before the raw message is persisted in the message store.
7-
async fn on_outbound_message(&self, msg: &Outbound) -> OutboundDecision;
12+
async fn on_outbound_message(&self, msg: &Self::Outbound) -> OutboundDecision;
813
/// Called when a message is received from the counterparty.
914
///
10-
/// This is invoked after the message is verified and parsed into a typed message.
11-
async fn on_inbound_message(&self, msg: Inbound) -> InboundDecision;
15+
/// This is invoked after the message is verified by the session layer.
16+
async fn on_inbound_message(&self, msg: &Message) -> InboundDecision;
1217
/// Called when the session is logged out.
1318
async fn on_logout(&mut self, reason: &str);
1419
/// Called when the session is logged on.
1520
async fn on_logon(&mut self);
1621
}
1722

23+
/// Standard FIX Business Reject Reason values (tag 380).
24+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
25+
#[repr(u32)]
26+
pub enum BusinessRejectReason {
27+
Other = 0,
28+
UnknownId = 1,
29+
UnknownSecurity = 2,
30+
UnsupportedMessageType = 3,
31+
ApplicationNotAvailable = 4,
32+
ConditionallyRequiredFieldMissing = 5,
33+
NotAuthorized = 6,
34+
DeliverToFirmNotAvailable = 7,
35+
}
36+
1837
pub enum InboundDecision {
1938
Accept,
39+
Reject {
40+
reason: BusinessRejectReason,
41+
text: Option<String>,
42+
},
2043
TerminateSession,
2144
}
2245

crates/hotfix/src/initiator.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tracing::{debug, warn};
1313

1414
use crate::application::Application;
1515
use crate::config::SessionConfig;
16-
use crate::message::{InboundMessage, OutboundMessage};
16+
use crate::message::OutboundMessage;
1717
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
1818
use crate::session::{InternalSessionRef, SessionHandle};
1919
use crate::store::MessageStore;
@@ -27,9 +27,9 @@ pub struct Initiator<Outbound> {
2727
}
2828

2929
impl<Outbound: OutboundMessage> Initiator<Outbound> {
30-
pub async fn start<Inbound: InboundMessage>(
30+
pub async fn start(
3131
config: SessionConfig,
32-
application: impl Application<Inbound, Outbound>,
32+
application: impl Application<Outbound = Outbound>,
3333
store: impl MessageStore + 'static,
3434
) -> Result<Self, SessionCreationError> {
3535
let session_ref = InternalSessionRef::new(config.clone(), application, store)?;
@@ -157,10 +157,10 @@ async fn establish_connection<Outbound: OutboundMessage>(
157157
mod tests {
158158
use super::*;
159159
use crate::application::{Application, InboundDecision, OutboundDecision};
160+
use crate::message::generate_message;
160161
use crate::message::logon::{Logon, ResetSeqNumConfig};
161162
use crate::message::logout::Logout;
162163
use crate::message::parser::Parser;
163-
use crate::message::{InboundMessage, generate_message};
164164
use crate::store::in_memory::InMemoryMessageStore;
165165
use hotfix_message::Part;
166166
use hotfix_message::message::Message;
@@ -180,21 +180,17 @@ mod tests {
180180
}
181181
}
182182

183-
impl InboundMessage for DummyMessage {
184-
fn parse(_message: &Message) -> Self {
185-
DummyMessage
186-
}
187-
}
188-
189183
// No-op application
190184
struct NoOpApp;
191185

192186
#[async_trait::async_trait]
193-
impl Application<DummyMessage, DummyMessage> for NoOpApp {
187+
impl Application for NoOpApp {
188+
type Outbound = DummyMessage;
189+
194190
async fn on_outbound_message(&self, _msg: &DummyMessage) -> OutboundDecision {
195191
OutboundDecision::Send
196192
}
197-
async fn on_inbound_message(&self, _msg: DummyMessage) -> InboundDecision {
193+
async fn on_inbound_message(&self, _msg: &Message) -> InboundDecision {
198194
InboundDecision::Accept
199195
}
200196
async fn on_logout(&mut self, _reason: &str) {}
@@ -366,13 +362,10 @@ mod tests {
366362
let mut config = create_test_config("127.0.0.1", port);
367363
config.reconnect_interval = 1; // Short interval for test
368364

369-
let _initiator = Initiator::<DummyMessage>::start::<DummyMessage>(
370-
config,
371-
NoOpApp,
372-
InMemoryMessageStore::default(),
373-
)
374-
.await
375-
.unwrap();
365+
let _initiator =
366+
Initiator::<DummyMessage>::start(config, NoOpApp, InMemoryMessageStore::default())
367+
.await
368+
.unwrap();
376369

377370
// Accept first connection
378371
let (conn1, _) = tokio::time::timeout(Duration::from_secs(2), listener.accept())

crates/hotfix/src/message.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub(crate) use hotfix_message::message::{Config, Message};
55
use hotfix_message::session_fields::{MSG_SEQ_NUM, SENDER_COMP_ID, SENDING_TIME, TARGET_COMP_ID};
66
pub use hotfix_message::{Part, RepeatingGroup};
77

8+
pub mod business_reject;
89
pub mod heartbeat;
910
pub mod logon;
1011
pub mod logout;
@@ -25,10 +26,6 @@ pub trait OutboundMessage: Clone + Send + 'static {
2526
fn message_type(&self) -> &str;
2627
}
2728

28-
pub trait InboundMessage: Clone + Send + 'static {
29-
fn parse(message: &Message) -> Self;
30-
}
31-
3229
pub fn generate_message(
3330
begin_string: &str,
3431
sender_comp_id: &str,
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
use crate::application::BusinessRejectReason;
2+
use crate::message::OutboundMessage;
3+
use hotfix_message::dict::{FieldLocation, FixDatatype};
4+
use hotfix_message::message::Message;
5+
use hotfix_message::session_fields::{REF_MSG_TYPE, REF_SEQ_NUM, TEXT};
6+
use hotfix_message::{Buffer, FieldType, HardCodedFixFieldDefinition, Part};
7+
8+
const BUSINESS_REJECT_REASON: &HardCodedFixFieldDefinition = &HardCodedFixFieldDefinition {
9+
name: "BusinessRejectReason",
10+
tag: 380,
11+
data_type: FixDatatype::Int,
12+
location: FieldLocation::Body,
13+
};
14+
15+
impl<'a> FieldType<'a> for BusinessRejectReason {
16+
type Error = ();
17+
type SerializeSettings = ();
18+
19+
fn serialize_with<B>(&self, buffer: &mut B, _settings: Self::SerializeSettings) -> usize
20+
where
21+
B: Buffer,
22+
{
23+
let value = *self as u32;
24+
value.serialize(buffer)
25+
}
26+
27+
fn deserialize(data: &'a [u8]) -> Result<Self, Self::Error> {
28+
let value = u32::deserialize(data).map_err(|_| ())?;
29+
match value {
30+
0 => Ok(Self::Other),
31+
1 => Ok(Self::UnknownId),
32+
2 => Ok(Self::UnknownSecurity),
33+
3 => Ok(Self::UnsupportedMessageType),
34+
4 => Ok(Self::ApplicationNotAvailable),
35+
5 => Ok(Self::ConditionallyRequiredFieldMissing),
36+
6 => Ok(Self::NotAuthorized),
37+
7 => Ok(Self::DeliverToFirmNotAvailable),
38+
_ => Err(()),
39+
}
40+
}
41+
}
42+
43+
#[derive(Clone, Debug)]
44+
pub(crate) struct BusinessReject {
45+
ref_msg_type: String,
46+
reason: BusinessRejectReason,
47+
ref_seq_num: Option<u64>,
48+
text: Option<String>,
49+
}
50+
51+
impl BusinessReject {
52+
pub(crate) fn new(ref_msg_type: &str, reason: BusinessRejectReason) -> Self {
53+
Self {
54+
ref_msg_type: ref_msg_type.to_string(),
55+
reason,
56+
ref_seq_num: None,
57+
text: None,
58+
}
59+
}
60+
61+
pub(crate) fn ref_seq_num(mut self, ref_seq_num: u64) -> Self {
62+
self.ref_seq_num = Some(ref_seq_num);
63+
self
64+
}
65+
66+
pub(crate) fn text(mut self, text: &str) -> Self {
67+
self.text = Some(text.to_string());
68+
self
69+
}
70+
71+
#[cfg(test)]
72+
fn parse(message: &Message) -> Self {
73+
Self {
74+
#[allow(clippy::expect_used)]
75+
ref_msg_type: message
76+
.get::<&str>(REF_MSG_TYPE)
77+
.expect("ref_msg_type should be present")
78+
.to_string(),
79+
#[allow(clippy::expect_used)]
80+
reason: message
81+
.get(BUSINESS_REJECT_REASON)
82+
.expect("reason should be present"),
83+
ref_seq_num: message.get(REF_SEQ_NUM).ok(),
84+
text: message.get::<&str>(TEXT).ok().map(|s| s.to_string()),
85+
}
86+
}
87+
}
88+
89+
impl OutboundMessage for BusinessReject {
90+
fn write(&self, msg: &mut Message) {
91+
msg.set(REF_MSG_TYPE, self.ref_msg_type.as_str());
92+
msg.set(BUSINESS_REJECT_REASON, self.reason);
93+
94+
if let Some(ref_seq_num) = self.ref_seq_num {
95+
msg.set(REF_SEQ_NUM, ref_seq_num);
96+
}
97+
if let Some(text) = &self.text {
98+
msg.set(TEXT, text.as_str());
99+
}
100+
}
101+
102+
fn message_type(&self) -> &str {
103+
"j"
104+
}
105+
}
106+
107+
#[cfg(test)]
108+
mod tests {
109+
use super::*;
110+
use hotfix_message::message::Message;
111+
112+
#[test]
113+
fn test_write_business_reject_with_required_fields_only() {
114+
let reject = BusinessReject::new("D", BusinessRejectReason::UnsupportedMessageType);
115+
116+
let mut msg = Message::new("FIX.4.4", "j");
117+
reject.write(&mut msg);
118+
119+
assert_eq!(msg.get::<&str>(REF_MSG_TYPE).unwrap(), "D");
120+
assert_eq!(
121+
msg.get::<BusinessRejectReason>(BUSINESS_REJECT_REASON)
122+
.unwrap(),
123+
BusinessRejectReason::UnsupportedMessageType
124+
);
125+
assert!(msg.get::<u64>(REF_SEQ_NUM).is_err());
126+
assert!(msg.get::<&str>(TEXT).is_err());
127+
}
128+
129+
#[test]
130+
fn test_write_business_reject_with_all_fields() {
131+
let reject = BusinessReject::new("8", BusinessRejectReason::NotAuthorized)
132+
.ref_seq_num(456)
133+
.text("Not authorized for execution reports");
134+
135+
let mut msg = Message::new("FIX.4.4", "j");
136+
reject.write(&mut msg);
137+
138+
assert_eq!(msg.get::<&str>(REF_MSG_TYPE).unwrap(), "8");
139+
assert_eq!(
140+
msg.get::<BusinessRejectReason>(BUSINESS_REJECT_REASON)
141+
.unwrap(),
142+
BusinessRejectReason::NotAuthorized
143+
);
144+
assert_eq!(msg.get::<u64>(REF_SEQ_NUM).unwrap(), 456);
145+
assert_eq!(
146+
msg.get::<&str>(TEXT).unwrap(),
147+
"Not authorized for execution reports"
148+
);
149+
}
150+
151+
#[test]
152+
fn test_round_trip_serialization() {
153+
let original =
154+
BusinessReject::new("D", BusinessRejectReason::ConditionallyRequiredFieldMissing)
155+
.ref_seq_num(789)
156+
.text("ClOrdID is required");
157+
158+
let mut msg = Message::new("FIX.4.4", "j");
159+
original.write(&mut msg);
160+
161+
let parsed = BusinessReject::parse(&msg);
162+
163+
assert_eq!(parsed.ref_msg_type, original.ref_msg_type);
164+
assert_eq!(parsed.reason, original.reason);
165+
assert_eq!(parsed.ref_seq_num, original.ref_seq_num);
166+
assert_eq!(parsed.text, original.text);
167+
}
168+
169+
#[test]
170+
fn test_round_trip_with_minimal_fields() {
171+
let original = BusinessReject::new("0", BusinessRejectReason::Other);
172+
173+
let mut msg = Message::new("FIX.4.4", "j");
174+
original.write(&mut msg);
175+
176+
let parsed = BusinessReject::parse(&msg);
177+
178+
assert_eq!(parsed.ref_msg_type, original.ref_msg_type);
179+
assert_eq!(parsed.reason, original.reason);
180+
assert_eq!(parsed.ref_seq_num, original.ref_seq_num);
181+
assert_eq!(parsed.text, original.text);
182+
}
183+
184+
#[test]
185+
fn test_message_type() {
186+
let reject = BusinessReject::new("D", BusinessRejectReason::Other);
187+
assert_eq!(reject.message_type(), "j");
188+
}
189+
190+
#[test]
191+
fn test_all_reject_reasons_round_trip() {
192+
let reasons = [
193+
BusinessRejectReason::Other,
194+
BusinessRejectReason::UnknownId,
195+
BusinessRejectReason::UnknownSecurity,
196+
BusinessRejectReason::UnsupportedMessageType,
197+
BusinessRejectReason::ApplicationNotAvailable,
198+
BusinessRejectReason::ConditionallyRequiredFieldMissing,
199+
BusinessRejectReason::NotAuthorized,
200+
BusinessRejectReason::DeliverToFirmNotAvailable,
201+
];
202+
203+
for reason in reasons {
204+
let reject = BusinessReject::new("D", reason);
205+
let mut msg = Message::new("FIX.4.4", "j");
206+
reject.write(&mut msg);
207+
208+
let parsed = BusinessReject::parse(&msg);
209+
assert_eq!(parsed.reason, reason, "Round-trip failed for {reason:?}");
210+
}
211+
}
212+
}

0 commit comments

Comments
 (0)