Skip to content

Commit 3fb6247

Browse files
authored
feat: better handling of resend requests (#237)
* Improve logging during resend flows * Correctly send rejects when a resend request is missing required tags * Prevent resend attempts when the session is not connected * Add test case for resend request without begin sequence number * Add test case for invalid EndSeqNo in resend requests * Add test case for happy resend flow where counterparty requests resend
1 parent f7c0ddb commit 3fb6247

5 files changed

Lines changed: 170 additions & 34 deletions

File tree

crates/hotfix/src/message.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod test_request;
1616
pub mod verification;
1717

1818
pub use parser::RawFixMessage;
19+
pub use resend_request::ResendRequest;
1920

2021
pub trait FixMessage: Clone + Send + 'static {
2122
fn write(&self, msg: &mut Message);

crates/hotfix/src/message/resend_request.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ use hotfix_message::message::Message;
33
use hotfix_message::{Part, fix44};
44

55
#[derive(Clone, Copy)]
6-
pub(crate) struct ResendRequest {
6+
pub struct ResendRequest {
77
begin_seq_no: u64,
88
end_seq_no: u64,
99
}
1010

1111
impl ResendRequest {
12-
pub(crate) fn new(begin: u64, end: u64) -> Self {
12+
pub fn new(begin: u64, end: u64) -> Self {
1313
Self {
1414
begin_seq_no: begin,
1515
end_seq_no: end,

crates/hotfix/src/session.rs

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@ mod info;
33
mod session_ref;
44
mod state;
55

6+
use crate::config::SessionConfig;
7+
use crate::message::FixMessage;
8+
use crate::message::generate_message;
9+
use crate::message::heartbeat::Heartbeat;
10+
use crate::message::logon::{Logon, ResetSeqNumConfig};
11+
use crate::message::parser::RawFixMessage;
12+
use crate::store::MessageStore;
13+
use crate::transport::writer::WriterRef;
614
use anyhow::{Result, anyhow};
715
use chrono::Utc;
816
use hotfix_message::dict::Dictionary;
@@ -12,16 +20,7 @@ use std::pin::Pin;
1220
use tokio::select;
1321
use tokio::sync::mpsc;
1422
use tokio::time::{Duration, Instant, Sleep, sleep, sleep_until};
15-
use tracing::{debug, error, info, warn};
16-
17-
use crate::config::SessionConfig;
18-
use crate::message::FixMessage;
19-
use crate::message::generate_message;
20-
use crate::message::heartbeat::Heartbeat;
21-
use crate::message::logon::{Logon, ResetSeqNumConfig};
22-
use crate::message::parser::RawFixMessage;
23-
use crate::store::MessageStore;
24-
use crate::transport::writer::WriterRef;
23+
use tracing::{debug, enabled, error, info, warn};
2524

2625
use crate::error::{CompIdType, MessageVerificationError};
2726
use crate::message::logout::Logout;
@@ -362,25 +361,46 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
362361
}
363362

364363
async fn on_resend_request(&mut self, message: &Message) -> Result<()> {
365-
// TODO: verify message and send reject as necessary
364+
if !self.state.is_connected() {
365+
warn!("received resend request while disconnected, ignoring");
366+
}
366367

367-
let begin_seq_number: usize = message.get(fix44::BEGIN_SEQ_NO).unwrap_or_else(|_| {
368-
// TODO: send reject if there is no valid begin number
369-
todo!()
370-
});
368+
let begin_seq_number: u64 = match message.get(fix44::BEGIN_SEQ_NO) {
369+
Ok(seq_number) => seq_number,
370+
Err(_) => {
371+
let reject = Reject::new(
372+
message
373+
.header()
374+
.get(fix44::MSG_SEQ_NUM)
375+
.map_err(|_| anyhow!("failed to get seq number"))?,
376+
)
377+
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
378+
.text("missing begin sequence number for resend request");
379+
self.send_message(reject).await;
380+
return Ok(());
381+
}
382+
};
371383

372-
let end_seq_number: usize = match message.get(fix44::END_SEQ_NO) {
384+
let end_seq_number: u64 = match message.get(fix44::END_SEQ_NO) {
373385
Ok(seq_number) => {
374-
let last_seq_number = self.store.next_sender_seq_number() as usize - 1;
386+
let last_seq_number = self.store.next_sender_seq_number() - 1;
375387
if seq_number == 0 {
376388
last_seq_number
377389
} else {
378390
std::cmp::min(seq_number, last_seq_number)
379391
}
380392
}
381393
Err(_) => {
382-
// send reject if there is no valid end number
383-
todo!()
394+
let reject = Reject::new(
395+
message
396+
.header()
397+
.get(fix44::MSG_SEQ_NUM)
398+
.map_err(|_| anyhow!("failed to get seq number"))?,
399+
)
400+
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
401+
.text("missing end sequence number for resend request");
402+
self.send_message(reject).await;
403+
return Ok(());
384404
}
385405
};
386406

@@ -583,19 +603,21 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
583603
};
584604
}
585605

586-
async fn resend_messages(&mut self, begin: usize, end: usize, _message: &Message) {
587-
debug!(begin, end, "resending messages as requested");
588-
let messages = self.store.get_slice(begin, end).await.unwrap();
606+
async fn resend_messages(&mut self, begin: u64, end: u64, _message: &Message) {
607+
info!(begin, end, "resending messages as requested");
608+
let messages = self
609+
.store
610+
.get_slice(begin as usize, end as usize)
611+
.await
612+
.unwrap();
589613

590614
let no = messages.len();
591-
debug!(no, "number of messages");
615+
debug!(number_of_messages = no, "number of messages");
592616

593617
let mut reset_start: Option<u64> = None;
594618
let mut sequence_number = 0;
595619

596620
for msg in messages {
597-
let m = String::from_utf8(msg.clone()).unwrap();
598-
debug!(m, "resending message");
599621
let mut message = self
600622
.message_builder
601623
.build(msg.as_slice())
@@ -609,7 +631,6 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
609631
.to_string();
610632

611633
if is_admin(message_type.as_str()) {
612-
debug!("skipping message as it's an admin message");
613634
if reset_start.is_none() {
614635
reset_start = Some(sequence_number);
615636
}
@@ -618,6 +639,7 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
618639

619640
if let Some(begin) = reset_start {
620641
let end = sequence_number;
642+
Self::log_skipped_admin_messages(begin, end);
621643
self.send_sequence_reset(begin, end).await;
622644
reset_start = None;
623645
}
@@ -633,16 +655,28 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
633655
message.encode(&self.message_config).unwrap(),
634656
)
635657
.await;
636-
debug!(sequence_number, "resent message");
658+
659+
if enabled!(tracing::Level::DEBUG) {
660+
let m = String::from_utf8(msg.clone()).unwrap();
661+
debug!(sequence_number, message = m, "resent message");
662+
}
637663
}
638664

639665
if let Some(begin) = reset_start {
640666
// the final reset if needed
641667
let end = sequence_number;
668+
Self::log_skipped_admin_messages(begin, end);
642669
self.send_sequence_reset(begin, end).await;
643670
}
644671
}
645672

673+
fn log_skipped_admin_messages(begin: u64, end: u64) {
674+
info!(
675+
begin,
676+
end, "skipped admin message(s) during resend, requesting reset for these"
677+
);
678+
}
679+
646680
fn reset_heartbeat_timer(&mut self) {
647681
self.state
648682
.reset_heartbeat_timer(self.config.heartbeat_interval);

crates/hotfix/tests/common/test_messages.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,3 +373,25 @@ pub fn replace_field_value(raw_message: &mut Vec<u8>, tag: u32, new_value: &[u8]
373373
}
374374
}
375375
}
376+
377+
/// Builds a resend request message without the required BeginSeqNo field.
378+
pub fn build_invalid_resend_request(
379+
msg_seq_num: u64,
380+
begin_seq_no: Option<u64>,
381+
end_seq_no: Option<u64>,
382+
) -> Vec<u8> {
383+
let mut msg = Message::new("FIX.4.4", "2"); // MsgType 2 = ResendRequest
384+
msg.set(fix44::SENDER_COMP_ID, COUNTERPARTY_COMP_ID);
385+
msg.set(fix44::TARGET_COMP_ID, OUR_COMP_ID);
386+
msg.set(fix44::MSG_SEQ_NUM, msg_seq_num);
387+
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());
388+
389+
if let Some(begin_seq_no) = begin_seq_no {
390+
msg.set(fix44::BEGIN_SEQ_NO, begin_seq_no);
391+
}
392+
if let Some(end_seq_no) = end_seq_no {
393+
msg.set(fix44::END_SEQ_NO, end_seq_no);
394+
}
395+
396+
msg.encode(&Config::default()).unwrap()
397+
}

crates/hotfix/tests/session_test_cases/resend_tests.rs

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use crate::common::actions::when;
22
use crate::common::assertions::{assert_msg_type, then};
3-
use crate::common::setup::given_an_active_session;
3+
use crate::common::setup::{HEARTBEAT_INTERVAL, given_an_active_session};
44
use crate::common::test_messages::{
5-
TestMessage, build_execution_report_with_incorrect_body_length,
5+
TestMessage, build_execution_report_with_incorrect_body_length, build_invalid_resend_request,
66
};
7-
use hotfix::message::FixMessage;
7+
use hotfix::message::{FixMessage, ResendRequest};
88
use hotfix::session::Status;
9-
use hotfix_message::FieldType;
10-
use hotfix_message::fix44::MsgType;
9+
use hotfix_message::fix44::{GAP_FILL_FLAG, MsgType, NEW_SEQ_NO};
10+
use hotfix_message::{FieldType, Part};
11+
use std::time::Duration;
1112

1213
#[tokio::test]
1314
async fn test_message_sequence_number_too_high() {
@@ -142,3 +143,81 @@ async fn test_resent_message_previously_received_is_ignored() {
142143
when(&session).requests_disconnect().await;
143144
then(&mut counterparty).gets_disconnected().await;
144145
}
146+
147+
/// Tests that when a counterparty sends a resend request without the required field,
148+
/// the session rejects the invalid message.
149+
#[tokio::test]
150+
async fn test_invalid_resend_request_gets_rejected() {
151+
// We run the test twice - once with an invalid BeginSeqNo and once with an invalid EndSeqNo.
152+
for (begin_seq_no, end_seq_no) in [(None, Some(2)), (Some(1), None)] {
153+
let (session, mut counterparty) = given_an_active_session().await;
154+
155+
// build a resend request message missing the required BeginSeqNo (tag 7)
156+
let seq_num = counterparty.next_target_sequence_number();
157+
let invalid_resend_request =
158+
build_invalid_resend_request(seq_num, begin_seq_no, end_seq_no);
159+
when(&mut counterparty)
160+
.sends_raw_message(invalid_resend_request)
161+
.await;
162+
163+
// the session should reject this invalid resend request
164+
then(&mut counterparty)
165+
.receives(|msg| assert_msg_type(msg, MsgType::Reject))
166+
.await;
167+
168+
when(&session).requests_disconnect().await;
169+
then(&mut counterparty).gets_disconnected().await;
170+
}
171+
}
172+
173+
/// Tests that when a counterparty requests a resend of both admin and business messages,
174+
/// the session gap fills admin messages and resends business messages as expected.
175+
#[tokio::test(start_paused = true)]
176+
async fn test_resend_request_with_gap_fill_for_admin_messages() {
177+
let (session, mut counterparty) = given_an_active_session().await;
178+
179+
// wait for a heartbeat to be sent automatically (this will be message sequence number 2)
180+
when(Duration::from_secs(HEARTBEAT_INTERVAL + 1))
181+
.elapses()
182+
.await;
183+
then(&mut counterparty)
184+
.receives(|msg| assert_msg_type(msg, MsgType::Heartbeat))
185+
.await;
186+
187+
// send an execution report from the session (this will be message sequence number 3)
188+
when(&session)
189+
.sends_message(TestMessage::dummy_execution_report())
190+
.await;
191+
then(&mut counterparty)
192+
.receives(|msg| assert_msg_type(msg, MsgType::ExecutionReport))
193+
.await;
194+
195+
// counterparty requests a resend of messages 2 and 3
196+
let resend_request = ResendRequest::new(2, 3);
197+
when(&mut counterparty).sends_message(resend_request).await;
198+
199+
// the session should send a SequenceReset-GapFill for the heartbeat (message 2)
200+
then(&mut counterparty)
201+
.receives(|msg| {
202+
assert_msg_type(msg, MsgType::SequenceReset);
203+
assert_eq!(msg.get::<&str>(GAP_FILL_FLAG).unwrap(), "Y");
204+
// the gap fill's MsgSeqNum indicates the beginning of the gap
205+
assert_eq!(
206+
msg.header()
207+
.get::<u64>(hotfix_message::fix44::MSG_SEQ_NUM)
208+
.unwrap(),
209+
2
210+
);
211+
// NewSeqNo indicates the next sequence number after the gap
212+
assert_eq!(msg.get::<u64>(NEW_SEQ_NO).unwrap(), 3);
213+
})
214+
.await;
215+
216+
// the session should resend the execution report (message 3)
217+
then(&mut counterparty)
218+
.receives(|msg| assert_msg_type(msg, MsgType::ExecutionReport))
219+
.await;
220+
221+
when(&session).requests_disconnect().await;
222+
then(&mut counterparty).gets_disconnected().await;
223+
}

0 commit comments

Comments
 (0)