Skip to content

Commit c6d8ae5

Browse files
committed
wip: fix whatever is going on with envoys
1 parent 9f06a14 commit c6d8ae5

File tree

43 files changed

+1962
-583
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1962
-583
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Driver Test Fixes Summary
2+
3+
This was not one bug. It was a stack of separate issues that all showed up in the file-system and engine driver suites.
4+
5+
## Main fixes
6+
7+
1. Raw and Drizzle SQLite were not choosing the same backend path.
8+
9+
Raw `rivetkit/db` could use the native addon while `rivetkit/db/drizzle` always went through the WASM VFS. I unified backend selection behind shared open logic in [open-database.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/db/open-database.ts), with a native `IDatabase` adapter in [native-adapter.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/db/native-adapter.ts). Both providers now reuse that in [mod.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/db/mod.ts) and [mod.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts). This removed a class of raw-path versus drizzle-path inconsistencies.
10+
11+
2. The native SQLite KV channel had actor-resolution bugs.
12+
13+
The KV channel was caching actor resolution too coarsely, which is wrong when one connection multiplexes many actors. That was fixed in [lib.rs](/home/nathan/r2/engine/packages/pegboard-kv-channel/src/lib.rs). I also fixed the open race where native SQLite could attempt `sqlite3_open_v2` before the actor was actually resolvable on the engine side, which was the source of the earlier `SQLITE_CANTOPEN` and internal-error failures. Related engine-side actor lifecycle and lookup changes were needed in [create.rs](/home/nathan/r2/engine/packages/pegboard/src/ops/actor/create.rs), [get_for_key.rs](/home/nathan/r2/engine/packages/pegboard/src/ops/actor/get_for_key.rs), [mod.rs](/home/nathan/r2/engine/packages/pegboard/src/workflows/actor2/mod.rs), and [runtime.rs](/home/nathan/r2/engine/packages/pegboard/src/workflows/actor2/runtime.rs).
14+
15+
3. The native client path was not resilient to channel loss.
16+
17+
When the native KV channel went stale, the old code surfaced connection-closed errors instead of reopening and retrying. I fixed that in [native-sqlite.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/db/native-sqlite.ts) and [native-adapter.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/db/native-adapter.ts). I also fixed crash cleanup ordering so hard-crash tests did not abort DB cleanup too early in [mod.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts).
18+
19+
4. Query-backed actor handles and gateway routing had mismatches.
20+
21+
Some engine failures were because `.get()` / `.getOrCreate()` / `.getForId()` were not consistently preserving the query-backed gateway path the tests expected. That was fixed in [actor-handle.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts), [actor-query.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/client/actor-query.ts), [resolve-gateway-target.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/driver-helpers/resolve-gateway-target.ts), [mod.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts), and [mod.rs](/home/nathan/r2/engine/packages/guard/src/routing/pegboard_gateway/mod.rs). That cleared the `.get().connect()` and `.getForId().connect()` engine proxy failures.
22+
23+
5. Raw websocket lifecycle handling had real race bugs.
24+
25+
There were two separate websocket problems:
26+
27+
- `Conn.disconnect()` was re-entrant, so a local close and the resulting close event could double-run cleanup. I made it single-flight in [mod.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/actor/conn/mod.ts).
28+
- Actors could auto-sleep before a wake-triggered raw websocket open had actually been delivered into RivetKit. I first fixed the local `prepareConn -> connectConn` gap in [connection-manager.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts), then fixed the earlier engine-only wake-to-open gap by adding a driver-level initial sleep override in [driver.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts), consumed in [mod.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts) and implemented in [actor-driver.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts). That was the last real engine flake.
29+
30+
6. AgentOS process callbacks were leaking shutdown-time errors into Vitest.
31+
32+
The file-system suite ended up green on assertions but still failed because background process stdout and stderr callbacks were calling `broadcast()` after the actor had already shut down. I made those shutdown-safe in [process.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/agent-os/actor/process.ts). That removed the final unhandled rejections from the file-system run.
33+
34+
## Supporting fixes
35+
36+
There were also supporting changes needed to make the suites reliably green:
37+
38+
- Better raw websocket open ordering and route handling in [router-websocket-endpoints.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/actor/router-websocket-endpoints.ts) and [actor-driver.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts).
39+
- File-system driver and global-state cleanup and sleep behavior fixes in [global-state.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts), [actor.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts), and [manager.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts).
40+
- Remote websocket client and proxy fixes in [actor-websocket-client.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/actor-websocket-client.ts) and [actor-conn.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts).
41+
- Test harness changes in [driver-engine.test.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts), [utils.ts](/home/nathan/r2/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts), and several driver-test files so the engine suite stopped fighting its own setup and teardown.
42+
43+
## Net result
44+
45+
The important root causes were:
46+
47+
- backend split between raw and drizzle,
48+
- native KV and open races,
49+
- query-backed gateway mismatches,
50+
- raw-websocket sleep and disconnect races,
51+
- shutdown-time async callbacks escaping actor lifetime.
52+
53+
Once those were fixed, both suites passed cleanly:
54+
55+
- [driver-file-system-full-after-process-fix.log](/tmp/driver-file-system-full-after-process-fix.log)
56+
- [driver-engine-full-after-initial-sleep-fix.log](/tmp/driver-engine-full-after-initial-sleep-fix.log)

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub async fn handle_init(
147147
// Read existing data
148148
let (create_ts_entry, old_last_ping_ts_entry, version_entry) = tokio::try_join!(
149149
tx.read_opt(&create_ts_key, Serializable),
150-
tx.read_opt(&create_ts_key, Serializable),
150+
tx.read_opt(&last_ping_ts_key, Serializable),
151151
tx.read_opt(&version_key, Serializable),
152152
)?;
153153

@@ -228,6 +228,7 @@ pub async fn handle_init(
228228
);
229229

230230
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
231+
tx.delete(&old_lb_key);
231232
}
232233

233234
// Insert into LB

engine/packages/pegboard-kv-channel/src/lib.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,8 @@ async fn actor_request_task(
434434
open_actors: Arc<Mutex<HashSet<String>>>,
435435
mut rx: mpsc::Receiver<protocol::ToServerRequest>,
436436
) {
437-
// Cached actor resolution. Populated on first KV request, reused for all
438-
// subsequent requests. Actor name is immutable so this never goes stale.
439-
let mut cached_actor: Option<(Id, String)> = None;
437+
// Cache keyed by actor id since a single connection multiplexes many actors.
438+
let mut cached_actors: HashMap<String, (Id, String)> = HashMap::new();
440439

441440
while let Some(req) = rx.recv().await {
442441
let is_close = matches!(req.data, protocol::RequestData::ActorCloseRequest);
@@ -464,11 +463,10 @@ async fn actor_request_task(
464463
)
465464
}
466465
} else {
467-
// Lazy-resolve and cache.
468-
if cached_actor.is_none() {
466+
if !cached_actors.contains_key(&req.actor_id) {
469467
match resolve_actor(&ctx, &req.actor_id, namespace_id).await {
470468
Ok(v) => {
471-
cached_actor = Some(v);
469+
cached_actors.insert(req.actor_id.clone(), v);
472470
}
473471
Err(resp) => {
474472
// Don't cache failures. Next request will retry.
@@ -480,7 +478,8 @@ async fn actor_request_task(
480478
}
481479
}
482480
}
483-
let (parsed_id, actor_name) = cached_actor.as_ref().unwrap();
481+
let (parsed_id, actor_name) =
482+
cached_actors.get(&req.actor_id).unwrap();
484483

485484
let recipient = actor_kv::Recipient {
486485
actor_id: *parsed_id,
@@ -795,10 +794,7 @@ async fn handle_kv_delete_range(
795794

796795
/// Look up an actor by ID and return the parsed ID and actor name.
797796
///
798-
/// Defense-in-depth: verifies the actor belongs to the authenticated namespace.
799-
/// The admin_token is a global credential, so this is not strictly necessary
800-
/// today, but prevents cross-namespace access if a less-privileged auth
801-
/// mechanism is introduced in the future.
797+
/// Verifies the actor belongs to the authenticated namespace.
802798
async fn resolve_actor(
803799
ctx: &StandaloneCtx,
804800
actor_id: &str,
@@ -812,11 +808,15 @@ async fn resolve_actor(
812808
})?;
813809

814810
let actor = ctx
815-
.op(pegboard::ops::actor::get_for_runner::Input {
816-
actor_id: parsed_id,
811+
.op(pegboard::ops::actor::get::Input {
812+
actor_ids: vec![parsed_id],
813+
fetch_error: false,
817814
})
818815
.await
819-
.map_err(|err| internal_error(&err))?;
816+
.map_err(|err| internal_error(&err))?
817+
.actors
818+
.into_iter()
819+
.next();
820820

821821
match actor {
822822
Some(actor) => {

engine/packages/pegboard/src/workflows/actor2/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,10 +804,15 @@ async fn process_signal(
804804
runtime::reschedule_actor(ctx, &input, state, metrics_workflow_id).await?;
805805
}
806806
Transition::SleepIntent {
807-
rewake_after_stop, ..
807+
lost_timeout_ts,
808+
rewake_after_stop,
809+
..
808810
} => {
809811
if !*rewake_after_stop {
810812
*rewake_after_stop = true;
813+
let now = ctx.activity(runtime::GetTsInput {}).await?;
814+
*lost_timeout_ts =
815+
now + ctx.config().pegboard().actor_stop_threshold();
811816

812817
tracing::debug!(
813818
actor_id=?input.actor_id,

engine/packages/pegboard/src/workflows/actor2/runtime.rs

Lines changed: 83 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,40 @@ pub async fn handle_stopped(
565565

566566
StoppedResult::Continue
567567
} else if try_reallocate {
568+
// A wake can arrive after the actor has already committed to sleeping
569+
// but before the stop completes. In that case, mirror the v1 workflow
570+
// and immediately reschedule after the stop instead of treating a
571+
// graceful sleep exit as terminal destruction.
572+
if let Transition::SleepIntent {
573+
rewake_after_stop: true,
574+
..
575+
} = state.transition
576+
{
577+
let allocate_res = ctx.activity(AllocateInput {}).await?;
578+
579+
if let Some(allocation) = allocate_res.allocation {
580+
state.generation += 1;
581+
582+
ctx.activity(SendOutboundInput {
583+
generation: state.generation,
584+
input: input.input.clone(),
585+
allocation,
586+
})
587+
.await?;
588+
589+
state.transition = Transition::Allocating {
590+
destroy_after_start: false,
591+
lost_timeout_ts: allocate_res.now
592+
+ ctx.config().pegboard().actor_allocation_threshold(),
593+
};
594+
} else {
595+
state.transition = Transition::Reallocating {
596+
since_ts: allocate_res.now,
597+
};
598+
}
599+
600+
StoppedResult::Continue
601+
} else {
568602
// An actor stopping with `StopCode::Ok` indicates a graceful exit
569603
let graceful_exit = matches!(
570604
variant,
@@ -574,63 +608,64 @@ pub async fn handle_stopped(
574608
}
575609
);
576610

577-
match (input.crash_policy, graceful_exit) {
578-
(CrashPolicy::Restart, false) => {
579-
let allocate_res = ctx.activity(AllocateInput {}).await?;
580-
581-
if let Some(allocation) = allocate_res.allocation {
582-
state.generation += 1;
611+
match (input.crash_policy, graceful_exit) {
612+
(CrashPolicy::Restart, false) => {
613+
let allocate_res = ctx.activity(AllocateInput {}).await?;
614+
615+
if let Some(allocation) = allocate_res.allocation {
616+
state.generation += 1;
617+
618+
ctx.activity(SendOutboundInput {
619+
generation: state.generation,
620+
input: input.input.clone(),
621+
allocation,
622+
})
623+
.await?;
624+
625+
// Transition to allocating
626+
state.transition = Transition::Allocating {
627+
destroy_after_start: false,
628+
lost_timeout_ts: allocate_res.now
629+
+ ctx.config().pegboard().actor_allocation_threshold(),
630+
};
631+
} else {
632+
// Transition to retry backoff
633+
state.transition = Transition::Reallocating {
634+
since_ts: allocate_res.now,
635+
};
636+
}
583637

584-
ctx.activity(SendOutboundInput {
585-
generation: state.generation,
586-
input: input.input.clone(),
587-
allocation,
588-
})
589-
.await?;
590-
591-
// Transition to allocating
592-
state.transition = Transition::Allocating {
593-
destroy_after_start: false,
594-
lost_timeout_ts: allocate_res.now
595-
+ ctx.config().pegboard().actor_allocation_threshold(),
596-
};
597-
} else {
598-
// Transition to retry backoff
599-
state.transition = Transition::Reallocating {
600-
since_ts: allocate_res.now,
601-
};
638+
StoppedResult::Continue
602639
}
640+
(CrashPolicy::Sleep, false) => {
641+
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to ungraceful exit");
603642

604-
StoppedResult::Continue
605-
}
606-
(CrashPolicy::Sleep, false) => {
607-
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to ungraceful exit");
608-
609-
// Clear alarm
610-
if let Some(alarm_ts) = state.alarm_ts {
611-
let now = ctx.activity(GetTsInput {}).await?;
643+
// Clear alarm
644+
if let Some(alarm_ts) = state.alarm_ts {
645+
let now = ctx.activity(GetTsInput {}).await?;
612646

613-
if now >= alarm_ts {
614-
state.alarm_ts = None;
647+
if now >= alarm_ts {
648+
state.alarm_ts = None;
649+
}
615650
}
616-
}
617651

618-
// Transition to sleeping
619-
state.transition = Transition::Sleeping;
652+
// Transition to sleeping
653+
state.transition = Transition::Sleeping;
620654

621-
StoppedResult::Continue
622-
}
623-
_ => {
624-
let now = ctx.activity(GetTsInput {}).await?;
655+
StoppedResult::Continue
656+
}
657+
_ => {
658+
let now = ctx.activity(GetTsInput {}).await?;
625659

626-
// Don't destroy on failed allocation, retry instead
627-
if let StoppedVariant::FailedAllocation { .. } = &variant {
628-
// Transition to retry backoff
629-
state.transition = Transition::Reallocating { since_ts: now };
660+
// Don't destroy on failed allocation, retry instead
661+
if let StoppedVariant::FailedAllocation { .. } = &variant {
662+
// Transition to retry backoff
663+
state.transition = Transition::Reallocating { since_ts: now };
630664

631-
StoppedResult::Continue
632-
} else {
633-
StoppedResult::Destroy
665+
StoppedResult::Continue
666+
} else {
667+
StoppedResult::Destroy
668+
}
634669
}
635670
}
636671
}

0 commit comments

Comments
 (0)