Skip to content

Commit 4573f6b

Browse files
committed
fix(pb): optimize actor key allocation
1 parent bd6ef15 commit 4573f6b

File tree

9 files changed

+228
-231
lines changed

9 files changed

+228
-231
lines changed

engine/packages/api-peer/src/internal.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ pub async fn get_epoxy_kv_optimistic(
523523
replica_id,
524524
key: key_bytes,
525525
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
526+
target_replicas: None,
526527
save_empty: false,
527528
})
528529
.await?;

engine/packages/epoxy/src/ops/kv/get_optimistic.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ pub struct Input {
1818
pub replica_id: ReplicaId,
1919
pub key: Vec<u8>,
2020
pub caching_behavior: protocol::CachingBehavior,
21+
/// Optional active-replica scope for this proposal.
22+
///
23+
/// Epoxy only validates that the supplied replicas are active and include the local
24+
/// replica. Callers are responsible for ensuring a given key stays on a stable scope over
25+
/// time, or that any scope change is handled as an explicit reconfiguration at a higher
26+
/// layer.
27+
pub target_replicas: Option<Vec<ReplicaId>>,
2128
// Whether or not to write an empty value into cache if it did not exist on read.
2229
pub save_empty: bool,
2330
}
@@ -73,7 +80,12 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
7380
.await?
7481
.config;
7582

76-
let quorum_members: Vec<ReplicaId> = utils::get_quorum_members(&config);
83+
let replica_id = ctx.config().epoxy_replica_id();
84+
let quorum_members = utils::resolve_active_quorum_members(
85+
&config,
86+
replica_id,
87+
input.target_replicas.as_deref(),
88+
)?;
7789

7890
if quorum_members.len() == 1 {
7991
return Ok(Output { value: None });

engine/packages/epoxy/src/ops/propose.rs

Lines changed: 5 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use gas::prelude::*;
55
use rand::Rng;
66
use rivet_api_builder::ApiCtx;
77
use serde::{Deserialize, Serialize};
8-
use std::collections::{BTreeSet, HashSet};
98
use std::time::{Duration, Instant};
109

1110
use crate::{
@@ -218,8 +217,11 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
218217
.await
219218
.context("failed reading config")?;
220219

221-
let quorum_members =
222-
resolve_quorum_members(&config, replica_id, input.target_replicas.as_deref())?;
220+
let quorum_members = utils::resolve_active_quorum_members(
221+
&config,
222+
replica_id,
223+
input.target_replicas.as_deref(),
224+
)?;
223225
tracing::debug!(
224226
?quorum_members,
225227
quorum_size = quorum_members.len(),
@@ -1005,123 +1007,12 @@ fn prepare_retry_base_delay_ms(retry_count: usize) -> u64 {
10051007
.min(PREPARE_RETRY_MAX_DELAY_MS)
10061008
}
10071009

1008-
/// Returns the quorum members to use for this proposal. This supports scoped proposals because
1009-
/// runner configs are often only enabled in a couple of explicitly coupled regions. If
1010-
/// `target_replicas` is provided, it validates that the scope is non-empty, includes the local
1011-
/// replica, and only contains active replicas. Otherwise it falls back to the full active quorum
1012-
/// from the cluster config.
1013-
///
1014-
/// Scoped proposals rely on a caller-side invariant: the same key must continue using the same
1015-
/// replica scope unless some higher-level reconfiguration step coordinates the membership change.
1016-
/// This function does not persist or enforce that per-key scope stability.
1017-
fn resolve_quorum_members(
1018-
config: &protocol::ClusterConfig,
1019-
replica_id: ReplicaId,
1020-
target_replicas: Option<&[ReplicaId]>,
1021-
) -> Result<Vec<ReplicaId>> {
1022-
match target_replicas {
1023-
Some(target_replicas) => {
1024-
let active = utils::get_quorum_members(config)
1025-
.into_iter()
1026-
.collect::<HashSet<_>>();
1027-
let validated = target_replicas.iter().copied().collect::<BTreeSet<_>>();
1028-
1029-
if validated.is_empty() {
1030-
bail!("target_replicas cannot be empty");
1031-
}
1032-
1033-
if !validated.contains(&replica_id) {
1034-
bail!("target_replicas must include the local replica");
1035-
}
1036-
1037-
if !validated.iter().all(|replica| active.contains(replica)) {
1038-
bail!("target_replicas contains an inactive or unknown replica");
1039-
}
1040-
1041-
Ok(validated.into_iter().collect())
1042-
}
1043-
None => {
1044-
let replicas = utils::get_quorum_members(config);
1045-
if !replicas.contains(&replica_id) {
1046-
bail!("local replica is not active in the current epoxy config");
1047-
}
1048-
Ok(replicas)
1049-
}
1050-
}
1051-
}
1052-
10531010
#[cfg(test)]
10541011
mod tests {
10551012
use super::*;
10561013
use epoxy_protocol::protocol::{ClusterConfig, ReplicaConfig, ReplicaStatus};
10571014
use rand::{SeedableRng, rngs::StdRng};
10581015

1059-
fn make_config(replicas: &[(ReplicaId, ReplicaStatus)]) -> ClusterConfig {
1060-
ClusterConfig {
1061-
coordinator_replica_id: replicas[0].0,
1062-
epoch: 1,
1063-
replicas: replicas
1064-
.iter()
1065-
.map(|(replica_id, status)| ReplicaConfig {
1066-
replica_id: *replica_id,
1067-
status: status.clone(),
1068-
api_peer_url: String::new(),
1069-
guard_url: String::new(),
1070-
})
1071-
.collect(),
1072-
}
1073-
}
1074-
1075-
#[test]
1076-
fn resolve_quorum_members_none_uses_all_active() {
1077-
let config = make_config(&[
1078-
(1, ReplicaStatus::Active),
1079-
(2, ReplicaStatus::Active),
1080-
(3, ReplicaStatus::Joining),
1081-
]);
1082-
let result = resolve_quorum_members(&config, 1, None).unwrap();
1083-
assert_eq!(result, vec![1, 2]);
1084-
}
1085-
1086-
#[test]
1087-
fn resolve_quorum_members_requires_local_replica_to_be_active() {
1088-
let config = make_config(&[(1, ReplicaStatus::Learning), (2, ReplicaStatus::Active)]);
1089-
let result = resolve_quorum_members(&config, 1, None);
1090-
assert!(result.is_err());
1091-
}
1092-
1093-
#[test]
1094-
fn resolve_quorum_members_scoped_subset() {
1095-
let config = make_config(&[
1096-
(1, ReplicaStatus::Active),
1097-
(2, ReplicaStatus::Active),
1098-
(3, ReplicaStatus::Active),
1099-
]);
1100-
let result = resolve_quorum_members(&config, 1, Some(&[1, 2])).unwrap();
1101-
assert_eq!(result, vec![1, 2]);
1102-
}
1103-
1104-
#[test]
1105-
fn resolve_quorum_members_empty_target_errors() {
1106-
let config = make_config(&[(1, ReplicaStatus::Active)]);
1107-
let result = resolve_quorum_members(&config, 1, Some(&[]));
1108-
assert!(result.is_err());
1109-
}
1110-
1111-
#[test]
1112-
fn resolve_quorum_members_missing_local_errors() {
1113-
let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Active)]);
1114-
let result = resolve_quorum_members(&config, 1, Some(&[2]));
1115-
assert!(result.is_err());
1116-
}
1117-
1118-
#[test]
1119-
fn resolve_quorum_members_inactive_replica_errors() {
1120-
let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Learning)]);
1121-
let result = resolve_quorum_members(&config, 1, Some(&[1, 2]));
1122-
assert!(result.is_err());
1123-
}
1124-
11251016
#[test]
11261017
fn parses_set_command_as_set_proposal() {
11271018
let proposal = Proposal {

engine/packages/epoxy/src/utils.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::{Result, bail};
22
use epoxy_protocol::protocol::{self, ReplicaId};
3+
use std::collections::{BTreeSet, HashSet};
34
use std::fmt;
45
use universaldb::{Transaction, utils::IsolationLevel::*};
56

@@ -32,6 +33,51 @@ pub fn get_quorum_members(config: &protocol::ClusterConfig) -> Vec<ReplicaId> {
3233
.collect()
3334
}
3435

36+
/// Returns the quorum members to use for this proposal. This supports scoped proposals because
37+
/// runner configs are often only enabled in a couple of explicitly coupled regions. If
38+
/// `target_replicas` is provided, it validates that the scope is non-empty, includes the local
39+
/// replica, and only contains active replicas. Otherwise it falls back to the full active quorum
40+
/// from the cluster config.
41+
///
42+
/// Scoped proposals rely on a caller-side invariant: the same key must continue using the same
43+
/// replica scope unless some higher-level reconfiguration step coordinates the membership change.
44+
/// This function does not persist or enforce that per-key scope stability.
45+
pub fn resolve_active_quorum_members(
46+
config: &protocol::ClusterConfig,
47+
replica_id: ReplicaId,
48+
target_replicas: Option<&[ReplicaId]>,
49+
) -> Result<Vec<ReplicaId>> {
50+
match target_replicas {
51+
Some(target_replicas) => {
52+
let active = get_quorum_members(config)
53+
.into_iter()
54+
.collect::<HashSet<_>>();
55+
let validated = target_replicas.iter().copied().collect::<BTreeSet<_>>();
56+
57+
if validated.is_empty() {
58+
bail!("target_replicas cannot be empty");
59+
}
60+
61+
if !validated.contains(&replica_id) {
62+
bail!("target_replicas must include the local replica");
63+
}
64+
65+
if !validated.iter().all(|replica| active.contains(replica)) {
66+
bail!("target_replicas contains an inactive or unknown replica");
67+
}
68+
69+
Ok(validated.into_iter().collect())
70+
}
71+
None => {
72+
let replicas = get_quorum_members(config);
73+
if !replicas.contains(&replica_id) {
74+
bail!("local replica is not active in the current epoxy config");
75+
}
76+
Ok(replicas)
77+
}
78+
}
79+
}
80+
3581
/// Use this replica list for any action that should still be sent to joining replicas.
3682
pub fn get_all_replicas(config: &protocol::ClusterConfig) -> Vec<ReplicaId> {
3783
config.replicas.iter().map(|r| r.replica_id).collect()
@@ -195,4 +241,70 @@ mod tests {
195241
}
196242
}
197243
}
244+
245+
fn make_config(replicas: &[(ReplicaId, ReplicaStatus)]) -> ClusterConfig {
246+
ClusterConfig {
247+
coordinator_replica_id: replicas[0].0,
248+
epoch: 1,
249+
replicas: replicas
250+
.iter()
251+
.map(|(replica_id, status)| ReplicaConfig {
252+
replica_id: *replica_id,
253+
status: status.clone(),
254+
api_peer_url: String::new(),
255+
guard_url: String::new(),
256+
})
257+
.collect(),
258+
}
259+
}
260+
261+
#[test]
262+
fn resolve_active_quorum_members_none_uses_all_active() {
263+
let config = make_config(&[
264+
(1, ReplicaStatus::Active),
265+
(2, ReplicaStatus::Active),
266+
(3, ReplicaStatus::Joining),
267+
]);
268+
let result = resolve_active_quorum_members(&config, 1, None).unwrap();
269+
assert_eq!(result, vec![1, 2]);
270+
}
271+
272+
#[test]
273+
fn resolve_active_quorum_members_requires_local_replica_to_be_active() {
274+
let config = make_config(&[(1, ReplicaStatus::Learning), (2, ReplicaStatus::Active)]);
275+
let result = resolve_active_quorum_members(&config, 1, None);
276+
assert!(result.is_err());
277+
}
278+
279+
#[test]
280+
fn resolve_active_quorum_members_scoped_subset() {
281+
let config = make_config(&[
282+
(1, ReplicaStatus::Active),
283+
(2, ReplicaStatus::Active),
284+
(3, ReplicaStatus::Active),
285+
]);
286+
let result = resolve_active_quorum_members(&config, 1, Some(&[1, 2])).unwrap();
287+
assert_eq!(result, vec![1, 2]);
288+
}
289+
290+
#[test]
291+
fn resolve_active_quorum_members_empty_target_errors() {
292+
let config = make_config(&[(1, ReplicaStatus::Active)]);
293+
let result = resolve_active_quorum_members(&config, 1, Some(&[]));
294+
assert!(result.is_err());
295+
}
296+
297+
#[test]
298+
fn resolve_active_quorum_members_missing_local_errors() {
299+
let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Active)]);
300+
let result = resolve_active_quorum_members(&config, 1, Some(&[2]));
301+
assert!(result.is_err());
302+
}
303+
304+
#[test]
305+
fn resolve_active_quorum_members_inactive_replica_errors() {
306+
let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Learning)]);
307+
let result = resolve_active_quorum_members(&config, 1, Some(&[1, 2]));
308+
assert!(result.is_err());
309+
}
198310
}

engine/packages/epoxy/tests/kv_get_optimistic.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ async fn get_with_behavior(
3131
replica_id,
3232
key: key.to_vec(),
3333
caching_behavior,
34+
target_replicas: None,
3435
save_empty: false,
3536
})
3637
.await

engine/packages/pegboard/src/ops/actor/get_reservation_for_key.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub async fn pegboard_actor_get_reservation_for_key(
3333
replica_id: ctx.config().epoxy_replica_id(),
3434
key: keys::subspace().pack(&reservation_key),
3535
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
36+
target_replicas: None,
3637
save_empty: false,
3738
})
3839
.await?

engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ async fn list_runner_config_enabled_dcs_inner(
7171
replica_id: ctx.config().epoxy_replica_id(),
7272
key: namespace::keys::subspace().pack(&runner_config_key),
7373
caching_behavior: CachingBehavior::Optimistic,
74+
target_replicas: None,
7475
save_empty: true,
7576
})
7677
.await?;

engine/packages/pegboard/src/workflows/actor/keys.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ pub async fn lookup_key_optimistic(
174174
replica_id: ctx.config().epoxy_replica_id(),
175175
key: keys::subspace().pack(&reservation_key),
176176
caching_behavior: epoxy::protocol::CachingBehavior::Optimistic,
177+
target_replicas: None,
177178
save_empty: false,
178179
})
179180
.await?

0 commit comments

Comments
 (0)