Skip to content

Commit 6be56bf

Browse files
NathanFlurryMasterPtato
authored andcommitted
feat(rivetkit-core): expose metrics endpoint
1 parent 9d66ffc commit 6be56bf

4 files changed

Lines changed: 156 additions & 2 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod actor;
1111
pub mod engine_process;
1212
pub mod error;
1313
pub mod inspector;
14+
pub(crate) mod metrics_endpoint;
1415
pub mod registry;
1516
pub mod runtime;
1617
pub mod serverless;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use std::collections::HashMap;
2+
3+
use anyhow::{Context, Result};
4+
use rivet_metrics::prometheus::{Encoder, TextEncoder};
5+
use subtle::ConstantTimeEq;
6+
7+
const METRICS_ENABLED_ENV: &str = "RIVETKIT_METRICS_ENABLED";
8+
const METRICS_TOKEN_ENV: &str = "RIVETKIT_METRICS_TOKEN";
9+
10+
pub(crate) struct RenderedMetrics {
11+
pub(crate) content_type: String,
12+
pub(crate) body: Vec<u8>,
13+
}
14+
15+
pub(crate) enum MetricsAccessError {
16+
NotEnabled,
17+
Unauthorized,
18+
}
19+
20+
pub(crate) fn authorize_metrics_request(
21+
bearer_token: Option<&str>,
22+
) -> std::result::Result<(), MetricsAccessError> {
23+
let Some(configured_token) = configured_metrics_token() else {
24+
return Err(MetricsAccessError::NotEnabled);
25+
};
26+
27+
let Some(bearer_token) = bearer_token.filter(|token| !token.is_empty()) else {
28+
return Err(MetricsAccessError::Unauthorized);
29+
};
30+
31+
if bearer_token.as_bytes().ct_eq(configured_token.as_bytes()).into() {
32+
Ok(())
33+
} else {
34+
Err(MetricsAccessError::Unauthorized)
35+
}
36+
}
37+
38+
pub(crate) fn render_prometheus_metrics() -> Result<RenderedMetrics> {
39+
let encoder = TextEncoder::new();
40+
let metric_families = rivet_metrics::REGISTRY.gather();
41+
let mut body = Vec::new();
42+
encoder
43+
.encode(&metric_families, &mut body)
44+
.context("encode prometheus metrics")?;
45+
46+
Ok(RenderedMetrics {
47+
content_type: encoder.format_type().to_owned(),
48+
body,
49+
})
50+
}
51+
52+
pub(crate) fn authorization_bearer_token(headers: &http::HeaderMap) -> Option<&str> {
53+
headers
54+
.get(http::header::AUTHORIZATION)
55+
.and_then(|value| value.to_str().ok())
56+
.and_then(bearer_token_from_authorization)
57+
}
58+
59+
pub(crate) fn authorization_bearer_token_map(headers: &HashMap<String, String>) -> Option<&str> {
60+
headers
61+
.iter()
62+
.find(|(name, _)| name.eq_ignore_ascii_case(http::header::AUTHORIZATION.as_str()))
63+
.and_then(|(_, value)| bearer_token_from_authorization(value))
64+
}
65+
66+
fn configured_metrics_token() -> Option<String> {
67+
let enabled = std::env::var(METRICS_ENABLED_ENV).ok()?;
68+
if enabled != "1" {
69+
return None;
70+
}
71+
72+
std::env::var(METRICS_TOKEN_ENV)
73+
.ok()
74+
.filter(|token| !token.is_empty())
75+
}
76+
77+
fn bearer_token_from_authorization(value: &str) -> Option<&str> {
78+
let value = value.trim_start();
79+
let scheme = value.get(..6)?;
80+
if !scheme.eq_ignore_ascii_case("bearer") {
81+
return None;
82+
}
83+
84+
let rest = value.get(6..)?;
85+
if !rest.chars().next().is_some_and(char::is_whitespace) {
86+
return None;
87+
}
88+
89+
let token = rest.trim_start();
90+
if token.is_empty() { None } else { Some(token) }
91+
}

rivetkit-rust/packages/rivetkit-core/src/registry/http.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ impl RegistryDispatcher {
2121
request.uri().path(),
2222
self.handle_inspector_http_in_runtime,
2323
)?;
24+
if matches!(route, RegistryHttpRoute::Framework(FrameworkHttpRoute::Metrics)) {
25+
return handle_metrics_fetch(&request);
26+
}
2427
let instance = match self.active_actor(actor_id).await {
2528
Ok(instance) => instance,
2629
Err(error) => {
@@ -114,6 +117,7 @@ impl RegistryDispatcher {
114117
}
115118
FrameworkHttpRoute::Metadata => handle_metadata_fetch(&request, Some(&actor)),
116119
FrameworkHttpRoute::Health => handle_health_fetch(&request, Some(&actor)),
120+
FrameworkHttpRoute::Metrics => handle_metrics_fetch(&request),
117121
FrameworkHttpRoute::Root => handle_root_fetch(&request, Some(&actor)),
118122
FrameworkHttpRoute::NotFound => handle_not_found_fetch(&request, Some(&actor)),
119123
}
@@ -416,6 +420,7 @@ impl RegistryHttpRoute {
416420
match normalized_path {
417421
"/metadata" => Ok(Self::Framework(FrameworkHttpRoute::Metadata)),
418422
"/health" => Ok(Self::Framework(FrameworkHttpRoute::Health)),
423+
"/metrics" => Ok(Self::Framework(FrameworkHttpRoute::Metrics)),
419424
"/" => Ok(Self::Framework(FrameworkHttpRoute::Root)),
420425
_ => Ok(Self::Framework(FrameworkHttpRoute::NotFound)),
421426
}
@@ -427,6 +432,7 @@ pub(super) enum FrameworkHttpRoute {
427432
Queue(String),
428433
Metadata,
429434
Health,
435+
Metrics,
430436
Root,
431437
NotFound,
432438
}
@@ -466,6 +472,29 @@ fn handle_health_fetch(request: &Request, actor: Option<&ActorSpecifier>) -> Res
466472
text_response(StatusCode::OK, "ok")
467473
}
468474

475+
fn handle_metrics_fetch(request: &Request) -> Result<HttpResponse> {
476+
if request.method() != http::Method::GET {
477+
return method_not_allowed_response(request, None);
478+
}
479+
480+
let bearer_token = crate::metrics_endpoint::authorization_bearer_token(request.headers());
481+
match crate::metrics_endpoint::authorize_metrics_request(bearer_token) {
482+
Ok(()) => {
483+
let metrics = crate::metrics_endpoint::render_prometheus_metrics()?;
484+
bytes_response(StatusCode::OK, &metrics.content_type, metrics.body)
485+
}
486+
Err(crate::metrics_endpoint::MetricsAccessError::NotEnabled) => {
487+
text_response(StatusCode::FORBIDDEN, "metrics not enabled\n")
488+
}
489+
Err(crate::metrics_endpoint::MetricsAccessError::Unauthorized) => {
490+
text_response(
491+
StatusCode::UNAUTHORIZED,
492+
"metrics request requires a valid bearer token\n",
493+
)
494+
}
495+
}
496+
}
497+
469498
fn handle_root_fetch(request: &Request, actor: Option<&ActorSpecifier>) -> Result<HttpResponse> {
470499
if request.method() != http::Method::GET {
471500
return method_not_allowed_response(request, actor);
@@ -494,15 +523,23 @@ fn handle_not_found_fetch(
494523
}
495524

496525
fn text_response(status: StatusCode, body: &str) -> Result<HttpResponse> {
526+
bytes_response(
527+
status,
528+
"text/plain; charset=utf-8",
529+
body.as_bytes().to_vec(),
530+
)
531+
}
532+
533+
fn bytes_response(status: StatusCode, content_type: &str, body: Vec<u8>) -> Result<HttpResponse> {
497534
let mut headers = HashMap::new();
498535
headers.insert(
499536
http::header::CONTENT_TYPE.to_string(),
500-
"text/plain; charset=utf-8".to_owned(),
537+
content_type.to_owned(),
501538
);
502539
Ok(HttpResponse {
503540
status: status.as_u16(),
504541
headers,
505-
body: Some(body.as_bytes().to_vec()),
542+
body: Some(body),
506543
body_stream: None,
507544
})
508545
}

rivetkit-rust/packages/rivetkit-core/src/serverless.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ impl CoreServerlessRuntime {
279279
}
280280
}
281281
("GET", "/metadata") => Ok(self.metadata_response()),
282+
("GET", "/metrics") => Ok(metrics_response(&req.headers)),
282283
("GET", "/start") | ("POST", "/start") => self.start_response(req).await,
283284
("OPTIONS", _) => Ok(bytes_response(
284285
StatusCode::NO_CONTENT,
@@ -625,6 +626,30 @@ fn json_response(status: StatusCode, body: serde_json::Value) -> ServerlessRespo
625626
)
626627
}
627628

629+
fn metrics_response(headers: &HashMap<String, String>) -> ServerlessResponse {
630+
let bearer_token = crate::metrics_endpoint::authorization_bearer_token_map(headers);
631+
match crate::metrics_endpoint::authorize_metrics_request(bearer_token) {
632+
Ok(()) => match crate::metrics_endpoint::render_prometheus_metrics() {
633+
Ok(metrics) => bytes_response(
634+
StatusCode::OK,
635+
HashMap::from([("content-type".to_owned(), metrics.content_type)]),
636+
metrics.body,
637+
),
638+
Err(error) => error_response(error),
639+
},
640+
Err(crate::metrics_endpoint::MetricsAccessError::NotEnabled) => text_response(
641+
StatusCode::FORBIDDEN,
642+
"text/plain; charset=utf-8",
643+
"metrics not enabled\n",
644+
),
645+
Err(crate::metrics_endpoint::MetricsAccessError::Unauthorized) => text_response(
646+
StatusCode::UNAUTHORIZED,
647+
"text/plain; charset=utf-8",
648+
"metrics request requires a valid bearer token\n",
649+
),
650+
}
651+
}
652+
628653
fn bytes_response(
629654
status: StatusCode,
630655
headers: HashMap<String, String>,

0 commit comments

Comments
 (0)