Skip to content

Commit 9243f2e

Browse files
committed
Add OTel metrics dual export alongside Prometheus
1 parent cfb53e9 commit 9243f2e

File tree

11 files changed

+1021
-132
lines changed

11 files changed

+1021
-132
lines changed

quickwit/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-actors/src/mailbox.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ mod tests {
521521
.unwrap();
522522
// At this point the actor was started and even processed a message entirely.
523523
let backpressure_micros_counter =
524-
IntCounter::new("test_counter", "help for test_counter").unwrap();
524+
IntCounter::new("test_counter", "help for test_counter", "", &[]);
525525
let wait_duration = Duration::from_millis(1);
526526
let processed = mailbox
527527
.send_message_with_backpressure_counter(
@@ -548,7 +548,7 @@ mod tests {
548548
.await
549549
.unwrap();
550550
let backpressure_micros_counter =
551-
IntCounter::new("test_counter", "help for test_counter").unwrap();
551+
IntCounter::new("test_counter", "help for test_counter", "", &[]);
552552
let wait_duration = Duration::from_millis(1);
553553
mailbox
554554
.send_message_with_backpressure_counter(
@@ -580,7 +580,7 @@ mod tests {
580580
.await
581581
.unwrap();
582582
let backpressure_micros_counter =
583-
IntCounter::new("test_counter", "help for test_counter").unwrap();
583+
IntCounter::new("test_counter", "help for test_counter", "", &[]);
584584
let start = Instant::now();
585585
mailbox
586586
.ask_with_backpressure_counter(Duration::from_millis(1), None)

quickwit/quickwit-cli/src/logger.rs

Lines changed: 107 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ use std::sync::Arc;
1717
use std::{env, fmt};
1818

1919
use anyhow::Context;
20+
use opentelemetry::metrics::MeterProvider;
2021
use opentelemetry::trace::TracerProvider;
2122
use opentelemetry::{KeyValue, global};
2223
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
2324
use opentelemetry_otlp::{
24-
LogExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig,
25+
LogExporter, MetricExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig,
2526
};
2627
use opentelemetry_sdk::logs::SdkLoggerProvider;
28+
use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality};
2729
use opentelemetry_sdk::propagation::TraceContextPropagator;
2830
use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider};
2931
use opentelemetry_sdk::{Resource, trace};
@@ -81,6 +83,26 @@ impl OtlpProtocol {
8183
}
8284
.context("failed to initialize OTLP traces exporter")
8385
}
86+
87+
fn metric_exporter(&self, temporality: Temporality) -> anyhow::Result<MetricExporter> {
88+
match self {
89+
OtlpProtocol::Grpc => MetricExporter::builder()
90+
.with_tonic()
91+
.with_temporality(temporality)
92+
.build(),
93+
OtlpProtocol::HttpProtobuf => MetricExporter::builder()
94+
.with_http()
95+
.with_temporality(temporality)
96+
.with_protocol(OtlpWireProtocol::HttpBinary)
97+
.build(),
98+
OtlpProtocol::HttpJson => MetricExporter::builder()
99+
.with_http()
100+
.with_temporality(temporality)
101+
.with_protocol(OtlpWireProtocol::HttpJson)
102+
.build(),
103+
}
104+
.context("failed to initialize OTLP metrics exporter")
105+
}
84106
}
85107

86108
impl FromStr for OtlpProtocol {
@@ -104,11 +126,60 @@ impl FromStr for OtlpProtocol {
104126
}
105127
}
106128

129+
struct OtlpMetricsTemporality(Temporality);
130+
131+
impl FromStr for OtlpMetricsTemporality {
132+
type Err = anyhow::Error;
133+
134+
fn from_str(temporality_str: &str) -> anyhow::Result<Self> {
135+
const TEMPORALITY_DELTA: &str = "delta";
136+
const TEMPORALITY_LOWMEMORY: &str = "lowmemory";
137+
const TEMPORALITY_CUMULATIVE: &str = "cumulative";
138+
139+
match temporality_str {
140+
TEMPORALITY_DELTA => Ok(OtlpMetricsTemporality(Temporality::Delta)),
141+
TEMPORALITY_LOWMEMORY => Ok(OtlpMetricsTemporality(Temporality::LowMemory)),
142+
TEMPORALITY_CUMULATIVE => Ok(OtlpMetricsTemporality(Temporality::Cumulative)),
143+
other => anyhow::bail!(
144+
"unsupported OTLP metrics temporality `{other}`, supported values are \
145+
`{TEMPORALITY_DELTA}`, `{TEMPORALITY_LOWMEMORY}` and `{TEMPORALITY_CUMULATIVE}`"
146+
),
147+
}
148+
}
149+
}
150+
151+
impl From<OtlpMetricsTemporality> for Temporality {
152+
fn from(t: OtlpMetricsTemporality) -> Self {
153+
t.0
154+
}
155+
}
156+
157+
pub struct TelemetryProviders {
158+
tracer_provider: SdkTracerProvider,
159+
logger_provider: SdkLoggerProvider,
160+
meter_provider: SdkMeterProvider,
161+
}
162+
163+
impl TelemetryProviders {
164+
pub fn shutdown(self) -> anyhow::Result<()> {
165+
self.tracer_provider
166+
.shutdown()
167+
.context("failed to shutdown OpenTelemetry tracer provider")?;
168+
self.logger_provider
169+
.shutdown()
170+
.context("failed to shutdown OpenTelemetry logger provider")?;
171+
self.meter_provider
172+
.shutdown()
173+
.context("failed to shutdown OpenTelemetry meter provider")?;
174+
Ok(())
175+
}
176+
}
177+
107178
#[cfg(feature = "tokio-console")]
108179
use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY;
109180

110181
/// Load the default logging filter from the environment. The filter can later
111-
/// be updated using the result callback of [setup_logging_and_tracing].
182+
/// be updated using the result callback of [init_telemetry_providers].
112183
fn startup_env_filter(level: Level) -> anyhow::Result<EnvFilter> {
113184
let env_filter = env::var("RUST_LOG")
114185
.map(|_| EnvFilter::from_default_env())
@@ -119,14 +190,11 @@ fn startup_env_filter(level: Level) -> anyhow::Result<EnvFilter> {
119190

120191
type ReloadLayer = tracing_subscriber::reload::Layer<EnvFilter, tracing_subscriber::Registry>;
121192

122-
pub fn setup_logging_and_tracing(
193+
pub fn init_telemetry_providers(
123194
level: Level,
124195
ansi_colors: bool,
125196
build_info: &BuildInfo,
126-
) -> anyhow::Result<(
127-
EnvFilterReloadFn,
128-
Option<(SdkTracerProvider, SdkLoggerProvider)>,
129-
)> {
197+
) -> anyhow::Result<(EnvFilterReloadFn, Option<TelemetryProviders>)> {
130198
#[cfg(feature = "tokio-console")]
131199
{
132200
if get_bool_from_env(QW_ENABLE_TOKIO_CONSOLE_ENV_KEY, false) {
@@ -204,12 +272,37 @@ pub fn setup_logging_and_tracing(
204272
.with_batch_exporter(log_exporter)
205273
.build();
206274

207-
let tracing_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
275+
let metrics_protocol_opt =
276+
get_from_env_opt::<String>("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", false);
277+
let metrics_protocol = metrics_protocol_opt
278+
.as_deref()
279+
.map(OtlpProtocol::from_str)
280+
.transpose()?
281+
.unwrap_or(global_protocol);
282+
let temporality_opt =
283+
get_from_env_opt::<String>("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE", false);
284+
let temporality = temporality_opt
285+
.as_deref()
286+
.map(OtlpMetricsTemporality::from_str)
287+
.transpose()?
288+
.map(Temporality::from)
289+
.unwrap_or(Temporality::Cumulative);
290+
let metric_exporter = metrics_protocol.metric_exporter(temporality)?;
291+
292+
let meter_provider = SdkMeterProvider::builder()
293+
.with_resource(resource.clone())
294+
.with_periodic_exporter(metric_exporter)
295+
.build();
296+
297+
let meter = meter_provider.meter("quickwit");
298+
quickwit_common::metrics::install_otel_meter(meter);
299+
300+
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
208301
.with_span_processor(span_processor)
209302
.with_resource(resource)
210303
.build();
211304

212-
let tracer = tracing_provider.tracer("quickwit");
305+
let tracer = tracer_provider.tracer("quickwit");
213306
let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
214307

215308
// Bridge between tracing logs and otel tracing events
@@ -220,7 +313,11 @@ pub fn setup_logging_and_tracing(
220313
.with(logs_otel_layer)
221314
.try_init()
222315
.context("failed to register tracing subscriber")?;
223-
Some((tracing_provider, logger_provider))
316+
Some(TelemetryProviders {
317+
tracer_provider,
318+
logger_provider,
319+
meter_provider,
320+
})
224321
} else {
225322
registry
226323
.try_init()

quickwit/quickwit-cli/src/main.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use quickwit_cli::checklist::RED_COLOR;
2222
use quickwit_cli::cli::{CliCommand, build_cli};
2323
#[cfg(feature = "jemalloc")]
2424
use quickwit_cli::jemalloc::start_jemalloc_metrics_loop;
25-
use quickwit_cli::logger::setup_logging_and_tracing;
25+
use quickwit_cli::logger::init_telemetry_providers;
2626
use quickwit_cli::{busy_detector, install_default_crypto_ring_provider};
2727
use quickwit_common::runtimes::scrape_tokio_runtime_metrics;
2828
use quickwit_serve::BuildInfo;
@@ -98,8 +98,8 @@ async fn main_impl() -> anyhow::Result<()> {
9898
start_jemalloc_metrics_loop();
9999

100100
let build_info = BuildInfo::get();
101-
let (env_filter_reload_fn, tracer_provider_opt) =
102-
setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?;
101+
let (env_filter_reload_fn, telemetry_providers) =
102+
init_telemetry_providers(command.default_log_level(), ansi_colors, build_info)?;
103103

104104
let return_code: i32 = if let Err(command_error) = command.execute(env_filter_reload_fn).await {
105105
error!(error=%command_error, "command failed");
@@ -113,13 +113,8 @@ async fn main_impl() -> anyhow::Result<()> {
113113
0
114114
};
115115

116-
if let Some((trace_provider, logs_provider)) = tracer_provider_opt {
117-
trace_provider
118-
.shutdown()
119-
.context("failed to shutdown OpenTelemetry tracer provider")?;
120-
logs_provider
121-
.shutdown()
122-
.context("failed to shutdown OpenTelemetry logs provider")?;
116+
if let Some(providers) = telemetry_providers {
117+
providers.shutdown()?;
123118
}
124119

125120
std::process::exit(return_code)

quickwit/quickwit-common/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ hyper = { workspace = true }
2929
hyper-util = { workspace = true, optional = true }
3030
itertools = { workspace = true }
3131
once_cell = { workspace = true }
32+
opentelemetry = { workspace = true }
3233
pin-project = { workspace = true }
3334
pnet = { workspace = true }
3435
prometheus = { workspace = true }
@@ -63,6 +64,7 @@ jemalloc-profiled = [
6364

6465
[dev-dependencies]
6566
hyper-util = { workspace = true }
67+
opentelemetry_sdk = { workspace = true, features = ["testing"] }
6668
proptest = { workspace = true }
6769
serde_json = { workspace = true }
6870
serial_test = { workspace = true }

quickwit/quickwit-common/src/io.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@ use async_speed_limit::limiter::Consume;
3434
use bytesize::ByteSize;
3535
use once_cell::sync::Lazy;
3636
use pin_project::pin_project;
37-
use prometheus::IntCounter;
3837
use tokio::io::AsyncWrite;
3938

40-
use crate::metrics::{IntCounterVec, new_counter_vec};
39+
use crate::metrics::{IntCounter, IntCounterVec, new_counter_vec};
4140
use crate::{KillSwitch, Progress, ProtectedZoneGuard};
4241

4342
// Max 1MB at a time.
@@ -99,7 +98,7 @@ pub struct IoControls {
9998
impl Default for IoControls {
10099
fn default() -> Self {
101100
let default_bytes_counter =
102-
IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap();
101+
IntCounter::new("default_write_num_bytes", "Default write counter.", "", &[]);
103102
IoControls {
104103
throughput_limiter_opt: None,
105104
progress: Progress::default(),

0 commit comments

Comments
 (0)