Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ When the user asks to track something in a note, store it in `.agent/notes/` by
- Any behavior, protocol handling, or test coverage added to one runner should be mirrored in the other runner in the same change whenever possible.
- When parity cannot be completed in the same change, explicitly document the gap and add a follow-up task.

### Trust Boundaries
- Treat `client <-> engine` as untrusted.
- Treat `envoy <-> pegboard-envoy` as untrusted.
- Treat traffic inside the engine over `nats`, `fdb`, and other internal backends as trusted.
- Treat `gateway`, `api`, `pegboard-envoy`, `nats`, `fdb`, and similar engine-internal services as one trusted internal boundary once traffic is inside the engine.
- Validate and authorize all client-originated data at the engine edge before it reaches trusted internal systems.
- Validate and authorize all envoy-originated data at `pegboard-envoy` before it reaches trusted internal systems.

### Important Patterns

**Error Handling**
Expand Down
126 changes: 2 additions & 124 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 7 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"engine/packages/pegboard-runner",
"engine/packages/pools",
"engine/packages/postgres-util",
"engine/packages/runner-protocol",
"engine/packages/runtime",
"engine/packages/service-manager",
"engine/packages/telemetry",
Expand All @@ -53,11 +54,8 @@ members = [
"engine/sdks/rust/envoy-client",
"engine/sdks/rust/envoy-protocol",
"engine/sdks/rust/epoxy-protocol",
"engine/packages/runner-protocol",
"engine/sdks/rust/test-envoy",
"engine/sdks/rust/ups-protocol",
"rivetkit-typescript/packages/sqlite-native",
"rivetkit-typescript/packages/rivetkit-native"
"engine/sdks/rust/ups-protocol"
]

[workspace.package]
Expand Down Expand Up @@ -448,6 +446,9 @@ members = [
[workspace.dependencies.rivet-postgres-util]
path = "engine/packages/postgres-util"

[workspace.dependencies.rivet-runner-protocol]
path = "engine/packages/runner-protocol"

[workspace.dependencies.rivet-runtime]
path = "engine/packages/runtime"

Expand Down Expand Up @@ -503,17 +504,11 @@ members = [
[workspace.dependencies.rivet-envoy-client]
path = "engine/sdks/rust/envoy-client"

[workspace.dependencies.epoxy-protocol]
path = "engine/sdks/rust/epoxy-protocol"

[workspace.dependencies.rivet-envoy-protocol]
path = "engine/sdks/rust/envoy-protocol"

[workspace.dependencies.rivetkit-sqlite-native]
path = "rivetkit-typescript/packages/sqlite-native"

[workspace.dependencies.rivet-runner-protocol]
path = "engine/packages/runner-protocol"
[workspace.dependencies.epoxy-protocol]
path = "engine/sdks/rust/epoxy-protocol"

[workspace.dependencies.rivet-test-envoy]
path = "engine/sdks/rust/test-envoy"
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard-envoy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ rivet-metrics.workspace = true
rivet-envoy-protocol.workspace = true
rivet-runtime.workspace = true
rivet-types.workspace = true
scc.workspace = true
serde_bare.workspace = true
serde_json.workspace = true
serde.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion engine/packages/pegboard-envoy/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use rivet_data::converted::{ActorNameKeyData, MetadataKeyData};
use rivet_envoy_protocol::{self as protocol, versioned};
use rivet_guard_core::WebSocketHandle;
use rivet_types::runner_configs::RunnerConfigKind;
use scc::HashMap;
use universaldb::prelude::*;
use vbare::OwnedVersionedData;

Expand All @@ -26,6 +27,7 @@ pub struct Conn {
pub envoy_key: String,
pub protocol_version: u16,
pub ws_handle: WebSocketHandle,
pub authorized_tunnel_routes: HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
pub is_serverless: bool,
pub last_rtt: AtomicU32,
/// Timestamp (epoch ms) of the last pong received from the envoy.
Expand Down Expand Up @@ -101,6 +103,7 @@ pub async fn init_conn(
envoy_key,
protocol_version,
ws_handle,
authorized_tunnel_routes: HashMap::new(),
is_serverless: false,
last_rtt: AtomicU32::new(0),
last_ping_ts: AtomicI64::new(util::timestamp::now()),
Expand All @@ -114,7 +117,6 @@ pub async fn init_conn(

Ok(Arc::new(conn))
}

#[tracing::instrument(skip_all)]
pub async fn handle_init(
ctx: &StandaloneCtx,
Expand Down
4 changes: 4 additions & 0 deletions engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ async fn handle_message(
}
protocol::ToEnvoyConn::ToEnvoyAckEvents(x) => protocol::ToEnvoy::ToEnvoyAckEvents(x),
protocol::ToEnvoyConn::ToEnvoyTunnelMessage(x) => {
let _ = conn
.authorized_tunnel_routes
.insert_async((x.message_id.gateway_id, x.message_id.request_id), ())
.await;
protocol::ToEnvoy::ToEnvoyTunnelMessage(x)
}
};
Expand Down
20 changes: 17 additions & 3 deletions engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use pegboard::actor_kv;
use pegboard::pubsub_subjects::GatewayReceiverSubject;
use rivet_envoy_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
use rivet_guard_core::websocket_handle::WebSocketReceiver;
use scc::HashMap;
use std::sync::{Arc, atomic::Ordering};
use tokio::sync::{Mutex, MutexGuard, watch};
use universaldb::utils::end_of_key_range;
Expand Down Expand Up @@ -366,7 +367,7 @@ async fn handle_message(
}
}
protocol::ToRivet::ToRivetTunnelMessage(tunnel_msg) => {
handle_tunnel_message(&ctx, tunnel_msg)
handle_tunnel_message(ctx, &conn.authorized_tunnel_routes, tunnel_msg)
.await
.context("failed to handle tunnel message")?;
}
Expand Down Expand Up @@ -447,6 +448,7 @@ async fn ack_commands(
#[tracing::instrument(skip_all)]
async fn handle_tunnel_message(
ctx: &StandaloneCtx,
authorized_tunnel_routes: &HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
msg: protocol::ToRivetTunnelMessage,
) -> Result<()> {
// Extract inner data length before consuming msg
Expand All @@ -457,6 +459,15 @@ async fn handle_tunnel_message(
return Err(errors::WsError::InvalidPacket("payload too large".to_string()).build());
}

if !authorized_tunnel_routes
.contains_async(&(msg.message_id.gateway_id, msg.message_id.request_id))
.await
{
return Err(
errors::WsError::InvalidPacket("unauthorized tunnel message".to_string()).build(),
);
}

let gateway_reply_to = GatewayReceiverSubject::new(msg.message_id.gateway_id).to_string();
let msg_serialized =
versioned::ToGateway::wrap_latest(protocol::ToGateway::ToRivetTunnelMessage(msg))
Expand All @@ -470,8 +481,7 @@ async fn handle_tunnel_message(
);

// Publish message to UPS
ctx.ups()
.context("failed to get UPS instance for tunnel message")?
ctx.ups()?
.publish(&gateway_reply_to, &msg_serialized, PublishOpts::one())
.await
.with_context(|| {
Expand Down Expand Up @@ -500,6 +510,10 @@ fn tunnel_message_inner_data_len(kind: &protocol::ToRivetTunnelMessageKind) -> u
}
}

#[cfg(test)]
#[path = "../tests/support/ws_to_tunnel_task.rs"]
mod tests;

async fn send_actor_kv_error(conn: &Conn, request_id: u32, message: &str) -> Result<()> {
let res_msg = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyKvResponse(
protocol::ToEnvoyKvResponse {
Expand Down
Loading
Loading