Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 162 additions & 41 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ description = "Reserved future DataDog package, contact info@datadoghq.com if yo
authors = ["Datadog Inc. <info@datadoghq.com>"]

[workspace.dependencies]
# Libdatadog dependencies - change to a stable version once we release
libdd-data-pipeline = { version = "3.0.1", default-features = false }
# Libdatadog dependencies
libdd-data-pipeline = { version = "3.1.0", default-features = false, features = ["telemetry"] }
libdd-trace-utils = { version = "3.0.0", default-features = false }
libdd-telemetry = { version = "4.0.0", default-features = false }
libdd-common = { version = "3.0.1", default-features = false }
libdd-telemetry = { version = "5.0.0", default-features = false }
libdd-common = { version = "4.0.0", default-features = false }
libdd-capabilities-impl = { version = "1.1.0", default-features = false }
libdd-tinybytes = { version = "1.1.0", default-features = false }
libdd-library-config = { version = "1.1.0", default-features = false }
opentelemetry_sdk = { version = "0.31.0", features = [
Expand Down Expand Up @@ -55,7 +56,6 @@ serde_json = { version = "1.0.140" }
hashbrown = { version = "0.15.5" }
foldhash = { version = "0.1.5" }


[profile.dev]
debug = 2 # full debug info

Expand Down
1 change: 1 addition & 0 deletions datadog-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ criterion = { version = "0.5.1", optional = true }

# Libdatadog dependencies
libdd-data-pipeline = { workspace = true }
libdd-capabilities-impl = { workspace = true }
libdd-trace-utils = { workspace = true }
libdd-telemetry = { workspace = true }
libdd-common = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1963,6 +1963,8 @@ impl ConfigBuilder {
config.dogstatsd_agent_url.set_calculated(url);
}

crate::core::telemetry_session::install_process_lineage_env(config.runtime_id);

config
}

Expand Down
1 change: 1 addition & 0 deletions datadog-opentelemetry/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod error;

pub mod log;
pub(crate) mod telemetry;
pub mod telemetry_session;
pub(crate) mod utils;

#[cfg(feature = "test-utils")]
Expand Down
5 changes: 5 additions & 0 deletions datadog-opentelemetry/src/core/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use libdd_telemetry::{
};

use super::configuration::{Config, ConfigurationProvider};
use super::telemetry_session;
use crate::{dd_debug, dd_error, dd_warn};

static TELEMETRY: TelemetryCell = OnceLock::new();
Expand Down Expand Up @@ -249,6 +250,10 @@ fn make_telemetry_worker(
builder.config = libdd_telemetry::config::Config::from_env();
builder.config.telemetry_heartbeat_interval =
Duration::from_secs_f64(config.telemetry_heartbeat_interval());
let inst = telemetry_session::sessions_from_runtime_id(config.runtime_id());
builder.config.session_id = inst.session_id;
builder.config.root_session_id = inst.root_session_id;
builder.config.parent_session_id = inst.parent_session_id;
// builder.config.debug_enabled = true;

builder.run().map(|handle| {
Expand Down
140 changes: 140 additions & 0 deletions datadog-opentelemetry/src/core/telemetry_session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Instrumentation session ids from env (`dd-session-id` family).
//! Here `runtime_id` is unchanged across `fork()` (unlike some Datadog SDKs); set parent for a new
//! process with [`lineage_env_for_spawn`] or [`extend_command_env_with_lineage`].

pub use libdd_data_pipeline::trace_exporter::TelemetryInstrumentationSessions;

/// Root lineage env key.
pub const ENV_ROOT_RS_SESSION_ID: &str = "_DD_ROOT_RS_SESSION_ID";
/// Parent lineage env key.
pub const ENV_PARENT_RS_SESSION_ID: &str = "_DD_PARENT_RS_SESSION_ID";

/// Builds [`TelemetryInstrumentationSessions`] from `runtime_id` plus lineage env keys.
#[allow(clippy::disallowed_methods)]
pub fn sessions_from_runtime_id(runtime_id: &str) -> TelemetryInstrumentationSessions {
let session_id = runtime_id.to_owned();
let root_session_id = std::env::var(ENV_ROOT_RS_SESSION_ID)
.ok()
.filter(|r| r != &session_id);
let parent_session_id = std::env::var(ENV_PARENT_RS_SESSION_ID)
.ok()
.filter(|p| p != &session_id);
TelemetryInstrumentationSessions {
session_id: Some(session_id),
root_session_id,
parent_session_id,
}
}

/// Sets [`ENV_ROOT_RS_SESSION_ID`] if unset (does not replace an existing value).
///
/// Must be called before spawning threads to avoid races on the process environment.
#[allow(clippy::disallowed_methods)]
pub fn install_process_lineage_env(runtime_id: &str) {
if std::env::var_os(ENV_ROOT_RS_SESSION_ID).is_none() {
std::env::set_var(ENV_ROOT_RS_SESSION_ID, runtime_id);
}
}

/// Env pairs for [`std::process::Command`] (root + parent = this `runtime_id`).
#[allow(clippy::disallowed_methods)]
pub fn lineage_env_for_spawn(runtime_id: &str) -> [(&'static str, String); 2] {
let root = std::env::var(ENV_ROOT_RS_SESSION_ID).unwrap_or_else(|_| runtime_id.to_string());
[
(ENV_ROOT_RS_SESSION_ID, root),
(ENV_PARENT_RS_SESSION_ID, runtime_id.to_string()),
]
}

/// Merges [`lineage_env_for_spawn`] into `cmd`.
pub fn extend_command_env_with_lineage(cmd: &mut std::process::Command, runtime_id: &str) {
for (key, val) in lineage_env_for_spawn(runtime_id) {
cmd.env(key, val);
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::sync::{Mutex, OnceLock};

fn env_mutation_lock() -> std::sync::MutexGuard<'static, ()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
}

#[test]
fn from_runtime_id_omits_root_parent_when_missing_env() {
let _l = env_mutation_lock();
let root_before = std::env::var_os(ENV_ROOT_RS_SESSION_ID);
let parent_before = std::env::var_os(ENV_PARENT_RS_SESSION_ID);
std::env::remove_var(ENV_ROOT_RS_SESSION_ID);
std::env::remove_var(ENV_PARENT_RS_SESSION_ID);
let s = sessions_from_runtime_id("rid-a");
assert_eq!(s.session_id.as_deref(), Some("rid-a"));
assert!(s.root_session_id.is_none());
assert!(s.parent_session_id.is_none());
restore_env_opt(ENV_ROOT_RS_SESSION_ID, root_before);
restore_env_opt(ENV_PARENT_RS_SESSION_ID, parent_before);
}

#[test]
fn lineage_env_parent_is_current_runtime_id() {
let _l = env_mutation_lock();
let before = std::env::var_os(ENV_ROOT_RS_SESSION_ID);
std::env::remove_var(ENV_ROOT_RS_SESSION_ID);
let env = lineage_env_for_spawn("run-1");
assert_eq!(env[1].0, ENV_PARENT_RS_SESSION_ID);
assert_eq!(env[1].1, "run-1");
restore_env_opt(ENV_ROOT_RS_SESSION_ID, before);
}

#[test]
fn lineage_env_for_spawn_preserves_existing_root() {
let _l = env_mutation_lock();
let before = std::env::var_os(ENV_ROOT_RS_SESSION_ID);
std::env::set_var(ENV_ROOT_RS_SESSION_ID, "existing-root");
let env = lineage_env_for_spawn("run-2");
assert_eq!(env[0].1, "existing-root");
assert_eq!(env[1].1, "run-2");
restore_env_opt(ENV_ROOT_RS_SESSION_ID, before);
}

#[test]
fn from_runtime_id_captures_env_root_and_parent() {
let _l = env_mutation_lock();
let root_before = std::env::var_os(ENV_ROOT_RS_SESSION_ID);
let parent_before = std::env::var_os(ENV_PARENT_RS_SESSION_ID);
std::env::set_var(ENV_ROOT_RS_SESSION_ID, "root-x");
std::env::set_var(ENV_PARENT_RS_SESSION_ID, "parent-x");
let s = sessions_from_runtime_id("current");
assert_eq!(s.session_id.as_deref(), Some("current"));
assert_eq!(s.root_session_id.as_deref(), Some("root-x"));
assert_eq!(s.parent_session_id.as_deref(), Some("parent-x"));
restore_env_opt(ENV_ROOT_RS_SESSION_ID, root_before);
restore_env_opt(ENV_PARENT_RS_SESSION_ID, parent_before);
}

#[test]
fn install_process_lineage_env_sets_root_when_missing() {
let _l = env_mutation_lock();
let before = std::env::var_os(ENV_ROOT_RS_SESSION_ID);
std::env::remove_var(ENV_ROOT_RS_SESSION_ID);
install_process_lineage_env("root-a");
assert_eq!(std::env::var(ENV_ROOT_RS_SESSION_ID).unwrap(), "root-a");
install_process_lineage_env("root-b");
assert_eq!(std::env::var(ENV_ROOT_RS_SESSION_ID).unwrap(), "root-a");
restore_env_opt(ENV_ROOT_RS_SESSION_ID, before);
}

fn restore_env_opt(key: &str, before: Option<std::ffi::OsString>) {
match before {
None => std::env::remove_var(key),
Some(v) => std::env::set_var(key, v),
}
}
}
22 changes: 16 additions & 6 deletions datadog-opentelemetry/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use std::{
};

use crate::{dd_debug, dd_error};
use libdd_capabilities_impl::NativeCapabilities;
use libdd_data_pipeline::trace_exporter::{
agent_response::AgentResponse,
error::{self as trace_exporter_error, TraceExporterError},
TraceExporter, TraceExporterBuilder,
TraceExporter as LibddTraceExporter, TraceExporterBuilder,
};

pub type TraceExporter = LibddTraceExporter<NativeCapabilities>;

#[derive(Clone, Copy)]
pub struct AsyncExporterConfig {
/// Whether the async exporter waits for the trace chunks to be exported before returning from
Expand Down Expand Up @@ -583,7 +586,7 @@ impl<T: Send + 'static> TraceExporterWorker<T> {
) -> TraceExporterHandle {
let handle = thread::spawn({
move || {
let trace_exporter = match builder.build() {
let trace_exporter = match builder.build::<NativeCapabilities>() {
Ok(exporter) => exporter,
Err(e) => {
return Err(e);
Expand All @@ -607,10 +610,17 @@ impl<T: Send + 'static> TraceExporterWorker<T> {
fn run(mut self) -> Result<(), TraceExporterError> {
#[cfg(feature = "test-utils")]
{
// Wait for the agent info to be fetched to get deterministic output when deciding
// to drop traces or not
self.trace_exporter
.wait_agent_info_ready(Duration::from_secs(5))
// Wait for agent info before first export for deterministic sampling decisions.
// `wait_agent_info_ready` is async; a dedicated runtime is required because this
// runs inside a plain `thread::spawn` with no surrounding executor.
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime for wait_agent_info_ready")
.block_on(
self.trace_exporter
.wait_agent_info_ready(Duration::from_secs(5)),
)
.unwrap();
}
while let Ok((message, data)) = self.rx.receive(self.config.max_flush_interval) {
Expand Down
5 changes: 5 additions & 0 deletions datadog-opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ pub(crate) mod core;
// Public re-exports
pub use core::configuration;
pub use core::log;
pub use core::telemetry_session::{
extend_command_env_with_lineage, install_process_lineage_env, lineage_env_for_spawn,
sessions_from_runtime_id, TelemetryInstrumentationSessions, ENV_PARENT_RS_SESSION_ID,
ENV_ROOT_RS_SESSION_ID,
};

#[cfg(feature = "test-utils")]
#[allow(missing_docs)]
Expand Down
9 changes: 7 additions & 2 deletions datadog-opentelemetry/src/span_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ use std::{sync::Arc, time::Duration};

use arc_swap::ArcSwap;
use libdd_data_pipeline::trace_exporter::{
agent_response::AgentResponse, error::TraceExporterError, TelemetryConfig, TraceExporter,
agent_response::AgentResponse, error::TraceExporterError, TelemetryConfig,
TraceExporterOutputFormat,
};

use opentelemetry_sdk::{trace::SpanData, Resource};

use crate::{
configuration::Config,
core::telemetry_session,
ddtrace_transform,
exporter::{AsyncExporterError, AsyncTraceExporter, Exporter, TraceChunk},
exporter::{AsyncExporterError, AsyncTraceExporter, Exporter, TraceChunk, TraceExporter},
mappings::CachedConfig,
};

Expand Down Expand Up @@ -61,6 +63,9 @@ impl DatadogExporter {
runtime_id: Some(config.runtime_id().to_string()),
debug_enabled: false,
});
builder.set_telemetry_instrumentation_sessions(
telemetry_session::sessions_from_runtime_id(config.runtime_id()),
);
}
DatadogExporter {
exporter: AsyncTraceExporter::new(
Expand Down
Loading
Loading