Skip to content

Commit 5b8d114

Browse files
committed
chore: fix merge issues
1 parent d2b9b9f commit 5b8d114

6 files changed

Lines changed: 50 additions & 36 deletions

File tree

libdd-data-pipeline/examples/send-traces-with-stats.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use clap::Parser;
5-
use libdd_data_pipeline::trace_exporter::TelemetryConfig;
65
use libdd_capabilities_impl::NativeCapabilities;
76
use libdd_data_pipeline::trace_exporter::{
87
TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
@@ -70,7 +69,7 @@ fn main() {
7069
.set_output_format(TraceExporterOutputFormat::V04)
7170
.enable_stats(Duration::from_secs(10));
7271
#[cfg(feature = "telemetry")]
73-
builder.enable_telemetry(TelemetryConfig::default());
72+
builder.enable_telemetry(libdd_data_pipeline::telemetry::TelemetryConfig::default());
7473
let exporter = builder
7574
.build::<NativeCapabilities>()
7675
.expect("Failed to build TraceExporter");

libdd-data-pipeline/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ pub(crate) mod otlp;
1717
mod pausable_worker;
1818
#[allow(missing_docs)]
1919
pub mod stats_exporter;
20-
#[cfg(feature = "telemetry")]
21-
pub(crate) mod telemetry;
20+
pub mod telemetry;
2221
#[allow(missing_docs)]
2322
pub mod trace_exporter;

libdd-data-pipeline/src/telemetry/builder.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use super::error::TelemetryError;
45
#[cfg(feature = "telemetry")]
56
use super::metrics::Metrics;
67
use super::worker;
@@ -9,7 +10,6 @@ use super::TelemetryClient;
910
use libdd_telemetry::worker::{TelemetryWorkerBuilder, TelemetryWorkerFlavor};
1011
use std::time::Duration;
1112
use tokio::runtime::Handle;
12-
use super::error::TelemetryError;
1313

1414
/// Structure to build a Telemetry client.
1515
///
@@ -96,12 +96,20 @@ impl TelemetryClientBuilder {
9696
#[cfg(feature = "telemetry")]
9797
impl TelemetryClientBuilder {
9898
/// Builds the telemetry client.
99-
pub fn build(self, runtime: Handle) -> Result<(TelemetryClient, worker::TelemetryWorker), TelemetryError> {
99+
pub fn build(
100+
self,
101+
runtime: Handle,
102+
) -> Result<(TelemetryClient, worker::TelemetryWorker), TelemetryError> {
100103
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
101-
self.service_name.ok_or_else(|| TelemetryError::Builder("service_name is required".to_string()))?,
102-
self.language.ok_or_else(|| TelemetryError::Builder("language is required".to_string()))?,
103-
self.language_version.ok_or_else(|| TelemetryError::Builder("language_version is required".to_string()))?,
104-
self.tracer_version.ok_or_else(|| TelemetryError::Builder("tracer_version is required".to_string()))?,
104+
self.service_name
105+
.ok_or_else(|| TelemetryError::Builder("service_name is required".to_string()))?,
106+
self.language
107+
.ok_or_else(|| TelemetryError::Builder("language is required".to_string()))?,
108+
self.language_version.ok_or_else(|| {
109+
TelemetryError::Builder("language_version is required".to_string())
110+
})?,
111+
self.tracer_version
112+
.ok_or_else(|| TelemetryError::Builder("tracer_version is required".to_string()))?,
105113
);
106114
if let Some(url) = self.url {
107115
builder
@@ -137,7 +145,10 @@ impl TelemetryClientBuilder {
137145
#[cfg(not(feature = "telemetry"))]
138146
impl TelemetryClientBuilder {
139147
/// Builds a no-op telemetry client.
140-
pub fn build(self, _runtime: Handle) -> Result<(TelemetryClient, worker::TelemetryWorker), TelemetryError> {
148+
pub fn build(
149+
self,
150+
_runtime: Handle,
151+
) -> Result<(TelemetryClient, worker::TelemetryWorker), TelemetryError> {
141152
Ok((TelemetryClient {}, worker::TelemetryWorker {}))
142153
}
143154
}

libdd-data-pipeline/src/telemetry/mod.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@ pub struct TelemetryConfig {
1616
pub debug_enabled: bool,
1717
}
1818

19-
2019
#[cfg(not(feature = "telemetry"))]
2120
mod inner {
2221
use super::*;
23-
use libdd_trace_utils::trace_utils::SendDataResult;
2422
use libdd_trace_utils::send_with_retry::SendWithRetryResult;
23+
use libdd_trace_utils::trace_utils::SendDataResult;
2524

2625
#[derive(Debug)]
2726
pub struct TelemetryClient {}
@@ -70,7 +69,6 @@ mod inner {
7069
use libdd_trace_utils::send_with_retry::SendWithRetryResult;
7170
use libdd_trace_utils::trace_utils::SendDataResult;
7271

73-
7472
/// Telemetry handle used to send metrics to the agent
7573
#[derive(Debug)]
7674
pub struct TelemetryClient {
@@ -123,8 +121,11 @@ mod inner {
123121
let key = self
124122
.metrics
125123
.get(metrics::MetricKind::ChunksDroppedSerializationError);
126-
self.worker
127-
.add_point(data.chunks_dropped_serialization_error as f64, key, vec![])?;
124+
self.worker.add_point(
125+
data.chunks_dropped_serialization_error as f64,
126+
key,
127+
vec![],
128+
)?;
128129
}
129130
if data.chunks_dropped_send_failure > 0 {
130131
let key = self
@@ -160,7 +161,6 @@ mod inner {
160161
}
161162
}
162163

163-
164164
/// Telemetry describing the sending of a trace payload
165165
/// It can be produced from a [`SendWithRetryResult`] or from a [`SendDataResult`].
166166
#[derive(PartialEq, Debug, Default)]
@@ -223,6 +223,11 @@ mod inner {
223223
telemetry.errors_network = 1;
224224
telemetry.requests_count = *attempts as u64;
225225
}
226+
SendWithRetryError::ResponseBody(attempts) => {
227+
telemetry.chunks_dropped_send_failure = chunks;
228+
telemetry.errors_network = 1;
229+
telemetry.requests_count = *attempts as u64;
230+
}
226231
SendWithRetryError::Build(attempts) => {
227232
telemetry.chunks_dropped_serialization_error = chunks;
228233
telemetry.requests_count = *attempts as u64;
@@ -248,12 +253,11 @@ mod inner {
248253
}
249254
}
250255
}
251-
252256
}
253257

254-
pub use inner::*;
255258
pub(crate) use builder::TelemetryClientBuilder;
256259
pub(crate) use error::TelemetryError;
260+
pub use inner::*;
257261
pub(crate) use worker::TelemetryWorker;
258262

259263
#[cfg(test)]
@@ -262,9 +266,11 @@ mod tests {
262266
use bytes::Bytes;
263267
use httpmock::Method::POST;
264268
use httpmock::MockServer;
265-
use libdd_trace_utils::{send_data::send_data_result::SendDataResult, send_with_retry::SendWithRetryError};
266269
use libdd_capabilities::HttpError;
267270
use libdd_common::worker::Worker;
271+
use libdd_trace_utils::{
272+
send_data::send_data_result::SendDataResult, send_with_retry::SendWithRetryError,
273+
};
268274
use regex::Regex;
269275
use std::collections::HashMap;
270276
use std::time::Duration;

libdd-data-pipeline/src/trace_exporter/builder.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::pausable_worker::PausableWorker;
99
use crate::telemetry::{TelemetryClientBuilder, TelemetryConfig};
1010
use crate::trace_exporter::agent_response::AgentResponsePayloadVersion;
1111
use crate::trace_exporter::error::BuilderErrorKind;
12-
use crate::trace_exporter::TelemetryConfig;
1312
#[cfg(not(target_arch = "wasm32"))]
1413
use crate::trace_exporter::TraceExporterWorkers;
1514
use crate::trace_exporter::{
@@ -296,7 +295,8 @@ impl TraceExporterBuilder {
296295
stats = StatsComputationStatus::DisabledByAgent { bucket_size };
297296
}
298297

299-
let (telemetry_client, telemetry_worker) = self.telemetry
298+
let (telemetry_client, telemetry_worker) = self
299+
.telemetry
300300
.map(|telemetry_config| -> Result<_, TraceExporterError> {
301301
let mut builder = TelemetryClientBuilder::default()
302302
.set_language(&self.language)
@@ -312,17 +312,19 @@ impl TraceExporterBuilder {
312312
builder = builder.set_runtime_id(&id);
313313
}
314314

315-
let (client, worker) = builder.build(runtime.handle().clone()).map_err(TraceExporterError::from)?;
315+
let (client, worker) = builder
316+
.build(runtime.handle().clone())
317+
.map_err(TraceExporterError::from)?;
316318
let mut telemetry_worker = PausableWorker::new(worker);
317319
telemetry_worker.start(&runtime).map_err(|e| {
318320
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
319-
e.to_string(),
321+
e.to_string(),
320322
))
321323
})?;
322324
runtime.block_on(client.start());
323325
Ok((client, telemetry_worker))
324326
})
325-
.transpose()?
327+
.transpose()?
326328
.map_or((None, None), |(c, w)| (Some(c), Some(w)));
327329

328330
Ok(TraceExporter {

libdd-data-pipeline/src/trace_exporter/mod.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,10 @@ use self::agent_response::AgentResponse;
1414
use self::metrics::MetricsEmitter;
1515
use self::stats::StatsComputationStatus;
1616
use self::trace_serializer::TraceSerializer;
17-
#[cfg(not(target_arch = "wasm32"))]
18-
use crate::agent_info::AgentInfoFetcher;
1917
use crate::agent_info::ResponseObserver;
2018
use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig};
2119
#[cfg(not(target_arch = "wasm32"))]
2220
use crate::pausable_worker::PausableWorker;
23-
#[cfg(not(target_arch = "wasm32"))]
24-
use crate::stats_exporter::StatsExporter;
2521
use crate::telemetry::{SendPayloadTelemetry, TelemetryClient};
2622
use crate::trace_exporter::agent_response::{
2723
AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION,
@@ -42,8 +38,6 @@ use libdd_capabilities::{HttpClientTrait, MaybeSend};
4238
use libdd_common::tag::Tag;
4339
use libdd_common::{Endpoint, MutexExt};
4440
use libdd_dogstatsd_client::Client;
45-
#[cfg(not(target_arch = "wasm32"))]
46-
use libdd_telemetry::worker::TelemetryWorker;
4741
use libdd_trace_utils::msgpack_decoder;
4842
use libdd_trace_utils::send_with_retry::{
4943
send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
@@ -182,9 +176,9 @@ impl<'a> From<&'a TracerMetadata> for HeaderMap {
182176
/// `H` is the HTTP client implementation, see [`HttpClientTrait`].
183177
#[derive(Debug)]
184178
pub(crate) struct TraceExporterWorkers<H: HttpClientTrait + MaybeSend + Sync + 'static> {
185-
pub info: PausableWorker<AgentInfoFetcher<H>>,
186-
pub stats: Option<PausableWorker<StatsExporter<H>>>,
187-
pub telemetry: Option<PausableWorker<TelemetryWorker>>,
179+
pub info: PausableWorker<crate::agent_info::AgentInfoFetcher<H>>,
180+
pub stats: Option<PausableWorker<crate::stats_exporter::StatsExporter<H>>>,
181+
pub telemetry: Option<PausableWorker<crate::telemetry::TelemetryWorker>>,
188182
}
189183

190184
/// The TraceExporter ingest traces from the tracers serialized as messagepack and forward them to
@@ -962,6 +956,7 @@ pub trait ResponseCallback {
962956
mod tests {
963957
use self::error::AgentErrorKind;
964958
use super::*;
959+
use crate::telemetry::TelemetryConfig;
965960
use httpmock::prelude::*;
966961
use httpmock::MockServer;
967962
use libdd_capabilities_impl::NativeCapabilities;
@@ -972,7 +967,6 @@ mod tests {
972967
use std::time::Duration;
973968
use tokio::time::sleep;
974969

975-
976970
#[test]
977971
fn test_from_tracer_tags_to_tracer_header_tags() {
978972
let tracer_tags = TracerMetadata {
@@ -1661,7 +1655,10 @@ mod tests {
16611655
true,
16621656
);
16631657

1664-
let v5: (Vec<BytesString>, Vec<Vec<libdd_trace_utils::span::v05::Span>>) = (vec![], vec![]);
1658+
let v5: (
1659+
Vec<BytesString>,
1660+
Vec<Vec<libdd_trace_utils::span::v05::Span>>,
1661+
) = (vec![], vec![]);
16651662
let traces = rmp_serde::to_vec(&v5).unwrap();
16661663
let result = exporter.send(traces.as_ref()).unwrap();
16671664
let AgentResponse::Changed { body } = result else {

0 commit comments

Comments
 (0)