diff --git a/Cargo.lock b/Cargo.lock index d76dd4a99..0b38f441b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6524,6 +6524,7 @@ dependencies = [ "rustls 0.23.31", "serde", "thiserror 1.0.69", + "tokio", "tower-http", "tracing", "tracing-log 0.1.4", diff --git a/crates/cli/src/options.rs b/crates/cli/src/options.rs index ee5b33885..10d2db095 100644 --- a/crates/cli/src/options.rs +++ b/crates/cli/src/options.rs @@ -628,7 +628,7 @@ impl TracerOptions { /// Get the tracer configuration based on the options pub fn config(&self) -> Option { if self.tracer_gcloud { - Some(TracerConfig::Gcloud(gcloud::GcloudConfig { + Some(TracerConfig::GCloud(gcloud::GcloudConfig { project_id: self.gcloud_project_id.clone(), })) } else if self.tracer_otlp { diff --git a/crates/tracing/Cargo.toml b/crates/tracing/Cargo.toml index d28bd662f..1b6f157d2 100644 --- a/crates/tracing/Cargo.toml +++ b/crates/tracing/Cargo.toml @@ -29,3 +29,6 @@ tracing-opentelemetry.workspace = true bytes.workspace = true chrono.workspace = true http-body-util = "0.1.3" + +[dev-dependencies] +tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] } diff --git a/crates/tracing/src/builder.rs b/crates/tracing/src/builder.rs new file mode 100644 index 000000000..a63fb5d68 --- /dev/null +++ b/crates/tracing/src/builder.rs @@ -0,0 +1,125 @@ +use std::fmt::Debug; + +use tracing_subscriber::fmt::format::{DefaultFields, Format, Full, Json, JsonFields}; +use tracing_subscriber::layer::{Layered, SubscriberExt}; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{fmt, EnvFilter, Registry}; + +use crate::fmt::{FmtLayer, LocalTime}; +use crate::{Error, LogFormat, TelemetryTracer}; + +const DEFAULT_LOG_FILTER: &str = "katana_db::mdbx=trace,cairo_native::compiler=off,pipeline=debug,\ + stage=debug,tasks=debug,executor=trace,forking::backend=trace,\ + blockifier=off,jsonrpsee_server=off,hyper=off,messaging=debug,\ + node=error,explorer=info,rpc=trace,pool=trace,info"; + +pub type NoopTracer = opentelemetry::trace::noop::NoopTracer; + +#[derive(Debug)] +pub struct TracingBuilder { + filter: Option, + log_format: LogFormat, + tracer: Telemetry, +} + +impl TracingBuilder { + /// Create a new tracing builder + pub fn new() -> Self { + Self { filter: None, log_format: LogFormat::Full, tracer: NoopTracer::new() } + } +} + +impl TracingBuilder { + pub fn with_telemetry(self, tracer: T) -> TracingBuilder { + TracingBuilder { filter: self.filter, log_format: self.log_format, tracer } + } +} + +impl TracingBuilder { + /// Set the log format to JSON + pub fn json(self) -> TracingBuilder { + TracingBuilder { log_format: LogFormat::Json, tracer: self.tracer, filter: self.filter } + } + + /// Set a custom filter from a string + pub fn with_filter(mut self, filter: &str) -> Result { + self.filter = Some(EnvFilter::try_new(filter)?); + Ok(self) + } + + /// Use the default filter + pub fn with_default_filter(mut self) -> Result { + self.filter = Some(EnvFilter::try_new(DEFAULT_LOG_FILTER)?); + Ok(self) + } + + /// Use filter from environment variable (RUST_LOG) + pub fn with_env_filter(mut self) -> Result { + self.filter = Some(EnvFilter::try_from_default_env()?); + Ok(self) + } + + /// Use filter from environment with fallback to default + pub fn with_env_filter_or_default(mut self) -> Result { + let default_filter = EnvFilter::try_new(DEFAULT_LOG_FILTER); + self.filter = Some(EnvFilter::try_from_default_env().or(default_filter)?); + Ok(self) + } +} + +impl TracingBuilder { + /// Try to initialize the tracing subscriber without telemetry + pub fn build(self) -> Result, Error> { + let filter = self.filter.unwrap_or_else(|| { + EnvFilter::try_new(DEFAULT_LOG_FILTER).expect("default filter should be valid") + }); + + let base_layer = fmt::layer().with_timer(LocalTime::new()); + + let fmt_layer = match self.log_format { + LogFormat::Full => FmtLayer::Full(base_layer), + LogFormat::Json => FmtLayer::Json(base_layer.json()), + }; + + Ok(TracingSubscriber { + tracer: self.tracer, + subscriber: tracing_subscriber::registry().with(filter).with(fmt_layer), + }) + } +} + +impl Default for TracingBuilder { + fn default() -> Self { + Self::new() + } +} + +/// The base subscribe type created by [`TracingBuilder`] and used by [`TracingSubscriber`]. +type BaseSubscriber = Layered< + FmtLayer< + fmt::Layer, DefaultFields, Format>, + fmt::Layer, JsonFields, Format>, + >, + Layered, +>; + +pub struct TracingSubscriber { + subscriber: BaseSubscriber, + tracer: Telemetry, +} + +impl TracingSubscriber { + pub fn init(self) { + self.tracer.init().unwrap(); + self.subscriber.with(tracing_opentelemetry::layer().with_tracer(self.tracer)).init(); + } +} + +impl Debug for TracingSubscriber { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TracingSubscriber") + .field("subscriber", &"..") + .field("tracer", &self.tracer) + .finish() + } +} diff --git a/crates/tracing/src/fmt.rs b/crates/tracing/src/fmt.rs index da1a3a7f8..fe00c17cd 100644 --- a/crates/tracing/src/fmt.rs +++ b/crates/tracing/src/fmt.rs @@ -3,6 +3,7 @@ use std::fmt::Display; use serde::{Deserialize, Serialize}; use tracing_subscriber::fmt::format::Writer; use tracing_subscriber::fmt::time::{self}; +use tracing_subscriber::Layer; /// Format for logging output. #[derive(Debug, Copy, Clone, PartialEq, Deserialize, Serialize, Default)] @@ -66,3 +67,23 @@ impl time::FormatTime for LocalTime { write!(w, "{}", time.format(DEFAULT_TIMESTAMP_FORMAT)) } } + +// Use an enum to preserve type information instead of Box +pub enum FmtLayer { + Full(F), + Json(J), +} + +impl Layer for FmtLayer +where + S: tracing::Subscriber, + F: Layer, + J: Layer, +{ + fn on_layer(&mut self, subscriber: &mut S) { + match self { + FmtLayer::Full(layer) => layer.on_layer(subscriber), + FmtLayer::Json(layer) => layer.on_layer(subscriber), + } + } +} diff --git a/crates/tracing/src/gcloud.rs b/crates/tracing/src/gcloud.rs index eb93a3aa2..5281d9424 100644 --- a/crates/tracing/src/gcloud.rs +++ b/crates/tracing/src/gcloud.rs @@ -1,19 +1,20 @@ use http::Request; +use opentelemetry::trace::Tracer; use opentelemetry_gcloud_trace::{GcpCloudTraceExporterBuilder, SdkTracer}; use opentelemetry_http::HeaderExtractor; +use opentelemetry_sdk::trace::SdkTracerProvider; use opentelemetry_sdk::Resource; use opentelemetry_stackdriver::google_trace_context_propagator::GoogleTraceContextPropagator; use tower_http::trace::MakeSpan; -use tracing::Span; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing_opentelemetry::{OpenTelemetrySpanExt, PreSampledTracer}; -use crate::Error; +use crate::{Error, TelemetryTracer}; #[derive(Debug, Clone, Default)] pub struct GoogleStackDriverMakeSpan; impl MakeSpan for GoogleStackDriverMakeSpan { - fn make_span(&mut self, request: &Request) -> Span { + fn make_span(&mut self, request: &Request) -> tracing::Span { // Extract trace context from HTTP headers let cx = opentelemetry::global::get_text_map_propagator(|propagator| { propagator.extract(&HeaderExtractor(request.headers())) @@ -36,37 +37,140 @@ pub struct GcloudConfig { pub project_id: Option, } -/// Initialize Google Cloud Trace exporter and OpenTelemetry propagators for Google Cloud trace -/// context support. -/// -/// Make sure to set `GOOGLE_APPLICATION_CREDENTIALS` env var to authenticate to gcloud -pub(crate) async fn init_tracer(gcloud_config: &GcloudConfig) -> Result { - rustls::crypto::ring::default_provider() - .install_default() - .map_err(|_| Error::InstallCryptoFailed)?; - - let resource = Resource::builder().with_service_name("katana").build(); - - let mut trace_exporter = if let Some(project_id) = &gcloud_config.project_id { - GcpCloudTraceExporterBuilder::new(project_id.clone()) - } else { - // Default will attempt to find project ID from environment variables in the following - // order: - // - GCP_PROJECT - // - PROJECT_ID - // - GCP_PROJECT_ID - GcpCloudTraceExporterBuilder::for_default_project_id().await? - }; - - trace_exporter = trace_exporter.with_resource(resource); - - let tracer_provider = trace_exporter.create_provider().await?; - let tracer = trace_exporter.install(&tracer_provider).await?; - - // Set the Google Cloud trace context propagator globally - // This will handle both extraction and injection of X-Cloud-Trace-Context headers - opentelemetry::global::set_text_map_propagator(GoogleTraceContextPropagator::default()); - opentelemetry::global::set_tracer_provider(tracer_provider.clone()); - - Ok(tracer) +/// Builder for creating an OpenTelemetry layer with Google Cloud Trace exporter +#[derive(Debug, Clone)] +pub struct GCloudTracerBuilder { + service_name: String, + project_id: Option, + resource: Option, +} + +///////////////////////////////////////////////////////////////////////////////// +// GCloudTracerBuilder implementations +///////////////////////////////////////////////////////////////////////////////// + +impl GCloudTracerBuilder { + /// Create a new Google Cloud tracing builder + pub fn new() -> Self { + Self { service_name: "katana".to_string(), project_id: None, resource: None } + } + + /// Set the service name + pub fn service_name(mut self, name: impl Into) -> Self { + self.service_name = name.into(); + self + } + + /// Set the Google Cloud project ID + pub fn project_id(mut self, project_id: impl Into) -> Self { + self.project_id = Some(project_id.into()); + self + } + + /// Set a custom resource + pub fn resource(mut self, resource: Resource) -> Self { + self.resource = Some(resource); + self + } + + /// Build the OpenTelemetry layer (async because GCloud SDK requires it) + pub async fn build(self) -> Result { + // Install crypto provider + rustls::crypto::ring::default_provider() + .install_default() + .map_err(|_| Error::InstallCryptoFailed)?; + + // Build resource with service name + let resource = self.resource.unwrap_or_else(|| { + Resource::builder().with_service_name(self.service_name.clone()).build() + }); + + // Create trace exporter + let mut trace_exporter = if let Some(project_id) = self.project_id { + GcpCloudTraceExporterBuilder::new(project_id) + } else { + // Default will attempt to find project ID from environment variables in the following + // order: + // - GCP_PROJECT + // - PROJECT_ID + // - GCP_PROJECT_ID + GcpCloudTraceExporterBuilder::for_default_project_id().await? + }; + + trace_exporter = trace_exporter.with_resource(resource); + + // Create provider and install + let tracer_provider = trace_exporter.create_provider().await?; + let tracer = trace_exporter.install(&tracer_provider).await?; + + Ok(GCloudTracer { tracer, tracer_provider }) + } +} + +impl Default for GCloudTracerBuilder { + fn default() -> Self { + Self::new() + } } + +/// Wrapper type for SdkTracer that implements the Tracer trait +#[derive(Debug, Clone)] +pub struct GCloudTracer { + tracer: SdkTracer, + tracer_provider: SdkTracerProvider, +} + +///////////////////////////////////////////////////////////////////////////////// +// GCloudTracer implementations +///////////////////////////////////////////////////////////////////////////////// + +impl GCloudTracer { + pub fn builder() -> GCloudTracerBuilder { + GCloudTracerBuilder::new() + } +} + +impl Tracer for GCloudTracer { + type Span = ::Span; + + #[inline] + fn build_with_context( + &self, + builder: opentelemetry::trace::SpanBuilder, + parent_cx: &opentelemetry::Context, + ) -> Self::Span { + self.tracer.build_with_context(builder, parent_cx) + } +} + +impl PreSampledTracer for GCloudTracer { + #[inline] + fn new_span_id(&self) -> opentelemetry::trace::SpanId { + self.tracer.new_span_id() + } + + #[inline] + fn new_trace_id(&self) -> opentelemetry::trace::TraceId { + self.tracer.new_trace_id() + } + + #[inline] + fn sampled_context( + &self, + data: &mut tracing_opentelemetry::OtelData, + ) -> opentelemetry::Context { + self.tracer.sampled_context(data) + } +} + +impl TelemetryTracer for GCloudTracer { + fn init(&self) -> Result<(), Error> { + // Set the Google Cloud trace context propagator globally + // This will handle both extraction and injection of X-Cloud-Trace-Context headers + opentelemetry::global::set_text_map_propagator(GoogleTraceContextPropagator::default()); + opentelemetry::global::set_tracer_provider(self.tracer_provider.clone()); + Ok(()) + } +} + +impl crate::__private::Sealed for GCloudTracer {} diff --git a/crates/tracing/src/lib.rs b/crates/tracing/src/lib.rs index 3ce83f193..7929e8c18 100644 --- a/crates/tracing/src/lib.rs +++ b/crates/tracing/src/lib.rs @@ -1,22 +1,44 @@ +use opentelemetry::trace::Tracer; use opentelemetry_gcloud_trace::errors::GcloudTraceError; use tracing::subscriber::SetGlobalDefaultError; use tracing_log::log::SetLoggerError; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::{filter, EnvFilter, Layer}; +use tracing_opentelemetry::PreSampledTracer; +use tracing_subscriber::filter; +mod builder; mod fmt; pub mod gcloud; pub mod otlp; +pub use builder::TracingBuilder; pub use fmt::LogFormat; +pub use gcloud::{GCloudTracerBuilder, GcloudConfig}; +pub use otlp::{OtlpConfig, OtlpTracerBuilder}; -use crate::fmt::LocalTime; +use crate::builder::NoopTracer; + +mod __private { + pub trait Sealed {} +} + +pub trait TelemetryTracer: + Tracer + PreSampledTracer + Send + Sync + __private::Sealed + 'static +{ + fn init(&self) -> Result<(), Error>; +} + +impl __private::Sealed for NoopTracer {} + +impl TelemetryTracer for NoopTracer { + fn init(&self) -> Result<(), Error> { + Ok(()) + } +} #[derive(Debug, Clone)] pub enum TracerConfig { Otlp(otlp::OtlpConfig), - Gcloud(gcloud::GcloudConfig), + GCloud(gcloud::GcloudConfig), } #[derive(Debug, thiserror::Error)] @@ -27,6 +49,9 @@ pub enum Error { #[error("failed to parse environment filter: {0}")] EnvFilterParse(#[from] filter::ParseError), + #[error("failed to parse environment filter from env: {0}")] + EnvFilterFromEnv(#[from] filter::FromEnvError), + #[error("failed to set global dispatcher: {0}")] SetGlobalDefault(#[from] SetGlobalDefaultError), @@ -44,49 +69,38 @@ pub enum Error { } pub async fn init(format: LogFormat, telemetry_config: Option) -> Result<(), Error> { - const DEFAULT_LOG_FILTER: &str = - "katana_db::mdbx=trace,cairo_native::compiler=off,pipeline=debug,stage=debug,tasks=debug,\ - executor=trace,forking::backend=trace,blockifier=off,jsonrpsee_server=off,hyper=off,\ - messaging=debug,node=error,explorer=info,rpc=trace,pool=trace,info"; - - let default_filter = EnvFilter::try_new(DEFAULT_LOG_FILTER); - let filter = EnvFilter::try_from_default_env().or(default_filter)?; - - // Initialize tracing subscriber with optional telemetry - if let Some(telemetry_config) = telemetry_config { - // Initialize telemetry layer based on exporter type - let telemetry = match telemetry_config { - TracerConfig::Gcloud(cfg) => { - let tracer = gcloud::init_tracer(&cfg).await?; - tracing_opentelemetry::layer().with_tracer(tracer) - } - TracerConfig::Otlp(cfg) => { - let tracer = otlp::init_tracer(&cfg)?; - tracing_opentelemetry::layer().with_tracer(tracer) - } - }; - - let fmt = match format { - LogFormat::Full => { - tracing_subscriber::fmt::layer().with_timer(LocalTime::new()).boxed() - } - LogFormat::Json => { - tracing_subscriber::fmt::layer().json().with_timer(LocalTime::new()).boxed() - } - }; - - tracing_subscriber::registry().with(filter).with(telemetry).with(fmt).init(); - } else { - let fmt = match format { - LogFormat::Full => { - tracing_subscriber::fmt::layer().with_timer(LocalTime::new()).boxed() + let builder = match format { + LogFormat::Full => TracingBuilder::new(), + LogFormat::Json => TracingBuilder::new().json(), + }; + + let builder = builder.with_env_filter_or_default()?; + + // Build telemetry layer and initialize based on config type + match telemetry_config { + Some(TracerConfig::Otlp(cfg)) => { + // OTLP is synchronous + let mut otlp_builder = OtlpTracerBuilder::new().service_name("katana"); + if let Some(endpoint) = cfg.endpoint { + otlp_builder = otlp_builder.endpoint(endpoint); } - LogFormat::Json => { - tracing_subscriber::fmt::layer().json().with_timer(LocalTime::new()).boxed() + let layer = otlp_builder.build()?; + builder.with_telemetry(layer).build()?.init(); + } + + Some(TracerConfig::GCloud(cfg)) => { + // GCloud is async + let mut gcloud_builder = GCloudTracerBuilder::new().service_name("katana"); + if let Some(project_id) = cfg.project_id { + gcloud_builder = gcloud_builder.project_id(project_id); } - }; + let layer = gcloud_builder.build().await?; + builder.with_telemetry(layer).build()?.init(); + } - tracing_subscriber::registry().with(filter).with(fmt).init(); + None => { + builder.build()?.init(); + } } Ok(()) diff --git a/crates/tracing/src/otlp.rs b/crates/tracing/src/otlp.rs index 7582c3ac4..53ca31299 100644 --- a/crates/tracing/src/otlp.rs +++ b/crates/tracing/src/otlp.rs @@ -1,37 +1,137 @@ -use anyhow::Result; -use opentelemetry::trace::TracerProvider; -use opentelemetry_otlp::SpanExporterBuilder; +use opentelemetry::trace::{Tracer, TracerProvider}; +use opentelemetry_otlp::{SpanExporterBuilder, WithExportConfig}; use opentelemetry_sdk::trace::{RandomIdGenerator, SdkTracerProvider}; use opentelemetry_sdk::Resource; +use tracing_opentelemetry::PreSampledTracer; -use crate::Error; +use crate::{Error, TelemetryTracer}; #[derive(Debug, Clone)] pub struct OtlpConfig { pub endpoint: Option, } -/// Initialize OTLP tracer -pub fn init_tracer(otlp_config: &OtlpConfig) -> Result { - use opentelemetry_otlp::WithExportConfig; +/// Builder for creating an OpenTelemetry layer with OTLP exporter +#[derive(Debug, Clone)] +pub struct OtlpTracerBuilder { + service_name: String, + endpoint: Option, + resource: Option, +} + +impl OtlpTracerBuilder { + /// Create a new OTLP tracing builder + pub fn new() -> Self { + Self { service_name: "katana".to_string(), endpoint: None, resource: None } + } + + /// Set the service name + pub fn service_name(mut self, name: impl Into) -> Self { + self.service_name = name.into(); + self + } + + /// Set the OTLP endpoint + pub fn endpoint(mut self, endpoint: impl Into) -> Self { + self.endpoint = Some(endpoint.into()); + self + } + + /// Set a custom resource + pub fn resource(mut self, resource: Resource) -> Self { + self.resource = Some(resource); + self + } + + /// Build the OpenTelemetry layer + pub fn build(self) -> Result { + // Build resource with service name + let resource = self.resource.unwrap_or_else(|| { + Resource::builder().with_service_name(self.service_name.clone()).build() + }); + + // Configure exporter + let mut exporter_builder = SpanExporterBuilder::new().with_tonic(); + + if let Some(endpoint) = self.endpoint { + exporter_builder = exporter_builder.with_endpoint(endpoint); + } + + let exporter = exporter_builder.build()?; + + // Build provider + let tracer_provider = SdkTracerProvider::builder() + .with_id_generator(RandomIdGenerator::default()) + .with_batch_exporter(exporter) + .with_resource(resource) + .build(); + + // // Set global provider + // opentelemetry::global::set_tracer_provider(tracer_provider.clone()); + let tracer = tracer_provider.tracer(self.service_name); + + Ok(OtlpTracer { tracer, tracer_provider }) + } +} - let resource = Resource::builder().with_service_name("katana").build(); +impl Default for OtlpTracerBuilder { + fn default() -> Self { + Self::new() + } +} - let mut exporter_builder = SpanExporterBuilder::new().with_tonic(); +/// Wrapper type for SdkTracer that implements the Tracer trait +#[derive(Debug, Clone)] +pub struct OtlpTracer { + tracer: opentelemetry_sdk::trace::Tracer, + tracer_provider: SdkTracerProvider, +} - if let Some(endpoint) = &otlp_config.endpoint { - exporter_builder = exporter_builder.with_endpoint(endpoint); +impl OtlpTracer { + pub fn builder() -> OtlpTracerBuilder { + OtlpTracerBuilder::new() } +} - let exporter = exporter_builder.build()?; +impl Tracer for OtlpTracer { + type Span = ::Span; - let provider = SdkTracerProvider::builder() - .with_id_generator(RandomIdGenerator::default()) - .with_batch_exporter(exporter) - .with_resource(resource) - .build(); + #[inline] + fn build_with_context( + &self, + builder: opentelemetry::trace::SpanBuilder, + parent_cx: &opentelemetry::Context, + ) -> Self::Span { + self.tracer.build_with_context(builder, parent_cx) + } +} - opentelemetry::global::set_tracer_provider(provider.clone()); +impl PreSampledTracer for OtlpTracer { + #[inline] + fn new_span_id(&self) -> opentelemetry::trace::SpanId { + self.tracer.new_span_id() + } + + #[inline] + fn new_trace_id(&self) -> opentelemetry::trace::TraceId { + self.tracer.new_trace_id() + } + + #[inline] + fn sampled_context( + &self, + data: &mut tracing_opentelemetry::OtelData, + ) -> opentelemetry::Context { + self.tracer.sampled_context(data) + } +} - Ok(provider.tracer("katana")) +impl TelemetryTracer for OtlpTracer { + fn init(&self) -> Result<(), Error> { + // Set global provider + opentelemetry::global::set_tracer_provider(self.tracer_provider.clone()); + Ok(()) + } } + +impl crate::__private::Sealed for OtlpTracer {}