Skip to content

Commit 2508b8e

Browse files
authored
feat: admin operation to set the next expected target sequence number (#342)
* Add error type for action to set target sequence number * Implement admin action to set target sequence number * Refactor to push action to set target sequence number into state which knows what state it's valid for * Add rejection tests for set target sequence number action
1 parent 3d1eef7 commit 2508b8e

6 files changed

Lines changed: 336 additions & 4 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::session::admin_request::AdminRequest;
3939
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
4040
use crate::session::error::SessionCreationError;
4141
use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError};
42-
pub use crate::session::error::{SendError, SendOutcome};
42+
pub use crate::session::error::{SendError, SendOutcome, SetNextTargetSeqNumError};
4343
pub use crate::session::info::{SessionInfo, Status};
4444
pub use crate::session::session_handle::SessionHandle;
4545
#[cfg(not(feature = "test-utils"))]
@@ -568,6 +568,22 @@ where
568568
warn!("resetting sequence numbers on next logon");
569569
self.reset_on_next_logon = true;
570570
}
571+
AdminRequest::SetNextTargetSeqNum { seq_num, responder } => {
572+
let response = self
573+
.state
574+
.try_set_next_target_seq_num(&mut self.ctx, seq_num)
575+
.await;
576+
if let Err(ref err) = response {
577+
warn!(
578+
?err,
579+
seq_num = seq_num.get(),
580+
"SetNextTargetSeqNum rejected"
581+
);
582+
}
583+
if responder.send(response).is_err() {
584+
error!("failed to respond to SetNextTargetSeqNum request");
585+
}
586+
}
571587
}
572588
}
573589

crates/hotfix/src/session/admin_request.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use crate::session::SessionInfo;
2+
use crate::session::error::SetNextTargetSeqNumError;
3+
use std::num::NonZeroU64;
24
use tokio::sync::oneshot;
35

46
/// Administrative actions exposed to users of the engine to control the session.
@@ -13,4 +15,10 @@ pub enum AdminRequest {
1315
/// which can be used to re-synchronise our state with the counterparty in
1416
/// unfortunate scenarios where such drastic recover is required.
1517
ResetSequenceNumbersOnNextLogon,
18+
/// Set the next expected target sequence number. Permitted only while the
19+
/// session is `Disconnected` — see `SetNextTargetSeqNumError::InvalidState`.
20+
SetNextTargetSeqNum {
21+
seq_num: NonZeroU64,
22+
responder: oneshot::Sender<Result<(), SetNextTargetSeqNumError>>,
23+
},
1624
}

crates/hotfix/src/session/error.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,23 @@ impl<T> InternalSendResultExt<T> for Result<T, InternalSendError> {
128128
}
129129
}
130130

131+
/// Error returned when setting the next expected target sequence number via admin.
132+
#[derive(Debug, Error)]
133+
pub enum SetNextTargetSeqNumError {
134+
/// The session was not in a state where the target sequence number can be
135+
/// safely adjusted. Only permitted while `Disconnected`.
136+
#[error("operation rejected in state {current:?}; only permitted while disconnected")]
137+
InvalidState { current: crate::session::Status },
138+
139+
/// Channel-level failure — the session task is gone or the responder was dropped.
140+
#[error(transparent)]
141+
Send(#[from] SendError),
142+
143+
/// Underlying store write failed.
144+
#[error(transparent)]
145+
Store(#[from] StoreError),
146+
}
147+
131148
#[cfg(test)]
132149
mod tests {
133150
use super::*;

crates/hotfix/src/session/session_handle.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::session::admin_request::AdminRequest;
2-
use crate::session::error::{SendError, SendOutcome};
2+
use crate::session::error::{SendError, SendOutcome, SetNextTargetSeqNumError};
33
use crate::session::session_ref::OutboundRequest;
44
use crate::session::{InternalSessionRef, SessionInfo};
5+
use std::num::NonZeroU64;
56
use tokio::sync::{mpsc, oneshot};
67

78
/// A public handle to the session that can be used to interact with the session.
@@ -66,6 +67,26 @@ impl<Outbound> SessionHandle<Outbound> {
6667
.await?;
6768
Ok(())
6869
}
70+
71+
/// Sets the next expected target sequence number.
72+
///
73+
/// Permitted only while the session is `Disconnected`. Use this to realign
74+
/// after a counterparty-initiated sequence reset without forcing a bilateral
75+
/// reset — the peer's subsequent `ResendRequest` is handled by the existing
76+
/// resend/gap-fill logic.
77+
pub async fn set_next_target_seq_num(
78+
&self,
79+
seq_num: NonZeroU64,
80+
) -> Result<(), SetNextTargetSeqNumError> {
81+
let (responder, receiver) = oneshot::channel();
82+
self.admin_request_sender
83+
.send(AdminRequest::SetNextTargetSeqNum { seq_num, responder })
84+
.await
85+
.map_err(|_| SetNextTargetSeqNumError::Send(SendError::SessionGone))?;
86+
receiver
87+
.await
88+
.map_err(|_| SetNextTargetSeqNumError::Send(SendError::SessionGone))?
89+
}
6990
}
7091

7192
impl<M> From<InternalSessionRef<M>> for SessionHandle<M> {

crates/hotfix/src/session/state.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ use crate::message::logon::Logon;
1616
use crate::message::logout::Logout;
1717
use crate::message::verification::VerificationFlags;
1818
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
19-
use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError};
19+
use crate::session::error::{
20+
InternalSendError, InternalSendResultExt, SessionOperationError, SetNextTargetSeqNumError,
21+
};
2022
use crate::session::event::ScheduleResponse;
2123
use crate::session::info::Status as SessionInfoStatus;
2224
use crate::transport::writer::WriterRef;
2325
use hotfix_message::message::Message;
2426
use hotfix_store::MessageStore;
27+
use std::num::NonZeroU64;
2528
use std::time::Duration;
2629
use tokio::sync::oneshot;
2730
use tokio::time::Instant;
@@ -247,6 +250,34 @@ impl SessionState {
247250
}
248251
}
249252

253+
/// Set the next expected target sequence number.
254+
pub(crate) async fn try_set_next_target_seq_num<A, S>(
255+
&self,
256+
ctx: &mut SessionCtx<A, S>,
257+
seq_num: NonZeroU64,
258+
) -> Result<(), SetNextTargetSeqNumError>
259+
where
260+
A: Application,
261+
S: MessageStore,
262+
{
263+
// Only permitted while `Disconnected` — any other state returns `InvalidState`.
264+
match self {
265+
SessionState::Disconnected(_) => {
266+
// The store stores "last seen" (see `inbound::on_sequence_reset` passing `end - 1`),
267+
// so we subtract 1 to make `next_target_seq_number()` return `seq_num`.
268+
let target_seq_num = seq_num.get() - 1;
269+
270+
ctx.store
271+
.set_target_seq_number(target_seq_num)
272+
.await
273+
.map_err(SetNextTargetSeqNumError::from)
274+
}
275+
_ => Err(SetNextTargetSeqNumError::InvalidState {
276+
current: self.as_status(),
277+
}),
278+
}
279+
}
280+
250281
/// Sends a logout message and puts the session state into an [`AwaitingLogout`] state.
251282
///
252283
/// The session waits for a configurable timeout period for the counterparty to

0 commit comments

Comments
 (0)