Skip to content

Commit 876d71a

Browse files
committed
Add tests for edge cases
1 parent a28d02b commit 876d71a

5 files changed

Lines changed: 272 additions & 7 deletions

File tree

crates/hotfix/tests/session_test_cases/common/actions.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ impl When<&SessionSpy> {
3131
) -> Result<SendOutcome, SendError> {
3232
self.target.session_handle().send(message).await
3333
}
34+
35+
pub async fn sends_message_without_confirmation(
36+
self,
37+
message: TestMessage,
38+
) -> Result<(), SendError> {
39+
self.target.session_handle().send_forget(message).await
40+
}
3441
}
3542

3643
impl When<&mut FakeCounterparty<TestMessage>> {

crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,99 @@
11
use crate::common::test_messages::TestMessage;
22
use hotfix::Application;
33
use hotfix::application::{InboundDecision, OutboundDecision};
4+
use std::sync::Arc;
5+
use std::sync::atomic::{AtomicU8, Ordering};
6+
7+
/// Represents the decision to make for outbound messages.
8+
/// Uses u8 for atomic storage.
9+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
10+
#[repr(u8)]
11+
pub enum OutboundDecisionKind {
12+
Send = 0,
13+
Drop = 1,
14+
TerminateSession = 2,
15+
}
16+
17+
impl From<OutboundDecision> for OutboundDecisionKind {
18+
fn from(decision: OutboundDecision) -> Self {
19+
match decision {
20+
OutboundDecision::Send => OutboundDecisionKind::Send,
21+
OutboundDecision::Drop => OutboundDecisionKind::Drop,
22+
OutboundDecision::TerminateSession => OutboundDecisionKind::TerminateSession,
23+
}
24+
}
25+
}
26+
27+
impl From<OutboundDecisionKind> for OutboundDecision {
28+
fn from(kind: OutboundDecisionKind) -> Self {
29+
match kind {
30+
OutboundDecisionKind::Send => OutboundDecision::Send,
31+
OutboundDecisionKind::Drop => OutboundDecision::Drop,
32+
OutboundDecisionKind::TerminateSession => OutboundDecision::TerminateSession,
33+
}
34+
}
35+
}
36+
37+
/// A handle to change the outbound decision at runtime.
38+
#[derive(Clone)]
39+
pub struct OutboundDecisionHandle {
40+
inner: Arc<AtomicU8>,
41+
}
42+
43+
impl OutboundDecisionHandle {
44+
fn new(decision: OutboundDecisionKind) -> Self {
45+
Self {
46+
inner: Arc::new(AtomicU8::new(decision as u8)),
47+
}
48+
}
49+
50+
#[allow(dead_code)]
51+
pub fn set(&self, decision: OutboundDecisionKind) {
52+
self.inner.store(decision as u8, Ordering::SeqCst);
53+
}
54+
55+
fn get(&self) -> OutboundDecisionKind {
56+
match self.inner.load(Ordering::SeqCst) {
57+
0 => OutboundDecisionKind::Send,
58+
1 => OutboundDecisionKind::Drop,
59+
2 => OutboundDecisionKind::TerminateSession,
60+
_ => OutboundDecisionKind::Send,
61+
}
62+
}
63+
}
464

565
pub struct FakeApplication {
666
message_sender: tokio::sync::mpsc::UnboundedSender<TestMessage>,
67+
outbound_decision: OutboundDecisionHandle,
768
}
869

970
impl FakeApplication {
1071
pub fn new(message_sender: tokio::sync::mpsc::UnboundedSender<TestMessage>) -> Self {
11-
Self { message_sender }
72+
Self {
73+
message_sender,
74+
outbound_decision: OutboundDecisionHandle::new(OutboundDecisionKind::Send),
75+
}
76+
}
77+
78+
pub fn with_outbound_decision(
79+
message_sender: tokio::sync::mpsc::UnboundedSender<TestMessage>,
80+
decision: OutboundDecision,
81+
) -> Self {
82+
Self {
83+
message_sender,
84+
outbound_decision: OutboundDecisionHandle::new(decision.into()),
85+
}
86+
}
87+
88+
pub fn outbound_decision_handle(&self) -> OutboundDecisionHandle {
89+
self.outbound_decision.clone()
1290
}
1391
}
1492

1593
#[async_trait::async_trait]
1694
impl Application<TestMessage, TestMessage> for FakeApplication {
1795
async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision {
18-
OutboundDecision::Send
96+
self.outbound_decision.get().into()
1997
}
2098

2199
async fn on_inbound_message(&self, msg: TestMessage) -> InboundDecision {

crates/hotfix/tests/session_test_cases/common/fakes/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ mod fake_application;
22
mod fake_counterparty;
33
mod session_spy;
44

5-
pub use fake_application::FakeApplication;
5+
pub use fake_application::{FakeApplication, OutboundDecisionHandle};
66
pub use fake_counterparty::FakeCounterparty;
77
pub use session_spy::SessionSpy;

crates/hotfix/tests/session_test_cases/common/setup.rs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use crate::common::actions::when;
22
use crate::common::assertions::then;
3-
use crate::common::fakes::{FakeApplication, FakeCounterparty, SessionSpy};
3+
use crate::common::fakes::{FakeApplication, FakeCounterparty, OutboundDecisionHandle, SessionSpy};
44
use crate::common::test_messages::TestMessage;
5+
use hotfix::application::OutboundDecision;
56
use hotfix::config::SessionConfig;
6-
use hotfix::session::InternalSessionRef;
77
use hotfix::session::Status;
8+
use hotfix::session::{InternalSessionRef, SessionHandle};
89
use hotfix::store::in_memory::InMemoryMessageStore;
910
use hotfix_message::Part;
1011
use hotfix_message::fix44::MSG_TYPE;
@@ -39,6 +40,87 @@ pub async fn given_a_connected_session_with_store(
3940
(session_spy, mock_counterparty)
4041
}
4142

43+
/// Creates a connected session with a configurable application.
44+
/// Returns the session spy, counterparty, and a handle to change the outbound decision at runtime.
45+
pub async fn given_a_connected_session_with_configurable_app(
46+
initial_decision: OutboundDecision,
47+
) -> (
48+
SessionSpy,
49+
FakeCounterparty<TestMessage>,
50+
OutboundDecisionHandle,
51+
) {
52+
let config = create_session_config();
53+
let counterparty_config = create_counterparty_session_config(config.clone());
54+
let message_store = InMemoryMessageStore::default();
55+
56+
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
57+
let app = FakeApplication::with_outbound_decision(message_tx, initial_decision);
58+
let decision_handle = app.outbound_decision_handle();
59+
let session = InternalSessionRef::new(config, app, message_store)
60+
.expect("session to be created successfully");
61+
62+
let session_spy = SessionSpy::new(session.clone().into(), message_rx);
63+
let mock_counterparty = FakeCounterparty::start(session.clone(), counterparty_config)
64+
.await
65+
.expect("failed to start FakeCounterparty");
66+
67+
(session_spy, mock_counterparty, decision_handle)
68+
}
69+
70+
/// Creates an active session with a configurable application.
71+
/// Returns the session spy, counterparty, and a handle to change the outbound decision at runtime.
72+
pub async fn given_an_active_session_with_configurable_app(
73+
initial_decision: OutboundDecision,
74+
) -> (
75+
SessionSpy,
76+
FakeCounterparty<TestMessage>,
77+
OutboundDecisionHandle,
78+
) {
79+
let (mut session, mut mock_counterparty, decision_handle) =
80+
given_a_connected_session_with_configurable_app(initial_decision).await;
81+
82+
then(&mut mock_counterparty)
83+
.receives(|msg| assert_eq!(msg.header().get::<&str>(MSG_TYPE).unwrap(), "A"))
84+
.await;
85+
when(&mut mock_counterparty).sends_logon().await;
86+
then(&mut session).status_changes_to(Status::Active).await;
87+
88+
(session, mock_counterparty, decision_handle)
89+
}
90+
91+
/// A disconnected session - has no transport connection established.
92+
/// Holds onto the InternalSessionRef to keep channels alive for the session task.
93+
pub struct DisconnectedSession {
94+
#[allow(dead_code)]
95+
session_ref: InternalSessionRef<TestMessage>,
96+
session_handle: SessionHandle<TestMessage>,
97+
}
98+
99+
impl DisconnectedSession {
100+
pub fn session_handle(&self) -> &SessionHandle<TestMessage> {
101+
&self.session_handle
102+
}
103+
}
104+
105+
/// Creates a session that has not yet established a transport connection.
106+
/// This is useful for testing the Disconnected error case.
107+
pub fn given_a_disconnected_session() -> DisconnectedSession {
108+
let config = create_session_config();
109+
let message_store = InMemoryMessageStore::default();
110+
111+
let (message_tx, _message_rx) = tokio::sync::mpsc::unbounded_channel();
112+
let session_ref =
113+
InternalSessionRef::new(config, FakeApplication::new(message_tx), message_store)
114+
.expect("session to be created successfully");
115+
116+
let session_handle = session_ref.clone().into();
117+
118+
DisconnectedSession {
119+
session_ref,
120+
session_handle,
121+
}
122+
}
123+
42124
pub async fn given_an_active_session() -> (SessionSpy, FakeCounterparty<TestMessage>) {
43125
let (mut session, mut mock_counterparty) = given_a_connected_session().await;
44126

crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
use crate::common::actions::when;
22
use crate::common::assertions::then;
33
use crate::common::cleanup::finally;
4-
use crate::common::setup::given_an_active_session;
4+
use crate::common::setup::{
5+
given_a_disconnected_session, given_an_active_session,
6+
given_an_active_session_with_configurable_app,
7+
};
58
use crate::common::test_messages::TestMessage;
9+
use hotfix::application::OutboundDecision;
610
use hotfix::message::{InboundMessage, OutboundMessage};
7-
use hotfix::session::SendOutcome;
11+
use hotfix::session::{SendError, SendOutcome};
812

913
#[tokio::test]
1014
async fn test_send_returns_sequence_number() {
@@ -78,3 +82,97 @@ async fn test_send_multiple_messages_returns_sequential_sequence_numbers() {
7882

7983
finally(&session, &mut counterparty).disconnect().await;
8084
}
85+
86+
#[tokio::test]
87+
async fn test_send_forget_queues_message() {
88+
let (session, mut counterparty) = given_an_active_session().await;
89+
90+
// Send a message using send_forget (no confirmation)
91+
when(&session)
92+
.sends_message_without_confirmation(TestMessage::dummy_new_order_single())
93+
.await
94+
.expect("message should be queued successfully");
95+
96+
// Verify counterparty received the message
97+
then(&mut counterparty)
98+
.receives(|msg| {
99+
let parsed = TestMessage::parse(msg);
100+
assert_eq!(parsed.message_type(), "D");
101+
})
102+
.await;
103+
104+
finally(&session, &mut counterparty).disconnect().await;
105+
}
106+
107+
#[tokio::test]
108+
async fn test_send_returns_disconnected_when_not_connected() {
109+
// Create a session without establishing a transport connection
110+
let disconnected_session = given_a_disconnected_session();
111+
112+
// Try to send a message before any connection is established
113+
let result = disconnected_session
114+
.session_handle()
115+
.send(TestMessage::dummy_new_order_single())
116+
.await;
117+
118+
// Should return Disconnected error since no transport connection exists
119+
match result {
120+
Err(SendError::Disconnected) => {
121+
// Expected - session has no transport connection
122+
}
123+
other => {
124+
panic!("Expected SendError::Disconnected, got {:?}", other);
125+
}
126+
}
127+
}
128+
129+
#[tokio::test]
130+
async fn test_send_returns_dropped_when_app_drops_message() {
131+
// Create an active session with an application configured to drop messages
132+
let (session, mut counterparty, _decision_handle) =
133+
given_an_active_session_with_configurable_app(OutboundDecision::Drop).await;
134+
135+
// Send a message - should be dropped by the application
136+
let result = when(&session)
137+
.sends_message_with_confirmation(TestMessage::dummy_new_order_single())
138+
.await;
139+
140+
// Verify we get SendOutcome::Dropped
141+
match result {
142+
Ok(SendOutcome::Dropped) => {
143+
// Expected - application chose to drop the message
144+
}
145+
other => {
146+
panic!("Expected SendOutcome::Dropped, got {:?}", other);
147+
}
148+
}
149+
150+
finally(&session, &mut counterparty).disconnect().await;
151+
}
152+
153+
#[tokio::test]
154+
async fn test_send_returns_session_terminated_when_app_terminates() {
155+
// Create an active session with an application configured to terminate session
156+
let (session, mut counterparty, _decision_handle) =
157+
given_an_active_session_with_configurable_app(OutboundDecision::TerminateSession).await;
158+
159+
// Send a message - should cause session termination
160+
let result = when(&session)
161+
.sends_message_with_confirmation(TestMessage::dummy_new_order_single())
162+
.await;
163+
164+
// Verify we get SendError::SessionTerminated
165+
match result {
166+
Err(SendError::SessionTerminated) => {
167+
// Expected - application chose to terminate the session
168+
}
169+
other => {
170+
panic!("Expected SendError::SessionTerminated, got {:?}", other);
171+
}
172+
}
173+
174+
// Session is terminated, so we just need to wait for disconnect
175+
counterparty
176+
.assert_disconnected_with_timeout(std::time::Duration::from_secs(5))
177+
.await;
178+
}

0 commit comments

Comments
 (0)