Skip to content

Commit b69ce64

Browse files
committed
fix(serverless): flatten metadata error to message/details/metadata
1 parent fc09194 commit b69ce64

30 files changed

Lines changed: 956 additions & 15 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ docker-compose up -d
9090
- Prefer the Tokio-shaped APIs from `antiox` (`antiox/sync/mpsc`, `antiox/task`, etc.) over ad hoc Promise queues, custom channel wrappers, or event-emitter coordination.
9191
- The high-level `rivetkit` crate stays a thin typed wrapper over `rivetkit-core` and re-exports shared transport/config types instead of redefining them.
9292
- When `rivetkit` needs ergonomic helpers on a `rivetkit-core` type it re-exports, prefer an extension trait plus `prelude` re-export instead of wrapping and replacing the core type.
93+
- `engine/sdks/*/api-*` are auto-generated SDK outputs; update the source API schema and regenerate them instead of editing them by hand.
9394

9495
### SQLite Package
9596

engine/artifacts/openapi.json

Lines changed: 7 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-public/src/runner_configs/utils.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use gas::prelude::*;
55

66
use crate::ctx::ApiCtx;
77

8-
// Re-export types from pegboard for API schema
98
pub use pegboard::ops::serverless_metadata::fetch::ServerlessMetadataError;
109

1110
/// Serverless metadata returned from a runner.
Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
#[path = "common/api/mod.rs"]
2+
mod api;
3+
#[path = "common/ctx.rs"]
4+
mod ctx;
5+
6+
use axum::{
7+
Json, Router,
8+
body::Bytes,
9+
extract::State,
10+
http::StatusCode,
11+
response::{
12+
IntoResponse, Response, Sse,
13+
sse::{Event, KeepAlive},
14+
},
15+
routing::{get, post},
16+
};
17+
use futures_util::stream;
18+
use serde_json::json;
19+
use std::collections::HashMap;
20+
use std::convert::Infallible;
21+
use std::future::Future;
22+
use std::sync::{
23+
Arc,
24+
atomic::{AtomicBool, Ordering},
25+
};
26+
use std::time::Duration;
27+
use tokio::sync::mpsc;
28+
29+
struct MockServerlessState {
30+
expose_protocol_version: AtomicBool,
31+
start_tx: mpsc::UnboundedSender<()>,
32+
}
33+
34+
struct MockMetadataErrorState {
35+
body: String,
36+
}
37+
38+
async fn metadata_handler(
39+
State(state): State<Arc<MockServerlessState>>,
40+
) -> Json<serde_json::Value> {
41+
let mut response = json!({
42+
"runtime": "rivetkit",
43+
"version": "1",
44+
});
45+
46+
if state.expose_protocol_version.load(Ordering::SeqCst) {
47+
response["envoyProtocolVersion"] = json!(rivet_envoy_protocol::PROTOCOL_VERSION);
48+
}
49+
50+
Json(response)
51+
}
52+
53+
async fn metadata_error_handler(State(state): State<Arc<MockMetadataErrorState>>) -> Response {
54+
(
55+
StatusCode::OK,
56+
[(axum::http::header::CONTENT_TYPE, "application/json")],
57+
state.body.clone(),
58+
)
59+
.into_response()
60+
}
61+
62+
async fn start_handler(
63+
State(state): State<Arc<MockServerlessState>>,
64+
_body: Bytes,
65+
) -> impl IntoResponse {
66+
let _ = state.start_tx.send(());
67+
let events =
68+
stream::once(async { Ok::<Event, Infallible>(Event::default().event("ping").data("")) });
69+
70+
Sse::new(events)
71+
.keep_alive(KeepAlive::default())
72+
.into_response()
73+
}
74+
75+
fn run<F, Fut>(opts: ctx::TestOpts, test_fn: F)
76+
where
77+
F: FnOnce(ctx::TestCtx) -> Fut,
78+
Fut: Future<Output = ()>,
79+
{
80+
let runtime = tokio::runtime::Runtime::new().expect("failed to build runtime");
81+
runtime.block_on(async {
82+
let timeout = Duration::from_secs(opts.timeout_secs);
83+
let ctx = ctx::TestCtx::new_with_opts(opts)
84+
.await
85+
.expect("failed to build test ctx");
86+
tokio::time::timeout(timeout, test_fn(ctx))
87+
.await
88+
.expect("test timed out");
89+
});
90+
}
91+
92+
async fn setup_test_namespace(leader_dc: &ctx::TestDatacenter) -> (String, rivet_util::Id) {
93+
let random_suffix = rand::random::<u16>();
94+
let namespace_name = format!("test-{random_suffix}");
95+
let response = api::public::namespaces_create(
96+
leader_dc.guard_port(),
97+
rivet_api_peer::namespaces::CreateRequest {
98+
name: namespace_name,
99+
display_name: "Test Namespace".to_string(),
100+
},
101+
)
102+
.await
103+
.expect("failed to set up test namespace");
104+
105+
(response.namespace.name, response.namespace.namespace_id)
106+
}
107+
108+
#[test]
109+
fn refresh_metadata_invalidates_protocol_cache_before_v2_dispatch() {
110+
run(
111+
ctx::TestOpts::new(1)
112+
.with_timeout(30)
113+
.with_pegboard_outbound(),
114+
|ctx| async move {
115+
let (namespace, namespace_id) = setup_test_namespace(ctx.leader_dc()).await;
116+
let runner_name = "metadata-refresh-v2-dispatch";
117+
118+
let (start_tx, mut start_rx) = mpsc::unbounded_channel();
119+
let mock_state = Arc::new(MockServerlessState {
120+
expose_protocol_version: AtomicBool::new(false),
121+
start_tx,
122+
});
123+
let app = Router::new()
124+
.route("/metadata", get(metadata_handler))
125+
.route("/start", post(start_handler))
126+
.with_state(mock_state.clone());
127+
128+
let mock_port = portpicker::pick_unused_port().expect("failed to pick port");
129+
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{mock_port}"))
130+
.await
131+
.expect("failed to bind mock serverless endpoint");
132+
let server_handle = tokio::spawn(async move {
133+
axum::serve(listener, app).await.expect("server error");
134+
});
135+
136+
let mut datacenters = HashMap::new();
137+
datacenters.insert(
138+
"dc-1".to_string(),
139+
rivet_api_types::namespaces::runner_configs::RunnerConfig {
140+
kind:
141+
rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless {
142+
url: format!("http://127.0.0.1:{mock_port}"),
143+
headers: None,
144+
request_lifespan: 30,
145+
max_concurrent_actors: Some(10),
146+
drain_grace_period: None,
147+
slots_per_runner: 1,
148+
min_runners: Some(0),
149+
max_runners: 0,
150+
runners_margin: Some(0),
151+
metadata_poll_interval: None,
152+
},
153+
metadata: None,
154+
drain_on_version_upgrade: true,
155+
},
156+
);
157+
158+
api::public::runner_configs_upsert(
159+
ctx.leader_dc().guard_port(),
160+
rivet_api_peer::runner_configs::UpsertPath {
161+
runner_name: runner_name.to_string(),
162+
},
163+
rivet_api_peer::runner_configs::UpsertQuery {
164+
namespace: namespace.clone(),
165+
},
166+
rivet_api_public::runner_configs::upsert::UpsertRequest { datacenters },
167+
)
168+
.await
169+
.expect("failed to upsert serverless runner config");
170+
171+
let cached_before_refresh = ctx
172+
.leader_dc()
173+
.workflow_ctx
174+
.op(pegboard::ops::runner_config::get::Input {
175+
runners: vec![(namespace_id, runner_name.to_string())],
176+
bypass_cache: false,
177+
})
178+
.await
179+
.expect("failed to read cached runner config");
180+
assert_eq!(cached_before_refresh[0].protocol_version, None);
181+
182+
mock_state
183+
.expose_protocol_version
184+
.store(true, Ordering::SeqCst);
185+
186+
api::public::runner_configs_refresh_metadata(
187+
ctx.leader_dc().guard_port(),
188+
runner_name.to_string(),
189+
api::public::RefreshMetadataQuery {
190+
namespace: namespace.clone(),
191+
},
192+
api::public::RefreshMetadataRequest {},
193+
)
194+
.await
195+
.expect("failed to refresh metadata");
196+
197+
tokio::time::timeout(Duration::from_millis(100), async {
198+
let cached_after_refresh = ctx
199+
.leader_dc()
200+
.workflow_ctx
201+
.op(pegboard::ops::runner_config::get::Input {
202+
runners: vec![(namespace_id, runner_name.to_string())],
203+
bypass_cache: false,
204+
})
205+
.await
206+
.expect("failed to read refreshed runner config");
207+
assert_eq!(
208+
cached_after_refresh[0].protocol_version,
209+
Some(rivet_envoy_protocol::PROTOCOL_VERSION)
210+
);
211+
})
212+
.await
213+
.expect("refreshed protocol version should bypass the old 5s cache TTL");
214+
215+
api::public::actors_create(
216+
ctx.leader_dc().guard_port(),
217+
rivet_api_types::actors::create::CreateQuery {
218+
namespace: namespace.clone(),
219+
},
220+
rivet_api_types::actors::create::CreateRequest {
221+
datacenter: None,
222+
name: "test-actor".to_string(),
223+
key: Some(format!("key-{}", rand::random::<u64>())),
224+
input: None,
225+
runner_name_selector: runner_name.to_string(),
226+
crash_policy: rivet_types::actors::CrashPolicy::Sleep,
227+
},
228+
)
229+
.await
230+
.expect("failed to create actor after metadata refresh");
231+
232+
tokio::time::timeout(Duration::from_secs(2), start_rx.recv())
233+
.await
234+
.expect("v2 serverless dispatch should start immediately after refresh")
235+
.expect("mock serverless start channel closed");
236+
237+
server_handle.abort();
238+
},
239+
);
240+
}
241+
242+
#[test]
243+
fn refresh_metadata_surfaces_invalid_envoy_protocol_version_in_error_metadata() {
244+
run(ctx::TestOpts::new(1), |ctx| async move {
245+
let (namespace, _) = setup_test_namespace(ctx.leader_dc()).await;
246+
let runner_name = "metadata-refresh-invalid-envoy-protocol";
247+
let invalid_version = rivet_envoy_protocol::PROTOCOL_VERSION + 1;
248+
249+
let app = Router::new()
250+
.route("/metadata", get(metadata_error_handler))
251+
.with_state(Arc::new(MockMetadataErrorState {
252+
body: format!(
253+
r#"{{"runtime":"rivetkit","version":"1","envoyProtocolVersion":{invalid_version}}}"#
254+
),
255+
}));
256+
257+
let mock_port = portpicker::pick_unused_port().expect("failed to pick port");
258+
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{mock_port}"))
259+
.await
260+
.expect("failed to bind mock serverless endpoint");
261+
let server_handle = tokio::spawn(async move {
262+
axum::serve(listener, app).await.expect("server error");
263+
});
264+
265+
let mut datacenters = HashMap::new();
266+
datacenters.insert(
267+
"dc-1".to_string(),
268+
rivet_api_types::namespaces::runner_configs::RunnerConfig {
269+
kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless {
270+
url: format!("http://127.0.0.1:{mock_port}"),
271+
headers: None,
272+
request_lifespan: 30,
273+
max_concurrent_actors: Some(10),
274+
drain_grace_period: None,
275+
slots_per_runner: 1,
276+
min_runners: Some(0),
277+
max_runners: 0,
278+
runners_margin: Some(0),
279+
metadata_poll_interval: None,
280+
},
281+
metadata: None,
282+
drain_on_version_upgrade: true,
283+
},
284+
);
285+
286+
api::public::runner_configs_upsert(
287+
ctx.leader_dc().guard_port(),
288+
rivet_api_peer::runner_configs::UpsertPath {
289+
runner_name: runner_name.to_string(),
290+
},
291+
rivet_api_peer::runner_configs::UpsertQuery {
292+
namespace: namespace.clone(),
293+
},
294+
rivet_api_public::runner_configs::upsert::UpsertRequest { datacenters },
295+
)
296+
.await
297+
.expect("failed to upsert serverless runner config");
298+
299+
let response = api::public::build_runner_configs_refresh_metadata_request(
300+
ctx.leader_dc().guard_port(),
301+
runner_name.to_string(),
302+
api::public::RefreshMetadataQuery {
303+
namespace: namespace.clone(),
304+
},
305+
api::public::RefreshMetadataRequest {},
306+
)
307+
.await
308+
.expect("failed to build refresh request")
309+
.send()
310+
.await
311+
.expect("failed to send refresh request");
312+
313+
server_handle.abort();
314+
315+
assert!(
316+
!response.status().is_success(),
317+
"refresh metadata should fail for an unsupported envoy protocol version"
318+
);
319+
320+
let body: serde_json::Value = response
321+
.json()
322+
.await
323+
.expect("failed to decode refresh-metadata error body");
324+
325+
assert_eq!(body["group"], "serverless_runner_pool");
326+
assert_eq!(body["code"], "failed_to_fetch_metadata");
327+
assert_eq!(
328+
body["metadata"]["reason"]["metadata"]["envoy_protocol_version"],
329+
invalid_version
330+
);
331+
assert_eq!(
332+
body["metadata"]["reason"]["metadata"]["max_supported_envoy_protocol_version"],
333+
rivet_envoy_protocol::PROTOCOL_VERSION
334+
);
335+
assert!(
336+
body["metadata"]["reason"]["message"]
337+
.as_str()
338+
.unwrap_or("")
339+
.to_ascii_lowercase()
340+
.contains("envoy protocol"),
341+
"message should mention envoy protocol version, got {:?}",
342+
body["metadata"]["reason"]["message"]
343+
);
344+
});
345+
}

0 commit comments

Comments
 (0)