Skip to content

Commit 6e18bf9

Browse files
committed
wip: fix whatever is going on with envoys
1 parent 9f06a14 commit 6e18bf9

55 files changed

Lines changed: 2720 additions & 758 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub async fn handle_init(
147147
// Read existing data
148148
let (create_ts_entry, old_last_ping_ts_entry, version_entry) = tokio::try_join!(
149149
tx.read_opt(&create_ts_key, Serializable),
150-
tx.read_opt(&create_ts_key, Serializable),
150+
tx.read_opt(&last_ping_ts_key, Serializable),
151151
tx.read_opt(&version_key, Serializable),
152152
)?;
153153

@@ -228,6 +228,7 @@ pub async fn handle_init(
228228
);
229229

230230
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
231+
tx.delete(&old_lb_key);
231232
}
232233

233234
// Insert into LB

engine/packages/pegboard-kv-channel/src/lib.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,8 @@ async fn actor_request_task(
434434
open_actors: Arc<Mutex<HashSet<String>>>,
435435
mut rx: mpsc::Receiver<protocol::ToServerRequest>,
436436
) {
437-
// Cached actor resolution. Populated on first KV request, reused for all
438-
// subsequent requests. Actor name is immutable so this never goes stale.
439-
let mut cached_actor: Option<(Id, String)> = None;
437+
// Cache keyed by actor id since a single connection multiplexes many actors.
438+
let mut cached_actors: HashMap<String, (Id, String)> = HashMap::new();
440439

441440
while let Some(req) = rx.recv().await {
442441
let is_close = matches!(req.data, protocol::RequestData::ActorCloseRequest);
@@ -464,11 +463,10 @@ async fn actor_request_task(
464463
)
465464
}
466465
} else {
467-
// Lazy-resolve and cache.
468-
if cached_actor.is_none() {
466+
if !cached_actors.contains_key(&req.actor_id) {
469467
match resolve_actor(&ctx, &req.actor_id, namespace_id).await {
470468
Ok(v) => {
471-
cached_actor = Some(v);
469+
cached_actors.insert(req.actor_id.clone(), v);
472470
}
473471
Err(resp) => {
474472
// Don't cache failures. Next request will retry.
@@ -480,7 +478,8 @@ async fn actor_request_task(
480478
}
481479
}
482480
}
483-
let (parsed_id, actor_name) = cached_actor.as_ref().unwrap();
481+
let (parsed_id, actor_name) =
482+
cached_actors.get(&req.actor_id).unwrap();
484483

485484
let recipient = actor_kv::Recipient {
486485
actor_id: *parsed_id,
@@ -795,10 +794,7 @@ async fn handle_kv_delete_range(
795794

796795
/// Look up an actor by ID and return the parsed ID and actor name.
797796
///
798-
/// Defense-in-depth: verifies the actor belongs to the authenticated namespace.
799-
/// The admin_token is a global credential, so this is not strictly necessary
800-
/// today, but prevents cross-namespace access if a less-privileged auth
801-
/// mechanism is introduced in the future.
797+
/// Verifies the actor belongs to the authenticated namespace.
802798
async fn resolve_actor(
803799
ctx: &StandaloneCtx,
804800
actor_id: &str,
@@ -812,11 +808,15 @@ async fn resolve_actor(
812808
})?;
813809

814810
let actor = ctx
815-
.op(pegboard::ops::actor::get_for_runner::Input {
816-
actor_id: parsed_id,
811+
.op(pegboard::ops::actor::get::Input {
812+
actor_ids: vec![parsed_id],
813+
fetch_error: false,
817814
})
818815
.await
819-
.map_err(|err| internal_error(&err))?;
816+
.map_err(|err| internal_error(&err))?
817+
.actors
818+
.into_iter()
819+
.next();
820820

821821
match actor {
822822
Some(actor) => {

engine/packages/pegboard/src/ops/actor/get_for_key.rs

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
use anyhow::Result;
2+
use futures_util::TryStreamExt;
23
use gas::prelude::*;
34
use rivet_api_util::{Method, request_remote_datacenter};
45
use rivet_types::actors::Actor;
6+
use universaldb::options::StreamingMode;
7+
use universaldb::utils::IsolationLevel::Serializable;
8+
9+
use crate::keys;
510

611
#[derive(Debug)]
712
pub struct Input {
@@ -34,21 +39,67 @@ pub async fn pegboard_actor_get_for_key(ctx: &OperationCtx, input: &Input) -> Re
3439

3540
// Check if the actor is in the current datacenter
3641
if reservation_id.label() == ctx.config().dc_label() {
37-
// Local datacenter - get the actor directly
38-
let actors_res = ctx
39-
.op(crate::ops::actor::list_for_ns::Input {
40-
namespace_id: input.namespace_id,
41-
name: input.name.clone(),
42-
key: Some(input.key.clone()),
43-
include_destroyed: false,
44-
created_before: None,
45-
limit: 1,
46-
fetch_error: input.fetch_error,
42+
// Local datacenter - resolve with a serializable read so actors that are
43+
// already tearing down do not leak through as a valid get/getOrCreate
44+
// target.
45+
let actors_with_wf_ids = ctx
46+
.udb()?
47+
.run(|tx| async move {
48+
let tx = tx.with_subspace(keys::subspace());
49+
50+
let actor_subspace = keys::subspace().subspace(&keys::ns::ActorByKeyKey::subspace(
51+
input.namespace_id,
52+
input.name.clone(),
53+
input.key.clone(),
54+
));
55+
let (start, end) = actor_subspace.range();
56+
57+
let mut stream = tx.get_ranges_keyvalues(
58+
universaldb::RangeOption {
59+
mode: StreamingMode::Iterator,
60+
reverse: true,
61+
..(start, end).into()
62+
},
63+
Serializable,
64+
);
65+
66+
let mut results = Vec::new();
67+
68+
while let Some(entry) = stream.try_next().await? {
69+
let (idx_key, data) = tx.read_entry::<keys::ns::ActorByKeyKey>(&entry)?;
70+
71+
if !data.is_destroyed {
72+
results.push((idx_key.actor_id, data.workflow_id));
73+
break;
74+
}
75+
}
76+
77+
Ok(results)
4778
})
4879
.await?;
4980

81+
let wfs = ctx
82+
.get_workflows(
83+
actors_with_wf_ids
84+
.iter()
85+
.map(|(_, workflow_id)| *workflow_id)
86+
.collect(),
87+
)
88+
.await?;
89+
90+
let dc_name = ctx.config().dc_name()?.to_string();
91+
92+
let actors = super::util::build_actors_from_workflows(
93+
ctx,
94+
actors_with_wf_ids,
95+
wfs,
96+
&dc_name,
97+
input.fetch_error,
98+
)
99+
.await?;
100+
50101
Ok(Output {
51-
actor: actors_res.actors.into_iter().next(),
102+
actor: actors.into_iter().next(),
52103
})
53104
} else {
54105
// Remote datacenter - request from the correct datacenter

engine/packages/pegboard/src/workflows/actor2/mod.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -672,22 +672,30 @@ async fn process_signal(
672672
} = &mut state.transition
673673
{
674674
if *destroy_after_start {
675-
// Transition to destroying
676-
state.transition = Transition::Destroying {
677-
envoy: runtime::EnvoyState::new(sig.envoy_key.clone()),
678-
lost_timeout_ts: now
679-
+ ctx.config().pegboard().actor_stop_threshold(),
675+
state.transition = Transition::Running {
676+
envoy: runtime::EnvoyState::new(
677+
sig.envoy_key.clone(),
678+
),
679+
last_liveness_check_ts: now,
680680
};
681681

682-
ctx.activity(runtime::InsertAndSendCommandsInput {
682+
ctx.activity(runtime::SetConnectableInput {
683+
envoy_key: sig.envoy_key.clone(),
683684
generation: state.generation,
685+
})
686+
.await?;
687+
688+
ctx.msg(Ready {
684689
envoy_key: sig.envoy_key.clone(),
685-
commands: vec![protocol::Command::CommandStopActor(
686-
protocol::CommandStopActor {
687-
reason: protocol::StopActorReason::Destroy,
688-
},
689-
)],
690690
})
691+
.topic(("actor_id", input.actor_id))
692+
.send()
693+
.await?;
694+
695+
let workflow_id = ctx.workflow_id();
696+
ctx.signal(Destroy {})
697+
.to_workflow_id(workflow_id)
698+
.send()
691699
.await?;
692700
} else {
693701
// Transition to starting
@@ -804,10 +812,15 @@ async fn process_signal(
804812
runtime::reschedule_actor(ctx, &input, state, metrics_workflow_id).await?;
805813
}
806814
Transition::SleepIntent {
807-
rewake_after_stop, ..
815+
lost_timeout_ts,
816+
rewake_after_stop,
817+
..
808818
} => {
809819
if !*rewake_after_stop {
810820
*rewake_after_stop = true;
821+
let now = ctx.activity(runtime::GetTsInput {}).await?;
822+
*lost_timeout_ts =
823+
now + ctx.config().pegboard().actor_stop_threshold();
811824

812825
tracing::debug!(
813826
actor_id=?input.actor_id,
@@ -983,6 +996,8 @@ async fn process_signal(
983996
lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(),
984997
};
985998

999+
ctx.activity(runtime::SetNotConnectableInput {}).await?;
1000+
9861001
ctx.activity(runtime::InsertAndSendCommandsInput {
9871002
generation: state.generation,
9881003
envoy_key,
@@ -1004,6 +1019,8 @@ async fn process_signal(
10041019
lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(),
10051020
};
10061021

1022+
ctx.activity(runtime::SetNotConnectableInput {}).await?;
1023+
10071024
// Stop command was already sent
10081025
}
10091026
Transition::Allocating {

0 commit comments

Comments
 (0)