diff --git a/Cargo.lock b/Cargo.lock index 79f2c9b1..494d140d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -258,10 +258,12 @@ dependencies = [ name = "confidence-cloudflare-resolver" version = "0.9.0" dependencies = [ + "arc-swap", "base64 0.22.1", "bytes", "confidence_resolver", "getrandom 0.3.3", + "js-sys", "once_cell", "prost 0.13.5", "serde", diff --git a/confidence-cloudflare-resolver/Cargo.toml b/confidence-cloudflare-resolver/Cargo.toml index 7a55790a..67a7915d 100644 --- a/confidence-cloudflare-resolver/Cargo.toml +++ b/confidence-cloudflare-resolver/Cargo.toml @@ -28,5 +28,7 @@ worker = { version= "0.6.1", features=['queue'] } base64 = "0.22.1" once_cell = "1.19" prost = "0.13" +arc-swap = "1" +js-sys = "0.3" serde = { version = "1.0.219" } serde_json = "1.0.85" \ No newline at end of file diff --git a/confidence-cloudflare-resolver/deployer/README.md b/confidence-cloudflare-resolver/deployer/README.md index f3592282..1c2b4c08 100644 --- a/confidence-cloudflare-resolver/deployer/README.md +++ b/confidence-cloudflare-resolver/deployer/README.md @@ -63,6 +63,7 @@ The deployer automatically: | `WRANGLER_DEPLOY_MESSAGE` | Value passed to `wrangler deploy --message` | | `WRANGLER_DEPLOY_ARGS` | Additional newline-separated arguments passed to `wrangler deploy` | | `WRANGLER_DEPLOY_ARGS_FILE` | Path to a file containing additional `wrangler deploy` arguments, one argument per line | +| `ENABLE_METRICS` | Set to create a KV namespace and enable the `/metrics` Prometheus endpoint. Requires a [KV store](https://developers.cloudflare.com/kv/platform/pricing/) | ### Extending Wrangler Configuration @@ -120,6 +121,33 @@ When integrating with the Cloudflare resolver, you have two options: For more details on integration, including code examples using the [`@spotify-confidence/sdk`](https://github.com/spotify/confidence-sdk-js), see the [Confidence documentation](https://confidence.spotify.com/docs/sdks/edge/cloudflare#cloudflare-workers). +## Telemetry & Metrics + +The resolver collects telemetry and exposes a Prometheus-compatible `/metrics` endpoint using the same metric names as all other Confidence providers (`confidence_resolve_latency_microseconds`, `confidence_resolves_total`). + +### How latency is measured + +Cloudflare Workers freeze `Date.now()` and `performance.now()` during synchronous CPU work (Spectre mitigation). The resolver uses `scheduler.wait(0)` — a zero-delay yield to the runtime — to unfreeze the clock after each resolve. This provides 1ms resolution with no measurable overhead. + +### `/metrics` endpoint + +Requires authentication: + +```bash +curl -H "Authorization: ClientSecret " \ + https://.workers.dev/metrics +``` + +Returns Prometheus exposition format with: +- `confidence_resolve_latency_microseconds` — histogram (sum, count, cumulative `le` buckets) +- `confidence_resolves_total` — counter by resolve reason + +Metrics are accumulated in a [KV namespace](https://developers.cloudflare.com/kv/platform/pricing/) (`CONFIDENCE_METRICS_KV`). Set `ENABLE_METRICS` to have the deployer create the KV namespace and bind it to the Worker. Without it, the `/metrics` endpoint returns empty and no KV writes occur. + +### Backend telemetry + +Resolve rates and latency are always sent to the Confidence backend via `WriteFlagLogsRequest`, regardless of the `ENABLE_METRICS` setting. The `/metrics` endpoint and KV store are only needed for direct Prometheus scraping — backend telemetry flows through the queue consumer independently. + ## Limitations * **Sticky assignments**: Not currently supported with the Cloudflare resolver. Flags with sticky assignment rules will return "flag not found". diff --git a/confidence-cloudflare-resolver/deployer/script.sh b/confidence-cloudflare-resolver/deployer/script.sh index 42d18ea8..b9017cd5 100755 --- a/confidence-cloudflare-resolver/deployer/script.sh +++ b/confidence-cloudflare-resolver/deployer/script.sh @@ -366,6 +366,63 @@ else echo "⚠️ Could not check queue status (HTTP $QUEUE_STATUS)" fi +# Create KV namespace for /metrics endpoint if it doesn't exist +if [ -n "$WORKER_NAME_PREFIX" ]; then + KV_NAMESPACE_TITLE="${WORKER_NAME_PREFIX}-resolver-metrics" +else + KV_NAMESPACE_TITLE="resolver-metrics" +fi + +ENABLE_METRICS=${ENABLE_METRICS:=} +if [ -z "$ENABLE_METRICS" ]; then + echo "ℹ️ ENABLE_METRICS not set; skipping KV namespace creation (/metrics endpoint disabled)" + KV_NAMESPACE_ID="" +else + +echo "🔍 Checking if KV namespace '$KV_NAMESPACE_TITLE' exists..." +KV_LIST=$(curl -sS -w "%{http_code}" \ + -H "Authorization: Bearer ${CLOUDFLARE_API_TOKEN}" \ + "https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces?per_page=100") +KV_LIST_STATUS="${KV_LIST: -3}" +KV_LIST_BODY="${KV_LIST%???}" + +KV_NAMESPACE_ID="" +if [ "$KV_LIST_STATUS" = "200" ]; then + KV_NAMESPACE_ID=$(printf "%s" "$KV_LIST_BODY" | jq -r ".result[] | select(.title == \"${KV_NAMESPACE_TITLE}\") | .id" 2>/dev/null || true) +fi + +if [ -z "$KV_NAMESPACE_ID" ]; then + echo "📦 KV namespace '$KV_NAMESPACE_TITLE' not found, creating..." + KV_CREATE_RESP=$(curl -sS -w "%{http_code}" -X POST \ + -H "Authorization: Bearer ${CLOUDFLARE_API_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"title\": \"${KV_NAMESPACE_TITLE}\"}" \ + "https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces") + KV_CREATE_STATUS="${KV_CREATE_RESP: -3}" + KV_CREATE_BODY="${KV_CREATE_RESP%???}" + if [ "$KV_CREATE_STATUS" = "200" ] || [ "$KV_CREATE_STATUS" = "201" ]; then + KV_NAMESPACE_ID=$(printf "%s" "$KV_CREATE_BODY" | jq -r '.result.id') + echo "✅ KV namespace '$KV_NAMESPACE_TITLE' created (id: $KV_NAMESPACE_ID)" + else + echo "⚠️ Failed to create KV namespace (HTTP $KV_CREATE_STATUS), /metrics will be unavailable" + fi +else + echo "✅ KV namespace '$KV_NAMESPACE_TITLE' already exists (id: $KV_NAMESPACE_ID)" +fi + +# Append KV binding to wrangler.toml if namespace was created +if [ -n "$KV_NAMESPACE_ID" ]; then + cat >> wrangler.toml <> = LazyLock::new(ResolveLogger::new); -static ASSIGN_LOGGER: LazyLock = LazyLock::new(AssignLogger::new); +use confidence_resolver::proto::confidence::flags::resolver::v1::{ + ResolveProcessRequest, ResolveReason, +}; use confidence_resolver::Client; use once_cell::sync::Lazy; @@ -38,8 +39,14 @@ const ENCRYPTION_KEY_BASE64: &str = include_str!("../../data/encryption_key"); use confidence::flags::resolver::v1::Sdk; use confidence_resolver::proto::confidence::flags::resolver::v1::WriteFlagLogsRequest; -use confidence_resolver::resolve_logger::ResolveLogger; -use std::sync::{LazyLock, OnceLock}; +use std::sync::OnceLock; + +thread_local! { + static FLAG_LOG: RefCell> = const { RefCell::new(None) }; +} + +/// Prometheus exposition format content type (version 0.0.4). +const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8"; static FLAGS_LOGS_QUEUE: OnceLock = OnceLock::new(); @@ -71,18 +78,22 @@ struct H {} impl Host for H { fn log_resolve( - resolve_id: &str, + _resolve_id: &str, evaluation_context: &Struct, values: &[ResolvedValue<'_>], client: &Client, ) { - RESOLVE_LOGGER.log_resolve( - resolve_id, - evaluation_context, - client.client_credential_name.as_str(), - values, - client, - ); + FLAG_LOG.with(|f| { + if let Some(req) = f.borrow_mut().as_mut() { + let (flag_infos, client_info) = resolve_logger::build_resolve_log( + evaluation_context, + client.client_credential_name.as_str(), + values, + ); + req.flag_resolve_info.extend(flag_infos); + req.client_resolve_info.push(client_info); + } + }); } fn log_assign( @@ -91,7 +102,17 @@ impl Host for H { client: &Client, sdk: &Option, ) { - ASSIGN_LOGGER.log_assigns(resolve_id, assigned_flags, client, sdk); + FLAG_LOG.with(|f| { + if let Some(req) = f.borrow_mut().as_mut() { + req.flag_assigned + .push(assign_logger::build_flag_assigned( + resolve_id, + assigned_flags, + client, + sdk, + )); + } + }); } } @@ -103,6 +124,15 @@ fn set_client_secret(env: &Env) { } } +fn sdk_info() -> Sdk { + Sdk { + sdk: Some(confidence::flags::resolver::v1::sdk::Sdk::Id( + confidence::flags::resolver::v1::SdkId::CloudflareResolver as i32, + )), + version: env!("CARGO_PKG_VERSION").to_string(), + } +} + #[event(fetch)] pub async fn main(req: Request, env: Env, ctx: Context) -> Result { match env.queue("flag_logs_queue") { @@ -137,10 +167,36 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { return Response::ok("")?.with_cors_headers(&allowed_origin_env); } + FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); + let state = &RESOLVER_STATE; let router = Router::new(); let response = router + .get_async("/metrics", |req, ctx| { + let allowed_origin = allowed_origin_env.clone(); + async move { + // Require client secret — metrics are not public. + if let Some(expected) = CONFIDENCE_CLIENT_SECRET.get() { + let authorized = req.headers().get("Authorization").ok().flatten() + .map(|v| v.strip_prefix("ClientSecret ").unwrap_or("") == expected.as_str()) + .unwrap_or(false); + if !authorized { + return Response::error("Unauthorized", 401)? + .with_cors_headers(&allowed_origin); + } + } + let text = match ctx.env.kv("CONFIDENCE_METRICS_KV") { + Ok(kv) => kv.get("prometheus").text().await.unwrap_or(None), + Err(_) => None, + }; + let body = text.unwrap_or_default(); + let headers = Headers::new(); + headers.set("Content-Type", PROMETHEUS_CONTENT_TYPE)?; + headers.set("Cache-Control", "no-store")?; + Response::ok(body)?.with_headers(headers).with_cors_headers(&allowed_origin) + } + }) // GET endpoint to expose the current deployment state etag and resolver version .get_async("/v1/state:etag", |_req, _ctx| { let allowed_origin = allowed_origin_env.clone(); @@ -180,7 +236,12 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .evaluation_context .clone() .unwrap_or_default(); - match state.get_resolver::( + + // Start timer before resolve. CF Workers freeze timers + // during sync CPU, but scheduler.wait(0) unfreezes them. + let t0 = js_sys::Date::now(); + + let (reasons, resp) = match state.get_resolver::( &resolver_request.client_secret, evaluation_context, &Bytes::from(STANDARD.decode(ENCRYPTION_KEY_BASE64).unwrap()), @@ -194,24 +255,72 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { Ok(process_response) => { match process_response.into_resolved() { Some((response, _writes)) => { - Response::from_json(&response)? - .with_cors_headers(&allowed_origin) + let reasons: Vec = response + .resolved_flags + .iter() + .map(|f| f.reason()) + .collect(); + (reasons, Response::from_json(&response)? + .with_cors_headers(&allowed_origin)) + } + None => { + (vec![ResolveReason::Error], + Response::error( + "Unexpected suspended response", + 500, + )? + .with_cors_headers(&allowed_origin)) } - None => Response::error( - "Unexpected suspended response", - 500, - )? - .with_cors_headers(&allowed_origin), } } - Err(msg) => Response::error(msg, 500)? - .with_cors_headers(&allowed_origin), + Err(msg) => { + (vec![ResolveReason::Error], + Response::error(msg, 500)? + .with_cors_headers(&allowed_origin)) + } } } Err(msg) => { - Response::error(msg, 500)?.with_cors_headers(&allowed_origin) + (vec![ResolveReason::Error], + Response::error(msg, 500)?.with_cors_headers(&allowed_origin)) } - } + }; + + // Unfreeze timer: scheduler.wait(0) yields to the + // runtime with zero delay, advancing the clock. + // If CF removes or changes the scheduler API, all + // lookups fall through gracefully: elapsed_us is None, + // the resolve still succeeds, we just lose latency data. + let elapsed_us = { + let scheduler = js_sys::Reflect::get( + &js_sys::global(), &wasm_bindgen::JsValue::from_str("scheduler") + ).unwrap_or(wasm_bindgen::JsValue::UNDEFINED); + if !scheduler.is_undefined() { + let wait = js_sys::Reflect::get( + &scheduler, &wasm_bindgen::JsValue::from_str("wait") + ).unwrap_or(wasm_bindgen::JsValue::UNDEFINED); + if let Ok(promise) = js_sys::Function::from(wait) + .call1(&scheduler, &wasm_bindgen::JsValue::from(0)) + { + let _ = wasm_bindgen_futures::JsFuture::from( + js_sys::Promise::from(promise) + ).await; + } + Some(((js_sys::Date::now() - t0) * 1000.0).max(0.0) as u32) + } else { + None + } + }; + + let mut td = telemetry::build_request_telemetry(elapsed_us, &reasons); + td.sdk = Some(sdk_info()); + FLAG_LOG.with(|f| { + if let Some(req) = f.borrow_mut().as_mut() { + req.telemetry_data = Some(td); + } + }); + + resp } "flags:apply" => { let body_bytes: Vec = req.bytes().await?; @@ -249,13 +358,14 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .run(req, env) .await; - // Use ctx.waitUntil to run logging after response is returned + // Use ctx.waitUntil to run logging and telemetry after response is returned. + let flag_log = FLAG_LOG.with(|f| f.borrow_mut().take()); ctx.wait_until(async move { - let aggregated: confidence_resolver::proto::confidence::flags::resolver::v1::WriteFlagLogsRequest - = checkpoint(); - if let Ok(converted) = serde_json::to_string(&aggregated) { - if let Some(queue) = FLAGS_LOGS_QUEUE.get() { - let _ = queue.send(converted).await; + if let Some(req) = flag_log { + if let Ok(json) = serde_json::to_string(&req) { + if let Some(queue) = FLAGS_LOGS_QUEUE.get() { + let _ = queue.send(json).await; + } } } }); @@ -275,25 +385,49 @@ pub async fn consume_flag_logs_queue( let logs: Vec = messages .iter() .map(|m| m.body().clone()) - .map(|s| serde_json::from_str::(s.as_str()).unwrap()) - .map(|v| WriteFlagLogsRequest { - telemetry_data: v.telemetry_data, - flag_resolve_info: v.flag_resolve_info, - flag_assigned: v.flag_assigned, - client_resolve_info: v.client_resolve_info, - }) + .map(|s| serde_json::from_str::(s.as_str()).unwrap()) .collect(); + let req = flag_logger::aggregate_batch(logs); + + // Accumulate telemetry deltas into KV-backed cumulative snapshot for /metrics. + if let Ok(kv) = env.kv("CONFIDENCE_METRICS_KV") { + update_prometheus_kv(&kv, &req).await; + } + send_flags_logs(CONFIDENCE_CLIENT_SECRET.get().unwrap().as_str(), req).await?; } Ok(()) } -fn checkpoint() -> WriteFlagLogsRequest { - let mut req = RESOLVE_LOGGER.checkpoint(); - ASSIGN_LOGGER.checkpoint_fill(&mut req); - req +/// Accumulate telemetry deltas from all isolates into a cumulative +/// `TelemetrySnapshot` stored in KV, then write its Prometheus text +/// representation for the /metrics endpoint. +/// +/// Note: concurrent queue consumer invocations can race on KV read-modify-write. +/// Acceptable for metrics — at worst one batch's deltas are lost, not cumulative state. +async fn update_prometheus_kv(kv: &kv::KvStore, req: &WriteFlagLogsRequest) { + let mut cumulative = match kv.get("snapshot").text().await { + Ok(Some(text)) => serde_json::from_str::(&text).unwrap_or_default(), + _ => TelemetrySnapshot::default(), + }; + + if let Some(td) = &req.telemetry_data { + cumulative.accumulate_delta(td); + } + + let prom_text = cumulative.to_prometheus( + "cf-resolver", + &confidence_resolver::telemetry::PrometheusConfig::default(), + ); + + if let Ok(builder) = kv.put("snapshot", serde_json::to_string(&cumulative).unwrap_or_default()) { + let _ = builder.execute().await; + } + if let Ok(builder) = kv.put("prometheus", prom_text) { + let _ = builder.execute().await; + } } async fn send_flags_logs(client_secret: &str, message: WriteFlagLogsRequest) -> Result { diff --git a/confidence-cloudflare-resolver/wrangler.toml b/confidence-cloudflare-resolver/wrangler.toml index 47660cce..76bbdcaa 100644 --- a/confidence-cloudflare-resolver/wrangler.toml +++ b/confidence-cloudflare-resolver/wrangler.toml @@ -14,5 +14,8 @@ max_batch_timeout = 10 # seconds queue = "flag-logs-queue" binding = "flag_logs_queue" +# KV namespace for /metrics endpoint is created and injected by the deployer. +# See deployer/script.sh for auto-creation of the CONFIDENCE_METRICS_KV binding. + [vars] CONFIDENCE_CLIENT_SECRET = "SECRET" diff --git a/confidence-resolver/protos/confidence/flags/resolver/v1/types.proto b/confidence-resolver/protos/confidence/flags/resolver/v1/types.proto index bc1ffc0d..60091464 100644 --- a/confidence-resolver/protos/confidence/flags/resolver/v1/types.proto +++ b/confidence-resolver/protos/confidence/flags/resolver/v1/types.proto @@ -75,4 +75,7 @@ enum SdkId { SDK_ID_GO_LOCAL_PROVIDER = 20; SDK_ID_JAVA_LOCAL_PROVIDER = 21; SDK_ID_JS_LOCAL_SERVER_PROVIDER = 22; + SDK_ID_PYTHON_LOCAL_PROVIDER = 23; + SDK_ID_RUST_LOCAL_PROVIDER = 24; + SDK_ID_CLOUDFLARE_RESOLVER = 25; } diff --git a/confidence-resolver/src/assign_logger.rs b/confidence-resolver/src/assign_logger.rs index 06f602b4..74634ea1 100644 --- a/confidence-resolver/src/assign_logger.rs +++ b/confidence-resolver/src/assign_logger.rs @@ -42,61 +42,8 @@ impl AssignLogger { client: &crate::Client, sdk: &Option, ) { - let client_info = Some(pb::ClientInfo { - client: client.client_name.to_string(), - client_credential: client.client_credential_name.to_string(), - sdk: sdk.clone(), - }); - let flags = assigned_flags - .iter() - .map( - |FlagToApply { - assigned_flag: f, - skew_adjusted_applied_time, - }| { - let assignment = if !f.variant.is_empty() { - let assignment_info = pb::AssignmentInfo { - segment: f.segment.clone(), - variant: f.variant.clone(), - }; - Some(pb::Assignment::AssignmentInfo(assignment_info)) - } else { - let default_reason: pb::DefaultAssignmentReason = - match pb::ResolveReason::try_from(f.reason) { - Ok(pb::ResolveReason::NoSegmentMatch) => { - pb::DefaultAssignmentReason::NoSegmentMatch - } - Ok(pb::ResolveReason::NoTreatmentMatch) => { - pb::DefaultAssignmentReason::NoTreatmentMatch - } - Ok(pb::ResolveReason::FlagArchived) => { - pb::DefaultAssignmentReason::FlagArchived - } - _ => pb::DefaultAssignmentReason::Unspecified, - }; - Some(pb::Assignment::DefaultAssignment(pb::DefaultAssignment { - reason: default_reason.into(), - })) - }; - pb::AppliedFlag { - flag: f.flag.clone(), - targeting_key: f.targeting_key.clone(), - targeting_key_selector: f.targeting_key_selector.clone(), - assignment_id: f.assignment_id.clone(), - rule: f.rule.clone(), - fallthrough_assignments: f.fallthrough_assignments.clone(), - apply_time: Some(skew_adjusted_applied_time.clone()), - assignment, - } - }, - ) - .collect(); - - self.assigned.push(pb::FlagAssigned { - resolve_id: resolve_id.to_string(), - client_info, - flags, - }); + self.assigned + .push(build_flag_assigned(resolve_id, assigned_flags, client, sdk)); } pub fn checkpoint(&self) -> WriteFlagLogsRequest { @@ -164,6 +111,69 @@ impl AssignLogger { } } +pub fn build_flag_assigned( + resolve_id: &str, + assigned_flags: &[FlagToApply], + client: &crate::Client, + sdk: &Option, +) -> pb::FlagAssigned { + let client_info = Some(pb::ClientInfo { + client: client.client_name.to_string(), + client_credential: client.client_credential_name.to_string(), + sdk: sdk.clone(), + }); + let flags = assigned_flags + .iter() + .map( + |FlagToApply { + assigned_flag: f, + skew_adjusted_applied_time, + }| { + let assignment = if !f.variant.is_empty() { + let assignment_info = pb::AssignmentInfo { + segment: f.segment.clone(), + variant: f.variant.clone(), + }; + Some(pb::Assignment::AssignmentInfo(assignment_info)) + } else { + let default_reason: pb::DefaultAssignmentReason = + match pb::ResolveReason::try_from(f.reason) { + Ok(pb::ResolveReason::NoSegmentMatch) => { + pb::DefaultAssignmentReason::NoSegmentMatch + } + Ok(pb::ResolveReason::NoTreatmentMatch) => { + pb::DefaultAssignmentReason::NoTreatmentMatch + } + Ok(pb::ResolveReason::FlagArchived) => { + pb::DefaultAssignmentReason::FlagArchived + } + _ => pb::DefaultAssignmentReason::Unspecified, + }; + Some(pb::Assignment::DefaultAssignment(pb::DefaultAssignment { + reason: default_reason.into(), + })) + }; + pb::AppliedFlag { + flag: f.flag.clone(), + targeting_key: f.targeting_key.clone(), + targeting_key_selector: f.targeting_key_selector.clone(), + assignment_id: f.assignment_id.clone(), + rule: f.rule.clone(), + fallthrough_assignments: f.fallthrough_assignments.clone(), + apply_time: Some(skew_adjusted_applied_time.clone()), + assignment, + } + }, + ) + .collect(); + + pb::FlagAssigned { + resolve_id: resolve_id.to_string(), + client_info, + flags, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/confidence-resolver/src/flag_logger.rs b/confidence-resolver/src/flag_logger.rs index 307653f9..04acfd5a 100644 --- a/confidence-resolver/src/flag_logger.rs +++ b/confidence-resolver/src/flag_logger.rs @@ -4,6 +4,7 @@ use crate::proto::confidence::flags::admin::v1::flag_resolve_info::{ }; use crate::proto::confidence::flags::admin::v1::{ClientResolveInfo, FlagResolveInfo}; use crate::proto::confidence::flags::resolver::v1::events::FlagAssigned; +use crate::proto::confidence::flags::resolver::v1::telemetry_data::ResolveRate; use crate::proto::confidence::flags::resolver::v1::{TelemetryData, WriteFlagLogsRequest}; use std::collections::{HashMap, HashSet}; @@ -14,12 +15,14 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog let mut flag_resolve_map: HashMap = HashMap::new(); let mut flag_assigned: Vec = vec![]; let mut first_sdk: Option = None; + let mut agg_telemetry: Option = None; for flag_logs_message in message_batch { if let Some(td) = &flag_logs_message.telemetry_data { if first_sdk.is_none() && td.sdk.is_some() { first_sdk = td.sdk.clone(); } + agg_telemetry = Some(merge_telemetry(agg_telemetry.take(), td)); } for c in &flag_logs_message.client_resolve_info { @@ -98,10 +101,20 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog }) } - let telemetry_data = first_sdk.map(|sdk| TelemetryData { - sdk: Some(sdk), - ..Default::default() - }); + // Attach SDK info to the aggregated telemetry + let telemetry_data = match (agg_telemetry, first_sdk) { + (Some(mut td), sdk) => { + if td.sdk.is_none() { + td.sdk = sdk; + } + Some(td) + } + (None, Some(sdk)) => Some(TelemetryData { + sdk: Some(sdk), + ..Default::default() + }), + (None, None) => None, + }; WriteFlagLogsRequest { telemetry_data, @@ -111,6 +124,53 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog } } +/// Merge a telemetry delta into an accumulator. +/// Both are deltas, so counters are summed and gauges take the latest non-zero value. +fn merge_telemetry(acc: Option, delta: &TelemetryData) -> TelemetryData { + let mut acc = acc.unwrap_or_default(); + + // Merge resolve latency + match (&mut acc.resolve_latency, &delta.resolve_latency) { + (Some(a), Some(d)) => { + a.sum = a.sum.wrapping_add(d.sum); + a.count = a.count.wrapping_add(d.count); + a.buckets.extend(d.buckets.iter().cloned()); + if a.ln_ratio == 0.0 { + a.ln_ratio = d.ln_ratio; + } + } + (None, Some(d)) => { + acc.resolve_latency = Some(d.clone()); + } + _ => {} + } + + // Merge resolve rates by reason + for dr in &delta.resolve_rate { + if let Some(ar) = acc.resolve_rate.iter_mut().find(|r| r.reason == dr.reason) { + ar.count = ar.count.wrapping_add(dr.count); + } else { + acc.resolve_rate.push(ResolveRate { + count: dr.count, + reason: dr.reason, + }); + } + } + + // Gauges: take latest non-zero + if let Some(sa) = &delta.state_age { + acc.state_age = Some(sa.clone()); + } + if delta.memory_bytes > 0 { + acc.memory_bytes = delta.memory_bytes; + } + if !delta.resolver_version.is_empty() { + acc.resolver_version = delta.resolver_version.clone(); + } + + acc +} + struct SchemaItem { pub client: String, pub schemas: HashSet, diff --git a/confidence-resolver/src/resolve_logger.rs b/confidence-resolver/src/resolve_logger.rs index b3d8d986..d461e092 100644 --- a/confidence-resolver/src/resolve_logger.rs +++ b/confidence-resolver/src/resolve_logger.rs @@ -145,6 +145,72 @@ impl ResolveLogger { } } +pub fn build_resolve_log( + evaluation_context: &pb::Struct, + client_credential: &str, + values: &[crate::ResolvedValue<'_>], +) -> (Vec, pb::ClientResolveInfo) { + let schema = SchemaFromEvaluationContext::get_schema(evaluation_context); + let client_info = pb::ClientResolveInfo { + client: extract_client(client_credential), + client_credential: client_credential.to_string(), + schema: vec![to_pb_schema_instance(&schema)], + }; + + let flag_infos = values + .iter() + .map(|value| { + let af = &value.inner; + let mut variant_resolve_info = Vec::new(); + let mut rule_resolve_info: Vec = Vec::new(); + + for fallthrough in &af.fallthrough_assignments { + rule_resolve_info.push(pb::flag_resolve_info::RuleResolveInfo { + rule: fallthrough.rule.clone(), + count: 1, + assignment_resolve_info: vec![pb::flag_resolve_info::AssignmentResolveInfo { + assignment_id: fallthrough.assignment_id.clone(), + count: 1, + }], + }); + } + + if !af.rule.is_empty() { + let variant_key = if af.variant.is_empty() { + String::new() + } else { + af.variant.clone() + }; + variant_resolve_info.push(pb::flag_resolve_info::VariantResolveInfo { + variant: variant_key, + count: 1, + }); + rule_resolve_info.push(pb::flag_resolve_info::RuleResolveInfo { + rule: af.rule.clone(), + count: 1, + assignment_resolve_info: vec![pb::flag_resolve_info::AssignmentResolveInfo { + assignment_id: af.assignment_id.clone(), + count: 1, + }], + }); + } else { + variant_resolve_info.push(pb::flag_resolve_info::VariantResolveInfo { + variant: String::new(), + count: 1, + }); + } + + pb::FlagResolveInfo { + flag: af.flag.clone(), + variant_resolve_info, + rule_resolve_info, + } + }) + .collect(); + + (flag_infos, client_info) +} + #[derive(Debug, Default)] struct RuleResolveInfo { count: AtomicU32, diff --git a/confidence-resolver/src/telemetry.rs b/confidence-resolver/src/telemetry.rs index 4d9e658c..e075953c 100644 --- a/confidence-resolver/src/telemetry.rs +++ b/confidence-resolver/src/telemetry.rs @@ -106,6 +106,7 @@ impl Histogram { /// Used for delta computation between flushes and as the future intermediate /// representation for Prometheus text format serialization. #[derive(Clone, Default)] +#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))] pub struct TelemetrySnapshot { pub latency: HistogramSnapshot, pub resolve_rates: Vec, @@ -113,6 +114,7 @@ pub struct TelemetrySnapshot { } #[derive(Clone, Default)] +#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))] pub struct HistogramSnapshot { pub sum: u64, pub count: u64, @@ -142,6 +144,52 @@ impl Default for PrometheusConfig { } impl TelemetrySnapshot { + /// Accumulate a `TelemetryData` delta into this cumulative snapshot. + /// + /// Expands compressed `BucketSpan`s back into the flat bucket array and + /// adds all counters. Gauge fields (memory_bytes) are replaced with the + /// latest value. + #[allow(clippy::indexing_slicing, clippy::arithmetic_side_effects)] + pub fn accumulate_delta(&mut self, td: &pb::TelemetryData) { + if let Some(latency) = &td.resolve_latency { + self.latency.sum = self.latency.sum.wrapping_add(latency.sum as u64); + self.latency.count = self.latency.count.wrapping_add(latency.count as u64); + + // Expand BucketSpans into flat bucket array + for span in &latency.buckets { + let base = match usize::try_from(span.offset) { + Ok(b) if b < BUCKET_COUNT => b, + _ => continue, // skip malformed span + }; + for (i, &count) in span.counts.iter().enumerate() { + let idx = base.saturating_add(i); + if idx >= BUCKET_COUNT { + break; + } + if idx >= self.latency.buckets.len() { + self.latency.buckets.resize(idx.saturating_add(1), 0); + } + // Safety: idx < BUCKET_COUNT and we just resized to at least idx+1 + self.latency.buckets[idx] = + self.latency.buckets[idx].wrapping_add(count as u64); + } + } + } + + for rate in &td.resolve_rate { + let idx = rate.reason as usize; + if idx >= self.resolve_rates.len() { + self.resolve_rates.resize(idx.saturating_add(1), 0); + } + // Safety: we just resized to at least idx+1 + self.resolve_rates[idx] = self.resolve_rates[idx].wrapping_add(rate.count as u64); + } + + if td.memory_bytes > 0 { + self.memory_bytes = td.memory_bytes; + } + } + /// Format the snapshot as Prometheus exposition text. /// /// All values are cumulative counters, matching what Prometheus expects. @@ -460,6 +508,49 @@ impl Default for Telemetry { } } +pub fn build_request_telemetry( + latency_us: Option, + reasons: &[ResolveReason], +) -> pb::TelemetryData { + let resolve_latency = latency_us.map(|us| { + let idx = if us == 0 { + 0 + } else { + let k = ((us as f64).ln() / LN_RATIO).floor() as usize; + k.min(BUCKET_COUNT.saturating_sub(1)) + }; + pb::ResolveLatency { + sum: us, + count: 1, + buckets: vec![pb::BucketSpan { + offset: idx as i32, + counts: vec![1], + }], + ln_ratio: LN_RATIO, + } + }); + + let mut reason_counts: Vec = Vec::new(); + for reason in reasons { + let r = *reason as i32; + if let Some(entry) = reason_counts.iter_mut().find(|e| e.reason == r) { + entry.count = entry.count.saturating_add(1); + } else { + reason_counts.push(pb::ResolveRate { + count: 1, + reason: r, + }); + } + } + + pb::TelemetryData { + resolve_latency, + resolve_rate: reason_counts, + resolver_version: crate::version::VERSION.to_string(), + ..Default::default() + } +} + #[cfg(test)] mod tests { use super::*; @@ -939,4 +1030,97 @@ mod tests { ); assert_eq!(default_out, zero_out); } + + #[test] + fn accumulate_delta_basic() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 500, + count: 2, + buckets: vec![pb::BucketSpan { + offset: 5, + counts: vec![1, 1], + }], + ln_ratio: LN_RATIO, + }), + resolve_rate: vec![pb::ResolveRate { + reason: ResolveReason::Match as i32, + count: 3, + }], + memory_bytes: 4096, + ..Default::default() + }; + + snap.accumulate_delta(&td); + assert_eq!(snap.latency.sum, 500); + assert_eq!(snap.latency.count, 2); + assert_eq!(snap.latency.buckets[5], 1); + assert_eq!(snap.latency.buckets[6], 1); + assert_eq!(snap.resolve_rates[ResolveReason::Match as usize], 3); + assert_eq!(snap.memory_bytes, 4096); + + // Second delta accumulates counters, replaces gauge + snap.accumulate_delta(&td); + assert_eq!(snap.latency.sum, 1000); + assert_eq!(snap.latency.count, 4); + assert_eq!(snap.latency.buckets[5], 2); + assert_eq!(snap.resolve_rates[ResolveReason::Match as usize], 6); + assert_eq!(snap.memory_bytes, 4096); + } + + #[test] + fn accumulate_delta_negative_offset_skipped() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 100, + count: 1, + buckets: vec![ + pb::BucketSpan { + offset: -1, + counts: vec![99], + }, + pb::BucketSpan { + offset: 3, + counts: vec![1], + }, + ], + ln_ratio: LN_RATIO, + }), + ..Default::default() + }; + + snap.accumulate_delta(&td); + // Negative offset span skipped, valid span applied + assert_eq!(snap.latency.count, 1); + assert_eq!(snap.latency.buckets.get(3).copied().unwrap_or(0), 1); + // Bucket from negative offset should not exist + let total: u64 = snap.latency.buckets.iter().sum(); + assert_eq!(total, 1); + } + + #[test] + fn accumulate_delta_oversized_offset_skipped() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 100, + count: 1, + buckets: vec![pb::BucketSpan { + offset: BUCKET_COUNT as i32 + 10, + counts: vec![1], + }], + ln_ratio: LN_RATIO, + }), + ..Default::default() + }; + + snap.accumulate_delta(&td); + // Oversized offset span skipped, sum/count still accumulated + assert_eq!(snap.latency.count, 1); + assert_eq!(snap.latency.sum, 100); + let total: u64 = snap.latency.buckets.iter().sum(); + assert_eq!(total, 0); + } } diff --git a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm index 5a5ad27b..dea9f7b0 100755 Binary files a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm and b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm differ diff --git a/openfeature-provider/js/proto/test-only.proto b/openfeature-provider/js/proto/test-only.proto index 12774a8f..99f26668 100644 --- a/openfeature-provider/js/proto/test-only.proto +++ b/openfeature-provider/js/proto/test-only.proto @@ -20,6 +20,7 @@ message Sdk { enum SdkId { SDK_ID_UNSPECIFIED = 0; SDK_ID_JS_LOCAL_SERVER_PROVIDER = 22; + SDK_ID_CLOUDFLARE_RESOLVER = 25; } message TelemetryData { diff --git a/openfeature-provider/proto/confidence/flags/resolver/v1/types.proto b/openfeature-provider/proto/confidence/flags/resolver/v1/types.proto index e831bc4a..bec0aae5 100644 --- a/openfeature-provider/proto/confidence/flags/resolver/v1/types.proto +++ b/openfeature-provider/proto/confidence/flags/resolver/v1/types.proto @@ -73,4 +73,7 @@ enum SdkId { SDK_ID_GO_LOCAL_PROVIDER = 20; SDK_ID_JAVA_LOCAL_PROVIDER = 21; SDK_ID_JS_LOCAL_SERVER_PROVIDER = 22; + SDK_ID_PYTHON_LOCAL_PROVIDER = 23; + SDK_ID_RUST_LOCAL_PROVIDER = 24; + SDK_ID_CLOUDFLARE_RESOLVER = 25; } diff --git a/wasm/proto/types.proto b/wasm/proto/types.proto index 31ae0765..6aeeec62 100644 --- a/wasm/proto/types.proto +++ b/wasm/proto/types.proto @@ -42,6 +42,7 @@ message Sdk { // receives and passes through Sdk messages from the main confidence-resolver. enum SdkId { SDK_ID_UNSPECIFIED = 0; + SDK_ID_CLOUDFLARE_RESOLVER = 25; } message ResolvedValue {