diff --git a/Cargo.lock b/Cargo.lock index 224bd14398..e4a1905a4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1484,6 +1484,7 @@ dependencies = [ "universaldb", "url", "uuid", + "vbare", ] [[package]] @@ -1494,6 +1495,7 @@ dependencies = [ "rivet-util", "serde", "serde_bare", + "vbare", "vbare-compiler", ] diff --git a/engine/packages/epoxy/Cargo.toml b/engine/packages/epoxy/Cargo.toml index 02c5c6fb06..3b2e53d59b 100644 --- a/engine/packages/epoxy/Cargo.toml +++ b/engine/packages/epoxy/Cargo.toml @@ -35,6 +35,7 @@ tracing.workspace = true universaldb.workspace = true url.workspace = true uuid.workspace = true +vbare.workspace = true [dev-dependencies] gas.workspace = true diff --git a/engine/packages/epoxy/src/keys/keys.rs b/engine/packages/epoxy/src/keys/keys.rs index 9701534a65..ef16effd94 100644 --- a/engine/packages/epoxy/src/keys/keys.rs +++ b/engine/packages/epoxy/src/keys/keys.rs @@ -1,17 +1,9 @@ use anyhow::Result; -use epoxy_protocol::protocol; +use epoxy_protocol::{PROTOCOL_VERSION, protocol, versioned}; use serde::{Deserialize, Serialize}; use universaldb::prelude::*; use universaldb::tuple::Versionstamp; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub struct CommittedValue { - // NOTE: An empty value may exist for cached entries to denote the value was not found on any datacenter - // and cached as such. - pub value: Vec, - pub version: u64, - pub mutable: bool, -} +use vbare::OwnedVersionedData; /// In-flight accepted proposal state stored under `kv/{key}/accepted`. /// @@ -44,14 +36,15 @@ impl KvValueKey { } impl FormalKey for KvValueKey { - type Value = CommittedValue; + type Value = protocol::CommittedValue; fn deserialize(&self, raw: &[u8]) -> Result { - serde_bare::from_slice(raw).map_err(Into::into) + versioned::CommittedValue::deserialize_with_embedded_version(raw) } fn serialize(&self, value: Self::Value) -> Result> { - serde_bare::to_vec(&value).map_err(Into::into) + versioned::CommittedValue::wrap_latest(value) + .serialize_with_embedded_version(PROTOCOL_VERSION) } } @@ -260,14 +253,14 @@ impl KvOptimisticCacheKey { } impl FormalKey for KvOptimisticCacheKey { - type Value = CommittedValue; + type Value = protocol::CachedValue; fn deserialize(&self, raw: &[u8]) -> Result { - serde_bare::from_slice(raw).map_err(Into::into) + versioned::CachedValue::deserialize_with_embedded_version(raw) } fn serialize(&self, value: Self::Value) -> Result> { - serde_bare::to_vec(&value).map_err(Into::into) + versioned::CachedValue::wrap_latest(value).serialize_with_embedded_version(PROTOCOL_VERSION) } } diff --git a/engine/packages/epoxy/src/keys/mod.rs b/engine/packages/epoxy/src/keys/mod.rs index 26917d20e1..80d48a108b 100644 --- a/engine/packages/epoxy/src/keys/mod.rs +++ b/engine/packages/epoxy/src/keys/mod.rs @@ -5,8 +5,8 @@ pub mod keys; pub mod replica; pub use self::keys::{ - ChangelogKey, CommittedValue, KvAcceptedKey, KvAcceptedValue, KvBallotKey, - KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey, + ChangelogKey, KvAcceptedKey, KvAcceptedValue, KvBallotKey, KvOptimisticCacheKey, KvValueKey, + LegacyCommittedValueKey, }; pub use self::replica::ConfigKey; diff --git a/engine/packages/epoxy/src/ops/kv/get_local.rs b/engine/packages/epoxy/src/ops/kv/get_local.rs index 3108785e00..c189b1b188 100644 --- a/engine/packages/epoxy/src/ops/kv/get_local.rs +++ b/engine/packages/epoxy/src/ops/kv/get_local.rs @@ -1,11 +1,9 @@ use anyhow::Result; -use epoxy_protocol::protocol::ReplicaId; +use epoxy_protocol::protocol::{CachedValue, CommittedValue, ReplicaId}; use gas::prelude::*; use universaldb::utils::{FormalKey, IsolationLevel::Serializable}; -use crate::keys::{ - self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey, -}; +use crate::keys::{self, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey}; #[derive(Debug)] pub struct Input { @@ -26,7 +24,7 @@ pub async fn epoxy_kv_get_local( #[derive(Debug)] pub(crate) struct LocalValueRead { pub value: Option, - pub cache_value: Option, + pub cache_value: Option, } /// Reads a committed value from the local replica with dual-read fallback. @@ -95,18 +93,12 @@ pub(crate) async fn read_local_value( if let Some(value) = cache_value { let cache_value = cache_key.deserialize(&value)?; - // Special case with empty values. These are inserted in kv_get_optimistic with `save_empty` - if cache_value.value.is_empty() { + if let Some(value) = cache_value.value { return Ok(LocalValueRead { value: None, - cache_value: None, + cache_value: Some(cache_key.deserialize(&value)?), }); } - - return Ok(LocalValueRead { - value: None, - cache_value: Some(cache_key.deserialize(&value)?), - }); } Ok(LocalValueRead { diff --git a/engine/packages/epoxy/src/ops/kv/get_optimistic.rs b/engine/packages/epoxy/src/ops/kv/get_optimistic.rs index 82e3c34557..4c491480ef 100644 --- a/engine/packages/epoxy/src/ops/kv/get_optimistic.rs +++ b/engine/packages/epoxy/src/ops/kv/get_optimistic.rs @@ -1,14 +1,10 @@ use anyhow::*; -use epoxy_protocol::protocol::{self, ReplicaId}; +use epoxy_protocol::protocol::{self, CachedValue, CommittedValue, ReplicaId}; use gas::prelude::*; use rivet_api_builder::ApiCtx; use universaldb::prelude::*; -use crate::{ - http_client, - keys::{self, CommittedValue}, - utils, -}; +use crate::{http_client, keys, utils}; use super::get_local::read_local_value; @@ -68,10 +64,12 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul input.caching_behavior == protocol::CachingBehavior::Optimistic, ) .await?; - if let Some(value) = local_read.value.or(local_read.cache_value) { - return Ok(Output { - value: Some(value.value), - }); + if let Some(value) = local_read + .value + .map(|v| v.value) + .or(local_read.cache_value.and_then(|v| v.value)) + { + return Ok(Output { value: Some(value) }); } // Request fanout to other datacenters, return first datacenter with any non-none value @@ -169,7 +167,13 @@ async fn cache_fanout_value( } } - tx.write(&cache_key, value_to_cache.clone())?; + tx.write( + &cache_key, + CachedValue { + value: Some(value_to_cache.value.clone()), + version: value_to_cache.version, + }, + )?; Ok(()) } }) @@ -189,11 +193,9 @@ async fn cache_empty_value(ctx: &OperationCtx, replica_id: ReplicaId, key: &[u8] tx.write( &cache_key, - CommittedValue { - value: Vec::new(), + CachedValue { + value: None, version: 0, - // TODO: What should this be set to? - mutable: true, }, )?; Ok(()) diff --git a/engine/packages/epoxy/src/ops/propose.rs b/engine/packages/epoxy/src/ops/propose.rs index b578d11127..c40b854fc0 100644 --- a/engine/packages/epoxy/src/ops/propose.rs +++ b/engine/packages/epoxy/src/ops/propose.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result, bail}; -use epoxy_protocol::protocol::{self, ReplicaId}; +use epoxy_protocol::protocol::{self, CommittedValue, ReplicaId}; use futures_util::{StreamExt, stream::FuturesUnordered}; use gas::prelude::*; use rand::Rng; @@ -8,9 +8,7 @@ use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; use crate::{ - http_client, - keys::CommittedValue, - metrics, + http_client, metrics, replica::{ ballot::{self, Ballot, BallotSelection}, commit_kv::{self, CommitKvOutcome}, diff --git a/engine/packages/epoxy/src/replica/ballot.rs b/engine/packages/epoxy/src/replica/ballot.rs index 33e169178a..15abb0ea81 100644 --- a/engine/packages/epoxy/src/replica/ballot.rs +++ b/engine/packages/epoxy/src/replica/ballot.rs @@ -1,10 +1,10 @@ use anyhow::{Context, Result}; -use epoxy_protocol::protocol; +use epoxy_protocol::protocol::{self, CommittedValue}; use std::cmp::Ordering; use universaldb::Transaction; use universaldb::utils::{FormalKey, IsolationLevel::Serializable}; -use crate::keys::{self, CommittedValue, KvBallotKey, KvValueKey, LegacyCommittedValueKey}; +use crate::keys::{self, KvBallotKey, KvValueKey, LegacyCommittedValueKey}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Ballot { diff --git a/engine/packages/epoxy/src/replica/changelog.rs b/engine/packages/epoxy/src/replica/changelog.rs index c86a654d77..d4e9c4588f 100644 --- a/engine/packages/epoxy/src/replica/changelog.rs +++ b/engine/packages/epoxy/src/replica/changelog.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result, bail}; -use epoxy_protocol::protocol; +use epoxy_protocol::protocol::{self, CommittedValue}; use futures_util::TryStreamExt; use universaldb::{ KeySelector, RangeOption, Transaction, @@ -10,8 +10,7 @@ use universaldb::{ }; use crate::keys::{ - self, ChangelogKey, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, - KvValueKey, + self, ChangelogKey, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey, }; use crate::metrics; diff --git a/engine/packages/epoxy/src/replica/commit_kv.rs b/engine/packages/epoxy/src/replica/commit_kv.rs index 7bf98a66b3..c6191df454 100644 --- a/engine/packages/epoxy/src/replica/commit_kv.rs +++ b/engine/packages/epoxy/src/replica/commit_kv.rs @@ -1,9 +1,9 @@ use anyhow::Result; -use epoxy_protocol::protocol; +use epoxy_protocol::protocol::{self, CommittedValue}; use universaldb::{Transaction, utils::IsolationLevel::Serializable}; use crate::{ - keys::{self, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey}, + keys::{self, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey}, replica::ballot::Ballot, }; diff --git a/engine/packages/epoxy/src/workflows/backfill.rs b/engine/packages/epoxy/src/workflows/backfill.rs index b0f86df8e8..d79a47a139 100644 --- a/engine/packages/epoxy/src/workflows/backfill.rs +++ b/engine/packages/epoxy/src/workflows/backfill.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use epoxy_protocol::protocol; +use epoxy_protocol::protocol::{self, CommittedValue}; use futures_util::{FutureExt, TryStreamExt}; use gas::prelude::*; use serde::{Deserialize, Serialize}; @@ -13,7 +13,7 @@ use universaldb::{ }, }; -use crate::keys::{self, CommittedValue, KvValueKey, LegacyCommittedValueKey}; +use crate::keys::{self, KvValueKey, LegacyCommittedValueKey}; const DEFAULT_CHUNK_SIZE: usize = 500; diff --git a/engine/packages/epoxy/tests/kv.rs b/engine/packages/epoxy/tests/kv.rs index e0db4c4dcb..17c354b069 100644 --- a/engine/packages/epoxy/tests/kv.rs +++ b/engine/packages/epoxy/tests/kv.rs @@ -1,6 +1,7 @@ mod common; use epoxy::ops::propose::{CommandError, ProposalResult}; +use epoxy_protocol::protocol::CommittedValue; use common::{ THREE_REPLICAS, TestCtx, @@ -80,7 +81,7 @@ async fn test_kv_operations() { assert!(matches!(first_result, ProposalResult::Committed)); assert_eq!( read_v2_committed_value(ctx, replica_id, key).await.unwrap(), - Some(epoxy::keys::CommittedValue { + Some(CommittedValue { value: b"value1".to_vec(), version: 1, mutable: true, @@ -92,7 +93,7 @@ async fn test_kv_operations() { assert!(matches!(second_result, ProposalResult::Committed)); assert_eq!( read_v2_committed_value(ctx, replica_id, key).await.unwrap(), - Some(epoxy::keys::CommittedValue { + Some(CommittedValue { value: b"value2".to_vec(), version: 2, mutable: true, diff --git a/engine/packages/epoxy/tests/kv_get_optimistic.rs b/engine/packages/epoxy/tests/kv_get_optimistic.rs index 6a4e5f91c0..64c1a17d84 100644 --- a/engine/packages/epoxy/tests/kv_get_optimistic.rs +++ b/engine/packages/epoxy/tests/kv_get_optimistic.rs @@ -9,7 +9,7 @@ use common::{ }, }; use epoxy::ops::propose::ProposalResult; -use epoxy_protocol::protocol::{CachingBehavior, ReplicaId}; +use epoxy_protocol::protocol::{CachingBehavior, CommittedValue, ReplicaId}; static TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); @@ -182,7 +182,7 @@ async fn test_kv_get_optimistic_paths() { test_ctx.get_ctx(writer_replica_id), writer_replica_id, key, - epoxy::keys::CommittedValue { + CommittedValue { value: b"remote-value".to_vec(), version: 2, mutable: true, @@ -194,7 +194,7 @@ async fn test_kv_get_optimistic_paths() { test_ctx.get_ctx(reader_replica_id), reader_replica_id, key, - epoxy::keys::CommittedValue { + CommittedValue { value: b"stale-cache".to_vec(), version: 1, mutable: true, @@ -221,7 +221,7 @@ async fn test_kv_get_optimistic_paths() { ) .await .unwrap(), - Some(epoxy::keys::CommittedValue { + Some(CommittedValue { value: b"stale-cache".to_vec(), version: 1, mutable: true, @@ -247,7 +247,7 @@ async fn test_kv_get_optimistic_paths() { follower_ctx, follower_replica_id, key, - epoxy::keys::CommittedValue { + CommittedValue { value: b"value1".to_vec(), version: 1, mutable: true, diff --git a/engine/packages/pegboard-envoy/src/conn.rs b/engine/packages/pegboard-envoy/src/conn.rs index c281b35502..f61dc406d6 100644 --- a/engine/packages/pegboard-envoy/src/conn.rs +++ b/engine/packages/pegboard-envoy/src/conn.rs @@ -67,6 +67,7 @@ pub async fn init_conn( } .build()); }; + let is_serverless = matches!(pool.config.kind, RunnerConfigKind::Serverless { .. }); tracing::debug!(namespace_id=?namespace.namespace_id, "new envoy connection"); @@ -81,16 +82,6 @@ pub async fn init_conn( .with_label_values(&[namespace.namespace_id.to_string().as_str(), &pool_name]) .observe(start.elapsed().as_secs_f64()); - let serverless_drain_grace_period = if let RunnerConfigKind::Serverless { - drain_grace_period, - .. - } = &pool.config.kind - { - Some(*drain_grace_period as i64) - } else { - None - }; - let udb = ctx.udb()?; let (_, mut missed_commands) = tokio::try_join!( // Send init packet as soon as possible @@ -103,7 +94,6 @@ pub async fn init_conn( metadata: protocol::ProtocolMetadata { envoy_lost_threshold: pb.envoy_lost_threshold(), actor_stop_threshold: pb.actor_stop_threshold(), - serverless_drain_grace_period, max_response_payload_size: pb.envoy_max_response_payload_size() as u64, }, }, @@ -317,8 +307,6 @@ pub async fn init_conn( .await?; } - let is_serverless = serverless_drain_grace_period.is_some(); - if is_serverless { report_success(ctx, namespace.namespace_id, &pool_name).await; } diff --git a/engine/packages/pegboard/src/ops/envoy/drain.rs b/engine/packages/pegboard/src/ops/envoy/drain.rs index 4489a08282..4ec43e59ae 100644 --- a/engine/packages/pegboard/src/ops/envoy/drain.rs +++ b/engine/packages/pegboard/src/ops/envoy/drain.rs @@ -16,6 +16,7 @@ pub struct Input { pub version: u32, } +// NOTE: Only applies to serverless #[operation] pub async fn pegboard_envoy_drain_older_versions(ctx: &OperationCtx, input: &Input) -> Result<()> { let pool_res = ctx diff --git a/engine/sdks/rust/epoxy-protocol/Cargo.toml b/engine/sdks/rust/epoxy-protocol/Cargo.toml index fa2ff9078a..4df187d039 100644 --- a/engine/sdks/rust/epoxy-protocol/Cargo.toml +++ b/engine/sdks/rust/epoxy-protocol/Cargo.toml @@ -10,6 +10,7 @@ anyhow.workspace = true rivet-util.workspace = true serde_bare.workspace = true serde.workspace = true +vbare.workspace = true [build-dependencies] vbare-compiler.workspace = true diff --git a/engine/sdks/rust/epoxy-protocol/src/lib.rs b/engine/sdks/rust/epoxy-protocol/src/lib.rs index 04bb4c39e7..0016517550 100644 --- a/engine/sdks/rust/epoxy-protocol/src/lib.rs +++ b/engine/sdks/rust/epoxy-protocol/src/lib.rs @@ -1,4 +1,5 @@ pub mod generated; pub mod protocol; +pub mod versioned; pub const PROTOCOL_VERSION: u16 = 2; diff --git a/engine/sdks/rust/epoxy-protocol/src/protocol.rs b/engine/sdks/rust/epoxy-protocol/src/protocol.rs index 33ba812261..767d385896 100644 --- a/engine/sdks/rust/epoxy-protocol/src/protocol.rs +++ b/engine/sdks/rust/epoxy-protocol/src/protocol.rs @@ -8,6 +8,7 @@ pub type ReplicaConfig = raw::ReplicaConfig; pub type ClusterConfig = raw::ClusterConfig; pub type Ballot = raw::Ballot; pub type CommittedValue = raw::CommittedValue; +pub type CachedValue = raw::CachedValue; pub type UpdateConfigRequest = raw::UpdateConfigRequest; pub type UpdateConfigResponse = raw::UpdateConfigResponse; diff --git a/engine/sdks/rust/epoxy-protocol/src/versioned.rs b/engine/sdks/rust/epoxy-protocol/src/versioned.rs new file mode 100644 index 0000000000..b2ae1d7e52 --- /dev/null +++ b/engine/sdks/rust/epoxy-protocol/src/versioned.rs @@ -0,0 +1,88 @@ +use anyhow::{Result, bail}; +use vbare::OwnedVersionedData; + +use crate::generated::v2; + +pub enum CommittedValue { + V2(v2::CommittedValue), +} + +impl OwnedVersionedData for CommittedValue { + type Latest = v2::CommittedValue; + + fn wrap_latest(latest: v2::CommittedValue) -> Self { + CommittedValue::V2(latest) + } + + fn unwrap_latest(self) -> Result { + #[allow(irrefutable_let_patterns)] + if let CommittedValue::V2(data) = self { + Ok(data) + } else { + bail!("version not latest"); + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 2 => Ok(CommittedValue::V2(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + CommittedValue::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } + + fn deserialize_converters() -> Vec Result> { + vec![Ok] + } + + fn serialize_converters() -> Vec Result> { + vec![Ok] + } +} + +pub enum CachedValue { + V2(v2::CachedValue), +} + +impl OwnedVersionedData for CachedValue { + type Latest = v2::CachedValue; + + fn wrap_latest(latest: v2::CachedValue) -> Self { + CachedValue::V2(latest) + } + + fn unwrap_latest(self) -> Result { + #[allow(irrefutable_let_patterns)] + if let CachedValue::V2(data) = self { + Ok(data) + } else { + bail!("version not latest"); + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 2 => Ok(CachedValue::V2(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + CachedValue::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } + + fn deserialize_converters() -> Vec Result> { + vec![Ok] + } + + fn serialize_converters() -> Vec Result> { + vec![Ok] + } +} diff --git a/engine/sdks/schemas/envoy-protocol/v1.bare b/engine/sdks/schemas/envoy-protocol/v1.bare index c08e30d5c9..b0f13b09fe 100644 --- a/engine/sdks/schemas/envoy-protocol/v1.bare +++ b/engine/sdks/schemas/envoy-protocol/v1.bare @@ -388,7 +388,6 @@ type ToRivet union { type ProtocolMetadata struct { envoyLostThreshold: i64 actorStopThreshold: i64 - serverlessDrainGracePeriod: optional maxResponsePayloadSize: u64 } diff --git a/engine/sdks/schemas/epoxy-protocol/v2.bare b/engine/sdks/schemas/epoxy-protocol/v2.bare index aad9173816..21f51dcba5 100644 --- a/engine/sdks/schemas/epoxy-protocol/v2.bare +++ b/engine/sdks/schemas/epoxy-protocol/v2.bare @@ -12,6 +12,11 @@ type CommittedValue struct { mutable: bool } +type CachedValue struct { + value: optional + version: u64 +} + # MARK: Cluster config type ReplicaStatus enum { # Receives nothing diff --git a/engine/sdks/typescript/envoy-protocol/src/index.ts b/engine/sdks/typescript/envoy-protocol/src/index.ts index 4b86eaf903..b919d069c8 100644 --- a/engine/sdks/typescript/envoy-protocol/src/index.ts +++ b/engine/sdks/typescript/envoy-protocol/src/index.ts @@ -1901,7 +1901,6 @@ export function decodeToRivet(bytes: Uint8Array): ToRivet { export type ProtocolMetadata = { readonly envoyLostThreshold: i64 readonly actorStopThreshold: i64 - readonly serverlessDrainGracePeriod: i64 | null readonly maxResponsePayloadSize: u64 } @@ -1909,7 +1908,6 @@ export function readProtocolMetadata(bc: bare.ByteCursor): ProtocolMetadata { return { envoyLostThreshold: bare.readI64(bc), actorStopThreshold: bare.readI64(bc), - serverlessDrainGracePeriod: read7(bc), maxResponsePayloadSize: bare.readU64(bc), } } @@ -1917,7 +1915,6 @@ export function readProtocolMetadata(bc: bare.ByteCursor): ProtocolMetadata { export function writeProtocolMetadata(bc: bare.ByteCursor, x: ProtocolMetadata): void { bare.writeI64(bc, x.envoyLostThreshold) bare.writeI64(bc, x.actorStopThreshold) - write7(bc, x.serverlessDrainGracePeriod) bare.writeU64(bc, x.maxResponsePayloadSize) }