Skip to content

Commit 0b43aa9

Browse files
committed
fix(epoxy): add list operation
1 parent f4762f2 commit 0b43aa9

File tree

5 files changed

+117
-142
lines changed

5 files changed

+117
-142
lines changed
Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,110 @@
1-
use anyhow::*;
1+
use anyhow::Result;
22
use epoxy_protocol::protocol::ReplicaId;
33
use gas::prelude::*;
4+
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};
45

5-
use super::read_value;
6+
use crate::keys::{
7+
self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
8+
};
69

710
#[derive(Debug)]
811
pub struct Input {
912
pub replica_id: ReplicaId,
1013
pub key: Vec<u8>,
1114
}
1215

16+
#[operation]
17+
pub async fn epoxy_kv_get_local(
18+
ctx: &OperationCtx,
19+
input: &Input,
20+
) -> Result<Option<CommittedValue>> {
21+
Ok(read_local_value(ctx, input.replica_id, &input.key, false)
22+
.await?
23+
.value)
24+
}
25+
1326
#[derive(Debug)]
14-
pub struct Output {
15-
pub value: Option<Vec<u8>>,
16-
pub version: Option<u64>,
17-
pub mutable: bool,
27+
pub(crate) struct LocalValueRead {
28+
pub value: Option<CommittedValue>,
29+
pub cache_value: Option<CommittedValue>,
1830
}
1931

20-
#[operation]
21-
pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Output> {
22-
let committed_value =
23-
read_value::read_local_value(ctx, input.replica_id, input.key.clone(), false)
24-
.await?
25-
.value;
26-
27-
Ok(Output {
28-
value: committed_value.as_ref().map(|value| value.value.clone()),
29-
version: committed_value.as_ref().map(|value| value.version),
30-
mutable: committed_value
31-
.as_ref()
32-
.map(|value| value.mutable)
33-
.unwrap_or(false),
34-
})
32+
/// Reads a committed value from the local replica with dual-read fallback.
33+
///
34+
/// This performs a cascading lookup across storage generations so that values written
35+
/// before the v2 migration remain readable without a full data migration:
36+
///
37+
/// 1. **V2 value** (`EPOXY_V2/replica/{id}/kv/{key}/value`). The current write path.
38+
/// 2. **Legacy committed value** (`EPOXY_V1/replica/{id}/kv/{key}/committed_value`). Written by
39+
/// the original EPaxos protocol. Deserialized as raw bytes with version 0 and mutable=false.
40+
/// 3. **Optimistic cache** (`EPOXY_V2/replica/{id}/kv/{key}/cache`). Only checked when
41+
/// `include_cache` is true. Contains values fetched from remote replicas for the optimistic
42+
/// read path.
43+
///
44+
/// The first path that returns a value wins. This lets the background backfill migrate data
45+
/// at its own pace without blocking reads.
46+
pub(crate) async fn read_local_value(
47+
ctx: &OperationCtx,
48+
replica_id: ReplicaId,
49+
key: &[u8],
50+
include_cache: bool,
51+
) -> Result<LocalValueRead> {
52+
ctx.udb()?
53+
.run(|tx| {
54+
async move {
55+
let value_key = KvValueKey::new(key.to_vec());
56+
let legacy_value_key = LegacyCommittedValueKey::new(key.to_vec());
57+
let cache_key = KvOptimisticCacheKey::new(key.to_vec());
58+
let packed_value_key = keys::subspace(replica_id).pack(&value_key);
59+
let packed_legacy_value_key =
60+
keys::legacy_subspace(replica_id).pack(&legacy_value_key);
61+
let packed_cache_key = keys::subspace(replica_id).pack(&cache_key);
62+
63+
let (local_value, legacy_value, cache_value) = tokio::try_join!(
64+
tx.get(&packed_value_key, Serializable),
65+
tx.get(&packed_legacy_value_key, Serializable),
66+
async {
67+
if include_cache {
68+
tx.get(&packed_cache_key, Serializable).await
69+
} else {
70+
Ok(None)
71+
}
72+
},
73+
)?;
74+
75+
// V2 committed value (current write path)
76+
if let Some(value) = local_value {
77+
return Ok(LocalValueRead {
78+
value: Some(value_key.deserialize(&value)?),
79+
cache_value: None,
80+
});
81+
}
82+
83+
// Legacy committed value (original EPaxos raw bytes)
84+
if let Some(value) = legacy_value {
85+
return Ok(LocalValueRead {
86+
value: Some(CommittedValue {
87+
value: legacy_value_key.deserialize(&value)?,
88+
version: 0,
89+
mutable: false,
90+
}),
91+
cache_value: None,
92+
});
93+
}
94+
95+
if let Some(value) = cache_value {
96+
return Ok(LocalValueRead {
97+
value: None,
98+
cache_value: Some(cache_key.deserialize(&value)?),
99+
});
100+
}
101+
102+
Ok(LocalValueRead {
103+
value: None,
104+
cache_value: None,
105+
})
106+
}
107+
})
108+
.custom_instrument(tracing::info_span!("read_local_value_tx"))
109+
.await
35110
}

engine/packages/epoxy/src/ops/kv/get_optimistic.rs

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
utils,
1010
};
1111

12-
use super::read_value;
12+
use super::get_local::read_local_value;
1313

1414
#[derive(Debug)]
1515
pub struct Input {
@@ -50,20 +50,14 @@ pub struct Output {
5050
/// best-effort lookup.
5151
#[operation]
5252
pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
53-
let local_read = read_value::read_local_value(
53+
let local_read = read_local_value(
5454
ctx,
5555
input.replica_id,
56-
input.key.clone(),
56+
&input.key,
5757
input.caching_behavior == protocol::CachingBehavior::Optimistic,
5858
)
5959
.await?;
60-
if local_read.value.is_some() {
61-
return Ok(Output {
62-
value: local_read.value.map(|value| value.value),
63-
});
64-
}
65-
66-
if let Some(value) = local_read.cache_value {
60+
if let Some(value) = local_read.value.or(local_read.cache_value) {
6761
return Ok(Output {
6862
value: Some(value.value),
6963
});
@@ -114,22 +108,21 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
114108
)
115109
.await?;
116110

117-
for response in responses {
118-
if let Some(value) = response {
119-
let value = CommittedValue {
120-
value: value.value,
121-
version: value.version,
122-
mutable: value.mutable,
123-
};
124-
125-
if input.caching_behavior == protocol::CachingBehavior::Optimistic {
126-
cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?;
127-
}
111+
// Should only have 1 response
112+
if let Some(value) = responses.first().and_then(|r| r.response) {
113+
let value = CommittedValue {
114+
value: value.value,
115+
version: value.version,
116+
mutable: value.mutable,
117+
};
128118

129-
return Ok(Output {
130-
value: Some(value.value),
131-
});
119+
if input.caching_behavior == protocol::CachingBehavior::Optimistic {
120+
cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?;
132121
}
122+
123+
return Ok(Output {
124+
value: Some(value.value),
125+
});
133126
}
134127

135128
// No value found in any datacenter
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
pub mod get_local;
22
pub mod get_optimistic;
33
pub mod purge_local;
4-
pub(crate) mod read_value;

engine/packages/epoxy/src/ops/kv/read_value.rs

Lines changed: 0 additions & 92 deletions
This file was deleted.

engine/packages/epoxy/src/replica/message_request.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,10 @@ async fn message_request_inner(
129129
.await?;
130130

131131
protocol::ResponseKind::KvGetResponse(protocol::KvGetResponse {
132-
value: result.value.map(|value| protocol::CommittedValue {
133-
value,
134-
version: result.version.unwrap_or(0),
135-
mutable: result.mutable,
132+
value: result.map(|value| protocol::CommittedValue {
133+
value: value.value,
134+
version: value.version,
135+
mutable: value.mutable,
136136
}),
137137
})
138138
}

0 commit comments

Comments
 (0)