Skip to content

Commit 9537b07

Browse files
committed
Implement admin action to set target sequence number
1 parent f682603 commit 9537b07

4 files changed

Lines changed: 73 additions & 1 deletion

File tree

crates/hotfix/src/session.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,27 @@ where
568568
warn!("resetting sequence numbers on next logon");
569569
self.reset_on_next_logon = true;
570570
}
571+
AdminRequest::SetNextTargetSeqNum { seq_num, responder } => {
572+
let current = self.state.as_status();
573+
let response = if matches!(self.state, SessionState::Disconnected(_)) {
574+
self.ctx
575+
.store
576+
.set_target_seq_number(seq_num.get() - 1)
577+
.await
578+
.map_err(SetNextTargetSeqNumError::from)
579+
} else {
580+
warn!(
581+
?current,
582+
seq_num = seq_num.get(),
583+
"rejecting SetNextTargetSeqNum outside Disconnected state"
584+
);
585+
Err(SetNextTargetSeqNumError::InvalidState { current })
586+
};
587+
588+
if responder.send(response).is_err() {
589+
error!("failed to respond to SetNextTargetSeqNum request");
590+
}
591+
}
571592
}
572593
}
573594

crates/hotfix/src/session/admin_request.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use crate::session::SessionInfo;
2+
use crate::session::error::SetNextTargetSeqNumError;
3+
use std::num::NonZeroU64;
24
use tokio::sync::oneshot;
35

46
/// Administrative actions exposed to users of the engine to control the session.
@@ -13,4 +15,10 @@ pub enum AdminRequest {
1315
/// which can be used to re-synchronise our state with the counterparty in
1416
/// unfortunate scenarios where such drastic recover is required.
1517
ResetSequenceNumbersOnNextLogon,
18+
/// Set the next expected target sequence number. Permitted only while the
19+
/// session is `Disconnected` — see `SetNextTargetSeqNumError::InvalidState`.
20+
SetNextTargetSeqNum {
21+
seq_num: NonZeroU64,
22+
responder: oneshot::Sender<Result<(), SetNextTargetSeqNumError>>,
23+
},
1624
}

crates/hotfix/src/session/session_handle.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::session::admin_request::AdminRequest;
2-
use crate::session::error::{SendError, SendOutcome};
2+
use crate::session::error::{SendError, SendOutcome, SetNextTargetSeqNumError};
33
use crate::session::session_ref::OutboundRequest;
44
use crate::session::{InternalSessionRef, SessionInfo};
5+
use std::num::NonZeroU64;
56
use tokio::sync::{mpsc, oneshot};
67

78
/// A public handle to the session that can be used to interact with the session.
@@ -66,6 +67,26 @@ impl<Outbound> SessionHandle<Outbound> {
6667
.await?;
6768
Ok(())
6869
}
70+
71+
/// Sets the next expected target sequence number.
72+
///
73+
/// Permitted only while the session is `Disconnected`. Use this to realign
74+
/// after a counterparty-initiated sequence reset without forcing a bilateral
75+
/// reset — the peer's subsequent `ResendRequest` is handled by the existing
76+
/// resend/gap-fill logic.
77+
pub async fn set_next_target_seq_num(
78+
&self,
79+
seq_num: NonZeroU64,
80+
) -> Result<(), SetNextTargetSeqNumError> {
81+
let (responder, receiver) = oneshot::channel();
82+
self.admin_request_sender
83+
.send(AdminRequest::SetNextTargetSeqNum { seq_num, responder })
84+
.await
85+
.map_err(|_| SetNextTargetSeqNumError::Send(SendError::SessionGone))?;
86+
receiver
87+
.await
88+
.map_err(|_| SetNextTargetSeqNumError::Send(SendError::SessionGone))?
89+
}
6990
}
7091

7192
impl<M> From<InternalSessionRef<M>> for SessionHandle<M> {

crates/hotfix/tests/session_test_cases/admin_request_tests.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::common::setup::given_an_active_session;
55
use hotfix::session::Status;
66
use hotfix_message::Part;
77
use hotfix_message::fix44::{MsgType, RESET_SEQ_NUM_FLAG};
8+
use std::num::NonZeroU64;
89

910
/// Tests that we can request the session to reset sequence numbers once.
1011
///
@@ -72,3 +73,24 @@ async fn test_reset_sequence_numbers_once() {
7273

7374
finally(&session, &mut counterparty).disconnect().await;
7475
}
76+
77+
/// Happy path: while `Disconnected`, setting the next expected target sequence
78+
/// number succeeds and the new value is visible via session info.
79+
#[tokio::test]
80+
async fn test_set_next_target_seq_num_while_disconnected() {
81+
let session = crate::common::setup::given_a_disconnected_session();
82+
83+
let new_target = NonZeroU64::new(42).expect("42 is non-zero");
84+
session
85+
.session_handle()
86+
.set_next_target_seq_num(new_target)
87+
.await
88+
.expect("set_next_target_seq_num to succeed");
89+
90+
let info = session
91+
.session_handle()
92+
.get_session_info()
93+
.await
94+
.expect("session info");
95+
assert_eq!(info.next_target_seq_number, 42);
96+
}

0 commit comments

Comments
 (0)