Skip to content

Commit c008cb0

Browse files
committed
fix(pegboard): route scoped actor key reads by runner dc
1 parent c2b3075 commit c008cb0

6 files changed

Lines changed: 167 additions & 41 deletions

File tree

engine/CLAUDE.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ When changing a versioned VBARE schema, follow the existing migration pattern.
3939
- UniversalDB simulated latency for benchmarks comes from `UDB_SIMULATED_LATENCY_MS`, which `Database::txn(...)` reads once via `OnceLock`, so set it before process startup.
4040
- When adding fields to epoxy workflow state structs, mark them `#[serde(default)]` so Gasoline can replay older serialized state.
4141
- Epoxy integration tests that spin up `tests/common::TestCtx` must call `shutdown()` before returning.
42+
- Before issuing an Epoxy operation with scoped `target_replicas`, validate the local replica is in scope or forward to an in-scope datacenter first.
4243

4344
## Test snapshots
4445

@@ -71,7 +72,12 @@ Use `test-snapshot-gen` to generate and load RocksDB snapshots of the full UDB K
7172

7273
## Pegboard Envoy
7374

75+
- Write new actor-hosting engine tests under `engine/packages/engine/tests/envoy/`; do not add new legacy runner tests under `engine/packages/engine/tests/runner/`.
7476
- `PegboardEnvoyWs::new(...)` is constructed per websocket request, so shared sqlite dispatch state such as the `SqliteEngine` and `CompactionCoordinator` must live behind a process-wide `OnceCell` instead of per-connection fields.
7577
- Restored hibernatable WebSockets must rebuild runtime WebSocket handlers from callbacks and call `on_open`; pre-sleep NAPI callbacks are not reusable after actor wake.
7678
- `pegboard-envoy` SQLite websocket handlers must validate page numbers, page sizes, and duplicate dirty pages at the websocket trust boundary and return `SqliteErrorResponse` for unexpected failures instead of bubbling them through the shared connection task.
7779
- SQLite start-command schema dispatch should probe actor KV prefix `0x08` at startup instead of persisting a schema version in pegboard config or actor workflow state.
80+
81+
## API routing
82+
83+
- `api-public` owns cross-datacenter forwarding for external requests; `api-peer` handlers should be local datacenter operations and must not add forwarding requirements.

engine/packages/api-peer/src/actors/get_or_create.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,11 @@ pub async fn get_or_create(
106106
}
107107
}
108108
}
109-
// Make request to remote datacenter
110109
pegboard::ops::actor::get_for_key::Output::Forward { dc_label } => {
111-
rivet_api_util::request_remote_datacenter(
112-
ctx.config(),
113-
dc_label,
114-
"/actors",
115-
rivet_api_util::Method::PUT,
116-
Some(&query),
117-
Some(&body),
118-
)
119-
.await
110+
Err(pegboard::errors::Actor::KeyReservedInDifferentDatacenter {
111+
datacenter_label: dc_label,
112+
}
113+
.build())
120114
}
121115
}
122116
}

engine/packages/api-public/src/actors/utils.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -139,32 +139,38 @@ pub async fn find_dc_for_actor_creation(
139139
runner_name: &str,
140140
dc_name: Option<&str>,
141141
) -> Result<u16> {
142-
let target_dc_label = if let Some(dc_name) = &dc_name {
142+
let requested_dc_label = if let Some(dc_name) = &dc_name {
143143
// Use user-configured DC
144-
ctx.config()
144+
Some(ctx.config()
145145
.dc_for_name(dc_name)
146146
.ok_or_else(|| rivet_api_util::errors::Datacenter::NotFound.build())?
147-
.datacenter_label
147+
.datacenter_label)
148148
} else {
149-
// Find the nearest DC with runners
150-
let res = ctx
151-
.op(
152-
pegboard::ops::runner::list_runner_config_enabled_dcs::Input {
153-
namespace_id,
154-
runner_name: runner_name.into(),
155-
},
156-
)
157-
.await?;
158-
if let Some(dc_label) = res.dc_labels.into_iter().next() {
159-
dc_label
160-
} else {
161-
return Err(pegboard::errors::Actor::NoRunnerConfigConfigured {
162-
namespace: namespace_name.into(),
163-
pool_name: runner_name.into(),
164-
}
165-
.build());
166-
}
149+
None
150+
};
151+
152+
let res = ctx
153+
.op(
154+
pegboard::ops::runner::list_runner_config_enabled_dcs::Input {
155+
namespace_id,
156+
runner_name: runner_name.into(),
157+
},
158+
)
159+
.await?;
160+
161+
let target_dc_label = if let Some(requested_dc_label) = requested_dc_label {
162+
res.dc_labels
163+
.into_iter()
164+
.find(|dc_label| *dc_label == requested_dc_label)
165+
} else {
166+
res.dc_labels.into_iter().next()
167167
};
168168

169-
Ok(target_dc_label)
169+
target_dc_label.ok_or_else(|| {
170+
pegboard::errors::Actor::NoRunnerConfigConfigured {
171+
namespace: namespace_name.into(),
172+
pool_name: runner_name.into(),
173+
}
174+
.build()
175+
})
170176
}

engine/packages/engine/tests/envoy/actors_lifecycle.rs

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,106 @@ fn envoy_actor_connectable_via_guard_websocket() {
359359
});
360360
}
361361

362+
#[test]
363+
fn query_get_or_create_from_dc_without_runner_forwards_to_runner_dc() {
364+
common::run(common::TestOpts::new(2).with_timeout(45), |ctx| async move {
365+
let (namespace, _, _envoy) =
366+
common::setup_test_namespace_with_envoy(ctx.leader_dc()).await;
367+
let wrong_dc = ctx.get_dc(2);
368+
369+
let client = reqwest::Client::new();
370+
let response = client
371+
.get(format!(
372+
"http://127.0.0.1:{}/gateway/test-actor/ping",
373+
wrong_dc.guard_port()
374+
))
375+
.query(&[
376+
("rvt-namespace", namespace.as_str()),
377+
("rvt-method", "getOrCreate"),
378+
("rvt-runner", common::TEST_RUNNER_NAME),
379+
("rvt-key", "geo-routed-key"),
380+
])
381+
.send()
382+
.await
383+
.expect("failed to send query gateway request");
384+
385+
assert_eq!(
386+
response.status(),
387+
reqwest::StatusCode::OK,
388+
"query gateway should forward to the runner dc"
389+
);
390+
391+
let body: serde_json::Value = response.json().await.expect("invalid ping response");
392+
let actor_id = body["actorId"].as_str().expect("missing actor id");
393+
assert_eq!(body["status"], "ok");
394+
common::assert_actor_in_dc(actor_id, ctx.leader_dc().config.dc_label()).await;
395+
});
396+
}
397+
398+
#[test]
399+
fn public_get_or_create_with_unavailable_datacenter_returns_typed_error() {
400+
common::run(common::TestOpts::new(2).with_timeout(45), |ctx| async move {
401+
let (namespace, _, _envoy) =
402+
common::setup_test_namespace_with_envoy(ctx.leader_dc()).await;
403+
let wrong_dc = ctx.get_dc(2);
404+
405+
let request = common::api::public::build_actors_get_or_create_request(
406+
ctx.leader_dc().guard_port(),
407+
common::api::public::GetOrCreateQuery {
408+
namespace: namespace.clone(),
409+
},
410+
common::api::public::GetOrCreateRequest {
411+
datacenter: Some(wrong_dc.config.dc_name().unwrap().to_string()),
412+
name: "test-actor".to_string(),
413+
key: "public-explicit-wrong-dc-key".to_string(),
414+
input: None,
415+
runner_name_selector: common::TEST_RUNNER_NAME.to_string(),
416+
crash_policy: rivet_types::actors::CrashPolicy::Sleep,
417+
},
418+
)
419+
.await
420+
.expect("failed to build request");
421+
let response = request.send().await.expect("failed to send request");
422+
423+
assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST);
424+
let body: serde_json::Value = response.json().await.expect("invalid error response");
425+
assert_eq!(body["group"], "actor");
426+
assert_eq!(body["code"], "no_runner_config_configured");
427+
});
428+
}
429+
430+
#[test]
431+
fn public_create_with_unavailable_datacenter_returns_typed_error() {
432+
common::run(common::TestOpts::new(2).with_timeout(45), |ctx| async move {
433+
let (namespace, _, _envoy) =
434+
common::setup_test_namespace_with_envoy(ctx.leader_dc()).await;
435+
let wrong_dc = ctx.get_dc(2);
436+
437+
let request = common::api::public::build_actors_create_request(
438+
ctx.leader_dc().guard_port(),
439+
common::api_types::actors::create::CreateQuery {
440+
namespace: namespace.clone(),
441+
},
442+
common::api_types::actors::create::CreateRequest {
443+
datacenter: Some(wrong_dc.config.dc_name().unwrap().to_string()),
444+
name: "test-actor".to_string(),
445+
key: Some("public-create-explicit-wrong-dc-key".to_string()),
446+
input: None,
447+
runner_name_selector: common::TEST_RUNNER_NAME.to_string(),
448+
crash_policy: rivet_types::actors::CrashPolicy::Sleep,
449+
},
450+
)
451+
.await
452+
.expect("failed to build request");
453+
let response = request.send().await.expect("failed to send request");
454+
455+
assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST);
456+
let body: serde_json::Value = response.json().await.expect("invalid error response");
457+
assert_eq!(body["group"], "actor");
458+
assert_eq!(body["code"], "no_runner_config_configured");
459+
});
460+
}
461+
362462
#[test]
363463
fn envoy_websocket_actor_close_round_trip() {
364464
common::run(common::TestOpts::new(1).with_timeout(20), |ctx| async move {
@@ -1129,4 +1229,3 @@ fn envoy_normal_pool_does_not_apply_legacy_runner_slot_capacity() {
11291229
}
11301230
});
11311231
}
1132-

engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,20 @@ async fn resolve_query_get_or_create(
132132
let namespace_id = resolve_namespace_id(ctx, namespace_name).await?;
133133
let serialized_key = serialize_actor_key(key)?;
134134

135+
let target_dc_label =
136+
resolve_query_target_dc_label(ctx, namespace_id, namespace_name, pool_name, region).await?;
137+
if target_dc_label != ctx.config().dc_label() {
138+
return Ok(ResolveQueryActorResult::Forward {
139+
dc_label: target_dc_label,
140+
});
141+
}
142+
135143
if let Some(res) =
136144
get_actor_for_key(ctx, namespace_id, name, &serialized_key, Some(pool_name)).await?
137145
{
138146
return Ok(res);
139147
}
140148

141-
let target_dc_label =
142-
resolve_query_target_dc_label(ctx, namespace_id, namespace_name, pool_name, region).await?;
143149
let encoded_input = input.map(|input| STANDARD.encode(input));
144150

145151
if target_dc_label == ctx.config().dc_label() {
@@ -185,13 +191,15 @@ async fn resolve_query_target_dc_label(
185191
runner_name_selector: &str,
186192
region: Option<&str>,
187193
) -> Result<u16> {
188-
if let Some(region) = region {
189-
return Ok(ctx
194+
let requested_dc_label = if let Some(region) = region {
195+
Some(ctx
190196
.config()
191197
.dc_for_name(region)
192198
.ok_or_else(|| rivet_api_util::errors::Datacenter::NotFound.build())?
193-
.datacenter_label);
194-
}
199+
.datacenter_label)
200+
} else {
201+
None
202+
};
195203

196204
let res = ctx
197205
.op(
@@ -202,8 +210,15 @@ async fn resolve_query_target_dc_label(
202210
)
203211
.await?;
204212

205-
// Return nearest enabled dc
206-
if let Some(dc_label) = res.dc_labels.into_iter().next() {
213+
let target_dc_label = if let Some(requested_dc_label) = requested_dc_label {
214+
res.dc_labels
215+
.into_iter()
216+
.find(|dc_label| *dc_label == requested_dc_label)
217+
} else {
218+
res.dc_labels.into_iter().next()
219+
};
220+
221+
if let Some(dc_label) = target_dc_label {
207222
Ok(dc_label)
208223
} else {
209224
Err(pegboard::errors::Actor::NoRunnerConfigConfigured {

engine/packages/pegboard/src/ops/actor/get_reservation_for_key.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::ensure;
12
use gas::prelude::*;
23
use universaldb::utils::FormalKey;
34

@@ -33,6 +34,11 @@ pub async fn pegboard_actor_get_reservation_for_key(
3334
)
3435
.await?;
3536

37+
ensure!(
38+
res.replicas.contains(&ctx.config().epoxy_replica_id()),
39+
"get_reservation_for_key called outside the scoped runner replica set"
40+
);
41+
3642
Some(res.replicas)
3743
} else {
3844
None

0 commit comments

Comments
 (0)