diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 8c4c16aa6e..df0cf18f94 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -34,7 +34,8 @@ impl LogExporter for OtlpHttpClient { } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - self.resource = resource.into(); + self.resource = + crate::transform::common::tonic::resource_to_attributes_with_schema(resource); } } diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 944b87f3a0..3e752cd525 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -2,17 +2,17 @@ use super::{ default_headers, parse_header_string, resolve_timeout, ExporterBuildError, OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT, }; +use crate::transform::common::tonic::ResourceAttributesWithSchema; +#[cfg(feature = "logs")] +use crate::transform::logs::tonic::group_logs_by_resource_and_scope; +#[cfg(feature = "trace")] +use crate::transform::trace::tonic::group_spans_by_resource_and_scope; use crate::{ exporter::ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, }; use http::{HeaderName, HeaderValue, Uri}; use opentelemetry::otel_debug; use opentelemetry_http::{Bytes, HttpClient}; -use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; -#[cfg(feature = "logs")] -use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; -#[cfg(feature = "trace")] -use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; #[cfg(feature = "logs")] use opentelemetry_sdk::logs::LogBatch; #[cfg(feature = "trace")] @@ -387,7 +387,7 @@ pub(crate) struct OtlpHttpClient { retry_policy: RetryPolicy, #[allow(dead_code)] // would be removed once we support set_resource for metrics and traces. - resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, + resource: crate::transform::common::tonic::ResourceAttributesWithSchema, } impl OtlpHttpClient { @@ -671,9 +671,7 @@ impl OtlpHttpClient { &self, metrics: &ResourceMetrics, ) -> Option<(Vec, &'static str, Option<&'static str>)> { - use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; - - let req: ExportMetricsServiceRequest = metrics.into(); + let req = crate::transform::metrics::tonic::resource_metrics_to_export_request(metrics); let (body, content_type) = match self.protocol { #[cfg(feature = "http-json")] diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index 7027668625..e101df8778 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -35,7 +35,8 @@ impl SpanExporter for OtlpHttpClient { } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - self.resource = resource.into(); + self.resource = + crate::transform::common::tonic::resource_to_attributes_with_schema(resource); } } diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 6704a2a2c5..95857c259c 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -9,7 +9,7 @@ use std::sync::{Arc, Mutex}; use std::time; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; -use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; +use crate::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; @@ -22,7 +22,7 @@ pub(crate) struct TonicLogsClient { retry_policy: RetryPolicy, #[allow(dead_code)] // would be removed once we support set_resource for metrics. - resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, + resource: crate::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { @@ -152,6 +152,7 @@ impl LogExporter for TonicLogsClient { } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - self.resource = resource.into(); + self.resource = + crate::transform::common::tonic::resource_to_attributes_with_schema(resource); } } diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index c29cc6d460..94d93267c5 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -2,9 +2,7 @@ use core::fmt; use std::sync::Mutex; use opentelemetry::{otel_debug, otel_warn}; -use opentelemetry_proto::tonic::collector::metrics::v1::{ - metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, -}; +use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_client::MetricsServiceClient; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; @@ -101,7 +99,9 @@ impl MetricsClient for TonicMetricsClient { .export(Request::from_parts( metadata, extensions, - ExportMetricsServiceRequest::from(metrics), + crate::transform::metrics::tonic::resource_metrics_to_export_request( + metrics, + ), )) .await .map(|response| { diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index f29a705b33..a2aa42bf9a 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -1,11 +1,11 @@ use core::fmt; use std::sync::{Arc, Mutex}; +use crate::transform::trace::tonic::group_spans_by_resource_and_scope; use opentelemetry::{otel_debug, otel_warn}; use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; -use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; use opentelemetry_sdk::error::OTelSdkError; use opentelemetry_sdk::{ error::OTelSdkResult, @@ -24,7 +24,7 @@ pub(crate) struct TonicTracesClient { retry_policy: RetryPolicy, #[allow(dead_code)] // would be removed once we support set_resource for metrics. - resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, + resource: crate::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { @@ -157,6 +157,7 @@ impl SpanExporter for TonicTracesClient { } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - self.resource = resource.into(); + self.resource = + crate::transform::common::tonic::resource_to_attributes_with_schema(resource); } } diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index a78643b616..cd97ed2ff3 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -636,6 +636,7 @@ mod metric; #[cfg(feature = "trace")] #[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] mod span; +mod transform; #[cfg(any(feature = "grpc-tonic", feature = "experimental-http-retry"))] pub mod retry_classification; diff --git a/opentelemetry-otlp/src/transform/common.rs b/opentelemetry-otlp/src/transform/common.rs new file mode 100644 index 0000000000..845bf8dd78 --- /dev/null +++ b/opentelemetry-otlp/src/transform/common.rs @@ -0,0 +1,204 @@ +#[cfg(all( + any(feature = "trace", feature = "metrics", feature = "logs"), + any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic") +))] +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +#[cfg(all( + any(feature = "trace", feature = "metrics", feature = "logs"), + any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic") +))] +pub(crate) fn to_nanos(time: SystemTime) -> u64 { + time.duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_nanos() as u64 +} + +#[cfg(all( + any(feature = "trace", feature = "metrics", feature = "logs"), + any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic") +))] +pub(crate) mod tonic { + use opentelemetry::{Array, Value}; + use opentelemetry_proto::tonic::common::v1::{ + any_value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, + }; + use std::borrow::Cow; + + #[cfg(any(feature = "trace", feature = "logs"))] + #[derive(Debug, Default)] + pub(crate) struct ResourceAttributesWithSchema { + pub(crate) attributes: Attributes, + pub(crate) schema_url: Option, + } + + #[cfg(any(feature = "trace", feature = "logs"))] + use opentelemetry_sdk::Resource; + + #[cfg(any(feature = "trace", feature = "logs"))] + pub(crate) fn resource_to_attributes_with_schema( + resource: &Resource, + ) -> ResourceAttributesWithSchema { + ResourceAttributesWithSchema { + attributes: resource_attributes(resource), + schema_url: resource.schema_url().map(ToString::to_string), + } + } + + #[cfg(any(feature = "trace", feature = "logs"))] + impl From<&Resource> for ResourceAttributesWithSchema { + fn from(resource: &Resource) -> Self { + resource_to_attributes_with_schema(resource) + } + } + + pub(crate) fn instrumentation_scope_to_proto( + library: opentelemetry::InstrumentationScope, + target: Option>, + ) -> InstrumentationScope { + InstrumentationScope { + name: target.map_or_else(|| library.name().to_owned(), Cow::into_owned), + version: library.version().unwrap_or_default().to_owned(), + attributes: keyvalues_to_proto(library.attributes().cloned()), + ..Default::default() + } + } + + pub(crate) fn instrumentation_scope_ref_to_proto( + library: &opentelemetry::InstrumentationScope, + target: Option>, + ) -> InstrumentationScope { + InstrumentationScope { + name: target.map_or_else(|| library.name().to_owned(), Cow::into_owned), + version: library.version().unwrap_or_default().to_owned(), + attributes: keyvalues_to_proto(library.attributes().cloned()), + ..Default::default() + } + } + + /// Wrapper type for Vec<`KeyValue`> + #[derive(Default, Debug)] + pub(crate) struct Attributes(pub(crate) ::std::vec::Vec); + + pub(crate) fn keyvalues_to_proto>( + kvs: I, + ) -> Vec { + kvs.into_iter() + .map(|api_kv| KeyValue { + key: api_kv.key.as_str().to_string(), + value: Some(value_to_any_value(api_kv.value)), + key_strindex: 0, + }) + .collect() + } + + // Kept as a `From` impl since `Attributes` is a local type, so orphan rule is satisfied. + impl> From for Attributes { + fn from(kvs: I) -> Self { + Attributes(keyvalues_to_proto(kvs)) + } + } + + #[cfg(feature = "logs")] + impl, V: Into> FromIterator<(K, V)> for Attributes { + fn from_iter>(iter: T) -> Self { + Attributes( + iter.into_iter() + .map(|(k, v)| KeyValue { + key: k.into(), + value: Some(v.into()), + key_strindex: 0, + }) + .collect(), + ) + } + } + + pub(crate) fn value_to_any_value(value: Value) -> AnyValue { + AnyValue { + value: match value { + Value::Bool(val) => Some(any_value::Value::BoolValue(val)), + Value::I64(val) => Some(any_value::Value::IntValue(val)), + Value::F64(val) => Some(any_value::Value::DoubleValue(val)), + Value::String(val) => Some(any_value::Value::StringValue(val.to_string())), + Value::Array(array) => Some(any_value::Value::ArrayValue(match array { + Array::Bool(vals) => array_into_proto(vals), + Array::I64(vals) => array_into_proto(vals), + Array::F64(vals) => array_into_proto(vals), + Array::String(vals) => array_into_proto(vals), + _ => unreachable!("Nonexistent array type"), + })), + _ => unreachable!("Nonexistent value type"), + }, + } + } + + fn array_into_proto(vals: Vec) -> ArrayValue + where + Value: From, + { + let values = vals + .into_iter() + .map(|val| value_to_any_value(Value::from(val))) + .collect(); + + ArrayValue { values } + } + + #[cfg(any(feature = "trace", feature = "logs"))] + pub(crate) fn resource_attributes(resource: &Resource) -> Attributes { + Attributes(keyvalues_to_proto( + resource + .iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())), + )) + } + + #[cfg(test)] + mod tests { + use super::*; + use opentelemetry::KeyValue; + + fn assert_scope_fields( + proto_scope: &InstrumentationScope, + expected_name: &str, + expected_version: &str, + expected_attr_key: &str, + ) { + assert_eq!(proto_scope.name, expected_name); + assert_eq!(proto_scope.version, expected_version); + assert_eq!(proto_scope.attributes.len(), 1); + assert_eq!(proto_scope.attributes[0].key, expected_attr_key); + } + + #[test] + fn instrumentation_scope_with_target_overrides_name_but_preserves_version_and_attributes() { + let scope = opentelemetry::InstrumentationScope::builder("my-lib") + .with_version("1.0.0") + .with_attributes([KeyValue::new("feature", "metrics")]) + .build(); + let target: Option> = Some(Cow::Borrowed("my_app::handlers")); + + let from_owned = instrumentation_scope_to_proto(scope.clone(), target.clone()); + let from_ref = instrumentation_scope_ref_to_proto(&scope, target); + + assert_scope_fields(&from_owned, "my_app::handlers", "1.0.0", "feature"); + assert_scope_fields(&from_ref, "my_app::handlers", "1.0.0", "feature"); + } + + #[test] + fn instrumentation_scope_without_target_preserves_all_fields() { + let scope = opentelemetry::InstrumentationScope::builder("my-lib") + .with_version("1.0.0") + .with_attributes([KeyValue::new("feature", "metrics")]) + .build(); + let target: Option> = None; + + let from_owned = instrumentation_scope_to_proto(scope.clone(), target.clone()); + let from_ref = instrumentation_scope_ref_to_proto(&scope, target); + + assert_scope_fields(&from_owned, "my-lib", "1.0.0", "feature"); + assert_scope_fields(&from_ref, "my-lib", "1.0.0", "feature"); + } + } +} diff --git a/opentelemetry-otlp/src/transform/logs.rs b/opentelemetry-otlp/src/transform/logs.rs new file mode 100644 index 0000000000..12ad49edc6 --- /dev/null +++ b/opentelemetry-otlp/src/transform/logs.rs @@ -0,0 +1,312 @@ +pub(crate) mod tonic { + use crate::transform::common::{ + to_nanos, + tonic::{instrumentation_scope_ref_to_proto, ResourceAttributesWithSchema}, + }; + use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; + use opentelemetry_proto::tonic::{ + common::v1::{any_value::Value, AnyValue, ArrayValue, KeyValue, KeyValueList}, + logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber}, + resource::v1::Resource, + }; + use opentelemetry_sdk::logs::LogBatch; + use std::borrow::Cow; + use std::collections::HashMap; + + pub(crate) fn logs_any_value_to_any_value(value: LogsAnyValue) -> AnyValue { + AnyValue { + value: Some(logs_any_value_to_value(value)), + } + } + + pub(crate) fn logs_any_value_to_value(value: LogsAnyValue) -> Value { + match value { + LogsAnyValue::Double(f) => Value::DoubleValue(f), + LogsAnyValue::Int(i) => Value::IntValue(i), + LogsAnyValue::String(s) => Value::StringValue(s.into()), + LogsAnyValue::Boolean(b) => Value::BoolValue(b), + LogsAnyValue::ListAny(v) => Value::ArrayValue(ArrayValue { + values: v + .into_iter() + .map(|v| AnyValue { + value: Some(logs_any_value_to_value(v)), + }) + .collect(), + }), + LogsAnyValue::Map(m) => Value::KvlistValue(KeyValueList { + values: m + .into_iter() + .map(|(key, value)| KeyValue { + key: key.into(), + value: Some(AnyValue { + value: Some(logs_any_value_to_value(value)), + }), + key_strindex: 0, + }) + .collect(), + }), + LogsAnyValue::Bytes(v) => Value::BytesValue(*v), + _ => unreachable!("Nonexistent value type"), + } + } + + pub(crate) fn log_record_to_proto( + log_record: &opentelemetry_sdk::logs::SdkLogRecord, + ) -> LogRecord { + let trace_context = log_record.trace_context(); + let severity_number = match log_record.severity_number() { + Some(Severity::Trace) => SeverityNumber::Trace, + Some(Severity::Trace2) => SeverityNumber::Trace2, + Some(Severity::Trace3) => SeverityNumber::Trace3, + Some(Severity::Trace4) => SeverityNumber::Trace4, + Some(Severity::Debug) => SeverityNumber::Debug, + Some(Severity::Debug2) => SeverityNumber::Debug2, + Some(Severity::Debug3) => SeverityNumber::Debug3, + Some(Severity::Debug4) => SeverityNumber::Debug4, + Some(Severity::Info) => SeverityNumber::Info, + Some(Severity::Info2) => SeverityNumber::Info2, + Some(Severity::Info3) => SeverityNumber::Info3, + Some(Severity::Info4) => SeverityNumber::Info4, + Some(Severity::Warn) => SeverityNumber::Warn, + Some(Severity::Warn2) => SeverityNumber::Warn2, + Some(Severity::Warn3) => SeverityNumber::Warn3, + Some(Severity::Warn4) => SeverityNumber::Warn4, + Some(Severity::Error) => SeverityNumber::Error, + Some(Severity::Error2) => SeverityNumber::Error2, + Some(Severity::Error3) => SeverityNumber::Error3, + Some(Severity::Error4) => SeverityNumber::Error4, + Some(Severity::Fatal) => SeverityNumber::Fatal, + Some(Severity::Fatal2) => SeverityNumber::Fatal2, + Some(Severity::Fatal3) => SeverityNumber::Fatal3, + Some(Severity::Fatal4) => SeverityNumber::Fatal4, + None => SeverityNumber::Unspecified, + }; + + LogRecord { + time_unix_nano: log_record.timestamp().map(to_nanos).unwrap_or_default(), + observed_time_unix_nano: to_nanos(log_record.observed_timestamp().unwrap()), + attributes: { + log_record + .attributes_iter() + .map(|kv| KeyValue { + key: kv.0.to_string(), + value: Some(AnyValue { + value: Some(logs_any_value_to_value(kv.1.clone())), + }), + key_strindex: 0, + }) + .collect() + }, + event_name: log_record.event_name().unwrap_or_default().into(), + severity_number: severity_number.into(), + severity_text: log_record + .severity_text() + .map(Into::into) + .unwrap_or_default(), + body: log_record.body().cloned().map(logs_any_value_to_any_value), + dropped_attributes_count: 0, + flags: trace_context + .map(|ctx| { + ctx.trace_flags + .map(|flags| flags.to_u8() as u32) + .unwrap_or_default() + }) + .unwrap_or_default(), + span_id: trace_context + .map(|ctx| ctx.span_id.to_bytes().to_vec()) + .unwrap_or_default(), + trace_id: trace_context + .map(|ctx| ctx.trace_id.to_bytes().to_vec()) + .unwrap_or_default(), + } + } + + pub(crate) fn group_logs_by_resource_and_scope<'a>( + logs: &'a LogBatch<'a>, + resource: &ResourceAttributesWithSchema, + ) -> Vec { + // Group logs by target or instrumentation name + let scope_map = logs.iter().fold( + HashMap::new(), + |mut scope_map: HashMap< + Cow<'static, str>, + Vec<( + &opentelemetry_sdk::logs::SdkLogRecord, + &opentelemetry::InstrumentationScope, + )>, + >, + (log_record, instrumentation)| { + let key = log_record + .target() + .cloned() + .unwrap_or_else(|| Cow::Owned(instrumentation.name().to_owned())); + scope_map + .entry(key) + .or_default() + .push((log_record, instrumentation)); + scope_map + }, + ); + + let scope_logs = scope_map + .into_iter() + .map(|(key, log_data)| ScopeLogs { + scope: Some(instrumentation_scope_ref_to_proto( + log_data.first().unwrap().1, + Some(key.into_owned().into()), + )), + schema_url: resource.schema_url.clone().unwrap_or_default(), + log_records: log_data + .into_iter() + .map(|(log_record, _)| log_record_to_proto(log_record)) + .collect(), + }) + .collect(); + + vec![ResourceLogs { + resource: Some(Resource { + attributes: resource.attributes.0.clone(), + dropped_attributes_count: 0, + entity_refs: vec![], + }), + scope_logs, + schema_url: resource.schema_url.clone().unwrap_or_default(), + }] + } +} + +#[cfg(test)] +mod tests { + use crate::transform::common::tonic::ResourceAttributesWithSchema; + use opentelemetry::logs::LogRecord as _; + use opentelemetry::logs::Logger; + use opentelemetry::logs::LoggerProvider; + use opentelemetry::time::now; + use opentelemetry::{InstrumentationScope, KeyValue}; + use opentelemetry_sdk::error::OTelSdkResult; + use opentelemetry_sdk::logs::LogProcessor; + use opentelemetry_sdk::logs::SdkLoggerProvider; + use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource}; + use std::borrow::Cow; + + #[derive(Debug)] + struct MockProcessor; + + impl LogProcessor for MockProcessor { + fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {} + + fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } + } + + fn create_test_log_data( + instrumentation_name: &str, + _message: &str, + ) -> (SdkLogRecord, InstrumentationScope) { + let processor = MockProcessor {}; + let logger = SdkLoggerProvider::builder() + .with_log_processor(processor) + .build() + .logger("test"); + let mut logrecord = logger.create_log_record(); + logrecord.set_timestamp(now()); + logrecord.set_observed_timestamp(now()); + let instrumentation = + InstrumentationScope::builder(instrumentation_name.to_string()).build(); + (logrecord, instrumentation) + } + + #[test] + fn test_group_logs_by_resource_and_scope_single_scope() { + let resource = Resource::builder().build(); + let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1"); + let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2"); + + let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let log_batch = LogBatch::new(&logs); + let resource: ResourceAttributesWithSchema = (&resource).into(); + + let grouped_logs = + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); + + assert_eq!(grouped_logs.len(), 1); + let resource_logs = &grouped_logs[0]; + assert_eq!(resource_logs.scope_logs.len(), 1); + + let scope_logs = &resource_logs.scope_logs[0]; + assert_eq!(scope_logs.log_records.len(), 2); + } + + #[test] + fn test_group_logs_by_resource_and_scope_multiple_scopes() { + let resource = Resource::builder().build(); + let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1"); + let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2"); + + let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; + let log_batch = LogBatch::new(&logs); + let resource: ResourceAttributesWithSchema = (&resource).into(); + let grouped_logs = + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); + + assert_eq!(grouped_logs.len(), 1); + let resource_logs = &grouped_logs[0]; + assert_eq!(resource_logs.scope_logs.len(), 2); + + let scope_logs_1 = &resource_logs + .scope_logs + .iter() + .find(|scope| scope.scope.as_ref().unwrap().name == "lib1") + .unwrap(); + let scope_logs_2 = &resource_logs + .scope_logs + .iter() + .find(|scope| scope.scope.as_ref().unwrap().name == "lib2") + .unwrap(); + + assert_eq!(scope_logs_1.log_records.len(), 1); + assert_eq!(scope_logs_2.log_records.len(), 1); + } + + #[test] + fn test_group_logs_preserves_scope_version_and_attributes_when_target_set() { + let resource = Resource::builder().build(); + let processor = MockProcessor {}; + let logger = SdkLoggerProvider::builder() + .with_log_processor(processor) + .build() + .logger("test"); + + let mut logrecord = logger.create_log_record(); + logrecord.set_timestamp(now()); + logrecord.set_observed_timestamp(now()); + logrecord.set_target(Cow::Borrowed("my_app::handlers")); + + let instrumentation = InstrumentationScope::builder("my-lib") + .with_version("1.0.0") + .with_attributes([KeyValue::new("feature", "metrics")]) + .build(); + + let logs = [(&logrecord, &instrumentation)]; + let log_batch = LogBatch::new(&logs); + let resource: ResourceAttributesWithSchema = (&resource).into(); + + let grouped_logs = + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); + + assert_eq!(grouped_logs.len(), 1); + let resource_logs = &grouped_logs[0]; + assert_eq!(resource_logs.scope_logs.len(), 1); + + let scope = resource_logs.scope_logs[0].scope.as_ref().unwrap(); + assert_eq!(scope.name, "my_app::handlers"); + assert_eq!(scope.version, "1.0.0"); + assert_eq!(scope.attributes.len(), 1); + assert_eq!(scope.attributes[0].key, "feature"); + } +} diff --git a/opentelemetry-otlp/src/transform/metrics.rs b/opentelemetry-otlp/src/transform/metrics.rs new file mode 100644 index 0000000000..e732d46d2b --- /dev/null +++ b/opentelemetry-otlp/src/transform/metrics.rs @@ -0,0 +1,321 @@ +// The prost currently will generate a non optional deprecated field for labels. +// We cannot assign value to it otherwise clippy will complain. +// We cannot ignore it as it's not an optional field. +// We can remove this after we removed the labels field from proto. +#[allow(deprecated)] +pub(crate) mod tonic { + use std::fmt::Debug; + + use opentelemetry::{otel_debug, Key, Value}; + use opentelemetry_sdk::metrics::data::{ + AggregatedMetrics, Exemplar as SdkExemplar, + ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge, + Histogram as SdkHistogram, Metric as SdkMetric, MetricData, ResourceMetrics, + ScopeMetrics as SdkScopeMetrics, Sum as SdkSum, + }; + use opentelemetry_sdk::metrics::Temporality; + use opentelemetry_sdk::Resource as SdkResource; + + use crate::transform::common::{to_nanos, tonic::value_to_any_value}; + use opentelemetry_proto::tonic::{ + collector::metrics::v1::ExportMetricsServiceRequest, + common::v1::KeyValue, + metrics::v1::{ + exemplar, exponential_histogram_data_point::Buckets as TonicBuckets, + metric::Data as TonicMetricData, number_data_point, AggregationTemporality, + DataPointFlags as TonicDataPointFlags, Exemplar as TonicExemplar, + ExponentialHistogram as TonicExponentialHistogram, + ExponentialHistogramDataPoint as TonicExponentialHistogramDataPoint, + Gauge as TonicGauge, Histogram as TonicHistogram, + HistogramDataPoint as TonicHistogramDataPoint, Metric as TonicMetric, + NumberDataPoint as TonicNumberDataPoint, ResourceMetrics as TonicResourceMetrics, + ScopeMetrics as TonicScopeMetrics, Sum as TonicSum, + }, + resource::v1::Resource as TonicResource, + }; + + pub(crate) fn u64_to_exemplar_value(value: u64) -> exemplar::Value { + exemplar::Value::AsInt(i64::try_from(value).unwrap_or_default()) + } + + pub(crate) fn i64_to_exemplar_value(value: i64) -> exemplar::Value { + exemplar::Value::AsInt(value) + } + + pub(crate) fn f64_to_exemplar_value(value: f64) -> exemplar::Value { + exemplar::Value::AsDouble(value) + } + + pub(crate) fn u64_to_data_point_value(value: u64) -> number_data_point::Value { + number_data_point::Value::AsInt(i64::try_from(value).unwrap_or_default()) + } + + pub(crate) fn i64_to_data_point_value(value: i64) -> number_data_point::Value { + number_data_point::Value::AsInt(value) + } + + pub(crate) fn f64_to_data_point_value(value: f64) -> number_data_point::Value { + number_data_point::Value::AsDouble(value) + } + + pub(crate) fn key_value_ref_to_proto(kv: (&Key, &Value)) -> KeyValue { + KeyValue { + key: kv.0.to_string(), + value: Some(value_to_any_value(kv.1.clone())), + key_strindex: 0, + } + } + + pub(crate) fn api_key_value_ref_to_proto(kv: &opentelemetry::KeyValue) -> KeyValue { + KeyValue { + key: kv.key.to_string(), + value: Some(value_to_any_value(kv.value.clone())), + key_strindex: 0, + } + } + + pub(crate) fn temporality_to_proto(temporality: Temporality) -> AggregationTemporality { + match temporality { + Temporality::Cumulative => AggregationTemporality::Cumulative, + Temporality::Delta => AggregationTemporality::Delta, + other => { + otel_debug!( + name: "AggregationTemporality::Unknown", + message = "Unknown temporality,using default instead.", + unknown_temporality = format!("{:?}", other), + default_temporality = format!("{:?}", Temporality::Cumulative) + ); + AggregationTemporality::Cumulative + } + } + } + + pub(crate) fn resource_metrics_to_export_request( + rm: &ResourceMetrics, + ) -> ExportMetricsServiceRequest { + ExportMetricsServiceRequest { + resource_metrics: vec![TonicResourceMetrics { + resource: Some(sdk_resource_to_proto(rm.resource())), + scope_metrics: rm.scope_metrics().map(scope_metrics_to_proto).collect(), + schema_url: rm + .resource() + .schema_url() + .map(Into::into) + .unwrap_or_default(), + }], + } + } + + pub(crate) fn sdk_resource_to_proto(resource: &SdkResource) -> TonicResource { + TonicResource { + attributes: resource.iter().map(key_value_ref_to_proto).collect(), + dropped_attributes_count: 0, + entity_refs: vec![], + } + } + + pub(crate) fn scope_metrics_to_proto(sm: &SdkScopeMetrics) -> TonicScopeMetrics { + TonicScopeMetrics { + scope: Some( + crate::transform::common::tonic::instrumentation_scope_ref_to_proto( + sm.scope(), + None, + ), + ), + metrics: sm.metrics().map(metric_to_proto).collect(), + schema_url: sm + .scope() + .schema_url() + .map(ToOwned::to_owned) + .unwrap_or_default(), + } + } + + pub(crate) fn metric_to_proto(metric: &SdkMetric) -> TonicMetric { + TonicMetric { + name: metric.name().to_string(), + description: metric.description().to_string(), + unit: metric.unit().to_string(), + metadata: vec![], + data: Some(match metric.data() { + AggregatedMetrics::F64(data) => metric_data_to_proto(data), + AggregatedMetrics::U64(data) => metric_data_to_proto(data), + AggregatedMetrics::I64(data) => metric_data_to_proto(data), + }), + } + } + + pub(crate) fn metric_data_to_proto(data: &MetricData) -> TonicMetricData + where + T: Numeric + Debug, + { + match data { + MetricData::Gauge(gauge) => TonicMetricData::Gauge(gauge_to_proto(gauge)), + MetricData::Sum(sum) => TonicMetricData::Sum(sum_to_proto(sum)), + MetricData::Histogram(hist) => TonicMetricData::Histogram(histogram_to_proto(hist)), + MetricData::ExponentialHistogram(hist) => { + TonicMetricData::ExponentialHistogram(exponential_histogram_to_proto(hist)) + } + } + } + + pub(crate) trait Numeric: Copy { + fn to_exemplar_value(self) -> exemplar::Value; + fn to_data_point_value(self) -> number_data_point::Value; + // lossy at large values for u64 and i64 but otlp histograms only handle float values + fn into_f64(self) -> f64; + } + + impl Numeric for u64 { + fn to_exemplar_value(self) -> exemplar::Value { + u64_to_exemplar_value(self) + } + fn to_data_point_value(self) -> number_data_point::Value { + u64_to_data_point_value(self) + } + fn into_f64(self) -> f64 { + self as f64 + } + } + + impl Numeric for i64 { + fn to_exemplar_value(self) -> exemplar::Value { + i64_to_exemplar_value(self) + } + fn to_data_point_value(self) -> number_data_point::Value { + i64_to_data_point_value(self) + } + fn into_f64(self) -> f64 { + self as f64 + } + } + + impl Numeric for f64 { + fn to_exemplar_value(self) -> exemplar::Value { + f64_to_exemplar_value(self) + } + fn to_data_point_value(self) -> number_data_point::Value { + f64_to_data_point_value(self) + } + fn into_f64(self) -> f64 { + self + } + } + + pub(crate) fn histogram_to_proto(hist: &SdkHistogram) -> TonicHistogram + where + T: Numeric, + { + TonicHistogram { + data_points: hist + .data_points() + .map(|dp| TonicHistogramDataPoint { + attributes: dp.attributes().map(api_key_value_ref_to_proto).collect(), + start_time_unix_nano: to_nanos(hist.start_time()), + time_unix_nano: to_nanos(hist.time()), + count: dp.count(), + sum: Some(dp.sum().into_f64()), + bucket_counts: dp.bucket_counts().collect(), + explicit_bounds: dp.bounds().collect(), + exemplars: dp.exemplars().map(exemplar_to_proto).collect(), + flags: TonicDataPointFlags::default() as u32, + min: dp.min().map(Numeric::into_f64), + max: dp.max().map(Numeric::into_f64), + }) + .collect(), + aggregation_temporality: temporality_to_proto(hist.temporality()).into(), + } + } + + pub(crate) fn exponential_histogram_to_proto( + hist: &SdkExponentialHistogram, + ) -> TonicExponentialHistogram + where + T: Numeric, + { + TonicExponentialHistogram { + data_points: hist + .data_points() + .map(|dp| TonicExponentialHistogramDataPoint { + attributes: dp.attributes().map(api_key_value_ref_to_proto).collect(), + start_time_unix_nano: to_nanos(hist.start_time()), + time_unix_nano: to_nanos(hist.time()), + count: dp.count() as u64, + sum: Some(dp.sum().into_f64()), + scale: dp.scale().into(), + zero_count: dp.zero_count(), + positive: Some(TonicBuckets { + offset: dp.positive_bucket().offset(), + bucket_counts: dp.positive_bucket().counts().collect(), + }), + negative: Some(TonicBuckets { + offset: dp.negative_bucket().offset(), + bucket_counts: dp.negative_bucket().counts().collect(), + }), + flags: TonicDataPointFlags::default() as u32, + exemplars: dp.exemplars().map(exemplar_to_proto).collect(), + min: dp.min().map(Numeric::into_f64), + max: dp.max().map(Numeric::into_f64), + zero_threshold: dp.zero_threshold(), + }) + .collect(), + aggregation_temporality: temporality_to_proto(hist.temporality()).into(), + } + } + + pub(crate) fn sum_to_proto(sum: &SdkSum) -> TonicSum + where + T: Numeric + Debug, + { + TonicSum { + data_points: sum + .data_points() + .map(|dp| TonicNumberDataPoint { + attributes: dp.attributes().map(api_key_value_ref_to_proto).collect(), + start_time_unix_nano: to_nanos(sum.start_time()), + time_unix_nano: to_nanos(sum.time()), + exemplars: dp.exemplars().map(exemplar_to_proto).collect(), + flags: TonicDataPointFlags::default() as u32, + value: Some(dp.value().to_data_point_value()), + }) + .collect(), + aggregation_temporality: temporality_to_proto(sum.temporality()).into(), + is_monotonic: sum.is_monotonic(), + } + } + + pub(crate) fn gauge_to_proto(gauge: &SdkGauge) -> TonicGauge + where + T: Numeric + Debug, + { + TonicGauge { + data_points: gauge + .data_points() + .map(|dp| TonicNumberDataPoint { + attributes: dp.attributes().map(api_key_value_ref_to_proto).collect(), + start_time_unix_nano: gauge.start_time().map(to_nanos).unwrap_or_default(), + time_unix_nano: to_nanos(gauge.time()), + exemplars: dp.exemplars().map(exemplar_to_proto).collect(), + flags: TonicDataPointFlags::default() as u32, + value: Some(dp.value().to_data_point_value()), + }) + .collect(), + } + } + + pub(crate) fn exemplar_to_proto(ex: &SdkExemplar) -> TonicExemplar + where + T: Numeric, + { + TonicExemplar { + filtered_attributes: ex + .filtered_attributes() + .map(|kv| key_value_ref_to_proto((&kv.key, &kv.value))) + .collect(), + time_unix_nano: to_nanos(ex.time()), + span_id: ex.span_id().into(), + trace_id: ex.trace_id().into(), + value: Some(ex.value.to_exemplar_value()), + } + } +} diff --git a/opentelemetry-otlp/src/transform/mod.rs b/opentelemetry-otlp/src/transform/mod.rs new file mode 100644 index 0000000000..e5cb50a80c --- /dev/null +++ b/opentelemetry-otlp/src/transform/mod.rs @@ -0,0 +1,13 @@ +pub(crate) mod common; + +#[cfg(feature = "metrics")] +#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] +pub(crate) mod metrics; + +#[cfg(feature = "trace")] +#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] +pub(crate) mod trace; + +#[cfg(feature = "logs")] +#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] +pub(crate) mod logs; diff --git a/opentelemetry-proto/src/transform/trace.rs b/opentelemetry-otlp/src/transform/trace.rs similarity index 58% rename from opentelemetry-proto/src/transform/trace.rs rename to opentelemetry-otlp/src/transform/trace.rs index 52e61ed901..0725392165 100644 --- a/opentelemetry-proto/src/transform/trace.rs +++ b/opentelemetry-otlp/src/transform/trace.rs @@ -1,8 +1,7 @@ -#[cfg(feature = "gen-tonic-messages")] /// Builds span flags based on the parent span's remote property. /// This follows the OTLP specification for span flags. pub(crate) fn build_span_flags(parent_span_is_remote: bool, base_flags: u32) -> u32 { - use crate::proto::tonic::trace::v1::SpanFlags; + use opentelemetry_proto::tonic::trace::v1::SpanFlags; let mut flags = base_flags; flags |= SpanFlags::ContextHasIsRemoteMask as u32; if parent_span_is_remote { @@ -11,169 +10,128 @@ pub(crate) fn build_span_flags(parent_span_is_remote: bool, base_flags: u32) -> flags } -#[cfg(feature = "gen-tonic-messages")] -pub mod tonic { - use crate::proto::tonic::resource::v1::Resource; - use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status}; +pub(crate) mod tonic { use crate::transform::common::{ to_nanos, - tonic::{Attributes, ResourceAttributesWithSchema}, + tonic::{ + instrumentation_scope_ref_to_proto, instrumentation_scope_to_proto, keyvalues_to_proto, + ResourceAttributesWithSchema, + }, }; use opentelemetry::trace; use opentelemetry::trace::{Link, SpanId, SpanKind}; + use opentelemetry_proto::tonic::resource::v1::Resource; + use opentelemetry_proto::tonic::trace::v1::{ + span, status, ResourceSpans, ScopeSpans, Span, Status, + }; use opentelemetry_sdk::trace::SpanData; use std::collections::HashMap; - impl From for span::SpanKind { - fn from(span_kind: SpanKind) -> Self { - match span_kind { - SpanKind::Client => span::SpanKind::Client, - SpanKind::Consumer => span::SpanKind::Consumer, - SpanKind::Internal => span::SpanKind::Internal, - SpanKind::Producer => span::SpanKind::Producer, - SpanKind::Server => span::SpanKind::Server, - } + pub(crate) fn span_kind_to_proto(span_kind: SpanKind) -> span::SpanKind { + match span_kind { + SpanKind::Client => span::SpanKind::Client, + SpanKind::Consumer => span::SpanKind::Consumer, + SpanKind::Internal => span::SpanKind::Internal, + SpanKind::Producer => span::SpanKind::Producer, + SpanKind::Server => span::SpanKind::Server, } } - impl From<&trace::Status> for status::StatusCode { - fn from(status: &trace::Status) -> Self { - match status { - trace::Status::Ok => status::StatusCode::Ok, - trace::Status::Unset => status::StatusCode::Unset, - trace::Status::Error { .. } => status::StatusCode::Error, - } + pub(crate) fn status_to_proto_code(status: &trace::Status) -> status::StatusCode { + match status { + trace::Status::Ok => status::StatusCode::Ok, + trace::Status::Unset => status::StatusCode::Unset, + trace::Status::Error { .. } => status::StatusCode::Error, } } - impl From for span::Link { - fn from(link: Link) -> Self { - span::Link { - trace_id: link.span_context.trace_id().to_bytes().to_vec(), - span_id: link.span_context.span_id().to_bytes().to_vec(), - trace_state: link.span_context.trace_state().header(), - attributes: Attributes::from(link.attributes).0, - dropped_attributes_count: link.dropped_attributes_count, - flags: super::build_span_flags( - link.span_context.is_remote(), - link.span_context.trace_flags().to_u8() as u32, - ), - } + pub(crate) fn link_to_proto(link: Link) -> span::Link { + span::Link { + trace_id: link.span_context.trace_id().to_bytes().to_vec(), + span_id: link.span_context.span_id().to_bytes().to_vec(), + trace_state: link.span_context.trace_state().header(), + attributes: keyvalues_to_proto(link.attributes), + dropped_attributes_count: link.dropped_attributes_count, + flags: super::build_span_flags( + link.span_context.is_remote(), + link.span_context.trace_flags().to_u8() as u32, + ), } } - impl From for Span { - fn from(source_span: opentelemetry_sdk::trace::SpanData) -> Self { - let span_kind: span::SpanKind = source_span.span_kind.into(); - Span { - trace_id: source_span.span_context.trace_id().to_bytes().to_vec(), - span_id: source_span.span_context.span_id().to_bytes().to_vec(), - trace_state: source_span.span_context.trace_state().header(), - parent_span_id: { - if source_span.parent_span_id != SpanId::INVALID { - source_span.parent_span_id.to_bytes().to_vec() - } else { - vec![] - } + + pub(crate) fn span_data_to_proto(source_span: SpanData) -> Span { + let span_kind = span_kind_to_proto(source_span.span_kind); + Span { + trace_id: source_span.span_context.trace_id().to_bytes().to_vec(), + span_id: source_span.span_context.span_id().to_bytes().to_vec(), + trace_state: source_span.span_context.trace_state().header(), + parent_span_id: { + if source_span.parent_span_id != SpanId::INVALID { + source_span.parent_span_id.to_bytes().to_vec() + } else { + vec![] + } + }, + flags: super::build_span_flags( + source_span.parent_span_is_remote, + source_span.span_context.trace_flags().to_u8() as u32, + ), + name: source_span.name.into_owned(), + kind: span_kind as i32, + start_time_unix_nano: to_nanos(source_span.start_time), + end_time_unix_nano: to_nanos(source_span.end_time), + dropped_attributes_count: source_span.dropped_attributes_count, + attributes: keyvalues_to_proto(source_span.attributes), + dropped_events_count: source_span.events.dropped_count, + events: source_span + .events + .into_iter() + .map(|event| span::Event { + time_unix_nano: to_nanos(event.timestamp), + name: event.name.into(), + attributes: keyvalues_to_proto(event.attributes), + dropped_attributes_count: event.dropped_attributes_count, + }) + .collect(), + dropped_links_count: source_span.links.dropped_count, + links: source_span.links.into_iter().map(link_to_proto).collect(), + status: Some(Status { + code: status_to_proto_code(&source_span.status).into(), + message: match source_span.status { + trace::Status::Error { description } => description.to_string(), + _ => Default::default(), }, - flags: super::build_span_flags( - source_span.parent_span_is_remote, - source_span.span_context.trace_flags().to_u8() as u32, - ), - name: source_span.name.into_owned(), - kind: span_kind as i32, - start_time_unix_nano: to_nanos(source_span.start_time), - end_time_unix_nano: to_nanos(source_span.end_time), - dropped_attributes_count: source_span.dropped_attributes_count, - attributes: Attributes::from(source_span.attributes).0, - dropped_events_count: source_span.events.dropped_count, - events: source_span - .events - .into_iter() - .map(|event| span::Event { - time_unix_nano: to_nanos(event.timestamp), - name: event.name.into(), - attributes: Attributes::from(event.attributes).0, - dropped_attributes_count: event.dropped_attributes_count, - }) - .collect(), - dropped_links_count: source_span.links.dropped_count, - links: source_span.links.into_iter().map(Into::into).collect(), - status: Some(Status { - code: status::StatusCode::from(&source_span.status).into(), - message: match source_span.status { - trace::Status::Error { description } => description.to_string(), - _ => Default::default(), - }, - }), - } + }), } } - impl ResourceSpans { - pub fn new(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> Self { - let span_kind: span::SpanKind = source_span.span_kind.into(); - ResourceSpans { - resource: Some(Resource { - attributes: resource.attributes.0.clone(), - dropped_attributes_count: 0, - entity_refs: vec![], - }), - schema_url: resource.schema_url.clone().unwrap_or_default(), - scope_spans: vec![ScopeSpans { - schema_url: source_span - .instrumentation_scope - .schema_url() - .map(ToOwned::to_owned) - .unwrap_or_default(), - scope: Some((source_span.instrumentation_scope, None).into()), - spans: vec![Span { - trace_id: source_span.span_context.trace_id().to_bytes().to_vec(), - span_id: source_span.span_context.span_id().to_bytes().to_vec(), - trace_state: source_span.span_context.trace_state().header(), - parent_span_id: { - if source_span.parent_span_id != SpanId::INVALID { - source_span.parent_span_id.to_bytes().to_vec() - } else { - vec![] - } - }, - flags: super::build_span_flags( - source_span.parent_span_is_remote, - source_span.span_context.trace_flags().to_u8() as u32, - ), - name: source_span.name.into_owned(), - kind: span_kind as i32, - start_time_unix_nano: to_nanos(source_span.start_time), - end_time_unix_nano: to_nanos(source_span.end_time), - dropped_attributes_count: source_span.dropped_attributes_count, - attributes: Attributes::from(source_span.attributes).0, - dropped_events_count: source_span.events.dropped_count, - events: source_span - .events - .into_iter() - .map(|event| span::Event { - time_unix_nano: to_nanos(event.timestamp), - name: event.name.into(), - attributes: Attributes::from(event.attributes).0, - dropped_attributes_count: event.dropped_attributes_count, - }) - .collect(), - dropped_links_count: source_span.links.dropped_count, - links: source_span.links.into_iter().map(Into::into).collect(), - status: Some(Status { - code: status::StatusCode::from(&source_span.status).into(), - message: match source_span.status { - trace::Status::Error { description } => description.to_string(), - _ => Default::default(), - }, - }), - }], - }], - } + #[allow(dead_code)] + pub(crate) fn span_data_to_resource_spans( + source_span: SpanData, + resource: &ResourceAttributesWithSchema, + ) -> ResourceSpans { + let schema_url = source_span + .instrumentation_scope + .schema_url() + .map(ToOwned::to_owned) + .unwrap_or_default(); + let scope = instrumentation_scope_to_proto(source_span.instrumentation_scope.clone(), None); + ResourceSpans { + resource: Some(Resource { + attributes: resource.attributes.0.clone(), + dropped_attributes_count: 0, + entity_refs: vec![], + }), + schema_url: resource.schema_url.clone().unwrap_or_default(), + scope_spans: vec![ScopeSpans { + schema_url, + scope: Some(scope), + spans: vec![span_data_to_proto(source_span)], + }], } } - pub fn group_spans_by_resource_and_scope( + pub(crate) fn group_spans_by_resource_and_scope( spans: Vec, resource: &ResourceAttributesWithSchema, ) -> Vec { @@ -191,14 +149,14 @@ pub mod tonic { let scope_spans = scope_map .into_iter() .map(|(instrumentation, span_records)| ScopeSpans { - scope: Some((instrumentation, None).into()), + scope: Some(instrumentation_scope_ref_to_proto(instrumentation, None)), schema_url: instrumentation .schema_url() .map(ToOwned::to_owned) .unwrap_or_default(), spans: span_records .into_iter() - .map(|span_data| span_data.clone().into()) + .map(|span_data| span_data_to_proto(span_data.clone())) .collect(), }) .collect(); @@ -216,11 +174,12 @@ pub mod tonic { } } -#[cfg(all(test, feature = "gen-tonic-messages"))] +#[cfg(test)] mod span_flags_tests { - use crate::proto::tonic::trace::v1::{Span, SpanFlags}; + use crate::transform::trace::tonic::span_data_to_proto; use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState}; use opentelemetry::InstrumentationScope; + use opentelemetry_proto::tonic::trace::v1::{Span, SpanFlags}; use opentelemetry_sdk::trace::SpanData; use std::borrow::Cow; @@ -275,7 +234,7 @@ mod span_flags_tests { instrumentation_scope: InstrumentationScope::builder("test").build(), }; - let otlp_span: Span = span_data.into(); + let otlp_span: Span = span_data_to_proto(span_data); assert_eq!(otlp_span.flags, SpanFlags::ContextHasIsRemoteMask as u32); // 0x100 } @@ -303,7 +262,7 @@ mod span_flags_tests { instrumentation_scope: InstrumentationScope::builder("test").build(), }; - let otlp_span: Span = span_data.into(); + let otlp_span: Span = span_data_to_proto(span_data); assert_eq!( otlp_span.flags, (SpanFlags::ContextHasIsRemoteMask as u32) | (SpanFlags::ContextIsRemoteMask as u32) @@ -313,7 +272,6 @@ mod span_flags_tests { #[cfg(test)] mod tests { - use crate::tonic::common::v1::any_value::Value; use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::time::now; use opentelemetry::trace::{ @@ -321,6 +279,7 @@ mod tests { }; use opentelemetry::InstrumentationScope; use opentelemetry::KeyValue; + use opentelemetry_proto::tonic::common::v1::any_value::Value; use opentelemetry_sdk::resource::Resource; use opentelemetry_sdk::trace::SpanData; use opentelemetry_sdk::trace::{SpanEvents, SpanLinks}; @@ -361,7 +320,7 @@ mod tests { let span_data = create_test_span_data("lib1"); let spans = vec![span_data.clone()]; - let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema + let resource: ResourceAttributesWithSchema = (&resource).into(); let grouped_spans = crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource); @@ -410,7 +369,7 @@ mod tests { let span_data3 = create_test_span_data("lib2"); let spans = vec![span_data1.clone(), span_data2.clone(), span_data3.clone()]; - let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema + let resource: ResourceAttributesWithSchema = (&resource).into(); let grouped_spans = crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource); @@ -422,24 +381,10 @@ mod tests { resource_spans.resource.as_ref().unwrap().attributes.len(), 1 ); - assert_eq!( - resource_spans.resource.as_ref().unwrap().attributes[0].key, - "resource_key" - ); - assert_eq!( - resource_spans.resource.as_ref().unwrap().attributes[0] - .value - .clone() - .unwrap() - .value - .unwrap(), - Value::StringValue("resource_value".to_string()) - ); let scope_spans = &resource_spans.scope_spans; assert_eq!(scope_spans.len(), 2); - // Check the scope spans for both lib1 and lib2 let mut lib1_scope_span = None; let mut lib2_scope_span = None; @@ -454,9 +399,6 @@ mod tests { let lib1_scope_span = lib1_scope_span.expect("lib1 scope span not found"); let lib2_scope_span = lib2_scope_span.expect("lib2 scope span not found"); - assert_eq!(lib1_scope_span.scope.as_ref().unwrap().name, "lib1"); - assert_eq!(lib2_scope_span.scope.as_ref().unwrap().name, "lib2"); - assert_eq!(lib1_scope_span.spans.len(), 2); assert_eq!(lib2_scope_span.spans.len(), 1); diff --git a/opentelemetry-otlp/tests/integration_test/Cargo.toml b/opentelemetry-otlp/tests/integration_test/Cargo.toml index d6231f6d2d..84f38b6ce2 100644 --- a/opentelemetry-otlp/tests/integration_test/Cargo.toml +++ b/opentelemetry-otlp/tests/integration_test/Cargo.toml @@ -26,7 +26,7 @@ ctor = { workspace = true } uuid = { workspace = true, features = ["v4"] } [target.'cfg(unix)'.dev-dependencies] -opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} +opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false } [features] hyper-client = ["opentelemetry-otlp/hyper-client", "opentelemetry-otlp/http-proto", "opentelemetry-otlp/trace", "opentelemetry-otlp/logs", "opentelemetry-otlp/metrics", "internal-logs"] diff --git a/opentelemetry-proto/CHANGELOG.md b/opentelemetry-proto/CHANGELOG.md index 25a10484d8..41d805ef32 100644 --- a/opentelemetry-proto/CHANGELOG.md +++ b/opentelemetry-proto/CHANGELOG.md @@ -2,6 +2,8 @@ ## vNext +- **Breaking change**: Removed the `opentelemetry_proto::transform` module and the direct dependencies on `opentelemetry` and `opentelemetry_sdk`. The transformation helpers were intended for exporter internals and now live in `opentelemetry-otlp`. Applications should use the OTLP exporters from `opentelemetry-otlp`; custom exporters should convert SDK data to the generated protobuf types directly. + ## 0.32.0 Released 2026-May-08 diff --git a/opentelemetry-proto/Cargo.toml b/opentelemetry-proto/Cargo.toml index 47523cf84e..6d22d4daeb 100644 --- a/opentelemetry-proto/Cargo.toml +++ b/opentelemetry-proto/Cargo.toml @@ -39,15 +39,15 @@ gen-tonic = ["gen-tonic-messages", "tonic", "tonic-prost", "tonic/channel"] gen-tonic-messages = ["prost"] # telemetry pillars and functions -trace = ["opentelemetry/trace", "opentelemetry_sdk/trace"] -metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"] -logs = ["opentelemetry/logs", "opentelemetry_sdk/logs"] +trace = [] +metrics = [] +logs = [] zpages = ["trace"] profiles = [] -testing = ["opentelemetry/testing"] +testing = [] # add ons -internal-logs = ["opentelemetry/internal-logs"] +internal-logs = [] with-schemars = ["schemars"] with-serde = ["serde", "const-hex", "base64"] @@ -55,15 +55,12 @@ with-serde = ["serde", "const-hex", "base64"] tonic = { workspace = true, optional = true, features = ["codegen"] } tonic-prost = { workspace = true, optional = true } prost = { workspace = true, optional = true } -opentelemetry = { workspace = true, default-features = false } -opentelemetry_sdk = { workspace = true, default-features = false } schemars = { workspace = true, optional = true } serde = { workspace = true, optional = true, features = ["serde_derive", "std"] } const-hex = { workspace = true, optional = true } base64 = { workspace = true, optional = true } [dev-dependencies] -opentelemetry = { workspace = true, features = ["testing"] } tonic-prost-build = { workspace = true } tempfile = { workspace = true } serde_json = { workspace = true } diff --git a/opentelemetry-proto/README.md b/opentelemetry-proto/README.md index fa66141b41..f68447a343 100644 --- a/opentelemetry-proto/README.md +++ b/opentelemetry-proto/README.md @@ -4,11 +4,9 @@ [splash]: https://raw.githubusercontent.com/open-telemetry/opentelemetry-rust/main/assets/logo-text.png -This crate contains generated files from +This crate contains generated OTLP protobuf types from the [opentelemetry-proto](https://github.com/open-telemetry/opentelemetry-proto) -repository and transformations between types from generated files and types -defined in -[opentelemetry](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry). +repository. [![Crates.io: opentelemetry-proto](https://img.shields.io/crates/v/opentelemetry-proto.svg)](https://crates.io/crates/opentelemetry-proto) [![Documentation](https://docs.rs/opentelemetry-proto/badge.svg)](https://docs.rs/opentelemetry-proto) @@ -40,11 +38,8 @@ of telemetry is intentionally left to other tools. ### What does this crate contain? This crate provides auto-generated Protobuf types from the [OpenTelemetry -protocol specification](https://github.com/open-telemetry/opentelemetry-proto), -along with conversion implementations between these generated types and the -types defined in the -[opentelemetry](https://crates.io/crates/opentelemetry) crate. It is used -internally by exporters such as +protocol specification](https://github.com/open-telemetry/opentelemetry-proto). +It is used internally by exporters such as [opentelemetry-otlp](https://crates.io/crates/opentelemetry-otlp) to serialize and deserialize telemetry data. diff --git a/opentelemetry-proto/src/lib.rs b/opentelemetry-proto/src/lib.rs index bd6ef1ecba..99de3dd183 100644 --- a/opentelemetry-proto/src/lib.rs +++ b/opentelemetry-proto/src/lib.rs @@ -1,9 +1,11 @@ #![cfg_attr(docsrs, feature(doc_cfg))] -//! This crate contains generated files from [opentelemetry-proto](https://github.com/open-telemetry/opentelemetry-proto) -//! repository and transformation between types from generated files and types defined in [opentelemetry](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry) +//! This crate contains generated OTLP protobuf types from the +//! [opentelemetry-proto](https://github.com/open-telemetry/opentelemetry-proto) +//! repository. //! -//! Based on the build tool needed, users can choose to generate files using [tonic](https://github.com/hyperium/tonic) -//! or [grpcio](https://github.com/tikv/grpc-rs). +//! Enable `gen-tonic-messages` to generate message types using +//! [prost](https://github.com/tokio-rs/prost), or `gen-tonic` to also include +//! [tonic](https://github.com/hyperium/tonic) gRPC client/server transport support. //! //! //! # Feature flags @@ -34,5 +36,3 @@ mod proto; #[cfg(feature = "gen-tonic-messages")] pub use proto::tonic; - -pub mod transform; diff --git a/opentelemetry-proto/src/proto.rs b/opentelemetry-proto/src/proto.rs index 892d8f10c2..7bc42248c9 100644 --- a/opentelemetry-proto/src/proto.rs +++ b/opentelemetry-proto/src/proto.rs @@ -508,5 +508,4 @@ pub mod tonic { pub mod v1development; } - pub use crate::transform::common::tonic::Attributes; } diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs deleted file mode 100644 index d9f14e3b84..0000000000 --- a/opentelemetry-proto/src/transform/common.rs +++ /dev/null @@ -1,212 +0,0 @@ -#[cfg(all( - feature = "gen-tonic-messages", - any(feature = "trace", feature = "metrics", feature = "logs") -))] -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -#[cfg(all( - feature = "gen-tonic-messages", - any(feature = "trace", feature = "metrics", feature = "logs") -))] -pub(crate) fn to_nanos(time: SystemTime) -> u64 { - time.duration_since(UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_nanos() as u64 -} - -#[cfg(feature = "gen-tonic-messages")] -pub mod tonic { - use crate::proto::tonic::common::v1::{ - any_value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, - }; - use opentelemetry::{Array, Value}; - use std::borrow::Cow; - - #[cfg(any(feature = "trace", feature = "logs"))] - #[derive(Debug, Default)] - pub struct ResourceAttributesWithSchema { - pub attributes: Attributes, - pub schema_url: Option, - } - - #[cfg(any(feature = "trace", feature = "logs"))] - impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema { - fn from(resource: &opentelemetry_sdk::Resource) -> Self { - ResourceAttributesWithSchema { - attributes: resource_attributes(resource), - schema_url: resource.schema_url().map(ToString::to_string), - } - } - } - - #[cfg(any(feature = "trace", feature = "logs"))] - use opentelemetry_sdk::Resource; - - impl - From<( - opentelemetry::InstrumentationScope, - Option>, - )> for InstrumentationScope - { - fn from( - data: ( - opentelemetry::InstrumentationScope, - Option>, - ), - ) -> Self { - let (library, target) = data; - InstrumentationScope { - name: target.map_or_else(|| library.name().to_owned(), Cow::into_owned), - version: library.version().unwrap_or_default().to_owned(), - attributes: Attributes::from(library.attributes().cloned()).0, - ..Default::default() - } - } - } - - impl - From<( - &opentelemetry::InstrumentationScope, - Option>, - )> for InstrumentationScope - { - fn from( - data: ( - &opentelemetry::InstrumentationScope, - Option>, - ), - ) -> Self { - let (library, target) = data; - InstrumentationScope { - name: target.map_or_else(|| library.name().to_owned(), Cow::into_owned), - version: library.version().unwrap_or_default().to_owned(), - attributes: Attributes::from(library.attributes().cloned()).0, - ..Default::default() - } - } - } - - /// Wrapper type for Vec<`KeyValue`> - #[derive(Default, Debug)] - pub struct Attributes(pub ::std::vec::Vec); - - impl> From for Attributes { - fn from(kvs: I) -> Self { - Attributes( - kvs.into_iter() - .map(|api_kv| KeyValue { - key: api_kv.key.as_str().to_string(), - value: Some(api_kv.value.into()), - key_strindex: 0, - }) - .collect(), - ) - } - } - - #[cfg(feature = "logs")] - impl, V: Into> FromIterator<(K, V)> for Attributes { - fn from_iter>(iter: T) -> Self { - Attributes( - iter.into_iter() - .map(|(k, v)| KeyValue { - key: k.into(), - value: Some(v.into()), - key_strindex: 0, - }) - .collect(), - ) - } - } - - impl From for AnyValue { - fn from(value: Value) -> Self { - AnyValue { - value: match value { - Value::Bool(val) => Some(any_value::Value::BoolValue(val)), - Value::I64(val) => Some(any_value::Value::IntValue(val)), - Value::F64(val) => Some(any_value::Value::DoubleValue(val)), - Value::String(val) => Some(any_value::Value::StringValue(val.to_string())), - Value::Array(array) => Some(any_value::Value::ArrayValue(match array { - Array::Bool(vals) => array_into_proto(vals), - Array::I64(vals) => array_into_proto(vals), - Array::F64(vals) => array_into_proto(vals), - Array::String(vals) => array_into_proto(vals), - _ => unreachable!("Nonexistent array type"), // Needs to be updated when new array types are added - })), - _ => unreachable!("Nonexistent value type"), // Needs to be updated when new value types are added - }, - } - } - } - - fn array_into_proto(vals: Vec) -> ArrayValue - where - Value: From, - { - let values = vals - .into_iter() - .map(|val| AnyValue::from(Value::from(val))) - .collect(); - - ArrayValue { values } - } - - #[cfg(any(feature = "trace", feature = "logs"))] - pub(crate) fn resource_attributes(resource: &Resource) -> Attributes { - resource - .iter() - .map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())) - .collect::>() - .into() - } - - #[cfg(test)] - mod tests { - use super::*; - use opentelemetry::KeyValue; - use std::borrow::Cow; - - fn assert_scope_fields( - proto_scope: &InstrumentationScope, - expected_name: &str, - expected_version: &str, - expected_attr_key: &str, - ) { - assert_eq!(proto_scope.name, expected_name); - assert_eq!(proto_scope.version, expected_version); - assert_eq!(proto_scope.attributes.len(), 1); - assert_eq!(proto_scope.attributes[0].key, expected_attr_key); - } - - #[test] - fn instrumentation_scope_with_target_overrides_name_but_preserves_version_and_attributes() { - let scope = opentelemetry::InstrumentationScope::builder("my-lib") - .with_version("1.0.0") - .with_attributes([KeyValue::new("feature", "metrics")]) - .build(); - let target: Option> = Some(Cow::Borrowed("my_app::handlers")); - - let from_owned = InstrumentationScope::from((scope.clone(), target.clone())); - let from_ref = InstrumentationScope::from((&scope, target)); - - assert_scope_fields(&from_owned, "my_app::handlers", "1.0.0", "feature"); - assert_scope_fields(&from_ref, "my_app::handlers", "1.0.0", "feature"); - } - - #[test] - fn instrumentation_scope_without_target_preserves_all_fields() { - let scope = opentelemetry::InstrumentationScope::builder("my-lib") - .with_version("1.0.0") - .with_attributes([KeyValue::new("feature", "metrics")]) - .build(); - let target: Option> = None; - - let from_owned = InstrumentationScope::from((scope.clone(), target.clone())); - let from_ref = InstrumentationScope::from((&scope, target)); - - assert_scope_fields(&from_owned, "my-lib", "1.0.0", "feature"); - assert_scope_fields(&from_ref, "my-lib", "1.0.0", "feature"); - } - } -} diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs deleted file mode 100644 index a5739695ba..0000000000 --- a/opentelemetry-proto/src/transform/logs.rs +++ /dev/null @@ -1,358 +0,0 @@ -#[cfg(feature = "gen-tonic-messages")] -pub mod tonic { - use crate::{ - tonic::{ - common::v1::{ - any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue, - KeyValueList, - }, - logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber}, - resource::v1::Resource, - }, - transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, - }; - use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; - use opentelemetry_sdk::logs::LogBatch; - use std::borrow::Cow; - use std::collections::HashMap; - - impl From for AnyValue { - fn from(value: LogsAnyValue) -> Self { - AnyValue { - value: Some(value.into()), - } - } - } - - impl From for Value { - fn from(value: LogsAnyValue) -> Self { - match value { - LogsAnyValue::Double(f) => Value::DoubleValue(f), - LogsAnyValue::Int(i) => Value::IntValue(i), - LogsAnyValue::String(s) => Value::StringValue(s.into()), - LogsAnyValue::Boolean(b) => Value::BoolValue(b), - LogsAnyValue::ListAny(v) => Value::ArrayValue(ArrayValue { - values: v - .into_iter() - .map(|v| AnyValue { - value: Some(v.into()), - }) - .collect(), - }), - LogsAnyValue::Map(m) => Value::KvlistValue(KeyValueList { - values: m - .into_iter() - .map(|(key, value)| KeyValue { - key: key.into(), - value: Some(AnyValue { - value: Some(value.into()), - }), - key_strindex: 0, - }) - .collect(), - }), - LogsAnyValue::Bytes(v) => Value::BytesValue(*v), - _ => unreachable!("Nonexistent value type"), - } - } - } - - impl From<&opentelemetry_sdk::logs::SdkLogRecord> for LogRecord { - fn from(log_record: &opentelemetry_sdk::logs::SdkLogRecord) -> Self { - let trace_context = log_record.trace_context(); - let severity_number = match log_record.severity_number() { - Some(Severity::Trace) => SeverityNumber::Trace, - Some(Severity::Trace2) => SeverityNumber::Trace2, - Some(Severity::Trace3) => SeverityNumber::Trace3, - Some(Severity::Trace4) => SeverityNumber::Trace4, - Some(Severity::Debug) => SeverityNumber::Debug, - Some(Severity::Debug2) => SeverityNumber::Debug2, - Some(Severity::Debug3) => SeverityNumber::Debug3, - Some(Severity::Debug4) => SeverityNumber::Debug4, - Some(Severity::Info) => SeverityNumber::Info, - Some(Severity::Info2) => SeverityNumber::Info2, - Some(Severity::Info3) => SeverityNumber::Info3, - Some(Severity::Info4) => SeverityNumber::Info4, - Some(Severity::Warn) => SeverityNumber::Warn, - Some(Severity::Warn2) => SeverityNumber::Warn2, - Some(Severity::Warn3) => SeverityNumber::Warn3, - Some(Severity::Warn4) => SeverityNumber::Warn4, - Some(Severity::Error) => SeverityNumber::Error, - Some(Severity::Error2) => SeverityNumber::Error2, - Some(Severity::Error3) => SeverityNumber::Error3, - Some(Severity::Error4) => SeverityNumber::Error4, - Some(Severity::Fatal) => SeverityNumber::Fatal, - Some(Severity::Fatal2) => SeverityNumber::Fatal2, - Some(Severity::Fatal3) => SeverityNumber::Fatal3, - Some(Severity::Fatal4) => SeverityNumber::Fatal4, - None => SeverityNumber::Unspecified, - }; - - LogRecord { - time_unix_nano: log_record.timestamp().map(to_nanos).unwrap_or_default(), - observed_time_unix_nano: to_nanos(log_record.observed_timestamp().unwrap()), - attributes: { - log_record - .attributes_iter() - .map(|kv| KeyValue { - key: kv.0.to_string(), - value: Some(AnyValue { - value: Some(kv.1.clone().into()), - }), - key_strindex: 0, - }) - .collect() - }, - event_name: log_record.event_name().unwrap_or_default().into(), - severity_number: severity_number.into(), - severity_text: log_record - .severity_text() - .map(Into::into) - .unwrap_or_default(), - body: log_record.body().cloned().map(Into::into), - dropped_attributes_count: 0, - flags: trace_context - .map(|ctx| { - ctx.trace_flags - .map(|flags| flags.to_u8() as u32) - .unwrap_or_default() - }) - .unwrap_or_default(), - span_id: trace_context - .map(|ctx| ctx.span_id.to_bytes().to_vec()) - .unwrap_or_default(), - trace_id: trace_context - .map(|ctx| ctx.trace_id.to_bytes().to_vec()) - .unwrap_or_default(), - } - } - } - - impl - From<( - ( - &opentelemetry_sdk::logs::SdkLogRecord, - &opentelemetry::InstrumentationScope, - ), - &ResourceAttributesWithSchema, - )> for ResourceLogs - { - fn from( - data: ( - ( - &opentelemetry_sdk::logs::SdkLogRecord, - &opentelemetry::InstrumentationScope, - ), - &ResourceAttributesWithSchema, - ), - ) -> Self { - let ((log_record, instrumentation), resource) = data; - - ResourceLogs { - resource: Some(Resource { - attributes: resource.attributes.0.clone(), - dropped_attributes_count: 0, - entity_refs: vec![], - }), - schema_url: resource.schema_url.clone().unwrap_or_default(), - scope_logs: vec![ScopeLogs { - schema_url: instrumentation - .schema_url() - .map(ToOwned::to_owned) - .unwrap_or_default(), - scope: Some((instrumentation, log_record.target().cloned()).into()), - log_records: vec![log_record.into()], - }], - } - } - } - - pub fn group_logs_by_resource_and_scope<'a>( - logs: &'a LogBatch<'a>, - resource: &ResourceAttributesWithSchema, - ) -> Vec { - // Group logs by target or instrumentation name - let scope_map = logs.iter().fold( - HashMap::new(), - |mut scope_map: HashMap< - Cow<'static, str>, - Vec<( - &opentelemetry_sdk::logs::SdkLogRecord, - &opentelemetry::InstrumentationScope, - )>, - >, - (log_record, instrumentation)| { - let key = log_record - .target() - .cloned() - .unwrap_or_else(|| Cow::Owned(instrumentation.name().to_owned())); - scope_map - .entry(key) - .or_default() - .push((log_record, instrumentation)); - scope_map - }, - ); - - let scope_logs = scope_map - .into_iter() - .map(|(key, log_data)| ScopeLogs { - scope: Some(InstrumentationScope::from(( - log_data.first().unwrap().1, - Some(key.into_owned().into()), - ))), - schema_url: resource.schema_url.clone().unwrap_or_default(), - log_records: log_data - .into_iter() - .map(|(log_record, _)| log_record.into()) - .collect(), - }) - .collect(); - - vec![ResourceLogs { - resource: Some(Resource { - attributes: resource.attributes.0.clone(), - dropped_attributes_count: 0, - entity_refs: vec![], - }), - scope_logs, - schema_url: resource.schema_url.clone().unwrap_or_default(), - }] - } -} - -#[cfg(test)] -mod tests { - use crate::transform::common::tonic::ResourceAttributesWithSchema; - use opentelemetry::logs::LogRecord as _; - use opentelemetry::logs::Logger; - use opentelemetry::logs::LoggerProvider; - use opentelemetry::time::now; - use opentelemetry::{InstrumentationScope, KeyValue}; - use opentelemetry_sdk::error::OTelSdkResult; - use opentelemetry_sdk::logs::LogProcessor; - use opentelemetry_sdk::logs::SdkLoggerProvider; - use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource}; - use std::borrow::Cow; - - #[derive(Debug)] - struct MockProcessor; - - impl LogProcessor for MockProcessor { - fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {} - - fn force_flush(&self) -> OTelSdkResult { - Ok(()) - } - - fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { - Ok(()) - } - } - - fn create_test_log_data( - instrumentation_name: &str, - _message: &str, - ) -> (SdkLogRecord, InstrumentationScope) { - let processor = MockProcessor {}; - let logger = SdkLoggerProvider::builder() - .with_log_processor(processor) - .build() - .logger("test"); - let mut logrecord = logger.create_log_record(); - logrecord.set_timestamp(now()); - logrecord.set_observed_timestamp(now()); - let instrumentation = - InstrumentationScope::builder(instrumentation_name.to_string()).build(); - (logrecord, instrumentation) - } - - #[test] - fn test_group_logs_by_resource_and_scope_single_scope() { - let resource = Resource::builder().build(); - let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1"); - let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2"); - - let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; - let log_batch = LogBatch::new(&logs); - let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema - - let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); - - assert_eq!(grouped_logs.len(), 1); - let resource_logs = &grouped_logs[0]; - assert_eq!(resource_logs.scope_logs.len(), 1); - - let scope_logs = &resource_logs.scope_logs[0]; - assert_eq!(scope_logs.log_records.len(), 2); - } - - #[test] - fn test_group_logs_by_resource_and_scope_multiple_scopes() { - let resource = Resource::builder().build(); - let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1"); - let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2"); - - let logs = [(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)]; - let log_batch = LogBatch::new(&logs); - let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema - let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); - - assert_eq!(grouped_logs.len(), 1); - let resource_logs = &grouped_logs[0]; - assert_eq!(resource_logs.scope_logs.len(), 2); - - let scope_logs_1 = &resource_logs - .scope_logs - .iter() - .find(|scope| scope.scope.as_ref().unwrap().name == "lib1") - .unwrap(); - let scope_logs_2 = &resource_logs - .scope_logs - .iter() - .find(|scope| scope.scope.as_ref().unwrap().name == "lib2") - .unwrap(); - - assert_eq!(scope_logs_1.log_records.len(), 1); - assert_eq!(scope_logs_2.log_records.len(), 1); - } - - #[test] - fn test_group_logs_preserves_scope_version_and_attributes_when_target_set() { - let resource = Resource::builder().build(); - let processor = MockProcessor {}; - let logger = SdkLoggerProvider::builder() - .with_log_processor(processor) - .build() - .logger("test"); - - let mut logrecord = logger.create_log_record(); - logrecord.set_timestamp(now()); - logrecord.set_observed_timestamp(now()); - logrecord.set_target(Cow::Borrowed("my_app::handlers")); - - let instrumentation = InstrumentationScope::builder("my-lib") - .with_version("1.0.0") - .with_attributes([KeyValue::new("feature", "metrics")]) - .build(); - - let logs = [(&logrecord, &instrumentation)]; - let log_batch = LogBatch::new(&logs); - let resource: ResourceAttributesWithSchema = (&resource).into(); - - let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); - - assert_eq!(grouped_logs.len(), 1); - let resource_logs = &grouped_logs[0]; - assert_eq!(resource_logs.scope_logs.len(), 1); - - let scope = resource_logs.scope_logs[0].scope.as_ref().unwrap(); - assert_eq!(scope.name, "my_app::handlers"); - assert_eq!(scope.version, "1.0.0"); - assert_eq!(scope.attributes.len(), 1); - assert_eq!(scope.attributes[0].key, "feature"); - } -} diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs deleted file mode 100644 index a84e147ee8..0000000000 --- a/opentelemetry-proto/src/transform/metrics.rs +++ /dev/null @@ -1,334 +0,0 @@ -// The prost currently will generate a non optional deprecated field for labels. -// We cannot assign value to it otherwise clippy will complain. -// We cannot ignore it as it's not an optional field. -// We can remove this after we removed the labels field from proto. -#[allow(deprecated)] -#[cfg(feature = "gen-tonic-messages")] -pub mod tonic { - use std::fmt::Debug; - - use opentelemetry::{otel_debug, Key, Value}; - use opentelemetry_sdk::metrics::data::{ - AggregatedMetrics, Exemplar as SdkExemplar, - ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge, - Histogram as SdkHistogram, Metric as SdkMetric, MetricData, ResourceMetrics, - ScopeMetrics as SdkScopeMetrics, Sum as SdkSum, - }; - use opentelemetry_sdk::metrics::Temporality; - use opentelemetry_sdk::Resource as SdkResource; - - use crate::proto::tonic::{ - collector::metrics::v1::ExportMetricsServiceRequest, - common::v1::KeyValue, - metrics::v1::{ - exemplar, exemplar::Value as TonicExemplarValue, - exponential_histogram_data_point::Buckets as TonicBuckets, - metric::Data as TonicMetricData, number_data_point, - number_data_point::Value as TonicDataPointValue, - AggregationTemporality as TonicTemporality, AggregationTemporality, - DataPointFlags as TonicDataPointFlags, Exemplar as TonicExemplar, - ExponentialHistogram as TonicExponentialHistogram, - ExponentialHistogramDataPoint as TonicExponentialHistogramDataPoint, - Gauge as TonicGauge, Histogram as TonicHistogram, - HistogramDataPoint as TonicHistogramDataPoint, Metric as TonicMetric, - NumberDataPoint as TonicNumberDataPoint, ResourceMetrics as TonicResourceMetrics, - ScopeMetrics as TonicScopeMetrics, Sum as TonicSum, - }, - resource::v1::Resource as TonicResource, - }; - use crate::transform::common::to_nanos; - - impl From for exemplar::Value { - fn from(value: u64) -> Self { - exemplar::Value::AsInt(i64::try_from(value).unwrap_or_default()) - } - } - - impl From for exemplar::Value { - fn from(value: i64) -> Self { - exemplar::Value::AsInt(value) - } - } - - impl From for exemplar::Value { - fn from(value: f64) -> Self { - exemplar::Value::AsDouble(value) - } - } - - impl From for number_data_point::Value { - fn from(value: u64) -> Self { - number_data_point::Value::AsInt(i64::try_from(value).unwrap_or_default()) - } - } - - impl From for number_data_point::Value { - fn from(value: i64) -> Self { - number_data_point::Value::AsInt(value) - } - } - - impl From for number_data_point::Value { - fn from(value: f64) -> Self { - number_data_point::Value::AsDouble(value) - } - } - - impl From<(&Key, &Value)> for KeyValue { - fn from(kv: (&Key, &Value)) -> Self { - KeyValue { - key: kv.0.to_string(), - value: Some(kv.1.clone().into()), - key_strindex: 0, - } - } - } - - impl From<&opentelemetry::KeyValue> for KeyValue { - fn from(kv: &opentelemetry::KeyValue) -> Self { - KeyValue { - key: kv.key.to_string(), - value: Some(kv.value.clone().into()), - key_strindex: 0, - } - } - } - - impl From for AggregationTemporality { - fn from(temporality: Temporality) -> Self { - match temporality { - Temporality::Cumulative => AggregationTemporality::Cumulative, - Temporality::Delta => AggregationTemporality::Delta, - other => { - otel_debug!( - name: "AggregationTemporality::Unknown", - message = "Unknown temporality,using default instead.", - unknown_temporality = format!("{:?}", other), - default_temporality = format!("{:?}", Temporality::Cumulative) - ); - AggregationTemporality::Cumulative - } - } - } - } - - impl From<&ResourceMetrics> for ExportMetricsServiceRequest { - fn from(rm: &ResourceMetrics) -> Self { - ExportMetricsServiceRequest { - resource_metrics: vec![TonicResourceMetrics { - resource: Some((rm.resource()).into()), - scope_metrics: rm.scope_metrics().map(Into::into).collect(), - schema_url: rm - .resource() - .schema_url() - .map(Into::into) - .unwrap_or_default(), - }], - } - } - } - - impl From<&SdkResource> for TonicResource { - fn from(resource: &SdkResource) -> Self { - TonicResource { - attributes: resource.iter().map(Into::into).collect(), - dropped_attributes_count: 0, - entity_refs: vec![], // internal and currently unused - } - } - } - - impl From<&SdkScopeMetrics> for TonicScopeMetrics { - fn from(sm: &SdkScopeMetrics) -> Self { - TonicScopeMetrics { - scope: Some((sm.scope(), None).into()), - metrics: sm.metrics().map(Into::into).collect(), - schema_url: sm - .scope() - .schema_url() - .map(ToOwned::to_owned) - .unwrap_or_default(), - } - } - } - - impl From<&SdkMetric> for TonicMetric { - fn from(metric: &SdkMetric) -> Self { - TonicMetric { - name: metric.name().to_string(), - description: metric.description().to_string(), - unit: metric.unit().to_string(), - metadata: vec![], // internal and currently unused - data: Some(match metric.data() { - AggregatedMetrics::F64(data) => data.into(), - AggregatedMetrics::U64(data) => data.into(), - AggregatedMetrics::I64(data) => data.into(), - }), - } - } - } - - impl From<&MetricData> for TonicMetricData - where - T: Numeric + Debug, - { - fn from(data: &MetricData) -> Self { - match data { - MetricData::Gauge(gauge) => TonicMetricData::Gauge(gauge.into()), - MetricData::Sum(sum) => TonicMetricData::Sum(sum.into()), - MetricData::Histogram(hist) => TonicMetricData::Histogram(hist.into()), - MetricData::ExponentialHistogram(hist) => { - TonicMetricData::ExponentialHistogram(hist.into()) - } - } - } - } - - trait Numeric: Into + Into + Copy { - // lossy at large values for u64 and i64 but otlp histograms only handle float values - fn into_f64(self) -> f64; - } - - impl Numeric for u64 { - fn into_f64(self) -> f64 { - self as f64 - } - } - - impl Numeric for i64 { - fn into_f64(self) -> f64 { - self as f64 - } - } - - impl Numeric for f64 { - fn into_f64(self) -> f64 { - self - } - } - - impl From<&SdkHistogram> for TonicHistogram - where - T: Numeric, - { - fn from(hist: &SdkHistogram) -> Self { - TonicHistogram { - data_points: hist - .data_points() - .map(|dp| TonicHistogramDataPoint { - attributes: dp.attributes().map(Into::into).collect(), - start_time_unix_nano: to_nanos(hist.start_time()), - time_unix_nano: to_nanos(hist.time()), - count: dp.count(), - sum: Some(dp.sum().into_f64()), - bucket_counts: dp.bucket_counts().collect(), - explicit_bounds: dp.bounds().collect(), - exemplars: dp.exemplars().map(Into::into).collect(), - flags: TonicDataPointFlags::default() as u32, - min: dp.min().map(Numeric::into_f64), - max: dp.max().map(Numeric::into_f64), - }) - .collect(), - aggregation_temporality: TonicTemporality::from(hist.temporality()).into(), - } - } - } - - impl From<&SdkExponentialHistogram> for TonicExponentialHistogram - where - T: Numeric, - { - fn from(hist: &SdkExponentialHistogram) -> Self { - TonicExponentialHistogram { - data_points: hist - .data_points() - .map(|dp| TonicExponentialHistogramDataPoint { - attributes: dp.attributes().map(Into::into).collect(), - start_time_unix_nano: to_nanos(hist.start_time()), - time_unix_nano: to_nanos(hist.time()), - count: dp.count() as u64, - sum: Some(dp.sum().into_f64()), - scale: dp.scale().into(), - zero_count: dp.zero_count(), - positive: Some(TonicBuckets { - offset: dp.positive_bucket().offset(), - bucket_counts: dp.positive_bucket().counts().collect(), - }), - negative: Some(TonicBuckets { - offset: dp.negative_bucket().offset(), - bucket_counts: dp.negative_bucket().counts().collect(), - }), - flags: TonicDataPointFlags::default() as u32, - exemplars: dp.exemplars().map(Into::into).collect(), - min: dp.min().map(Numeric::into_f64), - max: dp.max().map(Numeric::into_f64), - zero_threshold: dp.zero_threshold(), - }) - .collect(), - aggregation_temporality: TonicTemporality::from(hist.temporality()).into(), - } - } - } - - impl From<&SdkSum> for TonicSum - where - T: Debug + Into + Into + Copy, - { - fn from(sum: &SdkSum) -> Self { - TonicSum { - data_points: sum - .data_points() - .map(|dp| TonicNumberDataPoint { - attributes: dp.attributes().map(Into::into).collect(), - start_time_unix_nano: to_nanos(sum.start_time()), - time_unix_nano: to_nanos(sum.time()), - exemplars: dp.exemplars().map(Into::into).collect(), - flags: TonicDataPointFlags::default() as u32, - value: Some(dp.value().into()), - }) - .collect(), - aggregation_temporality: TonicTemporality::from(sum.temporality()).into(), - is_monotonic: sum.is_monotonic(), - } - } - } - - impl From<&SdkGauge> for TonicGauge - where - T: Debug + Into + Into + Copy, - { - fn from(gauge: &SdkGauge) -> Self { - TonicGauge { - data_points: gauge - .data_points() - .map(|dp| TonicNumberDataPoint { - attributes: dp.attributes().map(Into::into).collect(), - start_time_unix_nano: gauge.start_time().map(to_nanos).unwrap_or_default(), - time_unix_nano: to_nanos(gauge.time()), - exemplars: dp.exemplars().map(Into::into).collect(), - flags: TonicDataPointFlags::default() as u32, - value: Some(dp.value().into()), - }) - .collect(), - } - } - } - - impl From<&SdkExemplar> for TonicExemplar - where - T: Into + Copy, - { - fn from(ex: &SdkExemplar) -> Self { - TonicExemplar { - filtered_attributes: ex - .filtered_attributes() - .map(|kv| (&kv.key, &kv.value).into()) - .collect(), - time_unix_nano: to_nanos(ex.time()), - span_id: ex.span_id().into(), - trace_id: ex.trace_id().into(), - value: Some(ex.value.into()), - } - } - } -} diff --git a/opentelemetry-proto/src/transform/mod.rs b/opentelemetry-proto/src/transform/mod.rs deleted file mode 100644 index f0b7b86d3d..0000000000 --- a/opentelemetry-proto/src/transform/mod.rs +++ /dev/null @@ -1,16 +0,0 @@ -pub mod common; - -#[cfg(feature = "metrics")] -pub mod metrics; - -#[cfg(feature = "trace")] -pub mod trace; - -#[cfg(feature = "logs")] -pub mod logs; - -#[cfg(feature = "zpages")] -pub mod tracez; - -#[cfg(feature = "profiles")] -pub mod profiles; diff --git a/opentelemetry-proto/src/transform/profiles.rs b/opentelemetry-proto/src/transform/profiles.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/opentelemetry-proto/src/transform/profiles.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/opentelemetry-proto/src/transform/tracez.rs b/opentelemetry-proto/src/transform/tracez.rs deleted file mode 100644 index 3147a82cbb..0000000000 --- a/opentelemetry-proto/src/transform/tracez.rs +++ /dev/null @@ -1,72 +0,0 @@ -#[cfg(all(feature = "gen-tonic-messages", feature = "zpages"))] -mod tonic { - use opentelemetry::trace::{Event, Status}; - use opentelemetry_sdk::trace::SpanData; - - use crate::proto::tonic::{ - trace::v1::{span::Event as SpanEvent, Status as SpanStatus}, - tracez::v1::{ErrorData, LatencyData, RunningData}, - }; - use crate::transform::common::{to_nanos, tonic::Attributes}; - - impl From for LatencyData { - fn from(span_data: SpanData) -> Self { - LatencyData { - traceid: span_data.span_context.trace_id().to_bytes().to_vec(), - spanid: span_data.span_context.span_id().to_bytes().to_vec(), - parentid: span_data.parent_span_id.to_bytes().to_vec(), - starttime: to_nanos(span_data.start_time), - endtime: to_nanos(span_data.end_time), - attributes: Attributes::from(span_data.attributes).0, - events: span_data.events.iter().cloned().map(Into::into).collect(), - links: span_data.links.iter().cloned().map(Into::into).collect(), - } - } - } - - impl From for ErrorData { - fn from(span_data: SpanData) -> Self { - ErrorData { - traceid: span_data.span_context.trace_id().to_bytes().to_vec(), - spanid: span_data.span_context.span_id().to_bytes().to_vec(), - parentid: span_data.parent_span_id.to_bytes().to_vec(), - starttime: to_nanos(span_data.start_time), - attributes: Attributes::from(span_data.attributes).0, - events: span_data.events.iter().cloned().map(Into::into).collect(), - links: span_data.links.iter().cloned().map(Into::into).collect(), - status: match span_data.status { - Status::Error { description } => Some(SpanStatus { - message: description.to_string(), - code: 2, - }), - _ => None, - }, - } - } - } - - impl From for RunningData { - fn from(span_data: SpanData) -> Self { - RunningData { - traceid: span_data.span_context.trace_id().to_bytes().to_vec(), - spanid: span_data.span_context.span_id().to_bytes().to_vec(), - parentid: span_data.parent_span_id.to_bytes().to_vec(), - starttime: to_nanos(span_data.start_time), - attributes: Attributes::from(span_data.attributes).0, - events: span_data.events.iter().cloned().map(Into::into).collect(), - links: span_data.links.iter().cloned().map(Into::into).collect(), - } - } - } - - impl From for SpanEvent { - fn from(event: Event) -> Self { - SpanEvent { - time_unix_nano: to_nanos(event.timestamp), - name: event.name.to_string(), - attributes: Attributes::from(event.attributes).0, - dropped_attributes_count: event.dropped_attributes_count, - } - } - } -}