Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ impl LogExporter for OtlpHttpClient {
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
self.resource =
crate::transform::common::tonic::resource_to_attributes_with_schema(resource);
}
}

Expand Down
16 changes: 7 additions & 9 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ use super::{
default_headers, parse_header_string, resolve_timeout, ExporterBuildError,
OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
};
use crate::transform::common::tonic::ResourceAttributesWithSchema;
#[cfg(feature = "logs")]
use crate::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "trace")]
use crate::transform::trace::tonic::group_spans_by_resource_and_scope;
use crate::{
exporter::ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS,
};
use http::{HeaderName, HeaderValue, Uri};
use opentelemetry::otel_debug;
use opentelemetry_http::{Bytes, HttpClient};
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "trace")]
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
#[cfg(feature = "logs")]
use opentelemetry_sdk::logs::LogBatch;
#[cfg(feature = "trace")]
Expand Down Expand Up @@ -387,7 +387,7 @@ pub(crate) struct OtlpHttpClient {
retry_policy: RetryPolicy,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
resource: crate::transform::common::tonic::ResourceAttributesWithSchema,
}

impl OtlpHttpClient {
Expand Down Expand Up @@ -671,9 +671,7 @@ impl OtlpHttpClient {
&self,
metrics: &ResourceMetrics,
) -> Option<(Vec<u8>, &'static str, Option<&'static str>)> {
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;

let req: ExportMetricsServiceRequest = metrics.into();
let req = crate::transform::metrics::tonic::resource_metrics_to_export_request(metrics);

let (body, content_type) = match self.protocol {
#[cfg(feature = "http-json")]
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ impl SpanExporter for OtlpHttpClient {
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
self.resource =
crate::transform::common::tonic::resource_to_attributes_with_schema(resource);
}
}

Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::{Arc, Mutex};
use std::time;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
use crate::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;

Expand All @@ -22,7 +22,7 @@ pub(crate) struct TonicLogsClient {
retry_policy: RetryPolicy,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
resource: crate::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -152,6 +152,7 @@ impl LogExporter for TonicLogsClient {
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
self.resource =
crate::transform::common::tonic::resource_to_attributes_with_schema(resource);
}
}
8 changes: 4 additions & 4 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use core::fmt;
use std::sync::Mutex;

use opentelemetry::{otel_debug, otel_warn};
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
};
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_client::MetricsServiceClient;
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
Expand Down Expand Up @@ -101,7 +99,9 @@ impl MetricsClient for TonicMetricsClient {
.export(Request::from_parts(
metadata,
extensions,
ExportMetricsServiceRequest::from(metrics),
crate::transform::metrics::tonic::resource_metrics_to_export_request(
metrics,
),
))
.await
.map(|response| {
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use core::fmt;
use std::sync::{Arc, Mutex};

use crate::transform::trace::tonic::group_spans_by_resource_and_scope;
use opentelemetry::{otel_debug, otel_warn};
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
use opentelemetry_sdk::error::OTelSdkError;
use opentelemetry_sdk::{
error::OTelSdkResult,
Expand All @@ -24,7 +24,7 @@ pub(crate) struct TonicTracesClient {
retry_policy: RetryPolicy,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
resource: crate::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -157,6 +157,7 @@ impl SpanExporter for TonicTracesClient {
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
self.resource =
crate::transform::common::tonic::resource_to_attributes_with_schema(resource);
}
}
1 change: 1 addition & 0 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ mod metric;
#[cfg(feature = "trace")]
#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
mod span;
mod transform;

#[cfg(any(feature = "grpc-tonic", feature = "experimental-http-retry"))]
pub mod retry_classification;
Expand Down
204 changes: 204 additions & 0 deletions opentelemetry-otlp/src/transform/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#[cfg(all(
any(feature = "trace", feature = "metrics", feature = "logs"),
any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic")
))]
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[cfg(all(
any(feature = "trace", feature = "metrics", feature = "logs"),
any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic")
))]
pub(crate) fn to_nanos(time: SystemTime) -> u64 {
time.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_nanos() as u64
}

#[cfg(all(
any(feature = "trace", feature = "metrics", feature = "logs"),
any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic")
))]
pub(crate) mod tonic {
use opentelemetry::{Array, Value};
use opentelemetry_proto::tonic::common::v1::{
any_value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
};
use std::borrow::Cow;

#[cfg(any(feature = "trace", feature = "logs"))]
#[derive(Debug, Default)]
pub(crate) struct ResourceAttributesWithSchema {
pub(crate) attributes: Attributes,
pub(crate) schema_url: Option<String>,
}

#[cfg(any(feature = "trace", feature = "logs"))]
use opentelemetry_sdk::Resource;

#[cfg(any(feature = "trace", feature = "logs"))]
pub(crate) fn resource_to_attributes_with_schema(
resource: &Resource,
) -> ResourceAttributesWithSchema {
ResourceAttributesWithSchema {
attributes: resource_attributes(resource),
schema_url: resource.schema_url().map(ToString::to_string),
}
}

#[cfg(any(feature = "trace", feature = "logs"))]
impl From<&Resource> for ResourceAttributesWithSchema {
fn from(resource: &Resource) -> Self {
resource_to_attributes_with_schema(resource)
}
}

pub(crate) fn instrumentation_scope_to_proto(
library: opentelemetry::InstrumentationScope,
target: Option<Cow<'static, str>>,
) -> InstrumentationScope {
InstrumentationScope {
name: target.map_or_else(|| library.name().to_owned(), Cow::into_owned),
version: library.version().unwrap_or_default().to_owned(),
attributes: keyvalues_to_proto(library.attributes().cloned()),
..Default::default()
}
}

pub(crate) fn instrumentation_scope_ref_to_proto(
library: &opentelemetry::InstrumentationScope,
target: Option<Cow<'static, str>>,
) -> InstrumentationScope {
InstrumentationScope {
name: target.map_or_else(|| library.name().to_owned(), Cow::into_owned),
version: library.version().unwrap_or_default().to_owned(),
attributes: keyvalues_to_proto(library.attributes().cloned()),
..Default::default()
}
}

/// Wrapper type for Vec<`KeyValue`>
#[derive(Default, Debug)]
pub(crate) struct Attributes(pub(crate) ::std::vec::Vec<KeyValue>);

pub(crate) fn keyvalues_to_proto<I: IntoIterator<Item = opentelemetry::KeyValue>>(
kvs: I,
) -> Vec<KeyValue> {
kvs.into_iter()
.map(|api_kv| KeyValue {
key: api_kv.key.as_str().to_string(),
value: Some(value_to_any_value(api_kv.value)),
key_strindex: 0,
})
.collect()
}

// Kept as a `From` impl since `Attributes` is a local type, so orphan rule is satisfied.
impl<I: IntoIterator<Item = opentelemetry::KeyValue>> From<I> for Attributes {
fn from(kvs: I) -> Self {
Attributes(keyvalues_to_proto(kvs))
}
}

#[cfg(feature = "logs")]
impl<K: Into<String>, V: Into<AnyValue>> FromIterator<(K, V)> for Attributes {
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
Attributes(
iter.into_iter()
.map(|(k, v)| KeyValue {
key: k.into(),
value: Some(v.into()),
key_strindex: 0,
})
.collect(),
)
}
}

pub(crate) fn value_to_any_value(value: Value) -> AnyValue {
AnyValue {
value: match value {
Value::Bool(val) => Some(any_value::Value::BoolValue(val)),
Value::I64(val) => Some(any_value::Value::IntValue(val)),
Value::F64(val) => Some(any_value::Value::DoubleValue(val)),
Value::String(val) => Some(any_value::Value::StringValue(val.to_string())),
Value::Array(array) => Some(any_value::Value::ArrayValue(match array {
Array::Bool(vals) => array_into_proto(vals),
Array::I64(vals) => array_into_proto(vals),
Array::F64(vals) => array_into_proto(vals),
Array::String(vals) => array_into_proto(vals),
_ => unreachable!("Nonexistent array type"),
})),
_ => unreachable!("Nonexistent value type"),
},
}
}

fn array_into_proto<T>(vals: Vec<T>) -> ArrayValue
where
Value: From<T>,
{
let values = vals
.into_iter()
.map(|val| value_to_any_value(Value::from(val)))
.collect();

ArrayValue { values }
}

#[cfg(any(feature = "trace", feature = "logs"))]
pub(crate) fn resource_attributes(resource: &Resource) -> Attributes {
Attributes(keyvalues_to_proto(
resource
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())),
))
}

#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::KeyValue;

fn assert_scope_fields(
proto_scope: &InstrumentationScope,
expected_name: &str,
expected_version: &str,
expected_attr_key: &str,
) {
assert_eq!(proto_scope.name, expected_name);
assert_eq!(proto_scope.version, expected_version);
assert_eq!(proto_scope.attributes.len(), 1);
assert_eq!(proto_scope.attributes[0].key, expected_attr_key);
}

#[test]
fn instrumentation_scope_with_target_overrides_name_but_preserves_version_and_attributes() {
let scope = opentelemetry::InstrumentationScope::builder("my-lib")
.with_version("1.0.0")
.with_attributes([KeyValue::new("feature", "metrics")])
.build();
let target: Option<Cow<'static, str>> = Some(Cow::Borrowed("my_app::handlers"));

let from_owned = instrumentation_scope_to_proto(scope.clone(), target.clone());
let from_ref = instrumentation_scope_ref_to_proto(&scope, target);

assert_scope_fields(&from_owned, "my_app::handlers", "1.0.0", "feature");
assert_scope_fields(&from_ref, "my_app::handlers", "1.0.0", "feature");
}

#[test]
fn instrumentation_scope_without_target_preserves_all_fields() {
let scope = opentelemetry::InstrumentationScope::builder("my-lib")
.with_version("1.0.0")
.with_attributes([KeyValue::new("feature", "metrics")])
.build();
let target: Option<Cow<'static, str>> = None;

let from_owned = instrumentation_scope_to_proto(scope.clone(), target.clone());
let from_ref = instrumentation_scope_ref_to_proto(&scope, target);

assert_scope_fields(&from_owned, "my-lib", "1.0.0", "feature");
assert_scope_fields(&from_ref, "my-lib", "1.0.0", "feature");
}
}
}
Loading
Loading