Skip to content

Commit 413e829

Browse files
committed
fix: misc token fixes
1 parent b8ea46f commit 413e829

File tree

10 files changed

+90
-53
lines changed

10 files changed

+90
-53
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-public/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ rivet-types.workspace = true
2828
rivet-util.workspace = true
2929
serde_json.workspace = true
3030
serde.workspace = true
31+
subtle.workspace = true
3132
tokio.workspace = true
3233
tower-http.workspace = true
3334
tracing.workspace = true

engine/packages/api-public/src/ctx.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1+
use anyhow::Result;
12
use std::{
23
ops::Deref,
34
sync::{
45
Arc,
56
atomic::{AtomicBool, Ordering},
67
},
78
};
8-
9-
use anyhow::Result;
9+
use subtle::ConstantTimeEq;
1010

1111
#[derive(Clone)]
1212
pub struct ApiCtx {
@@ -31,11 +31,19 @@ impl ApiCtx {
3131

3232
self.authentication_handled.store(true, Ordering::Relaxed);
3333

34-
if self.token.as_ref() == Some(auth.admin_token.read()) {
35-
Ok(())
36-
} else {
37-
Err(rivet_api_builder::ApiForbidden.build())
34+
let Some(token) = &self.token else {
35+
return Err(rivet_api_builder::ApiForbidden.build());
36+
};
37+
38+
if token
39+
.as_bytes()
40+
.ct_ne(auth.admin_token.read().as_bytes())
41+
.into()
42+
{
43+
return Err(rivet_api_builder::ApiForbidden.build());
3844
}
45+
46+
Ok(())
3947
}
4048

4149
pub fn skip_auth(&self) {

engine/packages/guard/src/routing/envoy.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use anyhow::Result;
22
use gas::prelude::*;
33
use rivet_guard_core::{RoutingOutput, request_context::RequestContext};
44
use std::sync::Arc;
5+
use subtle::ConstantTimeEq;
56

67
use super::{SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_TOKEN, X_RIVET_TOKEN, validate_regional_host};
78

@@ -81,7 +82,11 @@ async fn route_envoy_internal(
8182
};
8283

8384
// Validate token
84-
if token != auth.admin_token.read() {
85+
if token
86+
.as_bytes()
87+
.ct_ne(auth.admin_token.read().as_bytes())
88+
.into()
89+
{
8590
return Err(rivet_api_builder::ApiForbidden.build());
8691
}
8792

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ pub async fn handle_init(
228228
);
229229

230230
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
231+
tx.delete(&old_lb_key);
231232
}
232233

233234
// Insert into LB
@@ -258,6 +259,18 @@ pub async fn handle_init(
258259
}
259260
}
260261

262+
// Update the pool's protocol version. This is required for serverful pools because normally
263+
// the pool's protocol version is updated via the metadata_poller wf but that only runs for
264+
// serverless pools.
265+
tx.write(
266+
&pegboard::keys::runner_config::ProtocolVersionKey::new(
267+
namespace_id,
268+
pool_name.clone(),
269+
),
270+
protocol_version,
271+
)?;
272+
273+
// Write envoy metadata
261274
if let Some(metadata) = &init.metadata {
262275
let metadata = MetadataKeyData {
263276
metadata:

engine/packages/pegboard-kv-channel/src/lib.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,11 @@ impl CustomServeTrait for PegboardKvChannelCustomServe {
9494
// Parse URL params.
9595
let url = url::Url::parse(&format!("ws://placeholder{}", req_ctx.path()))
9696
.context("failed to parse WebSocket URL")?;
97-
let params: HashMap<String, String> = url
98-
.query_pairs()
99-
.map(|(k, v)| (k.to_string(), v.to_string()))
100-
.collect();
10197

10298
// Validate protocol version.
103-
let protocol_version: u32 = params
104-
.get("protocol_version")
99+
let protocol_version: u32 = url
100+
.query_pairs()
101+
.find_map(|(n, v)| (n == "protocol_version").then_some(v))
105102
.context("missing protocol_version query param")?
106103
.parse()
107104
.context("invalid protocol_version")?;
@@ -112,10 +109,11 @@ impl CustomServeTrait for PegboardKvChannelCustomServe {
112109
);
113110

114111
// Resolve namespace.
115-
let namespace_name = params
116-
.get("namespace")
112+
let namespace_name = url
113+
.query_pairs()
114+
.find_map(|(n, v)| (n == "namespace").then_some(v))
117115
.context("missing namespace query param")?
118-
.clone();
116+
.to_string();
119117
let namespace = ctx
120118
.op(namespace::ops::resolve_for_name_global::Input {
121119
name: namespace_name.clone(),
@@ -820,9 +818,6 @@ async fn handle_kv_delete_range(
820818
/// Look up an actor by ID and return the parsed ID and actor name.
821819
///
822820
/// Defense-in-depth: verifies the actor belongs to the authenticated namespace.
823-
/// The admin_token is a global credential, so this is not strictly necessary
824-
/// today, but prevents cross-namespace access if a less-privileged auth
825-
/// mechanism is introduced in the future.
826821
async fn resolve_actor(
827822
ctx: &StandaloneCtx,
828823
actor_id: &str,

engine/packages/pegboard-outbound/src/lib.rs

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -166,16 +166,15 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
166166

167167
tracing::debug!(?namespace_id, %pool_name, ?actor_id, ?generation, "received outbound request");
168168

169-
// Check pool
170169
let db = ctx.udb()?;
171-
let (pool_res, namespace_res, preloaded_kv) = tokio::try_join!(
170+
let (namespace_res, pool_res, preloaded_kv) = tokio::try_join!(
171+
ctx.op(namespace::ops::get_global::Input {
172+
namespace_ids: vec![namespace_id],
173+
}),
172174
ctx.op(pegboard::ops::runner_config::get::Input {
173175
runners: vec![(namespace_id, pool_name.clone())],
174176
bypass_cache: false,
175177
}),
176-
ctx.op(namespace::ops::get_global::Input {
177-
namespace_ids: vec![namespace_id],
178-
}),
179178
pegboard::actor_kv::preload::fetch_preloaded_kv(
180179
&db,
181180
ctx.config().pegboard(),
@@ -184,10 +183,6 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
184183
&actor_config.name,
185184
),
186185
)?;
187-
let Some(pool) = pool_res.into_iter().next() else {
188-
tracing::debug!("pool does not exist, ending outbound handler");
189-
return Ok(());
190-
};
191186
let Some(namespace) = namespace_res.into_iter().next() else {
192187
tracing::error!("namespace not found, ending outbound handler");
193188
report_error(
@@ -199,20 +194,10 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
199194
.await;
200195
return Ok(());
201196
};
202-
203-
let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![
204-
protocol::CommandWrapper {
205-
checkpoint,
206-
inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
207-
config: actor_config,
208-
// Empty because request ids are ephemeral. This is intercepted by guard and
209-
// populated before it reaches the envoy
210-
hibernating_requests: Vec::new(),
211-
preloaded_kv,
212-
}),
213-
},
214-
]))
215-
.serialize_with_embedded_version(pool.protocol_version.unwrap_or(PROTOCOL_VERSION))?;
197+
let Some(pool) = pool_res.into_iter().next() else {
198+
tracing::debug!("pool does not exist, ending outbound handler");
199+
return Ok(());
200+
};
216201

217202
let RunnerConfigKind::Serverless {
218203
url,
@@ -228,6 +213,20 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
228213
return Ok(());
229214
};
230215

216+
let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![
217+
protocol::CommandWrapper {
218+
checkpoint,
219+
inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
220+
config: actor_config,
221+
// Empty because request ids are ephemeral. This is intercepted by guard and
222+
// populated before it reaches the envoy
223+
hibernating_requests: Vec::new(),
224+
preloaded_kv,
225+
}),
226+
},
227+
]))
228+
.serialize_with_embedded_version(pool.protocol_version.unwrap_or(PROTOCOL_VERSION))?;
229+
231230
// Send ack to actor wf before starting an outbound req
232231
ctx.signal(pegboard::workflows::actor2::Allocated { generation })
233232
.to_workflow::<pegboard::workflows::actor2::Workflow>()
@@ -250,6 +249,10 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
250249
&url,
251250
headers,
252251
request_lifespan,
252+
ctx.config()
253+
.auth
254+
.as_ref()
255+
.map(|a| a.admin_token.read().as_str()),
253256
)
254257
.await;
255258

@@ -272,15 +275,13 @@ async fn serverless_outbound_req(
272275
url: &str,
273276
headers: HashMap<String, String>,
274277
request_lifespan: u32,
278+
token: Option<&str>,
275279
) -> Result<()> {
276280
let current_dc = ctx.config().topology().current_dc()?;
277281
let mut term_signal = TermSignal::get();
278282

279-
let token = if let Some(auth) = &ctx.config().auth {
280-
Some((
281-
X_RIVET_TOKEN,
282-
HeaderValue::try_from(auth.admin_token.read())?,
283-
))
283+
let token = if let Some(token) = token {
284+
Some((X_RIVET_TOKEN, HeaderValue::try_from(token)?))
284285
} else {
285286
None
286287
};

engine/packages/pegboard/src/ops/envoy/update_ping.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,8 @@ pub async fn pegboard_envoy_update_ping(ctx: &OperationCtx, input: &Input) -> Re
6868
input.envoy_key.clone(),
6969
);
7070

71-
// Add read conflict
72-
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
73-
7471
// Clear old key
72+
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
7573
tx.delete(&old_lb_key);
7674

7775
tx.write(

engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts

Lines changed: 17 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-typescript/packages/sqlite-native/src/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl std::error::Error for ChannelError {}
170170
pub struct KvChannelConfig {
171171
/// Base WebSocket endpoint URL (e.g., "ws://localhost:6420").
172172
pub url: String,
173-
/// Authentication token. Engine uses admin_token, manager uses config.token.
173+
/// Authentication token.
174174
pub token: Option<String>,
175175
/// Namespace for actor scoping.
176176
pub namespace: String,

0 commit comments

Comments
 (0)