Skip to content

Commit 7a6dd6f

Browse files
authored
feat(o11y): implement trace context propagation for HTTP and gRPC (#5018)
Implement W3C Trace Context propagation (`traceparent` and `tracestate`) across both `reqwest` and `tonic` transports. Bump `google-cloud-gax-internal` version from 0.7.10 to 0.7.11.
1 parent e3b0626 commit 7a6dd6f

7 files changed

Lines changed: 121 additions & 10 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ tokio-test = { default-features = false, version = "0.4" }
458458
# Local packages used as dependencies.
459459
google-cloud-auth = { default-features = false, version = "1.7.0", path = "src/auth" }
460460
google-cloud-gax = { default-features = false, version = "1.8.0", path = "src/gax" }
461-
gaxi = { default-features = false, version = "0.7.10", path = "src/gax-internal", package = "google-cloud-gax-internal" }
461+
gaxi = { default-features = false, version = "0.7.11", path = "src/gax-internal", package = "google-cloud-gax-internal" }
462462
wkt = { default-features = false, version = "1", path = "src/wkt", package = "google-cloud-wkt" }
463463
google-cloud-wkt = { default-features = false, version = "1", path = "src/wkt", package = "google-cloud-wkt" }
464464
google-cloud-api = { default-features = false, version = "1.3.0", path = "src/generated/api/types" }

src/gax-internal/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
[package]
1616
name = "google-cloud-gax-internal"
17-
version = "0.7.10"
17+
version = "0.7.11"
1818
description = "Google Cloud Client Libraries for Rust - Implementation Details"
1919
build = "build.rs"
2020
# Inherit other attributes from the workspace.

src/gax-internal/src/http.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,13 @@ impl ReqwestClient {
275275
.record_http(&span, attempt_info.attempt_count, method, url)
276276
}
277277

278-
async fn execute_http_inner(&self, request: reqwest::Request) -> Result<reqwest::Response> {
278+
#[cfg_attr(not(google_cloud_unstable_tracing), allow(unused_mut))]
279+
async fn execute_http_inner(&self, mut request: reqwest::Request) -> Result<reqwest::Response> {
280+
#[cfg(google_cloud_unstable_tracing)]
281+
crate::observability::propagation::inject_context(
282+
&tracing::Span::current(),
283+
request.headers_mut(),
284+
);
279285
self.inner.execute(request).await.map_err(map_send_error)
280286
}
281287

@@ -413,7 +419,16 @@ impl ReqwestClient {
413419
.record_http(&span, attempt_count, method, url)
414420
}
415421

416-
async fn request_attempt_inner(&self, request: reqwest::Request) -> Result<reqwest::Response> {
422+
#[cfg_attr(not(google_cloud_unstable_tracing), allow(unused_mut))]
423+
async fn request_attempt_inner(
424+
&self,
425+
mut request: reqwest::Request,
426+
) -> Result<reqwest::Response> {
427+
#[cfg(google_cloud_unstable_tracing)]
428+
crate::observability::propagation::inject_context(
429+
&tracing::Span::current(),
430+
request.headers_mut(),
431+
);
417432
let response = self.inner.execute(request).await.map_err(map_send_error)?;
418433
if !response.status().is_success() {
419434
return self::to_http_error(response).await;

src/gax-internal/src/observability/grpc_tracing.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,11 @@ where
183183
self.inner.poll_ready(cx)
184184
}
185185

186-
fn call(&mut self, req: http::Request<B>) -> Self::Future {
186+
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
187187
let attempt_count = req.extensions().get::<AttemptCount>().map(|a| a.as_i64());
188188
let resource_name = req.extensions().get::<ResourceName>().map(|r| r.as_str());
189189
let span = create_grpc_span(req.uri(), &self.layer.inner, attempt_count, resource_name);
190+
crate::observability::propagation::inject_context(&span, req.headers_mut());
190191
let future = self.inner.call(req);
191192
ResponseFuture {
192193
inner: future,
@@ -220,9 +221,9 @@ impl<S> NoTracingTowerService<S> {
220221
}
221222
}
222223

223-
impl<S, Req, ResBody> Service<Req> for NoTracingTowerService<S>
224+
impl<S, B, ResBody> Service<http::Request<B>> for NoTracingTowerService<S>
224225
where
225-
S: Service<Req, Response = http::Response<ResBody>>,
226+
S: Service<http::Request<B>, Response = http::Response<ResBody>>,
226227
S::Future: Send + 'static,
227228
ResBody: http_body::Body<Data = bytes::Bytes, Error = tonic::Status> + Send + 'static,
228229
{
@@ -238,7 +239,11 @@ where
238239
self.inner.poll_ready(cx)
239240
}
240241

241-
fn call(&mut self, req: Req) -> Self::Future {
242+
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
243+
crate::observability::propagation::inject_context(
244+
&tracing::Span::current(),
245+
req.headers_mut(),
246+
);
242247
NoTracingFuture {
243248
inner: self.inner.call(req),
244249
_phantom: std::marker::PhantomData,

src/gax-internal/tests/grpc_observability.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,4 +696,57 @@ mod tests {
696696

697697
Ok(())
698698
}
699+
700+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
701+
async fn propagate_trace_context() -> anyhow::Result<()> {
702+
let (endpoint, _server) = start_echo_server().await?;
703+
704+
let mut config = google_cloud_gax_internal::options::ClientConfig::default();
705+
config.tracing = true;
706+
config.cred = Some(test_credentials());
707+
let client = grpc::Client::new(config, &endpoint).await?;
708+
709+
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder().build();
710+
let tracer = opentelemetry::trace::TracerProvider::tracer(&tracer_provider, "test");
711+
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
712+
use tracing_subscriber::layer::SubscriberExt;
713+
let subscriber = tracing_subscriber::registry().with(telemetry);
714+
let _guard = tracing::subscriber::set_default(subscriber);
715+
716+
let extensions = {
717+
let mut e = tonic::Extensions::new();
718+
e.insert(tonic::GrpcMethod::new(
719+
"google.test.v1.EchoServices",
720+
"Echo",
721+
));
722+
e
723+
};
724+
let request = google::test::v1::EchoRequest {
725+
message: "test message".into(),
726+
..Default::default()
727+
};
728+
729+
use tracing::Instrument;
730+
let span = tracing::info_span!("parent_span");
731+
let response = client
732+
.execute::<_, google::test::v1::EchoResponse>(
733+
extensions,
734+
http::uri::PathAndQuery::from_static("/google.test.v1.EchoService/Echo"),
735+
request,
736+
RequestOptions::default(),
737+
"test-client",
738+
"",
739+
)
740+
.instrument(span)
741+
.await?;
742+
743+
let inner = response.into_inner();
744+
assert!(
745+
inner.metadata.contains_key("traceparent"),
746+
"Metadata should contain traceparent. Metadata: {:?}",
747+
inner.metadata
748+
);
749+
750+
Ok(())
751+
}
699752
}

src/gax-internal/tests/http_observability.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ mod tests {
2626
use google_cloud_gax_internal::options::{ClientConfig, InstrumentationClientInfo};
2727
use google_cloud_test_utils::test_layer::{AttributeValue, TestLayer};
2828
use http::{Method, StatusCode};
29-
use httptest::matchers::request::{body, method, path};
29+
use httptest::matchers::request::{body, headers, method, path};
3030
use httptest::{Expectation, Server, all_of, responders::*};
3131
use opentelemetry_semantic_conventions::{attribute as otel_attr, trace as otel_trace};
3232
use serde::Deserialize;
@@ -524,4 +524,42 @@ mod tests {
524524

525525
assert_eq!(got, want);
526526
}
527+
528+
#[tokio::test]
529+
async fn propagate_trace_context() {
530+
let server = Server::run();
531+
let server_addr = server.addr();
532+
let server_url = format!("http://{}", server_addr);
533+
server.expect(
534+
Expectation::matching(all_of![
535+
method("GET"),
536+
path("/test"),
537+
headers(httptest::matchers::contains((
538+
"traceparent",
539+
httptest::matchers::any()
540+
))),
541+
])
542+
.respond_with(status_code(200).body("{\"hello\": \"world\"}")),
543+
);
544+
545+
let client = create_client(true, server_url.clone()).await;
546+
547+
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder().build();
548+
let tracer = opentelemetry::trace::TracerProvider::tracer(&tracer_provider, "test");
549+
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
550+
use tracing_subscriber::layer::SubscriberExt;
551+
let subscriber = tracing_subscriber::registry().with(telemetry);
552+
let _guard = tracing::subscriber::set_default(subscriber);
553+
554+
let options = RequestOptions::default().insert_extension(PathTemplate("/test"));
555+
let request = client.builder(Method::GET, "/test".to_string());
556+
557+
let span = tracing::info_span!("parent_span");
558+
let result: Result<Response<TestResponse>> = client
559+
.execute(request, None::<NoBody>, options)
560+
.instrument(span)
561+
.await;
562+
563+
assert!(result.is_ok(), "{result:?}");
564+
}
527565
}

0 commit comments

Comments
 (0)