Skip to content

Commit 25dd07c

Browse files
committed
fix(rivetkit): route raw request fetches to actors
1 parent 3e2c4f5 commit 25dd07c

4 files changed

Lines changed: 327 additions & 251 deletions

File tree

engine/packages/pegboard-envoy/src/sqlite_runtime.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -676,9 +676,9 @@ mod tests {
676676
use universaldb::driver::RocksDbDatabaseDriver;
677677

678678
use super::{
679-
FILE_TAG_JOURNAL, FILE_TAG_MAIN, FILE_TAG_SHM, FILE_TAG_WAL,
680-
SQLITE_V1_CHUNK_SIZE, SQLITE_V1_MAX_MIGRATION_BYTES, SQLITE_V1_MIGRATION_LEASE_MS,
681-
maybe_migrate_v1_to_v2, read_v1_file, sqlite_subspace, v1_chunk_key, v1_meta_key,
679+
FILE_TAG_JOURNAL, FILE_TAG_MAIN, FILE_TAG_SHM, FILE_TAG_WAL, SQLITE_V1_CHUNK_SIZE,
680+
SQLITE_V1_MAX_MIGRATION_BYTES, SQLITE_V1_MIGRATION_LEASE_MS, maybe_migrate_v1_to_v2,
681+
read_v1_file, sqlite_subspace, v1_chunk_key, v1_meta_key,
682682
};
683683

684684
fn recipient(actor_id: Id) -> Recipient {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"code": "incoming_too_long",
33
"group": "message",
4-
"message": "Incoming message too long."
4+
"message": "Incoming message too long"
55
}

rivetkit-rust/packages/rivetkit-core/src/registry/http.rs

Lines changed: 68 additions & 247 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ impl RegistryDispatcher {
1515
return self.handle_metrics_fetch(&instance, &request);
1616
}
1717

18+
let original_path = request.path.clone();
1819
let request = build_http_request(request).await?;
19-
let framework_route = framework_http_route(request.uri().path())?;
20+
let route = RegistryHttpRoute::from_paths(&original_path, request.uri().path())?;
2021
let instance = match self.active_actor(actor_id).await {
2122
Ok(instance) => instance,
2223
Err(error) => {
23-
if framework_route.is_some() {
24+
if matches!(route, RegistryHttpRoute::Framework(_)) {
2425
return message_boundary_error_response(
2526
request_encoding(request.headers()),
2627
framework_anyhow_status(&error),
@@ -36,20 +37,24 @@ impl RegistryDispatcher {
3637

3738
instance.ctx.cancel_sleep_timer();
3839

39-
let rearm_sleep_after_request = |ctx: ActorContext| {
40-
let sleep_ctx = ctx.clone();
41-
ctx.wait_until(async move {
42-
sleep_ctx.wait_for_http_requests_idle().await;
43-
sleep_ctx.reset_sleep_timer();
44-
});
40+
let response = match route {
41+
RegistryHttpRoute::Framework(route) => {
42+
let response = self.handle_framework_fetch(&instance, request, route).await;
43+
rearm_sleep_after_request(instance.ctx.clone());
44+
response
45+
}
46+
RegistryHttpRoute::UserRawRequest => {
47+
self.handle_user_request_fetch(&instance, request).await
48+
}
4549
};
50+
response
51+
}
4652

47-
if let Some(route) = framework_route {
48-
let response = self.handle_framework_fetch(&instance, request, route).await;
49-
rearm_sleep_after_request(instance.ctx.clone());
50-
return response;
51-
}
52-
53+
async fn handle_user_request_fetch(
54+
&self,
55+
instance: &ActorTaskHandle,
56+
request: Request,
57+
) -> Result<HttpResponse> {
5358
let (reply_tx, reply_rx) = oneshot::channel();
5459
try_send_dispatch_command(
5560
&instance.dispatch,
@@ -72,7 +77,11 @@ impl RegistryDispatcher {
7277
build_envoy_response(response)
7378
}
7479
Err(error) => {
75-
tracing::error!(actor_id, ?error, "actor request callback failed");
80+
tracing::error!(
81+
actor_id = instance.actor_id,
82+
?error,
83+
"actor request callback failed"
84+
);
7685
rearm_sleep_after_request(instance.ctx.clone());
7786
Ok(inspector_anyhow_response(error))
7887
}
@@ -373,6 +382,40 @@ impl RegistryDispatcher {
373382
}
374383
}
375384

385+
enum RegistryHttpRoute {
386+
Framework(FrameworkHttpRoute),
387+
UserRawRequest,
388+
}
389+
390+
impl RegistryHttpRoute {
391+
fn from_paths(original_path: &str, normalized_path: &str) -> Result<Self> {
392+
if let Some(stripped) = original_path.strip_prefix("/request") {
393+
if stripped.is_empty() || matches!(stripped.as_bytes().first(), Some(b'/') | Some(b'?'))
394+
{
395+
return Ok(Self::UserRawRequest);
396+
}
397+
}
398+
399+
if let Some(segment) = single_path_segment(normalized_path, "/action/") {
400+
return Ok(Self::Framework(FrameworkHttpRoute::Action(
401+
percent_decode_path_segment(segment)?,
402+
)));
403+
}
404+
if let Some(segment) = single_path_segment(normalized_path, "/queue/") {
405+
return Ok(Self::Framework(FrameworkHttpRoute::Queue(
406+
percent_decode_path_segment(segment)?,
407+
)));
408+
}
409+
410+
match normalized_path {
411+
"/metadata" => Ok(Self::Framework(FrameworkHttpRoute::Metadata)),
412+
"/health" => Ok(Self::Framework(FrameworkHttpRoute::Health)),
413+
"/" => Ok(Self::Framework(FrameworkHttpRoute::Root)),
414+
_ => Ok(Self::UserRawRequest),
415+
}
416+
}
417+
}
418+
376419
pub(super) enum FrameworkHttpRoute {
377420
Action(String),
378421
Queue(String),
@@ -387,25 +430,6 @@ pub(super) struct DecodedHttpQueueRequest {
387430
timeout: Option<u64>,
388431
}
389432

390-
pub(super) fn framework_http_route(path: &str) -> Result<Option<FrameworkHttpRoute>> {
391-
if let Some(segment) = single_path_segment(path, "/action/") {
392-
return Ok(Some(FrameworkHttpRoute::Action(
393-
percent_decode_path_segment(segment)?,
394-
)));
395-
}
396-
if let Some(segment) = single_path_segment(path, "/queue/") {
397-
return Ok(Some(FrameworkHttpRoute::Queue(
398-
percent_decode_path_segment(segment)?,
399-
)));
400-
}
401-
match path {
402-
"/metadata" => Ok(Some(FrameworkHttpRoute::Metadata)),
403-
"/health" => Ok(Some(FrameworkHttpRoute::Health)),
404-
"/" => Ok(Some(FrameworkHttpRoute::Root)),
405-
_ => Ok(None),
406-
}
407-
}
408-
409433
fn handle_metadata_fetch(request: &Request) -> Result<HttpResponse> {
410434
if request.method() != http::Method::GET {
411435
return method_not_allowed_response(request);
@@ -469,6 +493,14 @@ fn method_not_allowed_response(request: &Request) -> Result<HttpResponse> {
469493
)
470494
}
471495

496+
fn rearm_sleep_after_request(ctx: ActorContext) {
497+
let sleep_ctx = ctx.clone();
498+
ctx.wait_until(async move {
499+
sleep_ctx.wait_for_http_requests_idle().await;
500+
sleep_ctx.reset_sleep_timer();
501+
});
502+
}
503+
472504
pub(super) fn single_path_segment<'a>(path: &'a str, prefix: &str) -> Option<&'a str> {
473505
let segment = path.strip_prefix(prefix)?;
474506
(!segment.is_empty() && !segment.contains('/')).then_some(segment)
@@ -899,216 +931,5 @@ fn bearer_token_from_authorization(value: &str) -> Option<&str> {
899931
}
900932

901933
#[cfg(test)]
902-
mod tests {
903-
use std::collections::HashMap;
904-
use std::time::Duration;
905-
906-
use super::{
907-
HttpRequest, HttpResponseEncoding, authorization_bearer_token,
908-
authorization_bearer_token_map, framework_action_error_response,
909-
message_boundary_error_response, request_encoding, request_has_bearer_token,
910-
workflow_dispatch_result,
911-
};
912-
use crate::actor::action::ActionDispatchError;
913-
use crate::error::ActorLifecycle as ActorLifecycleError;
914-
use http::StatusCode;
915-
use rivet_error::RivetError;
916-
use serde_json::json;
917-
use vbare::OwnedVersionedData;
918-
919-
#[derive(RivetError)]
920-
#[error("message", "incoming_too_long", "Incoming message too long")]
921-
struct IncomingMessageTooLong;
922-
923-
#[derive(RivetError)]
924-
#[error("message", "outgoing_too_long", "Outgoing message too long")]
925-
struct OutgoingMessageTooLong;
926-
927-
#[test]
928-
fn workflow_dispatch_result_marks_handled_workflow_as_enabled() {
929-
assert_eq!(
930-
workflow_dispatch_result(Ok(Some(vec![1, 2, 3])))
931-
.expect("workflow dispatch should succeed"),
932-
(true, Some(vec![1, 2, 3])),
933-
);
934-
assert_eq!(
935-
workflow_dispatch_result(Ok(None)).expect("workflow dispatch should succeed"),
936-
(true, None),
937-
);
938-
}
939-
940-
#[test]
941-
fn workflow_dispatch_result_treats_dropped_reply_as_disabled() {
942-
assert_eq!(
943-
workflow_dispatch_result(Err(ActorLifecycleError::DroppedReply.build()))
944-
.expect("dropped reply should map to workflow disabled"),
945-
(false, None),
946-
);
947-
}
948-
949-
#[test]
950-
fn workflow_dispatch_result_preserves_non_dropped_reply_errors() {
951-
let error = workflow_dispatch_result(Err(ActorLifecycleError::Destroying.build()))
952-
.expect_err("non-dropped reply errors should be preserved");
953-
let error = rivet_error::RivetError::extract(&error);
954-
assert_eq!(error.group(), "actor");
955-
assert_eq!(error.code(), "destroying");
956-
}
957-
958-
#[test]
959-
fn inspector_error_status_maps_action_timeout_to_408() {
960-
assert_eq!(
961-
super::inspector_error_status("actor", "action_timed_out"),
962-
StatusCode::REQUEST_TIMEOUT,
963-
);
964-
}
965-
966-
#[test]
967-
fn authorization_bearer_token_accepts_case_insensitive_scheme_and_whitespace() {
968-
let mut headers = http::HeaderMap::new();
969-
headers.insert(
970-
http::header::AUTHORIZATION,
971-
"bearer test-token".parse().unwrap(),
972-
);
973-
974-
assert_eq!(authorization_bearer_token(&headers), Some("test-token"));
975-
976-
let map = HashMap::from([(
977-
http::header::AUTHORIZATION.as_str().to_owned(),
978-
"BEARER\ttest-token".to_owned(),
979-
)]);
980-
assert_eq!(authorization_bearer_token_map(&map), Some("test-token"));
981-
}
982-
983-
#[test]
984-
fn request_has_bearer_token_uses_same_authorization_parser() {
985-
let request = HttpRequest {
986-
method: "GET".to_owned(),
987-
path: "/metrics".to_owned(),
988-
headers: HashMap::from([(
989-
http::header::AUTHORIZATION.as_str().to_owned(),
990-
"Bearer configured".to_owned(),
991-
)]),
992-
body: Some(Vec::new()),
993-
body_stream: None,
994-
};
995-
996-
assert!(request_has_bearer_token(&request, Some("configured")));
997-
assert!(!request_has_bearer_token(&request, Some("other")));
998-
}
999-
1000-
#[tokio::test]
1001-
async fn action_dispatch_timeout_returns_structured_error() {
1002-
let error = super::with_action_dispatch_timeout(Duration::from_millis(1), async {
1003-
tokio::time::sleep(Duration::from_secs(60)).await;
1004-
Ok::<Vec<u8>, ActionDispatchError>(Vec::new())
1005-
})
1006-
.await
1007-
.expect_err("timeout should return an action dispatch error");
1008-
1009-
assert_eq!(error.group, "actor");
1010-
assert_eq!(error.code, "action_timed_out");
1011-
assert_eq!(error.message, "Action timed out");
1012-
}
1013-
1014-
#[tokio::test]
1015-
async fn framework_action_timeout_returns_structured_error() {
1016-
let error = super::with_framework_action_timeout(Duration::from_millis(1), async {
1017-
tokio::time::sleep(Duration::from_secs(60)).await;
1018-
Ok::<(), anyhow::Error>(())
1019-
})
1020-
.await
1021-
.expect_err("timeout should return a framework error");
1022-
let error = RivetError::extract(&error);
1023-
1024-
assert_eq!(error.group(), "actor");
1025-
assert_eq!(error.code(), "action_timed_out");
1026-
assert_eq!(error.message(), "Action timed out");
1027-
}
1028-
1029-
#[test]
1030-
fn framework_action_error_response_maps_timeout_to_408() {
1031-
let response = framework_action_error_response(
1032-
HttpResponseEncoding::Json,
1033-
ActionDispatchError {
1034-
group: "actor".to_owned(),
1035-
code: "action_timed_out".to_owned(),
1036-
message: "Action timed out".to_owned(),
1037-
metadata: None,
1038-
},
1039-
)
1040-
.expect("timeout error response should serialize");
1041-
1042-
assert_eq!(response.status, StatusCode::REQUEST_TIMEOUT.as_u16());
1043-
assert_eq!(
1044-
response.body,
1045-
Some(
1046-
serde_json::to_vec(&json!({
1047-
"group": "actor",
1048-
"code": "action_timed_out",
1049-
"message": "Action timed out",
1050-
}))
1051-
.expect("json body should encode")
1052-
)
1053-
);
1054-
}
1055-
1056-
#[test]
1057-
fn message_boundary_error_response_defaults_to_json() {
1058-
let response = message_boundary_error_response(
1059-
HttpResponseEncoding::Json,
1060-
StatusCode::BAD_REQUEST,
1061-
IncomingMessageTooLong.build(),
1062-
)
1063-
.expect("json response should serialize");
1064-
1065-
assert_eq!(response.status, StatusCode::BAD_REQUEST.as_u16());
1066-
assert_eq!(
1067-
response.headers.get(http::header::CONTENT_TYPE.as_str()),
1068-
Some(&"application/json".to_owned())
1069-
);
1070-
assert_eq!(
1071-
response.body,
1072-
Some(
1073-
serde_json::to_vec(&json!({
1074-
"group": "message",
1075-
"code": "incoming_too_long",
1076-
"message": "Incoming message too long",
1077-
}))
1078-
.expect("json body should encode")
1079-
)
1080-
);
1081-
}
1082-
1083-
#[test]
1084-
fn request_encoding_reads_cbor_header() {
1085-
let mut headers = http::HeaderMap::new();
1086-
headers.insert("x-rivet-encoding", "cbor".parse().unwrap());
1087-
1088-
assert_eq!(request_encoding(&headers), HttpResponseEncoding::Cbor);
1089-
}
1090-
1091-
#[test]
1092-
fn message_boundary_error_response_serializes_bare_v3() {
1093-
let response = message_boundary_error_response(
1094-
HttpResponseEncoding::Bare,
1095-
StatusCode::BAD_REQUEST,
1096-
OutgoingMessageTooLong.build(),
1097-
)
1098-
.expect("bare response should serialize");
1099-
1100-
assert_eq!(
1101-
response.headers.get(http::header::CONTENT_TYPE.as_str()),
1102-
Some(&"application/octet-stream".to_owned())
1103-
);
1104-
1105-
let body = response.body.expect("bare response should include body");
1106-
let decoded =
1107-
<rivetkit_client_protocol::versioned::HttpResponseError as OwnedVersionedData>::deserialize_with_embedded_version(&body)
1108-
.expect("bare error should decode");
1109-
assert_eq!(decoded.group, "message");
1110-
assert_eq!(decoded.code, "outgoing_too_long");
1111-
assert_eq!(decoded.message, "Outgoing message too long");
1112-
assert_eq!(decoded.metadata, None);
1113-
}
1114-
}
934+
#[path = "../../tests/modules/registry_http.rs"]
935+
mod tests;

0 commit comments

Comments
 (0)