Skip to content

Commit b8143ad

Browse files
committed
wip: fix whatever is going on with envoys
1 parent 9f06a14 commit b8143ad

File tree

56 files changed

+2753
-752
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2753
-752
lines changed

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub async fn handle_init(
147147
// Read existing data
148148
let (create_ts_entry, old_last_ping_ts_entry, version_entry) = tokio::try_join!(
149149
tx.read_opt(&create_ts_key, Serializable),
150-
tx.read_opt(&create_ts_key, Serializable),
150+
tx.read_opt(&last_ping_ts_key, Serializable),
151151
tx.read_opt(&version_key, Serializable),
152152
)?;
153153

@@ -227,7 +227,7 @@ pub async fn handle_init(
227227
envoy_key.clone(),
228228
);
229229

230-
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
230+
tx.delete(&old_lb_key);
231231
}
232232

233233
// Insert into LB

engine/packages/pegboard-gateway2/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,7 @@ impl PegboardGateway2 {
832832
})
833833
.await?
834834
{
835-
if actor.envoy_key.is_some() {
835+
if actor.envoy_key.is_some() && actor.connectable {
836836
tracing::debug!("actor became ready during hibernation");
837837

838838
return Ok(HibernationResult::Continue);

engine/packages/pegboard-kv-channel/src/lib.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,10 @@ async fn actor_request_task(
434434
open_actors: Arc<Mutex<HashSet<String>>>,
435435
mut rx: mpsc::Receiver<protocol::ToServerRequest>,
436436
) {
437-
// Cached actor resolution. Populated on first KV request, reused for all
438-
// subsequent requests. Actor name is immutable so this never goes stale.
439-
let mut cached_actor: Option<(Id, String)> = None;
437+
// Cached actor resolution per actor id. A single KV channel connection can
438+
// multiplex requests for many actors, so the cache must be keyed by actor id.
439+
// Actor name is immutable so each cached entry does not go stale.
440+
let mut cached_actors: HashMap<String, (Id, String)> = HashMap::new();
440441

441442
while let Some(req) = rx.recv().await {
442443
let is_close = matches!(req.data, protocol::RequestData::ActorCloseRequest);
@@ -464,11 +465,11 @@ async fn actor_request_task(
464465
)
465466
}
466467
} else {
467-
// Lazy-resolve and cache.
468-
if cached_actor.is_none() {
468+
// Lazy-resolve and cache per actor id.
469+
if !cached_actors.contains_key(&req.actor_id) {
469470
match resolve_actor(&ctx, &req.actor_id, namespace_id).await {
470471
Ok(v) => {
471-
cached_actor = Some(v);
472+
cached_actors.insert(req.actor_id.clone(), v);
472473
}
473474
Err(resp) => {
474475
// Don't cache failures. Next request will retry.
@@ -480,7 +481,8 @@ async fn actor_request_task(
480481
}
481482
}
482483
}
483-
let (parsed_id, actor_name) = cached_actor.as_ref().unwrap();
484+
let (parsed_id, actor_name) =
485+
cached_actors.get(&req.actor_id).unwrap();
484486

485487
let recipient = actor_kv::Recipient {
486488
actor_id: *parsed_id,
@@ -811,12 +813,21 @@ async fn resolve_actor(
811813
)
812814
})?;
813815

816+
// Resolve the actor from the general actor record rather than the
817+
// runner-specific lookup. SQLite opens happen during actor startup
818+
// before runner-specific fields are guaranteed to exist for v2 actors,
819+
// so requiring RunnerIdKey here can race the database bootstrap path and
820+
// reject perfectly valid opens with SQLITE_CANTOPEN.
814821
let actor = ctx
815-
.op(pegboard::ops::actor::get_for_runner::Input {
816-
actor_id: parsed_id,
822+
.op(pegboard::ops::actor::get::Input {
823+
actor_ids: vec![parsed_id],
824+
fetch_error: false,
817825
})
818826
.await
819-
.map_err(|err| internal_error(&err))?;
827+
.map_err(|err| internal_error(&err))?
828+
.actors
829+
.into_iter()
830+
.next();
820831

821832
match actor {
822833
Some(actor) => {

engine/packages/pegboard/src/ops/actor/create.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use anyhow::{Context, Result};
24
use gas::prelude::*;
35
use rivet_api_util::{Method, request_remote_datacenter};
@@ -47,7 +49,11 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result<
4749
ctx.subscribe::<crate::workflows::actor2::DestroyStarted>(("actor_id", input.actor_id)),
4850
ctx.op(crate::ops::runner_config::get::Input {
4951
runners: vec![(input.namespace_id, input.runner_name_selector.clone())],
50-
bypass_cache: false,
52+
// This decides whether we dispatch the legacy runner workflow or the
53+
// envoy-backed actor v2 workflow. The protocol version is updated
54+
// during runner/envoy connect, so a cached read can lag behind and
55+
// misclassify a freshly connected pool as legacy.
56+
bypass_cache: true,
5157
}),
5258
)?;
5359

@@ -75,6 +81,7 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result<
7581

7682
// Wait for actor creation to complete, fail, or be destroyed
7783
tokio::select! {
84+
biased;
7885
res = create_sub2.next() => { res?; },
7986
res = fail_sub2.next() => {
8087
let msg = res?;
@@ -105,6 +112,14 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result<
105112
}
106113
res = destroy_sub2.next() => {
107114
res?;
115+
if let Ok(Ok(msg)) = tokio::time::timeout(
116+
Duration::from_millis(50),
117+
fail_sub2.next(),
118+
)
119+
.await
120+
{
121+
return Err(msg.error.clone().build());
122+
}
108123
return Err(crate::errors::Actor::DestroyedDuringCreation.build());
109124
}
110125
}
@@ -125,6 +140,7 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result<
125140

126141
// Wait for actor creation to complete, fail, or be destroyed
127142
tokio::select! {
143+
biased;
128144
res = create_sub.next() => { res?; },
129145
res = fail_sub.next() => {
130146
let msg = res?;
@@ -155,6 +171,14 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result<
155171
}
156172
res = destroy_sub.next() => {
157173
res?;
174+
if let Ok(Ok(msg)) = tokio::time::timeout(
175+
Duration::from_millis(50),
176+
fail_sub.next(),
177+
)
178+
.await
179+
{
180+
return Err(msg.error.clone().build());
181+
}
158182
return Err(crate::errors::Actor::DestroyedDuringCreation.build());
159183
}
160184
}

engine/packages/pegboard/src/ops/actor/get_for_key.rs

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
use anyhow::Result;
2+
use futures_util::TryStreamExt;
23
use gas::prelude::*;
34
use rivet_api_util::{Method, request_remote_datacenter};
45
use rivet_types::actors::Actor;
6+
use universaldb::options::StreamingMode;
7+
use universaldb::utils::IsolationLevel::Serializable;
8+
9+
use crate::keys;
510

611
#[derive(Debug)]
712
pub struct Input {
@@ -34,21 +39,67 @@ pub async fn pegboard_actor_get_for_key(ctx: &OperationCtx, input: &Input) -> Re
3439

3540
// Check if the actor is in the current datacenter
3641
if reservation_id.label() == ctx.config().dc_label() {
37-
// Local datacenter - get the actor directly
38-
let actors_res = ctx
39-
.op(crate::ops::actor::list_for_ns::Input {
40-
namespace_id: input.namespace_id,
41-
name: input.name.clone(),
42-
key: Some(input.key.clone()),
43-
include_destroyed: false,
44-
created_before: None,
45-
limit: 1,
46-
fetch_error: input.fetch_error,
42+
// Local datacenter - resolve with a serializable read so actors that are
43+
// already tearing down do not leak through as a valid get/getOrCreate
44+
// target.
45+
let actors_with_wf_ids = ctx
46+
.udb()?
47+
.run(|tx| async move {
48+
let tx = tx.with_subspace(keys::subspace());
49+
50+
let actor_subspace = keys::subspace().subspace(&keys::ns::ActorByKeyKey::subspace(
51+
input.namespace_id,
52+
input.name.clone(),
53+
input.key.clone(),
54+
));
55+
let (start, end) = actor_subspace.range();
56+
57+
let mut stream = tx.get_ranges_keyvalues(
58+
universaldb::RangeOption {
59+
mode: StreamingMode::Iterator,
60+
reverse: true,
61+
..(start, end).into()
62+
},
63+
Serializable,
64+
);
65+
66+
let mut results = Vec::new();
67+
68+
while let Some(entry) = stream.try_next().await? {
69+
let (idx_key, data) = tx.read_entry::<keys::ns::ActorByKeyKey>(&entry)?;
70+
71+
if !data.is_destroyed {
72+
results.push((idx_key.actor_id, data.workflow_id));
73+
break;
74+
}
75+
}
76+
77+
Ok(results)
4778
})
4879
.await?;
4980

81+
let wfs = ctx
82+
.get_workflows(
83+
actors_with_wf_ids
84+
.iter()
85+
.map(|(_, workflow_id)| *workflow_id)
86+
.collect(),
87+
)
88+
.await?;
89+
90+
let dc_name = ctx.config().dc_name()?.to_string();
91+
92+
let actors = super::util::build_actors_from_workflows(
93+
ctx,
94+
actors_with_wf_ids,
95+
wfs,
96+
&dc_name,
97+
input.fetch_error,
98+
)
99+
.await?;
100+
50101
Ok(Output {
51-
actor: actors_res.actors.into_iter().next(),
102+
actor: actors.into_iter().next(),
52103
})
53104
} else {
54105
// Remote datacenter - request from the correct datacenter

engine/packages/pegboard/src/workflows/actor2/mod.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -672,22 +672,30 @@ async fn process_signal(
672672
} = &mut state.transition
673673
{
674674
if *destroy_after_start {
675-
// Transition to destroying
676-
state.transition = Transition::Destroying {
677-
envoy: runtime::EnvoyState::new(sig.envoy_key.clone()),
678-
lost_timeout_ts: now
679-
+ ctx.config().pegboard().actor_stop_threshold(),
675+
state.transition = Transition::Running {
676+
envoy: runtime::EnvoyState::new(
677+
sig.envoy_key.clone(),
678+
),
679+
last_liveness_check_ts: now,
680680
};
681681

682-
ctx.activity(runtime::InsertAndSendCommandsInput {
682+
ctx.activity(runtime::SetConnectableInput {
683+
envoy_key: sig.envoy_key.clone(),
683684
generation: state.generation,
685+
})
686+
.await?;
687+
688+
ctx.msg(Ready {
684689
envoy_key: sig.envoy_key.clone(),
685-
commands: vec![protocol::Command::CommandStopActor(
686-
protocol::CommandStopActor {
687-
reason: protocol::StopActorReason::Destroy,
688-
},
689-
)],
690690
})
691+
.topic(("actor_id", input.actor_id))
692+
.send()
693+
.await?;
694+
695+
let workflow_id = ctx.workflow_id();
696+
ctx.signal(Destroy {})
697+
.to_workflow_id(workflow_id)
698+
.send()
691699
.await?;
692700
} else {
693701
// Transition to starting
@@ -804,10 +812,15 @@ async fn process_signal(
804812
runtime::reschedule_actor(ctx, &input, state, metrics_workflow_id).await?;
805813
}
806814
Transition::SleepIntent {
807-
rewake_after_stop, ..
815+
lost_timeout_ts,
816+
rewake_after_stop,
817+
..
808818
} => {
809819
if !*rewake_after_stop {
810820
*rewake_after_stop = true;
821+
let now = ctx.activity(runtime::GetTsInput {}).await?;
822+
*lost_timeout_ts =
823+
now + ctx.config().pegboard().actor_stop_threshold();
811824

812825
tracing::debug!(
813826
actor_id=?input.actor_id,
@@ -983,6 +996,8 @@ async fn process_signal(
983996
lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(),
984997
};
985998

999+
ctx.activity(runtime::SetNotConnectableInput {}).await?;
1000+
9861001
ctx.activity(runtime::InsertAndSendCommandsInput {
9871002
generation: state.generation,
9881003
envoy_key,
@@ -1004,6 +1019,8 @@ async fn process_signal(
10041019
lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(),
10051020
};
10061021

1022+
ctx.activity(runtime::SetNotConnectableInput {}).await?;
1023+
10071024
// Stop command was already sent
10081025
}
10091026
Transition::Allocating {

0 commit comments

Comments
 (0)