From bebc8acbfc47ed05dd44b1865d586772e8634074 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Sun, 12 Apr 2026 12:41:36 -0700 Subject: [PATCH] fix(pb): optimize actor key allocation --- engine/packages/api-peer/src/internal.rs | 1 + .../epoxy/src/ops/kv/get_optimistic.rs | 14 +- engine/packages/epoxy/src/ops/propose.rs | 119 +--------- engine/packages/epoxy/src/utils.rs | 112 ++++++++++ .../packages/epoxy/tests/kv_get_optimistic.rs | 1 + .../src/ops/actor/get_reservation_for_key.rs | 1 + .../runner/list_runner_config_enabled_dcs.rs | 1 + .../pegboard/src/workflows/actor/keys.rs | 1 + .../pegboard/src/workflows/actor2/keys.rs | 209 ++++++++---------- 9 files changed, 228 insertions(+), 231 deletions(-) diff --git a/engine/packages/api-peer/src/internal.rs b/engine/packages/api-peer/src/internal.rs index 880b20b11c..4e3d31191f 100644 --- a/engine/packages/api-peer/src/internal.rs +++ b/engine/packages/api-peer/src/internal.rs @@ -523,6 +523,7 @@ pub async fn get_epoxy_kv_optimistic( replica_id, key: key_bytes, caching_behavior: epoxy::protocol::CachingBehavior::Optimistic, + target_replicas: None, save_empty: false, }) .await?; diff --git a/engine/packages/epoxy/src/ops/kv/get_optimistic.rs b/engine/packages/epoxy/src/ops/kv/get_optimistic.rs index d7ecac71f0..82e3c34557 100644 --- a/engine/packages/epoxy/src/ops/kv/get_optimistic.rs +++ b/engine/packages/epoxy/src/ops/kv/get_optimistic.rs @@ -18,6 +18,13 @@ pub struct Input { pub replica_id: ReplicaId, pub key: Vec, pub caching_behavior: protocol::CachingBehavior, + /// Optional active-replica scope for this proposal. + /// + /// Epoxy only validates that the supplied replicas are active and include the local + /// replica. Callers are responsible for ensuring a given key stays on a stable scope over + /// time, or that any scope change is handled as an explicit reconfiguration at a higher + /// layer. + pub target_replicas: Option>, // Whether or not to write an empty value into cache if it did not exist on read. pub save_empty: bool, } @@ -73,7 +80,12 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul .await? .config; - let quorum_members: Vec = utils::get_quorum_members(&config); + let replica_id = ctx.config().epoxy_replica_id(); + let quorum_members = utils::resolve_active_quorum_members( + &config, + replica_id, + input.target_replicas.as_deref(), + )?; if quorum_members.len() == 1 { return Ok(Output { value: None }); diff --git a/engine/packages/epoxy/src/ops/propose.rs b/engine/packages/epoxy/src/ops/propose.rs index badf62aabe..b578d11127 100644 --- a/engine/packages/epoxy/src/ops/propose.rs +++ b/engine/packages/epoxy/src/ops/propose.rs @@ -5,7 +5,6 @@ use gas::prelude::*; use rand::Rng; use rivet_api_builder::ApiCtx; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeSet, HashSet}; use std::time::{Duration, Instant}; use crate::{ @@ -218,8 +217,11 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result u64 { .min(PREPARE_RETRY_MAX_DELAY_MS) } -/// Returns the quorum members to use for this proposal. This supports scoped proposals because -/// runner configs are often only enabled in a couple of explicitly coupled regions. If -/// `target_replicas` is provided, it validates that the scope is non-empty, includes the local -/// replica, and only contains active replicas. Otherwise it falls back to the full active quorum -/// from the cluster config. -/// -/// Scoped proposals rely on a caller-side invariant: the same key must continue using the same -/// replica scope unless some higher-level reconfiguration step coordinates the membership change. -/// This function does not persist or enforce that per-key scope stability. -fn resolve_quorum_members( - config: &protocol::ClusterConfig, - replica_id: ReplicaId, - target_replicas: Option<&[ReplicaId]>, -) -> Result> { - match target_replicas { - Some(target_replicas) => { - let active = utils::get_quorum_members(config) - .into_iter() - .collect::>(); - let validated = target_replicas.iter().copied().collect::>(); - - if validated.is_empty() { - bail!("target_replicas cannot be empty"); - } - - if !validated.contains(&replica_id) { - bail!("target_replicas must include the local replica"); - } - - if !validated.iter().all(|replica| active.contains(replica)) { - bail!("target_replicas contains an inactive or unknown replica"); - } - - Ok(validated.into_iter().collect()) - } - None => { - let replicas = utils::get_quorum_members(config); - if !replicas.contains(&replica_id) { - bail!("local replica is not active in the current epoxy config"); - } - Ok(replicas) - } - } -} - #[cfg(test)] mod tests { use super::*; use epoxy_protocol::protocol::{ClusterConfig, ReplicaConfig, ReplicaStatus}; use rand::{SeedableRng, rngs::StdRng}; - fn make_config(replicas: &[(ReplicaId, ReplicaStatus)]) -> ClusterConfig { - ClusterConfig { - coordinator_replica_id: replicas[0].0, - epoch: 1, - replicas: replicas - .iter() - .map(|(replica_id, status)| ReplicaConfig { - replica_id: *replica_id, - status: status.clone(), - api_peer_url: String::new(), - guard_url: String::new(), - }) - .collect(), - } - } - - #[test] - fn resolve_quorum_members_none_uses_all_active() { - let config = make_config(&[ - (1, ReplicaStatus::Active), - (2, ReplicaStatus::Active), - (3, ReplicaStatus::Joining), - ]); - let result = resolve_quorum_members(&config, 1, None).unwrap(); - assert_eq!(result, vec![1, 2]); - } - - #[test] - fn resolve_quorum_members_requires_local_replica_to_be_active() { - let config = make_config(&[(1, ReplicaStatus::Learning), (2, ReplicaStatus::Active)]); - let result = resolve_quorum_members(&config, 1, None); - assert!(result.is_err()); - } - - #[test] - fn resolve_quorum_members_scoped_subset() { - let config = make_config(&[ - (1, ReplicaStatus::Active), - (2, ReplicaStatus::Active), - (3, ReplicaStatus::Active), - ]); - let result = resolve_quorum_members(&config, 1, Some(&[1, 2])).unwrap(); - assert_eq!(result, vec![1, 2]); - } - - #[test] - fn resolve_quorum_members_empty_target_errors() { - let config = make_config(&[(1, ReplicaStatus::Active)]); - let result = resolve_quorum_members(&config, 1, Some(&[])); - assert!(result.is_err()); - } - - #[test] - fn resolve_quorum_members_missing_local_errors() { - let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Active)]); - let result = resolve_quorum_members(&config, 1, Some(&[2])); - assert!(result.is_err()); - } - - #[test] - fn resolve_quorum_members_inactive_replica_errors() { - let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Learning)]); - let result = resolve_quorum_members(&config, 1, Some(&[1, 2])); - assert!(result.is_err()); - } - #[test] fn parses_set_command_as_set_proposal() { let proposal = Proposal { diff --git a/engine/packages/epoxy/src/utils.rs b/engine/packages/epoxy/src/utils.rs index 6c7ba3227e..8f50048925 100644 --- a/engine/packages/epoxy/src/utils.rs +++ b/engine/packages/epoxy/src/utils.rs @@ -1,5 +1,6 @@ use anyhow::{Result, bail}; use epoxy_protocol::protocol::{self, ReplicaId}; +use std::collections::{BTreeSet, HashSet}; use std::fmt; use universaldb::{Transaction, utils::IsolationLevel::*}; @@ -32,6 +33,51 @@ pub fn get_quorum_members(config: &protocol::ClusterConfig) -> Vec { .collect() } +/// Returns the quorum members to use for a given kv operation. This supports scoped operations because, for +/// example, runner configs are often only enabled in a couple of explicitly coupled regions. If +/// `target_replicas` is provided, it validates that the scope is non-empty, includes the local +/// replica, and only contains active replicas. Otherwise it falls back to the full active quorum +/// from the cluster config. +/// +/// Scoped proposals rely on a caller-side invariant: the same key must continue using the same +/// replica scope unless some higher-level reconfiguration step coordinates the membership change. +/// This function does not persist or enforce that per-key scope stability. +pub fn resolve_active_quorum_members( + config: &protocol::ClusterConfig, + replica_id: ReplicaId, + target_replicas: Option<&[ReplicaId]>, +) -> Result> { + match target_replicas { + Some(target_replicas) => { + let active = get_quorum_members(config) + .into_iter() + .collect::>(); + let validated = target_replicas.iter().copied().collect::>(); + + if validated.is_empty() { + bail!("target_replicas cannot be empty"); + } + + if !validated.contains(&replica_id) { + bail!("target_replicas must include the local replica"); + } + + if !validated.iter().all(|replica| active.contains(replica)) { + bail!("target_replicas contains an inactive or unknown replica"); + } + + Ok(validated.into_iter().collect()) + } + None => { + let replicas = get_quorum_members(config); + if !replicas.contains(&replica_id) { + bail!("local replica is not active in the current epoxy config"); + } + Ok(replicas) + } + } +} + /// Use this replica list for any action that should still be sent to joining replicas. pub fn get_all_replicas(config: &protocol::ClusterConfig) -> Vec { config.replicas.iter().map(|r| r.replica_id).collect() @@ -195,4 +241,70 @@ mod tests { } } } + + fn make_config(replicas: &[(ReplicaId, ReplicaStatus)]) -> ClusterConfig { + ClusterConfig { + coordinator_replica_id: replicas[0].0, + epoch: 1, + replicas: replicas + .iter() + .map(|(replica_id, status)| ReplicaConfig { + replica_id: *replica_id, + status: status.clone(), + api_peer_url: String::new(), + guard_url: String::new(), + }) + .collect(), + } + } + + #[test] + fn resolve_active_quorum_members_none_uses_all_active() { + let config = make_config(&[ + (1, ReplicaStatus::Active), + (2, ReplicaStatus::Active), + (3, ReplicaStatus::Joining), + ]); + let result = resolve_active_quorum_members(&config, 1, None).unwrap(); + assert_eq!(result, vec![1, 2]); + } + + #[test] + fn resolve_active_quorum_members_requires_local_replica_to_be_active() { + let config = make_config(&[(1, ReplicaStatus::Learning), (2, ReplicaStatus::Active)]); + let result = resolve_active_quorum_members(&config, 1, None); + assert!(result.is_err()); + } + + #[test] + fn resolve_active_quorum_members_scoped_subset() { + let config = make_config(&[ + (1, ReplicaStatus::Active), + (2, ReplicaStatus::Active), + (3, ReplicaStatus::Active), + ]); + let result = resolve_active_quorum_members(&config, 1, Some(&[1, 2])).unwrap(); + assert_eq!(result, vec![1, 2]); + } + + #[test] + fn resolve_active_quorum_members_empty_target_errors() { + let config = make_config(&[(1, ReplicaStatus::Active)]); + let result = resolve_active_quorum_members(&config, 1, Some(&[])); + assert!(result.is_err()); + } + + #[test] + fn resolve_active_quorum_members_missing_local_errors() { + let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Active)]); + let result = resolve_active_quorum_members(&config, 1, Some(&[2])); + assert!(result.is_err()); + } + + #[test] + fn resolve_active_quorum_members_inactive_replica_errors() { + let config = make_config(&[(1, ReplicaStatus::Active), (2, ReplicaStatus::Learning)]); + let result = resolve_active_quorum_members(&config, 1, Some(&[1, 2])); + assert!(result.is_err()); + } } diff --git a/engine/packages/epoxy/tests/kv_get_optimistic.rs b/engine/packages/epoxy/tests/kv_get_optimistic.rs index e13171a127..6a4e5f91c0 100644 --- a/engine/packages/epoxy/tests/kv_get_optimistic.rs +++ b/engine/packages/epoxy/tests/kv_get_optimistic.rs @@ -31,6 +31,7 @@ async fn get_with_behavior( replica_id, key: key.to_vec(), caching_behavior, + target_replicas: None, save_empty: false, }) .await diff --git a/engine/packages/pegboard/src/ops/actor/get_reservation_for_key.rs b/engine/packages/pegboard/src/ops/actor/get_reservation_for_key.rs index 77b28682cd..2dea8c10ec 100644 --- a/engine/packages/pegboard/src/ops/actor/get_reservation_for_key.rs +++ b/engine/packages/pegboard/src/ops/actor/get_reservation_for_key.rs @@ -33,6 +33,7 @@ pub async fn pegboard_actor_get_reservation_for_key( replica_id: ctx.config().epoxy_replica_id(), key: keys::subspace().pack(&reservation_key), caching_behavior: epoxy::protocol::CachingBehavior::Optimistic, + target_replicas: None, save_empty: false, }) .await? diff --git a/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs b/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs index dda5ab3db9..5b19d8a266 100644 --- a/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs +++ b/engine/packages/pegboard/src/ops/runner/list_runner_config_enabled_dcs.rs @@ -71,6 +71,7 @@ async fn list_runner_config_enabled_dcs_inner( replica_id: ctx.config().epoxy_replica_id(), key: namespace::keys::subspace().pack(&runner_config_key), caching_behavior: CachingBehavior::Optimistic, + target_replicas: None, save_empty: true, }) .await?; diff --git a/engine/packages/pegboard/src/workflows/actor/keys.rs b/engine/packages/pegboard/src/workflows/actor/keys.rs index 30236bc4af..05aec479ab 100644 --- a/engine/packages/pegboard/src/workflows/actor/keys.rs +++ b/engine/packages/pegboard/src/workflows/actor/keys.rs @@ -174,6 +174,7 @@ pub async fn lookup_key_optimistic( replica_id: ctx.config().epoxy_replica_id(), key: keys::subspace().pack(&reservation_key), caching_behavior: epoxy::protocol::CachingBehavior::Optimistic, + target_replicas: None, save_empty: false, }) .await? diff --git a/engine/packages/pegboard/src/workflows/actor2/keys.rs b/engine/packages/pegboard/src/workflows/actor2/keys.rs index ee1f95151e..6b68a0eaa2 100644 --- a/engine/packages/pegboard/src/workflows/actor2/keys.rs +++ b/engine/packages/pegboard/src/workflows/actor2/keys.rs @@ -27,94 +27,88 @@ pub async fn reserve_key( actor_id: Id, pool_name: &str, ) -> Result { - let optimistic_reservation_id = ctx + let optimistic_reservation = ctx .activity(LookupKeyOptimisticInput { namespace_id, name: name.to_string(), key: key.to_string(), + pool_name: pool_name.to_string(), }) .await?; - if let Some(reservation_id) = optimistic_reservation_id { + match optimistic_reservation { // Key found optimistically - - handle_existing_reservation(ctx, reservation_id, namespace_id, name, key, actor_id).await - } else { + LookupKeyOptimisticOutput::Found(reservation_id) => { + handle_existing_reservation(ctx, reservation_id, namespace_id, name, key, actor_id) + .await + } // Key not found optimistically + LookupKeyOptimisticOutput::NotFound(new_reservation_id, target_replicas) => { + if !target_replicas.contains(&ctx.config().epoxy_replica_id()) { + let replica_id = target_replicas + .into_iter() + .next() + .context("target_replicas is empty")?; + let dc_label = u16::try_from(replica_id)?; - let new_reservation_id = ctx.activity(GenerateReservationIdInput {}).await?; - let target_replicas = ctx - .v(2) - .activity(ResolveTargetReplicasInput { - namespace_id, - pool_name: pool_name.to_string(), - }) - .await?; + return Ok(ReserveKeyOutput::ForwardToDatacenter { dc_label }); + } - if !target_replicas.contains(&ctx.config().epoxy_replica_id()) { - let replica_id = target_replicas - .first() - .copied() - .ok_or_else(|| anyhow::anyhow!("target_replicas is empty"))?; - let dc_label = u16::try_from(replica_id)?; + let proposal_result = ctx + .activity(ProposeInput { + namespace_id, + name: name.to_string(), + key: key.to_string(), + new_reservation_id, + actor_id, + target_replicas, + }) + .await?; - return Ok(ReserveKeyOutput::ForwardToDatacenter { dc_label }); - } - - let proposal_result = ctx - .activity(ProposeInput { - namespace_id, - name: name.to_string(), - key: key.to_string(), - new_reservation_id, - actor_id, - target_replicas, - }) - .await?; - - match proposal_result { - ProposalResult::Committed => { - let output = ctx - .activity(ReserveActorKeyInput { - namespace_id, - name: name.to_string(), - key: key.to_string(), - actor_id, - create_ts: ctx.create_ts(), - }) - .await?; - match output { - ReserveActorKeyOutput::Success => Ok(ReserveKeyOutput::Success), - ReserveActorKeyOutput::ExistingActor { existing_actor_id } => { - Ok(ReserveKeyOutput::KeyExists { existing_actor_id }) + match proposal_result { + ProposalResult::Committed => { + let output = ctx + .activity(ReserveActorKeyInput { + namespace_id, + name: name.to_string(), + key: key.to_string(), + actor_id, + create_ts: ctx.create_ts(), + }) + .await?; + match output { + ReserveActorKeyOutput::Success => Ok(ReserveKeyOutput::Success), + ReserveActorKeyOutput::ExistingActor { existing_actor_id } => { + Ok(ReserveKeyOutput::KeyExists { existing_actor_id }) + } } } - } - ProposalResult::ConsensusFailed => { - bail!("consensus failed") - } - ProposalResult::CommandError(CommandError::ExpectedValueDoesNotMatch { - current_value, - }) => { - if let Some(current_value) = current_value { - let existing_reservation_id = keys::epoxy::ns::ReservationByKeyKey::new( - namespace_id, - name.to_string(), - key.to_string(), - ) - .deserialize(¤t_value)?; + ProposalResult::ConsensusFailed => { + bail!("consensus failed") + } + ProposalResult::CommandError(CommandError::ExpectedValueDoesNotMatch { + current_value, + }) => { + if let Some(current_value) = current_value { + let existing_reservation_id = keys::epoxy::ns::ReservationByKeyKey::new( + namespace_id, + name.to_string(), + key.to_string(), + ) + .deserialize(¤t_value)?; - handle_existing_reservation( - ctx, - existing_reservation_id, - namespace_id, - name, - key, - actor_id, - ) - .await - } else { - bail!("unreachable: current_value should exist") + handle_existing_reservation( + ctx, + existing_reservation_id, + namespace_id, + name, + key, + actor_id, + ) + .await + } else { + bail!("unreachable: current_value should exist") + } } } } @@ -157,13 +151,31 @@ pub struct LookupKeyOptimisticInput { namespace_id: Id, name: String, key: String, + pool_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +#[serde(rename_all = "snake_case")] +pub enum LookupKeyOptimisticOutput { + Found(Id), + NotFound(Id, Vec), } #[activity(LookupKeyOptimistic)] pub async fn lookup_key_optimistic( ctx: &ActivityCtx, input: &LookupKeyOptimisticInput, -) -> Result> { +) -> Result { + let replicas = ctx + .op( + crate::ops::runner::list_runner_config_epoxy_replica_ids::Input { + namespace_id: input.namespace_id, + runner_name: input.pool_name.clone(), + }, + ) + .await? + .replicas; + let reservation_key = keys::epoxy::ns::ReservationByKeyKey::new( input.namespace_id, input.name.clone(), @@ -174,58 +186,23 @@ pub async fn lookup_key_optimistic( replica_id: ctx.config().epoxy_replica_id(), key: keys::subspace().pack(&reservation_key), caching_behavior: epoxy::protocol::CachingBehavior::Optimistic, + target_replicas: Some(replicas.clone()), save_empty: false, }) .await? .value; if let Some(value) = value { let reservation_id = reservation_key.deserialize(&value)?; - Ok(Some(reservation_id)) + Ok(LookupKeyOptimisticOutput::Found(reservation_id)) } else { - Ok(None) + let new_reservation_id = Id::new_v1(ctx.config().dc_label()); + Ok(LookupKeyOptimisticOutput::NotFound( + new_reservation_id, + replicas, + )) } } -#[derive(Debug, Clone, Serialize, Deserialize, Hash)] -pub struct GenerateReservationIdInput {} - -#[activity(GenerateReservationId)] -pub async fn generate_reservation_id( - ctx: &ActivityCtx, - input: &GenerateReservationIdInput, -) -> Result { - Ok(Id::new_v1(ctx.config().dc_label())) -} - -#[derive(Debug, Clone, Serialize, Deserialize, Hash)] -pub struct ResolveTargetReplicasInput { - namespace_id: Id, - pool_name: String, -} - -#[activity(ResolveTargetReplicas)] -pub async fn resolve_target_replicas( - ctx: &ActivityCtx, - input: &ResolveTargetReplicasInput, -) -> Result> { - let start = std::time::Instant::now(); - let replicas = ctx - .op( - crate::ops::runner::list_runner_config_epoxy_replica_ids::Input { - namespace_id: input.namespace_id, - runner_name: input.pool_name.clone(), - }, - ) - .await? - .replicas; - tracing::debug!( - op_duration_ms = %start.elapsed().as_millis(), - ?replicas, - "resolve_target_replicas op completed" - ); - Ok(replicas) -} - #[derive(Debug, Clone, Serialize, Deserialize, Hash)] pub struct ProposeInput { namespace_id: Id,