Skip to content

Commit 32bfed2

Browse files
committed
chore: implement null struct pattern to reduce clutter
1 parent c225085 commit 32bfed2

File tree

6 files changed

+231
-179
lines changed

6 files changed

+231
-179
lines changed

libdd-data-pipeline/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ mod health_metrics;
1515
mod pausable_worker;
1616
#[allow(missing_docs)]
1717
pub mod stats_exporter;
18-
#[cfg(feature = "telemetry")]
1918
pub(crate) mod telemetry;
2019
#[allow(missing_docs)]
2120
pub mod trace_exporter;

libdd-data-pipeline/src/telemetry/mod.rs

Lines changed: 188 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,37 @@
33

44
//! Telemetry provides a client to send results accumulated in 'Metrics'.
55
pub mod error;
6+
#[cfg(feature = "telemetry")]
67
pub mod metrics;
8+
pub mod worker;
9+
710
use crate::telemetry::error::TelemetryError;
11+
#[cfg(feature = "telemetry")]
812
use crate::telemetry::metrics::Metrics;
13+
#[cfg(feature = "telemetry")]
914
use libdd_common::tag::Tag;
15+
#[cfg(feature = "telemetry")]
1016
use libdd_telemetry::worker::{
11-
LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder,
12-
TelemetryWorkerFlavor, TelemetryWorkerHandle,
13-
};
14-
use libdd_trace_utils::{
15-
send_with_retry::{SendWithRetryError, SendWithRetryResult},
16-
trace_utils::SendDataResult,
17+
LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerFlavor,
18+
TelemetryWorkerHandle,
1719
};
18-
use std::{collections::HashMap, time::Duration};
20+
#[cfg(feature = "telemetry")]
21+
use libdd_trace_utils::send_with_retry::SendWithRetryError;
22+
use libdd_trace_utils::send_with_retry::SendWithRetryResult;
23+
use libdd_trace_utils::trace_utils::SendDataResult;
24+
#[cfg(feature = "telemetry")]
25+
use std::collections::HashMap;
26+
use std::time::Duration;
1927
use tokio::runtime::Handle;
2028

29+
/// Configuration for telemetry reporting.
30+
#[derive(Debug, Default, Clone)]
31+
pub struct TelemetryConfig {
32+
pub heartbeat: u64,
33+
pub runtime_id: Option<String>,
34+
pub debug_enabled: bool,
35+
}
36+
2137
/// Structure to build a Telemetry client.
2238
///
2339
/// Holds partial data until the `build` method is called which results in a new
@@ -30,7 +46,9 @@ pub struct TelemetryClientBuilder {
3046
language: Option<String>,
3147
language_version: Option<String>,
3248
tracer_version: Option<String>,
33-
config: libdd_telemetry::config::Config,
49+
url: Option<String>,
50+
heartbeat: Option<Duration>,
51+
debug_enabled: bool,
3452
runtime_id: Option<String>,
3553
}
3654

@@ -73,16 +91,14 @@ impl TelemetryClientBuilder {
7391

7492
/// Sets the url where the metrics will be sent.
7593
pub fn set_url(mut self, url: &str) -> Self {
76-
let _ = self
77-
.config
78-
.set_endpoint(libdd_common::Endpoint::from_slice(url));
94+
self.url = Some(url.to_string());
7995
self
8096
}
8197

8298
/// Sets the heartbeat notification interval in millis.
8399
pub fn set_heartbeat(mut self, interval: u64) -> Self {
84100
if interval > 0 {
85-
self.config.telemetry_heartbeat_interval = Duration::from_millis(interval);
101+
self.heartbeat = Some(Duration::from_millis(interval));
86102
}
87103
self
88104
}
@@ -95,20 +111,31 @@ impl TelemetryClientBuilder {
95111

96112
/// Sets the debug enabled flag for the telemetry client.
97113
pub fn set_debug_enabled(mut self, debug: bool) -> Self {
98-
self.config.debug_enabled = debug;
114+
self.debug_enabled = debug;
99115
self
100116
}
117+
}
101118

119+
#[cfg(feature = "telemetry")]
120+
impl TelemetryClientBuilder {
102121
/// Builds the telemetry client.
103-
pub fn build(self, runtime: Handle) -> (TelemetryClient, TelemetryWorker) {
122+
pub fn build(self, runtime: Handle) -> (TelemetryClient, worker::TelemetryWorker) {
104123
#[allow(clippy::unwrap_used)]
105124
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
106125
self.service_name.unwrap(),
107126
self.language.unwrap(),
108127
self.language_version.unwrap(),
109128
self.tracer_version.unwrap(),
110129
);
111-
builder.config = self.config;
130+
if let Some(url) = self.url {
131+
let _ = builder
132+
.config
133+
.set_endpoint(libdd_common::Endpoint::from_slice(&url));
134+
}
135+
if let Some(heartbeat) = self.heartbeat {
136+
builder.config.telemetry_heartbeat_interval = heartbeat;
137+
}
138+
builder.config.debug_enabled = self.debug_enabled;
112139
// Send only metrics and logs and drop lifecycle events
113140
builder.flavor = TelemetryWorkerFlavor::MetricsLogs;
114141
builder.application.env = self.env;
@@ -130,101 +157,27 @@ impl TelemetryClientBuilder {
130157
}
131158
}
132159

160+
#[cfg(not(feature = "telemetry"))]
161+
impl TelemetryClientBuilder {
162+
/// Builds a no-op telemetry client.
163+
pub fn build(self, _runtime: Handle) -> (TelemetryClient, worker::TelemetryWorker) {
164+
(TelemetryClient {}, worker::TelemetryWorker {})
165+
}
166+
}
167+
168+
#[cfg(feature = "telemetry")]
133169
/// Telemetry handle used to send metrics to the agent
134170
#[derive(Debug)]
135171
pub struct TelemetryClient {
136172
metrics: Metrics,
137173
worker: TelemetryWorkerHandle,
138174
}
139175

140-
/// Telemetry describing the sending of a trace payload
141-
/// It can be produced from a [`SendWithRetryResult`] or from a [`SendDataResult`].
142-
#[derive(PartialEq, Debug, Default)]
143-
pub struct SendPayloadTelemetry {
144-
requests_count: u64,
145-
errors_network: u64,
146-
errors_timeout: u64,
147-
errors_status_code: u64,
148-
bytes_sent: u64,
149-
chunks_sent: u64,
150-
chunks_dropped_p0: u64,
151-
chunks_dropped_serialization_error: u64,
152-
chunks_dropped_send_failure: u64,
153-
responses_count_per_code: HashMap<u16, u64>,
154-
}
155-
156-
impl From<&SendDataResult> for SendPayloadTelemetry {
157-
fn from(value: &SendDataResult) -> Self {
158-
Self {
159-
requests_count: value.requests_count,
160-
errors_network: value.errors_network,
161-
errors_timeout: value.errors_timeout,
162-
errors_status_code: value.errors_status_code,
163-
bytes_sent: value.bytes_sent,
164-
chunks_sent: value.chunks_sent,
165-
chunks_dropped_send_failure: value.chunks_dropped,
166-
responses_count_per_code: value.responses_count_per_code.clone(),
167-
..Default::default()
168-
}
169-
}
170-
}
171-
172-
impl SendPayloadTelemetry {
173-
/// Create a [`SendPayloadTelemetry`] from a [`SendWithRetryResult`].
174-
///
175-
/// # Arguments
176-
/// * `value` - The result of sending traces with retry
177-
/// * `bytes_sent` - The number of bytes in the payload
178-
/// * `chunks` - The number of trace chunks in the payload
179-
/// * `chunks_dropped_p0` - The number of P0 trace chunks dropped due to sampling
180-
pub fn from_retry_result(
181-
value: &SendWithRetryResult,
182-
bytes_sent: u64,
183-
chunks: u64,
184-
chunks_dropped_p0: u64,
185-
) -> Self {
186-
let mut telemetry = Self {
187-
chunks_dropped_p0,
188-
..Default::default()
189-
};
190-
match value {
191-
Ok((response, attempts)) => {
192-
telemetry.chunks_sent = chunks;
193-
telemetry.bytes_sent = bytes_sent;
194-
telemetry
195-
.responses_count_per_code
196-
.insert(response.status().into(), 1);
197-
telemetry.requests_count = *attempts as u64;
198-
}
199-
Err(err) => match err {
200-
SendWithRetryError::Http(response, attempts) => {
201-
telemetry.chunks_dropped_send_failure = chunks;
202-
telemetry.errors_status_code = 1;
203-
telemetry
204-
.responses_count_per_code
205-
.insert(response.status().into(), 1);
206-
telemetry.requests_count = *attempts as u64;
207-
}
208-
SendWithRetryError::Timeout(attempts) => {
209-
telemetry.chunks_dropped_send_failure = chunks;
210-
telemetry.errors_timeout = 1;
211-
telemetry.requests_count = *attempts as u64;
212-
}
213-
SendWithRetryError::Network(_, attempts) => {
214-
telemetry.chunks_dropped_send_failure = chunks;
215-
telemetry.errors_network = 1;
216-
telemetry.requests_count = *attempts as u64;
217-
}
218-
SendWithRetryError::Build(attempts) => {
219-
telemetry.chunks_dropped_serialization_error = chunks;
220-
telemetry.requests_count = *attempts as u64;
221-
}
222-
},
223-
};
224-
telemetry
225-
}
226-
}
176+
#[cfg(not(feature = "telemetry"))]
177+
#[derive(Debug)]
178+
pub struct TelemetryClient {}
227179

180+
#[cfg(feature = "telemetry")]
228181
impl TelemetryClient {
229182
/// Sends metrics to the agent using a telemetry worker handle.
230183
///
@@ -307,7 +260,137 @@ impl TelemetryClient {
307260
}
308261
}
309262

263+
#[cfg(not(feature = "telemetry"))]
264+
impl TelemetryClient {
265+
/// No-op: telemetry is disabled.
266+
pub fn send(&self, _data: &SendPayloadTelemetry) -> Result<(), TelemetryError> {
267+
Ok(())
268+
}
269+
270+
/// No-op: telemetry is disabled.
271+
pub async fn start(&self) {}
272+
273+
/// No-op: telemetry is disabled.
274+
pub async fn shutdown(self) {}
275+
}
276+
277+
#[cfg(feature = "telemetry")]
278+
/// Telemetry describing the sending of a trace payload
279+
/// It can be produced from a [`SendWithRetryResult`] or from a [`SendDataResult`].
280+
#[derive(PartialEq, Debug, Default)]
281+
pub struct SendPayloadTelemetry {
282+
requests_count: u64,
283+
errors_network: u64,
284+
errors_timeout: u64,
285+
errors_status_code: u64,
286+
bytes_sent: u64,
287+
chunks_sent: u64,
288+
chunks_dropped_p0: u64,
289+
chunks_dropped_serialization_error: u64,
290+
chunks_dropped_send_failure: u64,
291+
responses_count_per_code: HashMap<u16, u64>,
292+
}
293+
294+
#[cfg(not(feature = "telemetry"))]
295+
#[derive(Debug)]
296+
pub struct SendPayloadTelemetry {}
297+
298+
#[cfg(feature = "telemetry")]
299+
impl SendPayloadTelemetry {
300+
/// Create a [`SendPayloadTelemetry`] from a [`SendWithRetryResult`].
301+
///
302+
/// # Arguments
303+
/// * `value` - The result of sending traces with retry
304+
/// * `bytes_sent` - The number of bytes in the payload
305+
/// * `chunks` - The number of trace chunks in the payload
306+
/// * `chunks_dropped_p0` - The number of P0 trace chunks dropped due to sampling
307+
pub fn from_retry_result(
308+
value: &SendWithRetryResult,
309+
bytes_sent: u64,
310+
chunks: u64,
311+
chunks_dropped_p0: u64,
312+
) -> Self {
313+
let mut telemetry = Self {
314+
chunks_dropped_p0,
315+
..Default::default()
316+
};
317+
match value {
318+
Ok((response, attempts)) => {
319+
telemetry.chunks_sent = chunks;
320+
telemetry.bytes_sent = bytes_sent;
321+
telemetry
322+
.responses_count_per_code
323+
.insert(response.status().into(), 1);
324+
telemetry.requests_count = *attempts as u64;
325+
}
326+
Err(err) => match err {
327+
SendWithRetryError::Http(response, attempts) => {
328+
telemetry.chunks_dropped_send_failure = chunks;
329+
telemetry.errors_status_code = 1;
330+
telemetry
331+
.responses_count_per_code
332+
.insert(response.status().into(), 1);
333+
telemetry.requests_count = *attempts as u64;
334+
}
335+
SendWithRetryError::Timeout(attempts) => {
336+
telemetry.chunks_dropped_send_failure = chunks;
337+
telemetry.errors_timeout = 1;
338+
telemetry.requests_count = *attempts as u64;
339+
}
340+
SendWithRetryError::Network(_, attempts) => {
341+
telemetry.chunks_dropped_send_failure = chunks;
342+
telemetry.errors_network = 1;
343+
telemetry.requests_count = *attempts as u64;
344+
}
345+
SendWithRetryError::Build(attempts) => {
346+
telemetry.chunks_dropped_serialization_error = chunks;
347+
telemetry.requests_count = *attempts as u64;
348+
}
349+
},
350+
};
351+
telemetry
352+
}
353+
}
354+
355+
#[cfg(not(feature = "telemetry"))]
356+
impl SendPayloadTelemetry {
357+
/// No-op: telemetry is disabled.
358+
pub fn from_retry_result(
359+
_value: &SendWithRetryResult,
360+
_bytes_sent: u64,
361+
_chunks: u64,
362+
_chunks_dropped_p0: u64,
363+
) -> Self {
364+
Self {}
365+
}
366+
}
367+
368+
#[cfg(feature = "telemetry")]
369+
impl From<&SendDataResult> for SendPayloadTelemetry {
370+
fn from(value: &SendDataResult) -> Self {
371+
Self {
372+
requests_count: value.requests_count,
373+
errors_network: value.errors_network,
374+
errors_timeout: value.errors_timeout,
375+
errors_status_code: value.errors_status_code,
376+
bytes_sent: value.bytes_sent,
377+
chunks_sent: value.chunks_sent,
378+
chunks_dropped_send_failure: value.chunks_dropped,
379+
responses_count_per_code: value.responses_count_per_code.clone(),
380+
..Default::default()
381+
}
382+
}
383+
}
384+
385+
#[cfg(not(feature = "telemetry"))]
386+
impl From<&SendDataResult> for SendPayloadTelemetry {
387+
fn from(_value: &SendDataResult) -> Self {
388+
Self {}
389+
}
390+
}
391+
310392
#[cfg(test)]
393+
#[cfg(feature = "telemetry")]
311394
mod tests {
312395
use http::{Response, StatusCode};
313396
use httpmock::Method::POST;
@@ -353,15 +436,9 @@ mod tests {
353436
assert_eq!(&builder.language.unwrap(), "test_language");
354437
assert_eq!(&builder.language_version.unwrap(), "test_language_version");
355438
assert_eq!(&builder.tracer_version.unwrap(), "test_tracer_version");
356-
assert!(builder.config.debug_enabled);
357-
assert_eq!(
358-
<String as AsRef<str>>::as_ref(&builder.config.endpoint().unwrap().url.to_string()),
359-
"http://localhost/telemetry/proxy/api/v2/apmtelemetry"
360-
);
361-
assert_eq!(
362-
builder.config.telemetry_heartbeat_interval,
363-
Duration::from_millis(30)
364-
);
439+
assert!(builder.debug_enabled);
440+
assert_eq!(builder.url.as_deref(), Some("http://localhost"));
441+
assert_eq!(builder.heartbeat.unwrap(), Duration::from_millis(30));
365442
}
366443

367444
#[cfg_attr(miri, ignore)]

0 commit comments

Comments
 (0)