Skip to content

Commit d7b0668

Browse files
NathanFlurryMasterPtato
authored andcommitted
chore: tunnel auth
1 parent ac386ed commit d7b0668

File tree

13 files changed

+342
-168
lines changed

13 files changed

+342
-168
lines changed

CLAUDE.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ When the user asks to track something in a note, store it in `.agent/notes/` by
216216
- Any behavior, protocol handling, or test coverage added to one runner should be mirrored in the other runner in the same change whenever possible.
217217
- When parity cannot be completed in the same change, explicitly document the gap and add a follow-up task.
218218

219+
### Trust Boundaries
220+
- Treat `client <-> engine` as untrusted.
221+
- Treat `envoy <-> pegboard-envoy` as untrusted.
222+
- Treat traffic inside the engine over `nats`, `fdb`, and other internal backends as trusted.
223+
- Treat `gateway`, `api`, `pegboard-envoy`, `nats`, `fdb`, and similar engine-internal services as one trusted internal boundary once traffic is inside the engine.
224+
- Validate and authorize all client-originated data at the engine edge before it reaches trusted internal systems.
225+
- Validate and authorize all envoy-originated data at `pegboard-envoy` before it reaches trusted internal systems.
226+
219227
### Important Patterns
220228

221229
**Error Handling**

Cargo.lock

Lines changed: 2 additions & 124 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ members = [
3434
"engine/packages/pegboard-runner",
3535
"engine/packages/pools",
3636
"engine/packages/postgres-util",
37+
"engine/packages/runner-protocol",
3738
"engine/packages/runtime",
3839
"engine/packages/service-manager",
3940
"engine/packages/telemetry",
@@ -53,11 +54,8 @@ members = [
5354
"engine/sdks/rust/envoy-client",
5455
"engine/sdks/rust/envoy-protocol",
5556
"engine/sdks/rust/epoxy-protocol",
56-
"engine/packages/runner-protocol",
5757
"engine/sdks/rust/test-envoy",
58-
"engine/sdks/rust/ups-protocol",
59-
"rivetkit-typescript/packages/sqlite-native",
60-
"rivetkit-typescript/packages/rivetkit-native"
58+
"engine/sdks/rust/ups-protocol"
6159
]
6260

6361
[workspace.package]
@@ -448,6 +446,9 @@ members = [
448446
[workspace.dependencies.rivet-postgres-util]
449447
path = "engine/packages/postgres-util"
450448

449+
[workspace.dependencies.rivet-runner-protocol]
450+
path = "engine/packages/runner-protocol"
451+
451452
[workspace.dependencies.rivet-runtime]
452453
path = "engine/packages/runtime"
453454

@@ -503,17 +504,11 @@ members = [
503504
[workspace.dependencies.rivet-envoy-client]
504505
path = "engine/sdks/rust/envoy-client"
505506

506-
[workspace.dependencies.epoxy-protocol]
507-
path = "engine/sdks/rust/epoxy-protocol"
508-
509507
[workspace.dependencies.rivet-envoy-protocol]
510508
path = "engine/sdks/rust/envoy-protocol"
511509

512-
[workspace.dependencies.rivetkit-sqlite-native]
513-
path = "rivetkit-typescript/packages/sqlite-native"
514-
515-
[workspace.dependencies.rivet-runner-protocol]
516-
path = "engine/packages/runner-protocol"
510+
[workspace.dependencies.epoxy-protocol]
511+
path = "engine/sdks/rust/epoxy-protocol"
517512

518513
[workspace.dependencies.rivet-test-envoy]
519514
path = "engine/sdks/rust/test-envoy"

engine/packages/pegboard-envoy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ rivet-metrics.workspace = true
2727
rivet-envoy-protocol.workspace = true
2828
rivet-runtime.workspace = true
2929
rivet-types.workspace = true
30+
scc.workspace = true
3031
serde_bare.workspace = true
3132
serde_json.workspace = true
3233
serde.workspace = true

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use rivet_data::converted::{ActorNameKeyData, MetadataKeyData};
1515
use rivet_envoy_protocol::{self as protocol, versioned};
1616
use rivet_guard_core::WebSocketHandle;
1717
use rivet_types::runner_configs::RunnerConfigKind;
18+
use scc::HashMap;
1819
use universaldb::prelude::*;
1920
use vbare::OwnedVersionedData;
2021

@@ -26,6 +27,7 @@ pub struct Conn {
2627
pub envoy_key: String,
2728
pub protocol_version: u16,
2829
pub ws_handle: WebSocketHandle,
30+
pub authorized_tunnel_routes: HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
2931
pub is_serverless: bool,
3032
pub last_rtt: AtomicU32,
3133
/// Timestamp (epoch ms) of the last pong received from the envoy.
@@ -101,6 +103,7 @@ pub async fn init_conn(
101103
envoy_key,
102104
protocol_version,
103105
ws_handle,
106+
authorized_tunnel_routes: HashMap::new(),
104107
is_serverless: false,
105108
last_rtt: AtomicU32::new(0),
106109
last_ping_ts: AtomicI64::new(util::timestamp::now()),
@@ -114,7 +117,6 @@ pub async fn init_conn(
114117

115118
Ok(Arc::new(conn))
116119
}
117-
118120
#[tracing::instrument(skip_all)]
119121
pub async fn handle_init(
120122
ctx: &StandaloneCtx,

engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ async fn handle_message(
162162
}
163163
protocol::ToEnvoyConn::ToEnvoyAckEvents(x) => protocol::ToEnvoy::ToEnvoyAckEvents(x),
164164
protocol::ToEnvoyConn::ToEnvoyTunnelMessage(x) => {
165+
let _ = conn
166+
.authorized_tunnel_routes
167+
.insert_async((x.message_id.gateway_id, x.message_id.request_id), ())
168+
.await;
165169
protocol::ToEnvoy::ToEnvoyTunnelMessage(x)
166170
}
167171
};

engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use pegboard::actor_kv;
88
use pegboard::pubsub_subjects::GatewayReceiverSubject;
99
use rivet_envoy_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
1010
use rivet_guard_core::websocket_handle::WebSocketReceiver;
11+
use scc::HashMap;
1112
use std::sync::{Arc, atomic::Ordering};
1213
use tokio::sync::{Mutex, MutexGuard, watch};
1314
use universaldb::utils::end_of_key_range;
@@ -366,7 +367,7 @@ async fn handle_message(
366367
}
367368
}
368369
protocol::ToRivet::ToRivetTunnelMessage(tunnel_msg) => {
369-
handle_tunnel_message(&ctx, tunnel_msg)
370+
handle_tunnel_message(ctx, &conn.authorized_tunnel_routes, tunnel_msg)
370371
.await
371372
.context("failed to handle tunnel message")?;
372373
}
@@ -447,6 +448,7 @@ async fn ack_commands(
447448
#[tracing::instrument(skip_all)]
448449
async fn handle_tunnel_message(
449450
ctx: &StandaloneCtx,
451+
authorized_tunnel_routes: &HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
450452
msg: protocol::ToRivetTunnelMessage,
451453
) -> Result<()> {
452454
// Extract inner data length before consuming msg
@@ -457,6 +459,15 @@ async fn handle_tunnel_message(
457459
return Err(errors::WsError::InvalidPacket("payload too large".to_string()).build());
458460
}
459461

462+
if !authorized_tunnel_routes
463+
.contains_async(&(msg.message_id.gateway_id, msg.message_id.request_id))
464+
.await
465+
{
466+
return Err(
467+
errors::WsError::InvalidPacket("unauthorized tunnel message".to_string()).build(),
468+
);
469+
}
470+
460471
let gateway_reply_to = GatewayReceiverSubject::new(msg.message_id.gateway_id).to_string();
461472
let msg_serialized =
462473
versioned::ToGateway::wrap_latest(protocol::ToGateway::ToRivetTunnelMessage(msg))
@@ -470,8 +481,7 @@ async fn handle_tunnel_message(
470481
);
471482

472483
// Publish message to UPS
473-
ctx.ups()
474-
.context("failed to get UPS instance for tunnel message")?
484+
ctx.ups()?
475485
.publish(&gateway_reply_to, &msg_serialized, PublishOpts::one())
476486
.await
477487
.with_context(|| {
@@ -500,6 +510,10 @@ fn tunnel_message_inner_data_len(kind: &protocol::ToRivetTunnelMessageKind) -> u
500510
}
501511
}
502512

513+
#[cfg(test)]
514+
#[path = "../tests/support/ws_to_tunnel_task.rs"]
515+
mod tests;
516+
503517
async fn send_actor_kv_error(conn: &Conn, request_id: u32, message: &str) -> Result<()> {
504518
let res_msg = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyKvResponse(
505519
protocol::ToEnvoyKvResponse {

0 commit comments

Comments
 (0)