diff --git a/Cargo.lock b/Cargo.lock index 3f294cab6f..fd6bc5ab83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6061,6 +6061,7 @@ dependencies = [ "google-cloud-lro", "google-cloud-monitoring-v3", "google-cloud-showcase-v1beta1", + "google-cloud-spanner", "google-cloud-storage", "google-cloud-test-utils", "google-cloud-trace-v1", diff --git a/src/gax-internal/src/observability/grpc_tracing.rs b/src/gax-internal/src/observability/grpc_tracing.rs index f9727936e9..acec9d827c 100644 --- a/src/gax-internal/src/observability/grpc_tracing.rs +++ b/src/gax-internal/src/observability/grpc_tracing.rs @@ -188,6 +188,21 @@ where let resource_name = req.extensions().get::().map(|r| r.as_str()); let span = create_grpc_span(req.uri(), &self.layer.inner, attempt_count, resource_name); crate::observability::propagation::inject_context(&span, req.headers_mut()); + + #[cfg(google_cloud_unstable_tracing)] + if let Ok(headers) = std::env::var("GOOGLE_CLOUD_TEST_EXTRA_HEADERS") { + for header in headers.split(',') { + if let Some((k, v)) = header.split_once('=') { + if let (Ok(name), Ok(val)) = ( + http::header::HeaderName::from_bytes(k.trim().as_bytes()), + http::header::HeaderValue::from_str(v.trim()), + ) { + req.headers_mut().insert(name, val); + } + } + } + } + let future = self.inner.call(req); ResponseFuture { inner: future, @@ -244,6 +259,21 @@ where &tracing::Span::current(), req.headers_mut(), ); + + #[cfg(google_cloud_unstable_tracing)] + if let Ok(headers) = std::env::var("GOOGLE_CLOUD_TEST_EXTRA_HEADERS") { + for header in headers.split(',') { + if let Some((k, v)) = header.split_once('=') { + if let (Ok(name), Ok(val)) = ( + http::header::HeaderName::from_bytes(k.trim().as_bytes()), + http::header::HeaderValue::from_str(v.trim()), + ) { + req.headers_mut().insert(name, val); + } + } + } + } + NoTracingFuture { inner: self.inner.call(req), _phantom: std::marker::PhantomData, diff --git a/src/spanner/src/client.rs b/src/spanner/src/client.rs index abc5c36b63..6f0a877f8c 100644 --- a/src/spanner/src/client.rs +++ b/src/spanner/src/client.rs @@ -140,7 +140,7 @@ impl Spanner { } } - pub(crate) async fn create_session( + pub async fn create_session( &self, request: crate::model::CreateSessionRequest, options: crate::RequestOptions, @@ -153,7 +153,7 @@ impl Spanner { .await } - pub(crate) async fn execute_sql( + pub async fn execute_sql( &self, request: crate::model::ExecuteSqlRequest, options: crate::RequestOptions, @@ -179,7 +179,7 @@ impl Spanner { .await } - pub(crate) async fn read( + pub async fn read( &self, request: crate::model::ReadRequest, options: crate::RequestOptions, @@ -192,7 +192,7 @@ impl Spanner { .await } - pub(crate) async fn begin_transaction( + pub async fn begin_transaction( &self, request: crate::model::BeginTransactionRequest, options: crate::RequestOptions, @@ -205,7 +205,7 @@ impl Spanner { .await } - pub(crate) async fn commit( + pub async fn commit( &self, request: crate::model::CommitRequest, options: crate::RequestOptions, @@ -218,7 +218,7 @@ impl Spanner { .await } - pub(crate) async fn rollback( + pub async fn rollback( &self, request: crate::model::RollbackRequest, options: crate::RequestOptions, @@ -235,7 +235,7 @@ impl Spanner { /// /// This is a custom streaming implementation over the underlying Spanner gRPC /// transport, since streaming responses are not yet auto-generated here. - pub(crate) fn execute_streaming_sql( + pub fn execute_streaming_sql( &self, request: crate::model::ExecuteSqlRequest, options: crate::RequestOptions, @@ -253,7 +253,7 @@ impl Spanner { /// /// This is a custom streaming implementation over the underlying Spanner gRPC /// transport, since streaming responses are not yet auto-generated here. - pub(crate) fn streaming_read( + pub fn streaming_read( &self, request: crate::model::ReadRequest, options: crate::RequestOptions, diff --git a/src/spanner/src/database_client.rs b/src/spanner/src/database_client.rs index 548ab75d30..7ce15ca749 100644 --- a/src/spanner/src/database_client.rs +++ b/src/spanner/src/database_client.rs @@ -44,9 +44,9 @@ use std::sync::Arc; #[derive(Clone, Debug)] pub struct DatabaseClient { #[allow(dead_code)] - pub(crate) spanner: Spanner, + pub spanner: Spanner, #[allow(dead_code)] - pub(crate) session: Arc, + pub session: Arc, } impl DatabaseClient { diff --git a/src/spanner/src/lib.rs b/src/spanner/src/lib.rs index 63d31a1c33..25dedf97f7 100644 --- a/src/spanner/src/lib.rs +++ b/src/spanner/src/lib.rs @@ -35,7 +35,7 @@ pub mod builder { pub use crate::database_client::DatabaseClientBuilder; } pub(crate) mod database_client; -pub(crate) mod model { +pub mod model { pub use crate::generated::gapic_dataplane::model::*; } pub(crate) mod from_value; diff --git a/src/spanner/src/server_streaming/builder.rs b/src/spanner/src/server_streaming/builder.rs index 692a055bd1..f1eedb591e 100644 --- a/src/spanner/src/server_streaming/builder.rs +++ b/src/spanner/src/server_streaming/builder.rs @@ -30,7 +30,7 @@ use prost::Message; /// The request builder for [SpannerImpl::execute_streaming_sql][crate::client::SpannerImpl::execute_streaming_sql] calls. #[derive(Clone, Debug)] -pub(crate) struct ExecuteStreamingSql { +pub struct ExecuteStreamingSql { grpc_client: gaxi::grpc::Client, request: ExecuteSqlRequest, options: crate::RequestOptions, @@ -82,7 +82,7 @@ impl crate::RequestBuilder for ExecuteStreamingSql { /// The request builder for [SpannerImpl::streaming_read][crate::client::SpannerImpl::streaming_read] calls. #[derive(Clone, Debug)] -pub(crate) struct StreamingRead { +pub struct StreamingRead { grpc_client: gaxi::grpc::Client, request: ReadRequest, options: crate::RequestOptions, diff --git a/tests/o11y/Cargo.toml b/tests/o11y/Cargo.toml index 55aaa2a40b..2ec9997293 100644 --- a/tests/o11y/Cargo.toml +++ b/tests/o11y/Cargo.toml @@ -37,6 +37,7 @@ google-cloud-lro.workspace = true google-cloud-test-utils = { workspace = true } google-cloud-showcase-v1beta1 = { workspace = true, features = ["default"] } google-cloud-storage = { workspace = true, features = ["default"] } +google-cloud-spanner = { workspace = true } google-cloud-wkt = { workspace = true } storage-samples = { workspace = true } google-cloud-trace-v1 = { workspace = true, features = ["default"] } diff --git a/tests/o11y/src/lib.rs b/tests/o11y/src/lib.rs index 9509766342..4203086911 100644 --- a/tests/o11y/src/lib.rs +++ b/tests/o11y/src/lib.rs @@ -28,6 +28,8 @@ pub mod mock_collector; #[cfg(google_cloud_unstable_tracing)] pub mod otlp; #[cfg(google_cloud_unstable_tracing)] +pub mod spanner_tracing; +#[cfg(google_cloud_unstable_tracing)] pub mod storage_tracing; #[cfg(google_cloud_unstable_tracing)] pub mod tracing; diff --git a/tests/o11y/src/spanner_tracing.rs b/tests/o11y/src/spanner_tracing.rs new file mode 100644 index 0000000000..07bd169a62 --- /dev/null +++ b/tests/o11y/src/spanner_tracing.rs @@ -0,0 +1,138 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::e2e::wait_for_trace; +use google_cloud_spanner::client::Spanner; +use google_cloud_test_utils::runtime_config::project_id; +use opentelemetry::trace::TraceContextExt; +use std::collections::BTreeSet; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +const ROOT_SPAN_NAME: &str = "e2e-spanner-test"; + +pub async fn spanner_e2e_tracing() -> anyhow::Result<()> { + let project_id = project_id()?; + // Create a trace with a number of interesting spans from the + // `google-cloud-spanner` client. + let trace_id = send_trace(&project_id).await?; + let required = BTreeSet::from_iter([ + ROOT_SPAN_NAME, + "google.spanner.v1.Spanner/CreateSession", + "google.spanner.v1.Spanner/BeginTransaction", + "Spanner.CreateSession", + "Spanner.BeginTransaction", + ]); + let trace = wait_for_trace(&project_id, &trace_id, &required).await?; + + println!("TRACE SPANS DUMP:"); + for span in &trace.spans { + println!("Span: {:?}", span); + } + + // Verify the expected spans appear in the trace: + let span_names = trace + .spans + .iter() + .map(|s| s.name.as_str()) + .collect::>(); + let missing = required.difference(&span_names).collect::>(); + assert!(missing.is_empty(), "missing={missing:?}\n\n{trace:?}"); + + Ok(()) +} + +async fn send_trace(project_id: &str) -> anyhow::Result { + // 1. Setup Telemetry (Google Cloud Destination) + let creds = google_cloud_auth::credentials::Builder::default().build()?; + let (provider, _, _) = crate::e2e::set_up_providers( + project_id, + "e2e-telemetry-test", + "spanner-test".to_string(), + creds, + ) + .await?; + + // 2. Generate Trace + // Start a root span + let root_span = tracing::info_span!("e2e_root", { "otel.name" } = ROOT_SPAN_NAME); + let trace_id = root_span + .context() + .span() + .span_context() + .trace_id() + .to_string(); + + use tracing::Instrument; + let _ = client_library_operations(project_id) + .instrument(root_span) + .await; + + println!( + "View generated trace in Console: https://console.cloud.google.com/traces/explorer;traceId={}?project={}", + trace_id, project_id + ); + + // 4. Force flush to ensure spans are sent. + if let Err(e) = provider.force_flush() { + tracing::error!("error flushing provider: {e:}"); + } + Ok(trace_id) +} + +async fn client_library_operations(project: &str) -> anyhow::Result<()> { + // Explicitly opt-in to E2E tracing headers for the test + unsafe { + std::env::set_var( + "GOOGLE_CLOUD_TEST_EXTRA_HEADERS", + "x-goog-spanner-end-to-end-tracing=true", + ); + } + let instance = std::env::var("GOOGLE_CLOUD_SPANNER_TEST_INSTANCE") + .unwrap_or_else(|_| "trace-propagation-test-instance".to_string()); + let db_id = std::env::var("GOOGLE_CLOUD_SPANNER_TEST_DATABASE") + .unwrap_or_else(|_| "test-database".to_string()); + + let db_path = format!( + "projects/{}/instances/{}/databases/{}", + project, instance, db_id + ); + + use google_cloud_auth::credentials::Builder as CredentialsBuilder; + let creds = CredentialsBuilder::default().build()?; + let spanner_client = Spanner::builder() + .with_credentials(creds.clone()) + .with_tracing() + .build() + .await?; + + // Calling `build()` on the database client triggers a `CreateSession` RPC + let db_client = spanner_client.database_client(db_path).build().await?; + + use google_cloud_spanner::model::{ + BeginTransactionRequest, TransactionOptions, transaction_options, + }; + let mut req = BeginTransactionRequest::default(); + req.session = db_client.session.name.clone(); + + let mut options = TransactionOptions::default(); + options.mode = Some(transaction_options::Mode::ReadOnly(Box::default())); + req.options = Some(options); + + let _ = db_client + .spanner + .begin_transaction(req, google_cloud_gax::options::RequestOptions::default()) + .await; + + Ok(()) +} diff --git a/tests/o11y/tests/spanner_tracing.rs b/tests/o11y/tests/spanner_tracing.rs new file mode 100644 index 0000000000..3a08704e1e --- /dev/null +++ b/tests/o11y/tests/spanner_tracing.rs @@ -0,0 +1,25 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(all(google_cloud_unstable_tracing, feature = "run-integration-tests"))] +mod spanner_tracing { + use google_cloud_test_utils::errors::anydump; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn spanner_e2e_tracing() -> anyhow::Result<()> { + integration_tests_o11y::spanner_tracing::spanner_e2e_tracing() + .await + .inspect_err(anydump) + } +}