Skip to content

Commit d31fd40

Browse files
apollo_propeller: add nonce per peer to prevent replays (#13264)
1 parent ec263b0 commit d31fd40

5 files changed

Lines changed: 208 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_propeller/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ apollo_protobuf.workspace = true
1616
asynchronous-codec.workspace = true
1717
futures.workspace = true
1818
libp2p.workspace = true
19+
lru.workspace = true
1920
prost.workspace = true
2021
rand.workspace = true
2122
reed-solomon-simd.workspace = true

crates/apollo_propeller/src/engine.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
//! `NetworkBehaviour` adapter in `behaviour.rs` via command/output channels.
66
77
use std::collections::{HashMap, HashSet};
8+
use std::num::NonZeroUsize;
89
use std::sync::Arc;
910

11+
use apollo_infra_utils::warn_every_n_ms;
1012
use libp2p::identity::{Keypair, PeerId, PublicKey};
13+
use lru::LruCache;
1114
use starknet_api::staking::StakingWeight;
1215
use tokio::sync::{mpsc, oneshot};
1316
use tracing::{debug, error, trace, warn};
@@ -23,6 +26,14 @@ use crate::tree::PropellerScheduleManager;
2326
use crate::types::{CommitteeId, CommitteeSetupError, Event, MessageRoot, ShardPublishError};
2427
use crate::unit::PropellerUnit;
2528

29+
#[cfg(test)]
30+
#[path = "engine_test.rs"]
31+
mod engine_test;
32+
33+
// TODO(guyn): move this to the propeller Config.
34+
// Must be much bigger than the number of peers we expect to work with (2*committee_size).
35+
const PEER_NONCE_CACHE_SIZE: usize = 1000;
36+
2637
type BroadcastResponseTx = oneshot::Sender<Result<(), ShardPublishError>>;
2738
type BroadcastResult = (Result<Vec<PropellerUnit>, ShardPublishError>, BroadcastResponseTx);
2839

@@ -93,6 +104,9 @@ pub struct Engine {
93104
message_to_unit_tx: HashMap<MessageKey, mpsc::UnboundedSender<UnitToValidate>>,
94105
/// Messages that have already been encountered and processed (for deduplication).
95106
messages_to_ignore_shards_from: TimeCache<MessageKey>,
107+
// TODO(guyn): track nonces separately for each committee.
108+
/// Nonce per peer. LRU cache is used as a passive garbage collection mechanism.
109+
peer_nonce: LruCache<PeerId, u64>,
96110
/// Receiver for messages from state manager tasks.
97111
state_manager_rx: mpsc::UnboundedReceiver<EventStateManagerToEngine>,
98112
state_manager_tx: mpsc::UnboundedSender<EventStateManagerToEngine>,
@@ -126,6 +140,9 @@ impl Engine {
126140
local_peer_id,
127141
message_to_unit_tx: HashMap::new(),
128142
messages_to_ignore_shards_from,
143+
peer_nonce: LruCache::new(
144+
NonZeroUsize::new(PEER_NONCE_CACHE_SIZE).expect("Cache size must be non-zero"),
145+
),
129146
state_manager_rx,
130147
state_manager_tx,
131148
prepared_units_rx: broadcaster_results_rx,
@@ -243,15 +260,22 @@ impl Engine {
243260
root: claimed_root,
244261
};
245262

246-
// TODO(guyn): Add timestamps to message key to avoid replay attacks or issues with very
247-
// late shards.
248263
if self.messages_to_ignore_shards_from.contains(&message_key) {
249264
trace!(?message_key, "Message already finalized, dropping unit");
250265
return;
251266
}
252267

253268
// Spawn tasks if this is a new message.
254269
if !self.message_to_unit_tx.contains_key(&message_key) {
270+
let nonce = self.peer_nonce.get(&claimed_publisher).copied().unwrap_or(0);
271+
if nonce >= claimed_nonce {
272+
warn_every_n_ms!(
273+
2000,
274+
"Message nonce is too old, dropping unit to prevent replay attacks"
275+
);
276+
return;
277+
}
278+
255279
debug!(?message_key, "[ENGINE] Spawning new message processor");
256280

257281
let schedule_manager = committee_data.schedule_manager.clone();
@@ -376,6 +400,16 @@ impl Engine {
376400

377401
if !expired_keys.is_empty() {
378402
trace!(?expired_keys, "[ENGINE] Removed expired messages from TTL cache");
403+
for key in expired_keys {
404+
// Update the nonce to the latest timestamp if it is bigger.
405+
let new_nonce = self
406+
.peer_nonce
407+
.peek(&key.publisher)
408+
.copied()
409+
.unwrap_or(0)
410+
.max(key.nonce);
411+
self.peer_nonce.put(key.publisher, new_nonce);
412+
}
379413
}
380414

381415
// Clean up task handles
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// TODO(andrew): test non-nonce scenarios.
2+
use std::time::Duration;
3+
4+
use libp2p::identity::Keypair;
5+
use libp2p::PeerId;
6+
use starknet_api::staking::StakingWeight;
7+
use tokio::sync::mpsc;
8+
9+
use crate::config::Config;
10+
use crate::engine::{Engine, EngineCommand, EngineOutput, MessageKey};
11+
use crate::message_processor::EventStateManagerToEngine;
12+
use crate::types::{CommitteeId, MessageRoot};
13+
use crate::{MerkleProof, PropellerUnit, Shard, ShardIndex, ShardsOfPeer};
14+
15+
const TEST_COMMITTEE_ID: CommitteeId = CommitteeId([1; 32]);
16+
const BASE_NONCE: u64 = 1_000_000;
17+
18+
fn test_config() -> Config {
19+
Config { stale_message_timeout: Duration::from_millis(200), ..Config::default() }
20+
}
21+
22+
struct TestSetup {
23+
engine: Engine,
24+
publisher: PeerId,
25+
_output_rx: mpsc::UnboundedReceiver<EngineOutput>,
26+
_cmd_tx: mpsc::UnboundedSender<EngineCommand>,
27+
}
28+
29+
fn setup() -> TestSetup {
30+
let local_keypair = Keypair::generate_ed25519();
31+
let publisher_keypair = Keypair::generate_ed25519();
32+
let publisher = PeerId::from(publisher_keypair.public());
33+
34+
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
35+
let (output_tx, output_rx) = mpsc::unbounded_channel();
36+
37+
let mut engine = Engine::new(local_keypair, test_config(), cmd_rx, output_tx, None);
38+
engine
39+
.register_committee(
40+
TEST_COMMITTEE_ID,
41+
vec![
42+
(engine.local_peer_id, StakingWeight(10), None),
43+
(publisher, StakingWeight(10), Some(publisher_keypair.public())),
44+
],
45+
)
46+
.unwrap();
47+
48+
TestSetup { engine, publisher, _output_rx: output_rx, _cmd_tx: cmd_tx }
49+
}
50+
51+
fn make_unit(publisher: PeerId, nonce: u64, root: MessageRoot) -> PropellerUnit {
52+
PropellerUnit::new(
53+
TEST_COMMITTEE_ID,
54+
publisher,
55+
root,
56+
vec![0; 64],
57+
ShardIndex(0),
58+
ShardsOfPeer(vec![Shard(vec![1, 2, 3])]),
59+
MerkleProof { siblings: vec![] },
60+
nonce,
61+
)
62+
}
63+
64+
fn message_key(publisher: PeerId, nonce: u64, root: MessageRoot) -> MessageKey {
65+
MessageKey { committee_id: TEST_COMMITTEE_ID, publisher, nonce, root }
66+
}
67+
68+
fn finalize_message(engine: &mut Engine, publisher: PeerId, nonce: u64, root: MessageRoot) {
69+
engine.handle_state_manager_message(EventStateManagerToEngine::Finalized {
70+
committee_id: TEST_COMMITTEE_ID,
71+
publisher,
72+
nonce,
73+
message_root: root,
74+
});
75+
}
76+
77+
#[tokio::test]
78+
async fn reject_unit_of_new_message_with_old_nonce() {
79+
let mut s = setup();
80+
s.engine.peer_nonce.put(s.publisher, BASE_NONCE);
81+
82+
// Nonce earlier than BASE_NONCE are rejected.
83+
let root_old = MessageRoot([1u8; 32]);
84+
s.engine.handle_unit(PeerId::random(), make_unit(s.publisher, BASE_NONCE - 1, root_old));
85+
assert!(
86+
!s.engine.message_to_unit_tx.contains_key(&message_key(
87+
s.publisher,
88+
BASE_NONCE - 1,
89+
root_old
90+
)),
91+
"unit with nonce < BASE_NONCE must be rejected"
92+
);
93+
94+
// Nonce equal to BASE_NONCE: rejected.
95+
let root_eq = MessageRoot([0u8; 32]);
96+
s.engine.handle_unit(PeerId::random(), make_unit(s.publisher, BASE_NONCE, root_eq));
97+
assert!(
98+
!s.engine.message_to_unit_tx.contains_key(&message_key(s.publisher, BASE_NONCE, root_eq)),
99+
"unit with nonce == BASE_NONCE must be rejected"
100+
);
101+
}
102+
103+
#[tokio::test]
104+
async fn allow_unit_of_new_message_with_fresh_nonce() {
105+
let mut s = setup();
106+
s.engine.peer_nonce.put(s.publisher, BASE_NONCE);
107+
108+
let root = MessageRoot([0u8; 32]);
109+
s.engine.handle_unit(PeerId::random(), make_unit(s.publisher, BASE_NONCE + 1, root));
110+
111+
assert!(
112+
s.engine.message_to_unit_tx.contains_key(&message_key(s.publisher, BASE_NONCE + 1, root)),
113+
"unit with nonce > BASE_NONCE must be accepted"
114+
);
115+
}
116+
117+
#[tokio::test]
118+
async fn nonce_updated_on_cache_expiry() {
119+
tokio::time::pause();
120+
let mut s = setup();
121+
122+
// Finalize two messages from the same publisher with different nonces.
123+
finalize_message(&mut s.engine, s.publisher, BASE_NONCE + 100, MessageRoot([2u8; 32]));
124+
tokio::time::advance(Duration::from_millis(100)).await;
125+
finalize_message(&mut s.engine, s.publisher, BASE_NONCE, MessageRoot([1u8; 32]));
126+
127+
// Before expiry: no nonce is cached (effective nonce = 0), so a unit with any nonce should be
128+
// accepted.
129+
let root_before = MessageRoot([10u8; 32]);
130+
s.engine.handle_unit(PeerId::random(), make_unit(s.publisher, BASE_NONCE - 50, root_before));
131+
assert!(
132+
s.engine.message_to_unit_tx.contains_key(&message_key(
133+
s.publisher,
134+
BASE_NONCE - 50,
135+
root_before
136+
)),
137+
"unit must be accepted when nonce cache has no entry for this publisher"
138+
);
139+
140+
// Advance past TTL so both messages expire, then trigger cleanup via a third finalization.
141+
tokio::time::advance(test_config().stale_message_timeout + Duration::from_millis(50)).await;
142+
finalize_message(&mut s.engine, s.publisher, BASE_NONCE + 200, MessageRoot([3u8; 32]));
143+
144+
// After expiry: nonce advances to max(BASE_NONCE, BASE_NONCE+100) = BASE_NONCE+100.
145+
// A unit with nonce <= BASE_NONCE+100 must now be rejected.
146+
let root_stale = MessageRoot([11u8; 32]);
147+
s.engine.handle_unit(PeerId::random(), make_unit(s.publisher, BASE_NONCE + 50, root_stale));
148+
assert!(
149+
!s.engine.message_to_unit_tx.contains_key(&message_key(
150+
s.publisher,
151+
BASE_NONCE + 50,
152+
root_stale
153+
)),
154+
"unit with nonce <= max expired nonce must be rejected after cache expiry"
155+
);
156+
157+
// A unit with nonce strictly above BASE_NONCE+100 must still be accepted.
158+
let root_fresh = MessageRoot([12u8; 32]);
159+
s.engine.handle_unit(PeerId::random(), make_unit(s.publisher, BASE_NONCE + 101, root_fresh));
160+
assert!(
161+
s.engine.message_to_unit_tx.contains_key(&message_key(
162+
s.publisher,
163+
BASE_NONCE + 101,
164+
root_fresh
165+
)),
166+
"unit with nonce > max expired nonce must be accepted after cache expiry"
167+
);
168+
}

crates/apollo_propeller/src/types.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ pub enum Event {
3535
MessageTimeout { committee_id: CommitteeId, publisher: PeerId, message_root: MessageRoot },
3636
}
3737

38+
// TODO(andrew): add the epoch number to committee ID, so it doesn't repeat if the same members are
39+
// in different epochs.
3840
#[derive(Debug, Default, PartialEq, Clone, Copy, Ord, PartialOrd, Eq, Hash)]
3941
pub struct CommitteeId(pub [u8; 32]);
4042

0 commit comments

Comments
 (0)