1111//! triggers creation for a given (env, version, service) is used as the runtime_id in the
1212//! stats payload for that key.
1313
14+ use base64:: prelude:: BASE64_URL_SAFE_NO_PAD ;
15+ use base64:: Engine ;
1416use datadog_ipc:: shm_stats:: {
1517 ShmSpanConcentrator , DEFAULT_SLOT_COUNT , DEFAULT_STRING_POOL_BYTES , RELOAD_FILL_RATIO ,
1618} ;
@@ -27,8 +29,6 @@ use std::hash::{Hash, Hasher};
2729use std:: sync:: atomic:: { AtomicU64 , AtomicUsize , Ordering :: * } ;
2830use std:: sync:: { Arc , Mutex , Weak } ;
2931use std:: time:: { Duration , SystemTime , UNIX_EPOCH } ;
30- use base64:: Engine ;
31- use base64:: prelude:: BASE64_URL_SAFE_NO_PAD ;
3232use tracing:: { error, info, warn} ;
3333use zwohash:: ZwoHasher ;
3434
@@ -55,7 +55,9 @@ pub(crate) fn stats_endpoint(endpoint: &Endpoint) -> Option<Endpoint> {
5555 return None ;
5656 }
5757 let mut parts = endpoint. url . clone ( ) . into_parts ( ) ;
58- parts. path_and_query = Some ( PathAndQuery :: from_static ( libdd_trace_stats:: stats_exporter:: STATS_ENDPOINT_PATH ) ) ;
58+ parts. path_and_query = Some ( PathAndQuery :: from_static (
59+ libdd_trace_stats:: stats_exporter:: STATS_ENDPOINT_PATH ,
60+ ) ) ;
5961 Some ( Endpoint {
6062 url : http:: Uri :: from_parts ( parts) . ok ( ) ?,
6163 ..endpoint. clone ( )
@@ -66,7 +68,7 @@ pub(crate) fn stats_endpoint(endpoint: &Endpoint) -> Option<Endpoint> {
6668#[ derive( Clone ) ]
6769pub ( crate ) struct StatsConfig {
6870 /// Stats endpoint with final path already baked in.
69- pub endpoint : Endpoint ,
71+ pub endpoint : Endpoint ,
7072 pub flush_interval : Duration ,
7173 /// Machine hostname, forwarded to the stats payload `hostname` field.
7274 pub hostname : String ,
@@ -149,20 +151,36 @@ impl SpanConcentratorState {
149151 /// Used for one-shot flushes (idle-removal, `flush_all_stats_now`). The retry-accumulator
150152 /// path in `run_stats_flush_loop` has its own send loop and does not use this.
151153 async fn send_and_emit ( & self , client : & HttpClient , payload : pb:: ClientStatsPayload ) {
152- let endpoint = self . endpoint . lock ( ) . unwrap_or_else ( |e| e. into_inner ( ) ) . clone ( ) ;
154+ let endpoint = self
155+ . endpoint
156+ . lock ( )
157+ . unwrap_or_else ( |e| e. into_inner ( ) )
158+ . clone ( ) ;
153159 let spans = spans_in_payload ( & payload) ;
154160 let buckets = payload. stats . len ( ) as i64 ;
155- match send_stats ( client, & endpoint, & payload, self . language . clone ( ) , self . tracer_version . clone ( ) ) . await {
156- StatsSendResult :: Sent => emit_flush_metrics ( & self . dogstatsd , & self . base_tags , spans, 1 , buckets, 0 ) ,
157- StatsSendResult :: Error | StatsSendResult :: Network => emit_flush_metrics ( & self . dogstatsd , & self . base_tags , 0 , 0 , 0 , 1 ) ,
161+ match send_stats (
162+ client,
163+ & endpoint,
164+ & payload,
165+ self . language . clone ( ) ,
166+ self . tracer_version . clone ( ) ,
167+ )
168+ . await
169+ {
170+ StatsSendResult :: Sent => {
171+ emit_flush_metrics ( & self . dogstatsd , & self . base_tags , spans, 1 , buckets, 0 )
172+ }
173+ StatsSendResult :: Error | StatsSendResult :: Network => {
174+ emit_flush_metrics ( & self . dogstatsd , & self . base_tags , 0 , 0 , 0 , 1 )
175+ }
158176 }
159177 }
160178}
161179
162180/// RAII guard that keeps an (env, version, root-service) concentrator alive.
163181///
164- /// Stored in `ActiveApplication`. When the last guard for a given (env, version, root-service) is dropped,
165- /// the flush loop will remove the concentrator after `IDLE_REMOVE_SECS` seconds.
182+ /// Stored in `ActiveApplication`. When the last guard for a given (env, version, root-service) is
183+ /// dropped, the flush loop will remove the concentrator after `IDLE_REMOVE_SECS` seconds.
166184pub struct SpanConcentratorGuard {
167185 ref_count : Arc < AtomicUsize > ,
168186 last_zero_secs : Arc < AtomicU64 > ,
@@ -198,7 +216,6 @@ pub fn env_stats_shm_path(env: &str, version: &str, service: &str) -> CString {
198216 CString :: new ( path) . unwrap ( )
199217}
200218
201-
202219/// Result of a single stats payload send attempt.
203220#[ must_use]
204221enum StatsSendResult {
@@ -395,7 +412,15 @@ pub async fn run_stats_flush_loop(
395412 let mut errors = 0i64 ;
396413 let endpoint = s. endpoint . lock ( ) . unwrap_or_else ( |e| e. into_inner ( ) ) . clone ( ) ;
397414 for p in & pending {
398- match send_stats ( & client, & endpoint, & p, s. language . to_owned ( ) , s. tracer_version . to_owned ( ) ) . await {
415+ match send_stats (
416+ & client,
417+ & endpoint,
418+ & p,
419+ s. language . to_owned ( ) ,
420+ s. tracer_version . to_owned ( ) ,
421+ )
422+ . await
423+ {
399424 StatsSendResult :: Sent => {
400425 to_drain += 1 ;
401426 payloads_sent += 1 ;
@@ -413,7 +438,14 @@ pub async fn run_stats_flush_loop(
413438 }
414439 }
415440 pending. drain ( ..to_drain) ;
416- emit_flush_metrics ( & s. dogstatsd , & s. base_tags , spans_sent, payloads_sent, buckets_sent, errors) ;
441+ emit_flush_metrics (
442+ & s. dogstatsd ,
443+ & s. base_tags ,
444+ spans_sent,
445+ payloads_sent,
446+ buckets_sent,
447+ errors,
448+ ) ;
417449
418450 // Idle-removal check: if no app has held a guard for >= IDLE_REMOVE_SECS, retire this
419451 // concentrator with a final force-flush.
@@ -428,7 +460,11 @@ pub async fn run_stats_flush_loop(
428460 } ;
429461 let idle_secs = if s. ref_count . load ( Acquire ) == 0 {
430462 let last_zero = s. last_zero_secs . load ( Acquire ) ;
431- if last_zero != u64:: MAX { now_secs ( ) . saturating_sub ( last_zero) } else { 0 }
463+ if last_zero != u64:: MAX {
464+ now_secs ( ) . saturating_sub ( last_zero)
465+ } else {
466+ 0
467+ }
432468 } else {
433469 0
434470 } ;
@@ -455,13 +491,14 @@ pub async fn run_stats_flush_loop(
455491/// Create (or look up) the SHM span concentrator for an (env, service, version) pair, increment its
456492/// reference count, and return a guard.
457493///
458- /// Idempotent with respect to SHM creation: if a concentrator for this (env, service, version) already
459- /// exists, only the reference count is incremented.
494+ /// Idempotent with respect to SHM creation: if a concentrator for this (env, service, version)
495+ /// already exists, only the reference count is incremented.
460496///
461497/// Returns `None` when no `SessionConfig` has been set yet for the calling session (caller should
462498/// retry later) or when SHM creation fails.
463499///
464- /// - `concentrators`: the global per-(env,version,service) map from `SidecarServer::span_concentrators`
500+ /// - `concentrators`: the global per-(env,version,service) map from
501+ /// `SidecarServer::span_concentrators`
465502/// - `env`: the environment name
466503/// - `version`: the application version
467504/// - `service_name`: the root service name reported by `set_universal_service_tags`
0 commit comments