Skip to content

Commit c0732d9

Browse files
committed
Add test case for business rejects coming from the application
1 parent cbdf260 commit c0732d9

6 files changed

Lines changed: 173 additions & 33 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ use crate::Application;
2020
use crate::application::{InboundDecision, OutboundDecision};
2121
use crate::config::SessionConfig;
2222
use crate::message::OutboundMessage;
23+
use crate::message::business_reject::BusinessReject;
2324
use crate::message::generate_message;
2425
use crate::message::heartbeat::Heartbeat;
2526
use crate::message::logon::{Logon, ResetSeqNumConfig};
2627
use crate::message::logout::Logout;
2728
use crate::message::parser::RawFixMessage;
28-
use crate::message::business_reject::BusinessReject;
2929
use crate::message::reject::Reject;
3030
use crate::message::resend_request::ResendRequest;
3131
use crate::message::sequence_reset::SequenceReset;
@@ -249,9 +249,8 @@ where
249249
.header()
250250
.get(MSG_TYPE)
251251
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?;
252-
let mut reject =
253-
BusinessReject::new(msg_type, reason)
254-
.ref_seq_num(get_msg_seq_num(message));
252+
let mut reject = BusinessReject::new(msg_type, reason)
253+
.ref_seq_num(get_msg_seq_num(message));
255254
if let Some(text) = text {
256255
reject = reject.text(&text);
257256
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use crate::common::actions::when;
2+
use crate::common::assertions::then;
3+
use crate::common::cleanup::finally;
4+
use crate::common::fakes::FakeApplication;
5+
use crate::common::setup::given_an_active_session_with_app;
6+
use crate::common::test_messages::TestMessage;
7+
use hotfix::application::{BusinessRejectReason, InboundDecision};
8+
use hotfix_message::Part;
9+
use hotfix_message::fix44::{MSG_TYPE, REF_MSG_TYPE, REF_SEQ_NUM, TEXT};
10+
11+
/// Tests that when the application returns InboundDecision::Reject,
12+
/// the session sends a Business Message Reject (MsgType "j") back to the counterparty.
13+
#[tokio::test]
14+
async fn test_inbound_reject_sends_business_message_reject() {
15+
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
16+
let app = FakeApplication::builder(message_tx)
17+
.inbound_decision_fn(|_| InboundDecision::Reject {
18+
reason: BusinessRejectReason::NotAuthorized,
19+
text: Some("Not authorized for this message".to_string()),
20+
})
21+
.build();
22+
let (session, mut counterparty) = given_an_active_session_with_app(app, message_rx).await;
23+
24+
// counterparty sends an execution report
25+
when(&mut counterparty)
26+
.sends_message(TestMessage::dummy_execution_report())
27+
.await;
28+
29+
// session should respond with a Business Message Reject (MsgType "j")
30+
then(&mut counterparty)
31+
.receives(|msg| {
32+
let msg_type: &str = msg.header().get(MSG_TYPE).unwrap();
33+
assert_eq!(msg_type, "j");
34+
35+
// RefMsgType should be the original message type ("8" for ExecutionReport)
36+
let ref_msg_type: &str = msg.get(REF_MSG_TYPE).unwrap();
37+
assert_eq!(ref_msg_type, "8");
38+
39+
// BusinessRejectReason (tag 380) should be 6 (NotAuthorized)
40+
let reason: u32 = msg.get(BUSINESS_REJECT_REASON).unwrap();
41+
assert_eq!(reason, 6);
42+
43+
// RefSeqNum should be the sequence number of the rejected message
44+
let ref_seq_num: u64 = msg.get(REF_SEQ_NUM).unwrap();
45+
assert!(ref_seq_num > 0);
46+
47+
// Text should contain our reject reason
48+
let text: &str = msg.get(TEXT).unwrap();
49+
assert_eq!(text, "Not authorized for this message");
50+
})
51+
.await;
52+
53+
finally(&session, &mut counterparty).disconnect().await;
54+
}
55+
56+
/// Tests that when the application returns InboundDecision::Reject without text,
57+
/// the Business Message Reject is sent without the Text field.
58+
#[tokio::test]
59+
async fn test_inbound_reject_without_text() {
60+
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
61+
let app = FakeApplication::builder(message_tx)
62+
.inbound_decision_fn(|_| InboundDecision::Reject {
63+
reason: BusinessRejectReason::UnsupportedMessageType,
64+
text: None,
65+
})
66+
.build();
67+
let (session, mut counterparty) = given_an_active_session_with_app(app, message_rx).await;
68+
69+
when(&mut counterparty)
70+
.sends_message(TestMessage::dummy_execution_report())
71+
.await;
72+
73+
then(&mut counterparty)
74+
.receives(|msg| {
75+
let msg_type: &str = msg.header().get(MSG_TYPE).unwrap();
76+
assert_eq!(msg_type, "j");
77+
78+
let reason: u32 = msg.get(BUSINESS_REJECT_REASON).unwrap();
79+
assert_eq!(reason, 3);
80+
81+
// Text field should not be present
82+
assert!(msg.get::<&str>(TEXT).is_err());
83+
})
84+
.await;
85+
86+
finally(&session, &mut counterparty).disconnect().await;
87+
}
88+
89+
/// Field definition for BusinessRejectReason (tag 380), used for assertions only.
90+
const BUSINESS_REJECT_REASON: &hotfix_message::HardCodedFixFieldDefinition =
91+
&hotfix_message::HardCodedFixFieldDefinition {
92+
name: "BusinessRejectReason",
93+
tag: 380,
94+
data_type: hotfix_message::dict::FixDatatype::Int,
95+
location: hotfix_message::dict::FieldLocation::Body,
96+
};

crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,57 @@ use crate::common::test_messages::TestMessage;
22
use hotfix::Application;
33
use hotfix::application::{InboundDecision, OutboundDecision};
44
use hotfix_message::message::Message;
5+
use std::sync::Mutex;
6+
7+
type OutboundDecisionFn = Box<dyn Fn(&TestMessage) -> OutboundDecision + Send>;
8+
type InboundDecisionFn = Box<dyn Fn(&Message) -> InboundDecision + Send>;
59

610
pub struct FakeApplication {
711
message_sender: tokio::sync::mpsc::UnboundedSender<Message>,
8-
outbound_decision: OutboundDecision,
12+
outbound_decision_fn: Mutex<OutboundDecisionFn>,
13+
inbound_decision_fn: Mutex<InboundDecisionFn>,
914
}
1015

1116
impl FakeApplication {
12-
pub fn new(message_sender: tokio::sync::mpsc::UnboundedSender<Message>) -> Self {
13-
Self {
17+
pub fn builder(
18+
message_sender: tokio::sync::mpsc::UnboundedSender<Message>,
19+
) -> FakeApplicationBuilder {
20+
FakeApplicationBuilder {
1421
message_sender,
15-
outbound_decision: OutboundDecision::Send,
22+
outbound_decision_fn: Box::new(|_| OutboundDecision::Send),
23+
inbound_decision_fn: Box::new(|_| InboundDecision::Accept),
1624
}
1725
}
26+
}
1827

19-
pub fn with_outbound_decision(
20-
message_sender: tokio::sync::mpsc::UnboundedSender<Message>,
21-
decision: OutboundDecision,
28+
pub struct FakeApplicationBuilder {
29+
message_sender: tokio::sync::mpsc::UnboundedSender<Message>,
30+
outbound_decision_fn: OutboundDecisionFn,
31+
inbound_decision_fn: InboundDecisionFn,
32+
}
33+
34+
impl FakeApplicationBuilder {
35+
pub fn outbound_decision_fn(
36+
mut self,
37+
f: impl Fn(&TestMessage) -> OutboundDecision + Send + 'static,
2238
) -> Self {
23-
Self {
24-
message_sender,
25-
outbound_decision: decision,
39+
self.outbound_decision_fn = Box::new(f);
40+
self
41+
}
42+
43+
pub fn inbound_decision_fn(
44+
mut self,
45+
f: impl Fn(&Message) -> InboundDecision + Send + 'static,
46+
) -> Self {
47+
self.inbound_decision_fn = Box::new(f);
48+
self
49+
}
50+
51+
pub fn build(self) -> FakeApplication {
52+
FakeApplication {
53+
message_sender: self.message_sender,
54+
outbound_decision_fn: Mutex::new(self.outbound_decision_fn),
55+
inbound_decision_fn: Mutex::new(self.inbound_decision_fn),
2656
}
2757
}
2858
}
@@ -31,13 +61,15 @@ impl FakeApplication {
3161
impl Application for FakeApplication {
3262
type Outbound = TestMessage;
3363

34-
async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision {
35-
self.outbound_decision
64+
async fn on_outbound_message(&self, msg: &TestMessage) -> OutboundDecision {
65+
let decision_fn = self.outbound_decision_fn.lock().unwrap();
66+
(decision_fn)(msg)
3667
}
3768

3869
async fn on_inbound_message(&self, msg: &Message) -> InboundDecision {
3970
self.message_sender.send(msg.clone()).unwrap();
40-
InboundDecision::Accept
71+
let decision_fn = self.inbound_decision_fn.lock().unwrap();
72+
(decision_fn)(msg)
4173
}
4274

4375
async fn on_logout(&mut self, _reason: &str) {}

crates/hotfix/tests/session_test_cases/common/setup.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ use crate::common::assertions::then;
33
use crate::common::fakes::{FakeApplication, FakeCounterparty, SessionSpy};
44
use crate::common::test_messages::TestMessage;
55
use crate::session_test_cases::common::fakes::DisconnectedSession;
6-
use hotfix::application::OutboundDecision;
76
use hotfix::config::SessionConfig;
87
use hotfix::session::InternalSessionRef;
98
use hotfix::session::Status;
109
use hotfix::store::in_memory::InMemoryMessageStore;
1110
use hotfix_message::Part;
1211
use hotfix_message::fix44::MSG_TYPE;
12+
use hotfix_message::message::Message;
1313

1414
pub const HEARTBEAT_INTERVAL: u64 = 30;
1515
pub const LOGON_TIMEOUT: u64 = 10;
@@ -30,8 +30,12 @@ pub async fn given_a_connected_session_with_store(
3030
let counterparty_config = create_counterparty_session_config(config.clone());
3131

3232
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
33-
let session = InternalSessionRef::new(config, FakeApplication::new(message_tx), message_store)
34-
.expect("session to be created successfully");
33+
let session = InternalSessionRef::new(
34+
config,
35+
FakeApplication::builder(message_tx).build(),
36+
message_store,
37+
)
38+
.expect("session to be created successfully");
3539

3640
let session_spy = SessionSpy::new(session.clone().into(), message_rx);
3741
let mock_counterparty = FakeCounterparty::start(session.clone(), counterparty_config)
@@ -42,15 +46,14 @@ pub async fn given_a_connected_session_with_store(
4246
}
4347

4448
/// Creates an active session with a configurable application.
45-
pub async fn given_an_active_session_with_outbound_decision(
46-
decision: OutboundDecision,
49+
pub async fn given_an_active_session_with_app(
50+
app: FakeApplication,
51+
message_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
4752
) -> (SessionSpy, FakeCounterparty<TestMessage>) {
4853
let config = create_session_config();
4954
let counterparty_config = create_counterparty_session_config(config.clone());
5055
let message_store = InMemoryMessageStore::default();
5156

52-
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
53-
let app = FakeApplication::with_outbound_decision(message_tx, decision);
5457
let session = InternalSessionRef::new(config, app, message_store)
5558
.expect("session to be created successfully");
5659

@@ -77,9 +80,12 @@ pub fn given_a_disconnected_session() -> DisconnectedSession {
7780
let message_store = InMemoryMessageStore::default();
7881

7982
let (message_tx, _message_rx) = tokio::sync::mpsc::unbounded_channel();
80-
let session_ref =
81-
InternalSessionRef::new(config, FakeApplication::new(message_tx), message_store)
82-
.expect("session to be created successfully");
83+
let session_ref = InternalSessionRef::new(
84+
config,
85+
FakeApplication::builder(message_tx).build(),
86+
message_store,
87+
)
88+
.expect("session to be created successfully");
8389

8490
let session_handle = session_ref.clone().into();
8591

crates/hotfix/tests/session_test_cases/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod admin_request_tests;
2+
mod business_reject_tests;
23
mod business_tests;
34
pub(crate) mod common;
45
mod heartbeat_tests;

crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::common::actions::when;
22
use crate::common::assertions::then;
33
use crate::common::cleanup::finally;
4+
use crate::common::fakes::FakeApplication;
45
use crate::common::setup::{
5-
given_a_disconnected_session, given_an_active_session,
6-
given_an_active_session_with_outbound_decision,
6+
given_a_disconnected_session, given_an_active_session, given_an_active_session_with_app,
77
};
88
use crate::common::test_messages::TestMessage;
99
use hotfix::application::OutboundDecision;
@@ -129,8 +129,11 @@ async fn test_send_returns_disconnected_when_not_connected() {
129129
#[tokio::test]
130130
async fn test_send_returns_dropped_when_app_drops_message() {
131131
// Create an active session with an application configured to drop messages
132-
let (session, mut counterparty) =
133-
given_an_active_session_with_outbound_decision(OutboundDecision::Drop).await;
132+
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
133+
let app = FakeApplication::builder(message_tx)
134+
.outbound_decision_fn(|_| OutboundDecision::Drop)
135+
.build();
136+
let (session, mut counterparty) = given_an_active_session_with_app(app, message_rx).await;
134137

135138
// Send a message - should be dropped by the application
136139
let result = when(&session)
@@ -153,8 +156,11 @@ async fn test_send_returns_dropped_when_app_drops_message() {
153156
#[tokio::test]
154157
async fn test_send_returns_session_terminated_when_app_terminates() {
155158
// Create an active session with an application configured to terminate session
156-
let (session, mut counterparty) =
157-
given_an_active_session_with_outbound_decision(OutboundDecision::TerminateSession).await;
159+
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
160+
let app = FakeApplication::builder(message_tx)
161+
.outbound_decision_fn(|_| OutboundDecision::TerminateSession)
162+
.build();
163+
let (session, mut counterparty) = given_an_active_session_with_app(app, message_rx).await;
158164

159165
// Send a message - should cause session termination
160166
let result = when(&session)

0 commit comments

Comments
 (0)