Skip to content

Commit 7597a94

Browse files
committed
wip: fix whatever is going on with envoys
1 parent 9f06a14 commit 7597a94

54 files changed

Lines changed: 2658 additions & 747 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub async fn handle_init(
147147
// Read existing data
148148
let (create_ts_entry, old_last_ping_ts_entry, version_entry) = tokio::try_join!(
149149
tx.read_opt(&create_ts_key, Serializable),
150-
tx.read_opt(&create_ts_key, Serializable),
150+
tx.read_opt(&last_ping_ts_key, Serializable),
151151
tx.read_opt(&version_key, Serializable),
152152
)?;
153153

@@ -228,6 +228,7 @@ pub async fn handle_init(
228228
);
229229

230230
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
231+
tx.delete(&old_lb_key);
231232
}
232233

233234
// Insert into LB

engine/packages/pegboard-kv-channel/src/lib.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,8 @@ async fn actor_request_task(
434434
open_actors: Arc<Mutex<HashSet<String>>>,
435435
mut rx: mpsc::Receiver<protocol::ToServerRequest>,
436436
) {
437-
// Cached actor resolution. Populated on first KV request, reused for all
438-
// subsequent requests. Actor name is immutable so this never goes stale.
439-
let mut cached_actor: Option<(Id, String)> = None;
437+
// Cache keyed by actor id since a single connection multiplexes many actors.
438+
let mut cached_actors: HashMap<String, (Id, String)> = HashMap::new();
440439

441440
while let Some(req) = rx.recv().await {
442441
let is_close = matches!(req.data, protocol::RequestData::ActorCloseRequest);
@@ -464,11 +463,10 @@ async fn actor_request_task(
464463
)
465464
}
466465
} else {
467-
// Lazy-resolve and cache.
468-
if cached_actor.is_none() {
466+
if !cached_actors.contains_key(&req.actor_id) {
469467
match resolve_actor(&ctx, &req.actor_id, namespace_id).await {
470468
Ok(v) => {
471-
cached_actor = Some(v);
469+
cached_actors.insert(req.actor_id.clone(), v);
472470
}
473471
Err(resp) => {
474472
// Don't cache failures. Next request will retry.
@@ -480,7 +478,8 @@ async fn actor_request_task(
480478
}
481479
}
482480
}
483-
let (parsed_id, actor_name) = cached_actor.as_ref().unwrap();
481+
let (parsed_id, actor_name) =
482+
cached_actors.get(&req.actor_id).unwrap();
484483

485484
let recipient = actor_kv::Recipient {
486485
actor_id: *parsed_id,
@@ -795,10 +794,7 @@ async fn handle_kv_delete_range(
795794

796795
/// Look up an actor by ID and return the parsed ID and actor name.
797796
///
798-
/// Defense-in-depth: verifies the actor belongs to the authenticated namespace.
799-
/// The admin_token is a global credential, so this is not strictly necessary
800-
/// today, but prevents cross-namespace access if a less-privileged auth
801-
/// mechanism is introduced in the future.
797+
/// Verifies the actor belongs to the authenticated namespace.
802798
async fn resolve_actor(
803799
ctx: &StandaloneCtx,
804800
actor_id: &str,
@@ -812,11 +808,15 @@ async fn resolve_actor(
812808
})?;
813809

814810
let actor = ctx
815-
.op(pegboard::ops::actor::get_for_runner::Input {
816-
actor_id: parsed_id,
811+
.op(pegboard::ops::actor::get::Input {
812+
actor_ids: vec![parsed_id],
813+
fetch_error: false,
817814
})
818815
.await
819-
.map_err(|err| internal_error(&err))?;
816+
.map_err(|err| internal_error(&err))?
817+
.actors
818+
.into_iter()
819+
.next();
820820

821821
match actor {
822822
Some(actor) => {

engine/packages/pegboard/src/workflows/actor2/mod.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -672,22 +672,30 @@ async fn process_signal(
672672
} = &mut state.transition
673673
{
674674
if *destroy_after_start {
675-
// Transition to destroying
676-
state.transition = Transition::Destroying {
677-
envoy: runtime::EnvoyState::new(sig.envoy_key.clone()),
678-
lost_timeout_ts: now
679-
+ ctx.config().pegboard().actor_stop_threshold(),
675+
state.transition = Transition::Running {
676+
envoy: runtime::EnvoyState::new(
677+
sig.envoy_key.clone(),
678+
),
679+
last_liveness_check_ts: now,
680680
};
681681

682-
ctx.activity(runtime::InsertAndSendCommandsInput {
682+
ctx.activity(runtime::SetConnectableInput {
683+
envoy_key: sig.envoy_key.clone(),
683684
generation: state.generation,
685+
})
686+
.await?;
687+
688+
ctx.msg(Ready {
684689
envoy_key: sig.envoy_key.clone(),
685-
commands: vec![protocol::Command::CommandStopActor(
686-
protocol::CommandStopActor {
687-
reason: protocol::StopActorReason::Destroy,
688-
},
689-
)],
690690
})
691+
.topic(("actor_id", input.actor_id))
692+
.send()
693+
.await?;
694+
695+
let workflow_id = ctx.workflow_id();
696+
ctx.signal(Destroy {})
697+
.to_workflow_id(workflow_id)
698+
.send()
691699
.await?;
692700
} else {
693701
// Transition to starting
@@ -804,10 +812,15 @@ async fn process_signal(
804812
runtime::reschedule_actor(ctx, &input, state, metrics_workflow_id).await?;
805813
}
806814
Transition::SleepIntent {
807-
rewake_after_stop, ..
815+
lost_timeout_ts,
816+
rewake_after_stop,
817+
..
808818
} => {
809819
if !*rewake_after_stop {
810820
*rewake_after_stop = true;
821+
let now = ctx.activity(runtime::GetTsInput {}).await?;
822+
*lost_timeout_ts =
823+
now + ctx.config().pegboard().actor_stop_threshold();
811824

812825
tracing::debug!(
813826
actor_id=?input.actor_id,
@@ -983,6 +996,8 @@ async fn process_signal(
983996
lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(),
984997
};
985998

999+
ctx.activity(runtime::SetNotConnectableInput {}).await?;
1000+
9861001
ctx.activity(runtime::InsertAndSendCommandsInput {
9871002
generation: state.generation,
9881003
envoy_key,
@@ -1004,6 +1019,8 @@ async fn process_signal(
10041019
lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(),
10051020
};
10061021

1022+
ctx.activity(runtime::SetNotConnectableInput {}).await?;
1023+
10071024
// Stop command was already sent
10081025
}
10091026
Transition::Allocating {

engine/packages/pegboard/src/workflows/actor2/runtime.rs

Lines changed: 83 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,40 @@ pub async fn handle_stopped(
565565

566566
StoppedResult::Continue
567567
} else if try_reallocate {
568+
// A wake can arrive after the actor has already committed to sleeping
569+
// but before the stop completes. In that case, mirror the v1 workflow
570+
// and immediately reschedule after the stop instead of treating a
571+
// graceful sleep exit as terminal destruction.
572+
if let Transition::SleepIntent {
573+
rewake_after_stop: true,
574+
..
575+
} = state.transition
576+
{
577+
let allocate_res = ctx.activity(AllocateInput {}).await?;
578+
579+
if let Some(allocation) = allocate_res.allocation {
580+
state.generation += 1;
581+
582+
ctx.activity(SendOutboundInput {
583+
generation: state.generation,
584+
input: input.input.clone(),
585+
allocation,
586+
})
587+
.await?;
588+
589+
state.transition = Transition::Allocating {
590+
destroy_after_start: false,
591+
lost_timeout_ts: allocate_res.now
592+
+ ctx.config().pegboard().actor_allocation_threshold(),
593+
};
594+
} else {
595+
state.transition = Transition::Reallocating {
596+
since_ts: allocate_res.now,
597+
};
598+
}
599+
600+
StoppedResult::Continue
601+
} else {
568602
// An actor stopping with `StopCode::Ok` indicates a graceful exit
569603
let graceful_exit = matches!(
570604
variant,
@@ -574,63 +608,64 @@ pub async fn handle_stopped(
574608
}
575609
);
576610

577-
match (input.crash_policy, graceful_exit) {
578-
(CrashPolicy::Restart, false) => {
579-
let allocate_res = ctx.activity(AllocateInput {}).await?;
580-
581-
if let Some(allocation) = allocate_res.allocation {
582-
state.generation += 1;
611+
match (input.crash_policy, graceful_exit) {
612+
(CrashPolicy::Restart, false) => {
613+
let allocate_res = ctx.activity(AllocateInput {}).await?;
614+
615+
if let Some(allocation) = allocate_res.allocation {
616+
state.generation += 1;
617+
618+
ctx.activity(SendOutboundInput {
619+
generation: state.generation,
620+
input: input.input.clone(),
621+
allocation,
622+
})
623+
.await?;
624+
625+
// Transition to allocating
626+
state.transition = Transition::Allocating {
627+
destroy_after_start: false,
628+
lost_timeout_ts: allocate_res.now
629+
+ ctx.config().pegboard().actor_allocation_threshold(),
630+
};
631+
} else {
632+
// Transition to retry backoff
633+
state.transition = Transition::Reallocating {
634+
since_ts: allocate_res.now,
635+
};
636+
}
583637

584-
ctx.activity(SendOutboundInput {
585-
generation: state.generation,
586-
input: input.input.clone(),
587-
allocation,
588-
})
589-
.await?;
590-
591-
// Transition to allocating
592-
state.transition = Transition::Allocating {
593-
destroy_after_start: false,
594-
lost_timeout_ts: allocate_res.now
595-
+ ctx.config().pegboard().actor_allocation_threshold(),
596-
};
597-
} else {
598-
// Transition to retry backoff
599-
state.transition = Transition::Reallocating {
600-
since_ts: allocate_res.now,
601-
};
638+
StoppedResult::Continue
602639
}
640+
(CrashPolicy::Sleep, false) => {
641+
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to ungraceful exit");
603642

604-
StoppedResult::Continue
605-
}
606-
(CrashPolicy::Sleep, false) => {
607-
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to ungraceful exit");
608-
609-
// Clear alarm
610-
if let Some(alarm_ts) = state.alarm_ts {
611-
let now = ctx.activity(GetTsInput {}).await?;
643+
// Clear alarm
644+
if let Some(alarm_ts) = state.alarm_ts {
645+
let now = ctx.activity(GetTsInput {}).await?;
612646

613-
if now >= alarm_ts {
614-
state.alarm_ts = None;
647+
if now >= alarm_ts {
648+
state.alarm_ts = None;
649+
}
615650
}
616-
}
617651

618-
// Transition to sleeping
619-
state.transition = Transition::Sleeping;
652+
// Transition to sleeping
653+
state.transition = Transition::Sleeping;
620654

621-
StoppedResult::Continue
622-
}
623-
_ => {
624-
let now = ctx.activity(GetTsInput {}).await?;
655+
StoppedResult::Continue
656+
}
657+
_ => {
658+
let now = ctx.activity(GetTsInput {}).await?;
625659

626-
// Don't destroy on failed allocation, retry instead
627-
if let StoppedVariant::FailedAllocation { .. } = &variant {
628-
// Transition to retry backoff
629-
state.transition = Transition::Reallocating { since_ts: now };
660+
// Don't destroy on failed allocation, retry instead
661+
if let StoppedVariant::FailedAllocation { .. } = &variant {
662+
// Transition to retry backoff
663+
state.transition = Transition::Reallocating { since_ts: now };
630664

631-
StoppedResult::Continue
632-
} else {
633-
StoppedResult::Destroy
665+
StoppedResult::Continue
666+
} else {
667+
StoppedResult::Destroy
668+
}
634669
}
635670
}
636671
}

0 commit comments

Comments
 (0)