Skip to content

Commit f7c0ddb

Browse files
authored
chore: add test case for processing correct duplicate message (#235)
* Introduce SessionSpy for smarter session assertions * Rename mock counterparty and mock application to fake counterparty and application * Support assertions for messages received by the session * Add test case for message received again with dup flag
1 parent 34fce5f commit f7c0ddb

16 files changed

Lines changed: 309 additions & 194 deletions

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ reaching full support of FIX 4.4 and 5.0 workflows as soon as possible.
3434
- [x] Session-layer supporting the core flows, such as logins, resends, etc.
3535
- [x] Built-in message stores
3636
- [x] in-memory
37+
- [x] file-system
3738
- [x] [mongodb](https://www.mongodb.com/docs/drivers/rust/current/)
3839
- [x] [redb](https://www.redb.org/)
3940
- [x] Code-generation for FIX fields from XML specifications

crates/hotfix/tests/common/actions.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::common::mock_counterparty::MockCounterparty;
1+
use crate::common::fakes::{FakeCounterparty, SessionSpy};
22
use crate::common::test_messages::TestMessage;
33
use hotfix::message::FixMessage;
44
use hotfix::session::SessionRef;
@@ -12,19 +12,23 @@ pub fn when<T>(target: T) -> When<T> {
1212
When { target }
1313
}
1414

15-
impl When<&SessionRef<TestMessage>> {
15+
impl When<&SessionSpy> {
16+
fn session(&self) -> &SessionRef<TestMessage> {
17+
self.target.session_ref()
18+
}
19+
1620
pub async fn requests_disconnect(self) {
17-
self.target
21+
self.session()
1822
.disconnect("Test Session Finished".to_string())
1923
.await;
2024
}
2125

2226
pub async fn sends_message(self, message: TestMessage) {
23-
self.target.send_message(message).await;
27+
self.session().send_message(message).await;
2428
}
2529
}
2630

27-
impl When<&mut MockCounterparty<TestMessage>> {
31+
impl When<&mut FakeCounterparty<TestMessage>> {
2832
pub async fn has_previously_sent(&mut self, message: impl FixMessage) {
2933
self.target.push_previously_sent_message(message).await;
3034
}

crates/hotfix/tests/common/assertions.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::common::mock_counterparty::MockCounterparty;
1+
use crate::common::fakes::{FakeCounterparty, SessionSpy};
22
use crate::common::test_messages::TestMessage;
33
use hotfix::session::SessionRef;
44
use hotfix::session::Status;
@@ -17,19 +17,32 @@ pub fn then<T>(target: T) -> Then<T> {
1717
Then { target }
1818
}
1919

20-
impl Then<&SessionRef<TestMessage>> {
20+
impl Then<&mut SessionSpy> {
21+
fn session(&self) -> &SessionRef<TestMessage> {
22+
self.target.session_ref()
23+
}
24+
25+
pub async fn receives<F>(self, assertion: F)
26+
where
27+
F: FnOnce(&TestMessage),
28+
{
29+
self.target
30+
.assert_next_with_timeout(assertion, DEFAULT_TIMEOUT)
31+
.await;
32+
}
33+
2134
pub async fn target_sequence_number_reaches(self, expected_target_sequence_number: u64) {
2235
let timeout = DEFAULT_TIMEOUT;
2336
let deadline = tokio::time::Instant::now() + timeout;
2437
let retry_interval = Duration::from_millis(1);
2538

26-
let mut session_info = self.target.get_session_info().await;
39+
let mut session_info = self.session().get_session_info().await;
2740
while tokio::time::Instant::now() < deadline {
2841
if session_info.next_target_seq_number - 1 == expected_target_sequence_number {
2942
return;
3043
}
3144
tokio::time::sleep(retry_interval).await;
32-
session_info = self.target.get_session_info().await;
45+
session_info = self.session().get_session_info().await;
3346
}
3447

3548
let actual_target_seq_number = session_info.next_target_seq_number - 1;
@@ -47,13 +60,13 @@ impl Then<&SessionRef<TestMessage>> {
4760
let deadline = tokio::time::Instant::now() + timeout;
4861
let retry_interval = Duration::from_millis(1);
4962

50-
let mut session_info = self.target.get_session_info().await;
63+
let mut session_info = self.session().get_session_info().await;
5164
while tokio::time::Instant::now() < deadline {
5265
if session_info.status == expected_status {
5366
return;
5467
}
5568
tokio::time::sleep(retry_interval).await;
56-
session_info = self.target.get_session_info().await;
69+
session_info = self.session().get_session_info().await;
5770
}
5871

5972
let actual_status = session_info.status;
@@ -63,7 +76,7 @@ impl Then<&SessionRef<TestMessage>> {
6376
}
6477
}
6578

66-
impl Then<&mut MockCounterparty<TestMessage>> {
79+
impl Then<&mut FakeCounterparty<TestMessage>> {
6780
pub async fn receives<F>(self, assertion: F)
6881
where
6982
F: FnOnce(&Message),
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use crate::common::test_messages::TestMessage;
2+
use hotfix::Application;
3+
use hotfix::application::{InboundDecision, OutboundDecision};
4+
5+
pub struct FakeApplication {
6+
message_sender: tokio::sync::mpsc::UnboundedSender<TestMessage>,
7+
}
8+
9+
impl FakeApplication {
10+
pub fn new(message_sender: tokio::sync::mpsc::UnboundedSender<TestMessage>) -> Self {
11+
Self { message_sender }
12+
}
13+
}
14+
15+
#[async_trait::async_trait]
16+
impl Application<TestMessage> for FakeApplication {
17+
async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision {
18+
OutboundDecision::Send
19+
}
20+
21+
async fn on_inbound_message(&self, msg: TestMessage) -> InboundDecision {
22+
self.message_sender.send(msg).unwrap();
23+
InboundDecision::Accept
24+
}
25+
26+
async fn on_logout(&mut self, _reason: &str) {}
27+
28+
async fn on_logon(&mut self) {}
29+
}

crates/hotfix/tests/common/mock_counterparty.rs renamed to crates/hotfix/tests/common/fakes/fake_counterparty.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::time::Duration;
1414
use tokio::sync::mpsc::Receiver;
1515
use tokio::sync::{mpsc, oneshot};
1616

17-
pub struct MockCounterparty<M> {
17+
pub struct FakeCounterparty<M> {
1818
receiver: Receiver<WriterMessage>,
1919
received_messages: Vec<Message>,
2020
sent_messages: Vec<Vec<u8>>,
@@ -26,7 +26,7 @@ pub struct MockCounterparty<M> {
2626
_dc_sender: oneshot::Sender<()>,
2727
}
2828

29-
impl<M> MockCounterparty<M>
29+
impl<M> FakeCounterparty<M>
3030
where
3131
M: FixMessage,
3232
{
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mod fake_application;
2+
mod fake_counterparty;
3+
mod session_spy;
4+
5+
pub use fake_application::FakeApplication;
6+
pub use fake_counterparty::FakeCounterparty;
7+
pub use session_spy::SessionSpy;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use crate::common::test_messages::TestMessage;
2+
use hotfix::session::SessionRef;
3+
4+
pub struct SessionSpy {
5+
session: SessionRef<TestMessage>,
6+
message_receiver: tokio::sync::mpsc::UnboundedReceiver<TestMessage>,
7+
}
8+
9+
impl SessionSpy {
10+
pub fn new(
11+
session: SessionRef<TestMessage>,
12+
message_receiver: tokio::sync::mpsc::UnboundedReceiver<TestMessage>,
13+
) -> Self {
14+
Self {
15+
session,
16+
message_receiver,
17+
}
18+
}
19+
20+
pub fn session_ref(&self) -> &SessionRef<TestMessage> {
21+
&self.session
22+
}
23+
24+
pub async fn assert_next_with_timeout<F>(&mut self, assertion: F, timeout: std::time::Duration)
25+
where
26+
F: FnOnce(&TestMessage),
27+
{
28+
match tokio::time::timeout(timeout, self.message_receiver.recv()).await {
29+
Ok(Some(message)) => {
30+
assertion(&message);
31+
}
32+
Ok(None) => {
33+
panic!("disconnected before receiving any message");
34+
}
35+
Err(_) => {
36+
panic!("timeout expired before receiving any message");
37+
}
38+
}
39+
}
40+
}

crates/hotfix/tests/common/mock_application.rs

Lines changed: 0 additions & 20 deletions
This file was deleted.

crates/hotfix/tests/common/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
pub mod actions;
22
pub mod assertions;
3-
pub mod mock_application;
4-
pub mod mock_counterparty;
3+
pub mod fakes;
54
pub mod setup;
65
pub mod test_messages;

crates/hotfix/tests/common/setup.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::common::actions::when;
22
use crate::common::assertions::then;
3-
use crate::common::mock_application::MockApplication;
4-
use crate::common::mock_counterparty::MockCounterparty;
3+
use crate::common::fakes::{FakeApplication, FakeCounterparty, SessionSpy};
54
use crate::common::test_messages::TestMessage;
65
use hotfix::config::SessionConfig;
76
use hotfix::session::SessionRef;
@@ -16,32 +15,34 @@ pub const LOGON_TIMEOUT: u64 = 10;
1615
pub const COUNTERPARTY_COMP_ID: &str = "dummy-acceptor";
1716
pub const OUR_COMP_ID: &str = "dummy-initiator";
1817

19-
pub async fn given_a_connected_session() -> (SessionRef<TestMessage>, MockCounterparty<TestMessage>)
20-
{
18+
pub async fn given_a_connected_session() -> (SessionSpy, FakeCounterparty<TestMessage>) {
2119
let message_store = InMemoryMessageStore::default();
2220
given_a_connected_session_with_store(message_store).await
2321
}
2422

2523
pub async fn given_a_connected_session_with_store(
2624
message_store: InMemoryMessageStore,
27-
) -> (SessionRef<TestMessage>, MockCounterparty<TestMessage>) {
25+
) -> (SessionSpy, FakeCounterparty<TestMessage>) {
2826
let config = create_session_config();
2927
let counterparty_config = create_counterparty_session_config(config.clone());
3028

31-
let session = SessionRef::new(config, MockApplication {}, message_store);
32-
let mock_counterparty = MockCounterparty::start(session.clone(), counterparty_config).await;
29+
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
30+
let session = SessionRef::new(config, FakeApplication::new(message_tx), message_store);
3331

34-
(session, mock_counterparty)
32+
let session_spy = SessionSpy::new(session.clone(), message_rx);
33+
let mock_counterparty = FakeCounterparty::start(session.clone(), counterparty_config).await;
34+
35+
(session_spy, mock_counterparty)
3536
}
3637

37-
pub async fn given_an_active_session() -> (SessionRef<TestMessage>, MockCounterparty<TestMessage>) {
38-
let (session, mut mock_counterparty) = given_a_connected_session().await;
38+
pub async fn given_an_active_session() -> (SessionSpy, FakeCounterparty<TestMessage>) {
39+
let (mut session, mut mock_counterparty) = given_a_connected_session().await;
3940

4041
then(&mut mock_counterparty)
4142
.receives(|msg| assert_eq!(msg.header().get::<&str>(MSG_TYPE).unwrap(), "A"))
4243
.await;
4344
when(&mut mock_counterparty).sends_logon().await;
44-
then(&session).status_changes_to(Status::Active).await;
45+
then(&mut session).status_changes_to(Status::Active).await;
4546

4647
(session, mock_counterparty)
4748
}

0 commit comments

Comments
 (0)