Skip to content

Commit 974d690

Browse files
authored
feat(sidecar)!: Allow for targeted telemetry flush (#1898)
Specifically allowing to chose what to flush. Co-authored-by: bob.weinand <bob.weinand@datadoghq.com>
1 parent 555a22e commit 974d690

6 files changed

Lines changed: 63 additions & 19 deletions

File tree

datadog-sidecar-ffi/src/lib.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datadog_sidecar::service::telemetry::InternalTelemetryAction;
2828
use datadog_sidecar::service::{
2929
blocking::{self, SidecarTransport},
3030
DynamicInstrumentationConfigState, InstanceId, QueueId, RuntimeMetadata,
31-
SerializedTracerHeaderTags, SessionConfig, SidecarAction,
31+
SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarFlushOptions,
3232
};
3333
use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions};
3434
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
@@ -361,8 +361,11 @@ pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> Ma
361361
}
362362

363363
#[no_mangle]
364-
pub extern "C" fn ddog_sidecar_flush_traces(transport: &mut Box<SidecarTransport>) -> MaybeError {
365-
try_c!(blocking::flush_traces(transport));
364+
pub extern "C" fn ddog_sidecar_flush(
365+
transport: &mut Box<SidecarTransport>,
366+
options: SidecarFlushOptions,
367+
) -> MaybeError {
368+
try_c!(blocking::flush(transport, options));
366369

367370
MaybeError::None
368371
}

datadog-sidecar/src/service/blocking.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use super::{
55
DynamicInstrumentationConfigState, InstanceId, QueueId, SerializedTracerHeaderTags,
6-
SessionConfig, SidecarAction,
6+
SessionConfig, SidecarAction, SidecarFlushOptions,
77
};
88
use crate::service::sender::SidecarSender;
99
use crate::service::sidecar_interface::SidecarInterfaceChannel;
@@ -440,9 +440,9 @@ pub fn stats(transport: &mut SidecarTransport) -> io::Result<String> {
440440
transport.with_retry(|s| s.stats().map_err(|e| io::Error::other(e.to_string())))
441441
}
442442

443-
/// Flushes the outstanding traces.
444-
pub fn flush_traces(transport: &mut SidecarTransport) -> io::Result<()> {
445-
transport.with_retry(|s| s.flush_traces())
443+
/// Flushes traces/stats and/or telemetry, as specified by options.
444+
pub fn flush(transport: &mut SidecarTransport, options: SidecarFlushOptions) -> io::Result<()> {
445+
transport.with_retry(|s| s.flush(options))
446446
}
447447

448448
/// Sends a ping to the service.

datadog-sidecar/src/service/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub(crate) mod tracing;
4444

4545
#[cfg(windows)]
4646
pub use remote_configs::RemoteConfigNotifyFunction;
47-
pub use sidecar_interface::DynamicInstrumentationConfigState;
47+
pub use sidecar_interface::{DynamicInstrumentationConfigState, SidecarFlushOptions};
4848
pub use telemetry::{get_telemetry_action_sender, InternalTelemetryActions};
4949
pub(crate) use telemetry::{init_telemetry_sender, telemetry_action_receiver_task};
5050

datadog-sidecar/src/service/sender.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
1414
use crate::service::{
1515
sidecar_interface::{
16-
DynamicInstrumentationConfigState, SidecarInterfaceChannel, SidecarInterfaceRequest,
16+
DynamicInstrumentationConfigState, SidecarFlushOptions, SidecarInterfaceChannel,
17+
SidecarInterfaceRequest,
1718
},
1819
InstanceId, QueueId, SerializedTracerHeaderTags, SessionConfig, SidecarAction,
1920
};
@@ -456,9 +457,9 @@ impl SidecarSender {
456457
self.channel.0.set_write_timeout(d)
457458
}
458459

459-
pub fn flush_traces(&mut self) -> io::Result<()> {
460+
pub fn flush(&mut self, options: SidecarFlushOptions) -> io::Result<()> {
460461
self.drain_outbox_blocking();
461-
self.channel.call_flush_traces()
462+
self.channel.call_flush(options)
462463
}
463464

464465
pub fn ping(&mut self) -> io::Result<()> {

datadog-sidecar/src/service/sidecar_interface.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ pub enum DynamicInstrumentationConfigState {
2222
NotSet,
2323
}
2424

25+
#[repr(C)]
26+
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
27+
pub struct SidecarFlushOptions {
28+
pub traces_and_stats: bool,
29+
pub telemetry: bool,
30+
}
31+
2532
/// The `SidecarInterface` trait defines the necessary methods for the sidecar service.
2633
///
2734
/// These methods include operations such as enqueueing actions, registering services, setting
@@ -201,9 +208,9 @@ pub trait SidecarInterface {
201208
/// * `actions` - The DogStatsD actions to send.
202209
async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec<DogStatsDActionOwned>);
203210

204-
/// Flushes any outstanding traces queued for sending.
211+
/// Flushes outstanding traces/stats and/or telemetry, as specified by options.
205212
#[blocking]
206-
async fn flush_traces();
213+
async fn flush(options: SidecarFlushOptions);
207214

208215
/// Sets x-datadog-test-session-token on all requests for the given session.
209216
///

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use crate::service::{
77
telemetry::{TelemetryCachedClient, TelemetryCachedClientSet},
88
tracing::TraceFlusher,
99
DynamicInstrumentationConfigState, InstanceId, QueueId, RuntimeInfo, RuntimeMetadata,
10-
SerializedTracerHeaderTags, SessionConfig, SessionInfo, SidecarAction, SidecarInterface,
10+
SerializedTracerHeaderTags, SessionConfig, SessionInfo, SidecarAction, SidecarFlushOptions,
11+
SidecarInterface,
1112
};
1213
use datadog_ipc::platform::{FileBackedHandle, ShmHandle};
1314
use datadog_ipc::{PeerCredentials, SeqpacketConn};
@@ -991,12 +992,44 @@ impl SidecarInterface for ConnectionSidecarHandler {
991992
}
992993
}
993994

994-
async fn flush_traces(&self, _peer: PeerCredentials) {
995-
let flusher = self.server.trace_flusher.clone();
996-
if let Err(e) = tokio::spawn(async move { flusher.flush().await }).await {
997-
error!("Failed flushing traces: {e:?}");
995+
async fn flush(&self, _peer: PeerCredentials, options: SidecarFlushOptions) {
996+
if options.traces_and_stats {
997+
let flusher = self.server.trace_flusher.clone();
998+
if let Err(e) = tokio::spawn(async move { flusher.flush().await }).await {
999+
error!("Failed flushing traces: {e:?}");
1000+
}
1001+
flush_all_stats_now(&self.server.span_concentrators).await;
1002+
}
1003+
if options.telemetry {
1004+
let workers: Vec<_> = {
1005+
let clients = self.server.telemetry_clients.inner.lock_or_panic();
1006+
clients
1007+
.values()
1008+
.filter_map(|entry| {
1009+
entry
1010+
.client
1011+
.lock_or_panic()
1012+
.as_ref()
1013+
.map(|c| c.worker.clone())
1014+
})
1015+
.collect()
1016+
};
1017+
futures::future::join_all(workers.into_iter().map(|worker| async move {
1018+
let _ = worker
1019+
.send_msg(TelemetryActions::Lifecycle(
1020+
LifecycleAction::FlushMetricAggr,
1021+
))
1022+
.await;
1023+
let _ = worker
1024+
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::FlushData))
1025+
.await;
1026+
// now await completion
1027+
let (tx, rx) = futures::channel::oneshot::channel();
1028+
let _ = worker.send_msg(TelemetryActions::CollectStats(tx)).await;
1029+
let _ = rx.await;
1030+
}))
1031+
.await;
9981032
}
999-
flush_all_stats_now(&self.server.span_concentrators).await;
10001033
}
10011034

10021035
async fn set_test_session_token(&self, _peer: PeerCredentials, token: String) {

0 commit comments

Comments
 (0)