Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ reaching full support of FIX 4.4 and 5.0 workflows as soon as possible.
- [x] Session-layer supporting the core flows, such as logins, resends, etc.
- [x] Built-in message stores
- [x] in-memory
- [x] file-system
- [x] [mongodb](https://www.mongodb.com/docs/drivers/rust/current/)
- [x] [redb](https://www.redb.org/)
- [x] Code-generation for FIX fields from XML specifications
Expand Down
14 changes: 9 additions & 5 deletions crates/hotfix/tests/common/actions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::mock_counterparty::MockCounterparty;
use crate::common::fakes::{FakeCounterparty, SessionSpy};
use crate::common::test_messages::TestMessage;
use hotfix::message::FixMessage;
use hotfix::session::SessionRef;
Expand All @@ -12,19 +12,23 @@ pub fn when<T>(target: T) -> When<T> {
When { target }
}

impl When<&SessionRef<TestMessage>> {
impl When<&SessionSpy> {
fn session(&self) -> &SessionRef<TestMessage> {
self.target.session_ref()
}

pub async fn requests_disconnect(self) {
self.target
self.session()
.disconnect("Test Session Finished".to_string())
.await;
}

pub async fn sends_message(self, message: TestMessage) {
self.target.send_message(message).await;
self.session().send_message(message).await;
}
}

impl When<&mut MockCounterparty<TestMessage>> {
impl When<&mut FakeCounterparty<TestMessage>> {
pub async fn has_previously_sent(&mut self, message: impl FixMessage) {
self.target.push_previously_sent_message(message).await;
}
Expand Down
27 changes: 20 additions & 7 deletions crates/hotfix/tests/common/assertions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::mock_counterparty::MockCounterparty;
use crate::common::fakes::{FakeCounterparty, SessionSpy};
use crate::common::test_messages::TestMessage;
use hotfix::session::SessionRef;
use hotfix::session::Status;
Expand All @@ -17,19 +17,32 @@ pub fn then<T>(target: T) -> Then<T> {
Then { target }
}

impl Then<&SessionRef<TestMessage>> {
impl Then<&mut SessionSpy> {
fn session(&self) -> &SessionRef<TestMessage> {
self.target.session_ref()
}

pub async fn receives<F>(self, assertion: F)
where
F: FnOnce(&TestMessage),
{
self.target
.assert_next_with_timeout(assertion, DEFAULT_TIMEOUT)
.await;
}

pub async fn target_sequence_number_reaches(self, expected_target_sequence_number: u64) {
let timeout = DEFAULT_TIMEOUT;
let deadline = tokio::time::Instant::now() + timeout;
let retry_interval = Duration::from_millis(1);

let mut session_info = self.target.get_session_info().await;
let mut session_info = self.session().get_session_info().await;
while tokio::time::Instant::now() < deadline {
if session_info.next_target_seq_number - 1 == expected_target_sequence_number {
return;
}
tokio::time::sleep(retry_interval).await;
session_info = self.target.get_session_info().await;
session_info = self.session().get_session_info().await;
}

let actual_target_seq_number = session_info.next_target_seq_number - 1;
Expand All @@ -47,13 +60,13 @@ impl Then<&SessionRef<TestMessage>> {
let deadline = tokio::time::Instant::now() + timeout;
let retry_interval = Duration::from_millis(1);

let mut session_info = self.target.get_session_info().await;
let mut session_info = self.session().get_session_info().await;
while tokio::time::Instant::now() < deadline {
if session_info.status == expected_status {
return;
}
tokio::time::sleep(retry_interval).await;
session_info = self.target.get_session_info().await;
session_info = self.session().get_session_info().await;
}

let actual_status = session_info.status;
Expand All @@ -63,7 +76,7 @@ impl Then<&SessionRef<TestMessage>> {
}
}

impl Then<&mut MockCounterparty<TestMessage>> {
impl Then<&mut FakeCounterparty<TestMessage>> {
pub async fn receives<F>(self, assertion: F)
where
F: FnOnce(&Message),
Expand Down
29 changes: 29 additions & 0 deletions crates/hotfix/tests/common/fakes/fake_application.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::common::test_messages::TestMessage;
use hotfix::Application;
use hotfix::application::{InboundDecision, OutboundDecision};

pub struct FakeApplication {
message_sender: tokio::sync::mpsc::UnboundedSender<TestMessage>,
}

impl FakeApplication {
pub fn new(message_sender: tokio::sync::mpsc::UnboundedSender<TestMessage>) -> Self {
Self { message_sender }
}
}

#[async_trait::async_trait]
impl Application<TestMessage> for FakeApplication {
async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision {
OutboundDecision::Send
}

async fn on_inbound_message(&self, msg: TestMessage) -> InboundDecision {
self.message_sender.send(msg).unwrap();
InboundDecision::Accept
}

async fn on_logout(&mut self, _reason: &str) {}

async fn on_logon(&mut self) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};

pub struct MockCounterparty<M> {
pub struct FakeCounterparty<M> {
receiver: Receiver<WriterMessage>,
received_messages: Vec<Message>,
sent_messages: Vec<Vec<u8>>,
Expand All @@ -26,7 +26,7 @@ pub struct MockCounterparty<M> {
_dc_sender: oneshot::Sender<()>,
}

impl<M> MockCounterparty<M>
impl<M> FakeCounterparty<M>
where
M: FixMessage,
{
Expand Down
7 changes: 7 additions & 0 deletions crates/hotfix/tests/common/fakes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod fake_application;
mod fake_counterparty;
mod session_spy;

pub use fake_application::FakeApplication;
pub use fake_counterparty::FakeCounterparty;
pub use session_spy::SessionSpy;
40 changes: 40 additions & 0 deletions crates/hotfix/tests/common/fakes/session_spy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::common::test_messages::TestMessage;
use hotfix::session::SessionRef;

pub struct SessionSpy {
session: SessionRef<TestMessage>,
message_receiver: tokio::sync::mpsc::UnboundedReceiver<TestMessage>,
}

impl SessionSpy {
pub fn new(
session: SessionRef<TestMessage>,
message_receiver: tokio::sync::mpsc::UnboundedReceiver<TestMessage>,
) -> Self {
Self {
session,
message_receiver,
}
}

pub fn session_ref(&self) -> &SessionRef<TestMessage> {
&self.session
}

pub async fn assert_next_with_timeout<F>(&mut self, assertion: F, timeout: std::time::Duration)
where
F: FnOnce(&TestMessage),
{
match tokio::time::timeout(timeout, self.message_receiver.recv()).await {
Ok(Some(message)) => {
assertion(&message);
}
Ok(None) => {
panic!("disconnected before receiving any message");
}
Err(_) => {
panic!("timeout expired before receiving any message");
}
}
}
}
20 changes: 0 additions & 20 deletions crates/hotfix/tests/common/mock_application.rs

This file was deleted.

3 changes: 1 addition & 2 deletions crates/hotfix/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod actions;
pub mod assertions;
pub mod mock_application;
pub mod mock_counterparty;
pub mod fakes;
pub mod setup;
pub mod test_messages;
23 changes: 12 additions & 11 deletions crates/hotfix/tests/common/setup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::common::actions::when;
use crate::common::assertions::then;
use crate::common::mock_application::MockApplication;
use crate::common::mock_counterparty::MockCounterparty;
use crate::common::fakes::{FakeApplication, FakeCounterparty, SessionSpy};
use crate::common::test_messages::TestMessage;
use hotfix::config::SessionConfig;
use hotfix::session::SessionRef;
Expand All @@ -16,32 +15,34 @@ pub const LOGON_TIMEOUT: u64 = 10;
pub const COUNTERPARTY_COMP_ID: &str = "dummy-acceptor";
pub const OUR_COMP_ID: &str = "dummy-initiator";

pub async fn given_a_connected_session() -> (SessionRef<TestMessage>, MockCounterparty<TestMessage>)
{
pub async fn given_a_connected_session() -> (SessionSpy, FakeCounterparty<TestMessage>) {
let message_store = InMemoryMessageStore::default();
given_a_connected_session_with_store(message_store).await
}

pub async fn given_a_connected_session_with_store(
message_store: InMemoryMessageStore,
) -> (SessionRef<TestMessage>, MockCounterparty<TestMessage>) {
) -> (SessionSpy, FakeCounterparty<TestMessage>) {
let config = create_session_config();
let counterparty_config = create_counterparty_session_config(config.clone());

let session = SessionRef::new(config, MockApplication {}, message_store);
let mock_counterparty = MockCounterparty::start(session.clone(), counterparty_config).await;
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
let session = SessionRef::new(config, FakeApplication::new(message_tx), message_store);

(session, mock_counterparty)
let session_spy = SessionSpy::new(session.clone(), message_rx);
let mock_counterparty = FakeCounterparty::start(session.clone(), counterparty_config).await;

(session_spy, mock_counterparty)
}

pub async fn given_an_active_session() -> (SessionRef<TestMessage>, MockCounterparty<TestMessage>) {
let (session, mut mock_counterparty) = given_a_connected_session().await;
pub async fn given_an_active_session() -> (SessionSpy, FakeCounterparty<TestMessage>) {
Comment thread
davidsteiner marked this conversation as resolved.
let (mut session, mut mock_counterparty) = given_a_connected_session().await;

then(&mut mock_counterparty)
.receives(|msg| assert_eq!(msg.header().get::<&str>(MSG_TYPE).unwrap(), "A"))
.await;
when(&mut mock_counterparty).sends_logon().await;
then(&session).status_changes_to(Status::Active).await;
then(&mut session).status_changes_to(Status::Active).await;

(session, mock_counterparty)
}
Expand Down
6 changes: 5 additions & 1 deletion crates/hotfix/tests/common/test_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ pub enum TestMessage {

impl TestMessage {
pub fn dummy_execution_report() -> Self {
Self::dummy_execution_report_with_order_id("123456789".to_string())
}

pub fn dummy_execution_report_with_order_id(order_id: String) -> Self {
Self::ExecutionReport {
order_id: "123456789".to_string(),
order_id,
exec_id: "123456789".to_string(),
exec_type: fix44::ExecType::New,
ord_status: fix44::OrdStatus::New,
Expand Down
12 changes: 7 additions & 5 deletions crates/hotfix/tests/session_test_cases/business_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,26 @@ use hotfix_message::{FieldType, fix44::MsgType};

#[tokio::test]
async fn test_new_order_single() {
let (session, mut mock_counterparty) = given_an_active_session().await;
let (mut session, mut counterparty) = given_an_active_session().await;

// we send a new order to the counterparty and they receive it successfully
when(&session)
.sends_message(TestMessage::dummy_new_order_single())
.await;
then(&mut mock_counterparty)
then(&mut counterparty)
.receives(|msg| {
let parsed = TestMessage::parse(msg);
assert_eq!(parsed.message_type(), MsgType::OrderSingle.to_string());
})
.await;

when(&mut mock_counterparty)
when(&mut counterparty)
.sends_message(TestMessage::dummy_execution_report())
.await;
// TODO: we currently have no good way of asserting this message was received
then(&mut session)
.receives(|msg| assert_eq!(msg.message_type(), MsgType::ExecutionReport.to_string()))
.await;

when(&session).requests_disconnect().await;
then(&mut mock_counterparty).gets_disconnected().await;
then(&mut counterparty).gets_disconnected().await;
}
Loading