Skip to content

Commit a14fd3f

Browse files
committed
chore: misc fixes, add pb snapshot test
1 parent 9f06a14 commit a14fd3f

File tree

82 files changed

+1184
-972
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+1184
-972
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/epoxy/src/consts.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,3 @@ pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
88
/// This keeps learner range reads bounded while still making steady progress through the
99
/// immutable per-key commit history.
1010
pub const CHANGELOG_READ_COUNT: u64 = 5_000;
11-

engine/packages/epoxy/src/http_routes.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ pub fn mount_routes(
1616
) -> axum::Router<rivet_api_builder::GlobalApiCtx> {
1717
router
1818
.route("/v{version}/epoxy/message", bin::post(message))
19-
.route("/v{version}/epoxy/changelog-read", bin::post(changelog_read))
19+
.route(
20+
"/v{version}/epoxy/changelog-read",
21+
bin::post(changelog_read),
22+
)
2023
}
2124

2225
pub async fn message(ctx: ApiCtx, path: ProtocolPath, _query: (), body: Bytes) -> Result<Vec<u8>> {
@@ -66,7 +69,9 @@ fn request_kind_label(kind: &protocol::RequestKind) -> &'static str {
6669
protocol::RequestKind::CommitRequest(_) => "commit",
6770
protocol::RequestKind::ChangelogReadRequest(_) => "changelog_read",
6871
protocol::RequestKind::HealthCheckRequest => "health_check",
69-
protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => "coordinator_update_replica_status",
72+
protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => {
73+
"coordinator_update_replica_status"
74+
}
7075
protocol::RequestKind::BeginLearningRequest(_) => "begin_learning",
7176
protocol::RequestKind::KvGetRequest(_) => "kv_get",
7277
protocol::RequestKind::KvPurgeCacheRequest(_) => "kv_purge_cache",

engine/packages/epoxy/src/keys/keys.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,7 @@ impl TuplePack for ChangelogKey {
336336

337337
impl<'de> TupleUnpack<'de> for ChangelogKey {
338338
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
339-
let (input, (root, versionstamp)) =
340-
<(usize, Versionstamp)>::unpack(input, tuple_depth)?;
339+
let (input, (root, versionstamp)) = <(usize, Versionstamp)>::unpack(input, tuple_depth)?;
341340
if root != CHANGELOG {
342341
return Err(PackError::Message("expected CHANGELOG root".into()));
343342
}
@@ -347,4 +346,3 @@ impl<'de> TupleUnpack<'de> for ChangelogKey {
347346
Ok((input, v))
348347
}
349348
}
350-

engine/packages/epoxy/src/metrics.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ pub fn record_changelog_append() {
9696
}
9797

9898
pub fn record_request(request_type: &str, result: &str, duration: std::time::Duration) {
99-
REQUEST_TOTAL.with_label_values(&[request_type, result]).inc();
99+
REQUEST_TOTAL
100+
.with_label_values(&[request_type, result])
101+
.inc();
100102
REQUEST_DURATION
101103
.with_label_values(&[request_type])
102104
.observe(duration.as_secs_f64());

engine/packages/epoxy/src/ops/kv/get_local.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@ pub struct Output {
1919

2020
#[operation]
2121
pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Output> {
22-
let committed_value = read_value::read_local_value(
23-
ctx,
24-
input.replica_id,
25-
input.key.clone(),
26-
false,
27-
)
28-
.await?
29-
.value;
22+
let committed_value =
23+
read_value::read_local_value(ctx, input.replica_id, input.key.clone(), false)
24+
.await?
25+
.value;
3026

3127
Ok(Output {
3228
value: committed_value.as_ref().map(|value| value.value.clone()),

engine/packages/epoxy/src/ops/kv/get_optimistic.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
123123
};
124124

125125
if input.caching_behavior == protocol::CachingBehavior::Optimistic {
126-
cache_fanout_value(
127-
ctx,
128-
input.replica_id,
129-
input.key.clone(),
130-
value.clone(),
131-
)
132-
.await?;
126+
cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?;
133127
}
134128

135129
return Ok(Output {
@@ -162,7 +156,10 @@ async fn cache_fanout_value(
162156
// This covers the race where a commit lands between the fanout read and
163157
// the cache write.
164158
if let Some(committed_value) = tx
165-
.read_opt(&committed_key, universaldb::utils::IsolationLevel::Serializable)
159+
.read_opt(
160+
&committed_key,
161+
universaldb::utils::IsolationLevel::Serializable,
162+
)
166163
.await?
167164
{
168165
if committed_value.version >= value_to_cache.version {
@@ -174,10 +171,7 @@ async fn cache_fanout_value(
174171
// prevents a slow fanout response from overwriting a fresher cache entry
175172
// written by a concurrent request.
176173
if let Some(existing_cache) = tx
177-
.read_opt(
178-
&cache_key,
179-
universaldb::utils::IsolationLevel::Serializable,
180-
)
174+
.read_opt(&cache_key, universaldb::utils::IsolationLevel::Serializable)
181175
.await?
182176
{
183177
if existing_cache.version > value_to_cache.version {

engine/packages/epoxy/src/ops/kv/read_value.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use epoxy_protocol::protocol::ReplicaId;
33
use gas::prelude::*;
44
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};
55

6-
use crate::keys::{self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey};
6+
use crate::keys::{
7+
self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
8+
};
79

810
#[derive(Debug)]
911
pub(crate) struct LocalValueRead {

engine/packages/epoxy/src/ops/propose.rs

Lines changed: 81 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use std::collections::{BTreeSet, HashSet};
99
use std::time::{Duration, Instant};
1010

1111
use crate::{
12-
http_client, metrics,
12+
http_client,
1313
keys::CommittedValue,
14+
metrics,
1415
replica::{
1516
ballot::{self, Ballot, BallotSelection},
1617
commit_kv::{self, CommitKvOutcome},
@@ -101,13 +102,11 @@ impl SetProposal {
101102
key,
102103
expect_one_of,
103104
new_value: Some(value),
104-
}) if expect_one_of.len() == 1 && matches!(expect_one_of.first(), Some(None)) => {
105-
Ok(Self {
106-
key: key.clone(),
107-
value: value.clone(),
108-
mutable,
109-
})
110-
}
105+
}) if expect_one_of.len() == 1 && matches!(expect_one_of.first(), Some(None)) => Ok(Self {
106+
key: key.clone(),
107+
value: value.clone(),
108+
mutable,
109+
}),
111110
_ => bail!(
112111
"epoxy v2 only supports single-key set-if-absent proposals with a concrete value"
113112
),
@@ -314,14 +313,14 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
314313
replica_id,
315314
&quorum_members,
316315
proposal.key.clone(),
317-
CommittedValue {
318-
value: proposal.value.clone(),
319-
version: 1,
320-
mutable: proposal.mutable,
321-
},
322-
ballot,
323-
)
324-
.await?
316+
CommittedValue {
317+
value: proposal.value.clone(),
318+
version: 1,
319+
mutable: proposal.mutable,
320+
},
321+
ballot,
322+
)
323+
.await?
325324
{
326325
PreparePhaseOutcome::Prepared { ballot, value } => {
327326
run_slow_path(
@@ -444,16 +443,7 @@ async fn run_accept_path(
444443
version,
445444
mutable,
446445
} = value;
447-
commit_kv::commit_kv(
448-
&tx,
449-
replica_id,
450-
key,
451-
value,
452-
ballot,
453-
mutable,
454-
version,
455-
)
456-
.await
446+
commit_kv::commit_kv(&tx, replica_id, key, value, ballot, mutable, version).await
457447
}
458448
})
459449
.custom_instrument(tracing::info_span!("commit_kv_tx"))
@@ -472,16 +462,15 @@ async fn run_accept_path(
472462
let ballot = ballot;
473463
let purge_cache = purge_cache && proposal.mutable;
474464
async move {
475-
if let Err(err) =
476-
broadcast_commits(
477-
&ctx,
478-
&config,
479-
replica_id,
480-
key.clone(),
481-
chosen_value.clone(),
482-
ballot,
483-
)
484-
.await
465+
if let Err(err) = broadcast_commits(
466+
&ctx,
467+
&config,
468+
replica_id,
469+
key.clone(),
470+
chosen_value.clone(),
471+
ballot,
472+
)
473+
.await
485474
{
486475
tracing::warn!(?err, "commit broadcast failed after local commit");
487476
}
@@ -550,7 +539,8 @@ async fn run_prepare_phase(
550539
}
551540
PrepareRoundOutcome::Retry { next_ballot } => {
552541
store_prepare_ballot(ctx, replica_id, key.clone(), next_ballot).await?;
553-
let Some(retry_delay) = next_prepare_retry_delay(retry_count, &mut rand::thread_rng())
542+
let Some(retry_delay) =
543+
next_prepare_retry_delay(retry_count, &mut rand::thread_rng())
554544
else {
555545
tracing::warn!(
556546
%replica_id,
@@ -592,31 +582,32 @@ async fn send_prepare_round(
592582
ballot: protocol::Ballot,
593583
) -> Result<PrepareRoundOutcome> {
594584
let target = utils::calculate_quorum(replica_ids.len(), utils::QuorumType::Slow);
595-
let mut pending = futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| {
596-
let key = key.clone();
597-
let proposed_value = proposed_value.clone();
598-
let ballot = ballot.clone();
599-
async move {
600-
(
601-
to_replica_id,
602-
tokio::time::timeout(
603-
crate::consts::REQUEST_TIMEOUT,
604-
send_prepare_request(
605-
ctx,
606-
config,
607-
from_replica_id,
608-
to_replica_id,
609-
key,
610-
proposed_value,
611-
ballot,
612-
),
585+
let mut pending =
586+
futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| {
587+
let key = key.clone();
588+
let proposed_value = proposed_value.clone();
589+
let ballot = ballot.clone();
590+
async move {
591+
(
592+
to_replica_id,
593+
tokio::time::timeout(
594+
crate::consts::REQUEST_TIMEOUT,
595+
send_prepare_request(
596+
ctx,
597+
config,
598+
from_replica_id,
599+
to_replica_id,
600+
key,
601+
proposed_value,
602+
ballot,
603+
),
604+
)
605+
.await,
613606
)
614-
.await,
615-
)
616-
}
617-
}))
618-
.collect::<FuturesUnordered<_>>()
619-
.await;
607+
}
608+
}))
609+
.collect::<FuturesUnordered<_>>()
610+
.await;
620611

621612
let mut ok_responses = 0;
622613
let mut remaining = replica_ids.len();
@@ -655,7 +646,9 @@ async fn send_prepare_round(
655646
}
656647
(None, None) => {}
657648
_ => {
658-
bail!("prepare response from replica {to_replica_id} returned partial accepted state");
649+
bail!(
650+
"prepare response from replica {to_replica_id} returned partial accepted state"
651+
);
659652
}
660653
}
661654

@@ -720,31 +713,32 @@ async fn send_accept_round(
720713
accept_quorum: utils::QuorumType,
721714
) -> Result<AcceptPhaseOutcome> {
722715
let target = utils::calculate_quorum(replica_ids.len(), accept_quorum);
723-
let mut pending = futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| {
724-
let key = key.clone();
725-
let value = value.clone();
726-
let ballot = ballot.clone();
727-
async move {
728-
(
729-
to_replica_id,
730-
tokio::time::timeout(
731-
crate::consts::REQUEST_TIMEOUT,
732-
send_accept_request(
733-
ctx,
734-
config,
735-
from_replica_id,
736-
to_replica_id,
737-
key,
738-
value,
739-
ballot,
740-
),
716+
let mut pending =
717+
futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| {
718+
let key = key.clone();
719+
let value = value.clone();
720+
let ballot = ballot.clone();
721+
async move {
722+
(
723+
to_replica_id,
724+
tokio::time::timeout(
725+
crate::consts::REQUEST_TIMEOUT,
726+
send_accept_request(
727+
ctx,
728+
config,
729+
from_replica_id,
730+
to_replica_id,
731+
key,
732+
value,
733+
ballot,
734+
),
735+
)
736+
.await,
741737
)
742-
.await,
743-
)
744-
}
745-
}))
746-
.collect::<FuturesUnordered<_>>()
747-
.await;
738+
}
739+
}))
740+
.collect::<FuturesUnordered<_>>()
741+
.await;
748742

749743
let mut state = AcceptRoundState {
750744
target,
@@ -754,9 +748,7 @@ async fn send_accept_round(
754748

755749
while let Some((to_replica_id, response)) = pending.next().await {
756750
let observation = match response {
757-
Ok(Ok(protocol::AcceptResponse::AcceptResponseOk(_))) => {
758-
AcceptObservation::Ok
759-
}
751+
Ok(Ok(protocol::AcceptResponse::AcceptResponseOk(_))) => AcceptObservation::Ok,
760752
Ok(Ok(protocol::AcceptResponse::AcceptResponseAlreadyCommitted(committed))) => {
761753
AcceptObservation::AlreadyCommitted(committed.value)
762754
}

0 commit comments

Comments
 (0)