Skip to content

Commit eb9bfe9

Browse files
committed
Introduce load-balanced channel for OpenTelemetry exporters
Add client-side load balancing to OTLP gRPC connections using ginepro. When NL_OTEL_ENDPOINT is set, the telemetry system creates a load-balanced channel shared across log, trace, and metric exporters. This enables better distribution of telemetry traffic across multiple OTLP collector instances and improves overall system resilience. - Add ginepro dependency for gRPC load balancing - Upgrade OpenTelemetry dependencies from 0.29 to 0.30 - Change init_tracing() to async to support channel initialization - Add NL_OTEL_ENDPOINT environment variable for configuration - Update all OTLP exporters to use shared load-balanced channel
1 parent 23611ca commit eb9bfe9

File tree

10 files changed

+341
-112
lines changed

10 files changed

+341
-112
lines changed

Cargo.lock

Lines changed: 266 additions & 85 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-scheduler/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ bytes = { version = "1.10.1", default-features = false }
2020
futures = { version = "0.3.31", default-features = false }
2121
lru = { version = "0.16.0", default-features = false }
2222
mock_instant = { version = "0.5.3", default-features = false }
23-
opentelemetry = { version = "0.29.1", default-features = false }
24-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
23+
opentelemetry = { version = "0.30.0", default-features = false }
24+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
2525
"default",
2626
"semconv_experimental",
2727
] }

nativelink-service/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ bytes = { version = "1.10.1", default-features = false }
2020
futures = { version = "0.3.31", default-features = false }
2121
http-body-util = { version = "0.1.3", default-features = false }
2222
hyper = { version = "1.6.0", default-features = false }
23-
opentelemetry = { version = "0.29.1", default-features = false }
24-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
23+
opentelemetry = { version = "0.30.0", default-features = false }
24+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
2525
"default",
2626
"semconv_experimental",
2727
] }

nativelink-store/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ mongodb = { version = "3", features = [
6464
"compat-3-0-0",
6565
"rustls-tls",
6666
], default-features = false }
67-
opentelemetry = { version = "0.29.1", default-features = false }
67+
opentelemetry = { version = "0.30.0", default-features = false }
6868
parking_lot = { version = "0.12.3", features = [
6969
"arc_lock",
7070
"send_guard",

nativelink-util/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ rust_library(
5555
"@crates//:blake3",
5656
"@crates//:bytes",
5757
"@crates//:futures",
58+
"@crates//:ginepro",
5859
"@crates//:hex",
5960
"@crates//:humantime",
6061
"@crates//:hyper-1.7.0",
@@ -84,6 +85,7 @@ rust_library(
8485
"@crates//:tracing",
8586
"@crates//:tracing-opentelemetry",
8687
"@crates//:tracing-subscriber",
88+
"@crates//:url",
8789
"@crates//:uuid",
8890
"@crates//:walkdir",
8991
],

nativelink-util/Cargo.toml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ hyper-util = { version = "0.1.11", default-features = false }
2727
libc = { version = "0.2.177", default-features = false }
2828
lru = { version = "0.16.0", default-features = false }
2929
mock_instant = { version = "0.5.3", default-features = false }
30-
opentelemetry = { version = "0.29.0", default-features = false }
31-
opentelemetry-appender-tracing = { version = "0.29.1", default-features = false }
32-
opentelemetry-http = { version = "0.29.0", default-features = false }
33-
opentelemetry-otlp = { version = "0.29.0", default-features = false, features = [
30+
opentelemetry = { version = "0.30.0", default-features = false }
31+
opentelemetry-appender-tracing = { version = "0.30.0", default-features = false }
32+
opentelemetry-http = { version = "0.30.0", default-features = false }
33+
opentelemetry-otlp = { version = "0.30.0", default-features = false, features = [
3434
"grpc-tonic",
3535
"logs",
3636
"metrics",
3737
"trace",
3838
"zstd-tonic",
3939
] }
40-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
40+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
4141
"default",
4242
"semconv_experimental",
4343
] }
44-
opentelemetry_sdk = { version = "0.29.0", default-features = false }
44+
opentelemetry_sdk = { version = "0.30.0", default-features = false }
4545
parking_lot = { version = "0.12.3", features = [
4646
"arc_lock",
4747
"send_guard",
@@ -76,7 +76,7 @@ tonic = { version = "0.13.0", features = [
7676
], default-features = false }
7777
tower = { version = "0.5.2", default-features = false }
7878
tracing = { version = "0.1.41", default-features = false }
79-
tracing-opentelemetry = { version = "0.30.0", default-features = false, features = [
79+
tracing-opentelemetry = { version = "0.31.0", default-features = false, features = [
8080
"metrics",
8181
] }
8282
tracing-subscriber = { version = "0.3.19", features = [
@@ -92,6 +92,8 @@ uuid = { version = "1.16.0", default-features = false, features = [
9292
"v6",
9393
] }
9494
walkdir = { version = "2.5.0", default-features = false }
95+
ginepro = "0.9.0"
96+
url = "2.5.7"
9597

9698
[dev-dependencies]
9799
nativelink-macro = { path = "../nativelink-macro" }

nativelink-util/src/telemetry.rs

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use core::default::Default;
16-
use std::env;
17-
use std::sync::OnceLock;
18-
1915
use base64::Engine;
2016
use base64::prelude::BASE64_STANDARD_NO_PAD;
17+
use core::default::Default;
18+
use ginepro::LoadBalancedChannel;
2119
use hyper::http::Response;
2220
use nativelink_error::{Code, ResultExt, make_err};
2321
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
@@ -26,14 +24,18 @@ use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider};
2624
use opentelemetry::{KeyValue, global};
2725
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
2826
use opentelemetry_http::HeaderExtractor;
29-
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig};
27+
use opentelemetry_otlp::{
28+
LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig, WithTonicConfig,
29+
};
3030
use opentelemetry_sdk::Resource;
3131
use opentelemetry_sdk::logs::SdkLoggerProvider;
3232
use opentelemetry_sdk::metrics::SdkMeterProvider;
3333
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
3434
use opentelemetry_sdk::trace::SdkTracerProvider;
3535
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
3636
use prost::Message;
37+
use std::env;
38+
use std::sync::{OnceLock};
3739
use tracing::debug;
3840
use tracing::metadata::LevelFilter;
3941
use tracing_opentelemetry::{MetricsLayer, layer};
@@ -103,7 +105,7 @@ fn tracing_stdout_layer() -> impl Layer<Registry> {
103105
///
104106
/// Returns `Err` if logging was already initialized or if the exporters can't
105107
/// be initialized.
106-
pub fn init_tracing() -> Result<(), nativelink_error::Error> {
108+
pub async fn init_tracing() -> Result<(), nativelink_error::Error> {
107109
static INITIALIZED: OnceLock<()> = OnceLock::new();
108110

109111
if INITIALIZED.get().is_some() {
@@ -128,13 +130,18 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
128130
]);
129131
global::set_text_map_propagator(propagator);
130132

133+
let maybe_channel = maybe_load_balanced_channel().await;
134+
131135
// Logs
136+
let mut log_exporter_builder = LogExporter::builder().with_tonic();
137+
if let Some(channel) = maybe_channel.clone() {
138+
log_exporter_builder = log_exporter_builder.with_channel(channel.into());
139+
}
132140
let otlp_log_layer = OpenTelemetryTracingBridge::new(
133141
&SdkLoggerProvider::builder()
134142
.with_resource(resource.clone())
135143
.with_batch_exporter(
136-
LogExporter::builder()
137-
.with_tonic()
144+
log_exporter_builder
138145
.with_protocol(Protocol::Grpc)
139146
.build()
140147
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -145,13 +152,16 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
145152
.with_filter(otlp_filter());
146153

147154
// Traces
155+
let mut span_exporter_builder = SpanExporter::builder().with_tonic();
156+
if let Some(channel) = maybe_channel.clone() {
157+
span_exporter_builder = span_exporter_builder.with_channel(channel.into());
158+
}
148159
let otlp_trace_layer = layer()
149160
.with_tracer(
150161
SdkTracerProvider::builder()
151162
.with_resource(resource.clone())
152163
.with_batch_exporter(
153-
SpanExporter::builder()
154-
.with_tonic()
164+
span_exporter_builder
155165
.with_protocol(Protocol::Grpc)
156166
.build()
157167
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -163,11 +173,14 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
163173
.with_filter(otlp_filter());
164174

165175
// Metrics
176+
let mut metric_exporter_builder = MetricExporter::builder().with_tonic();
177+
if let Some(channel) = maybe_channel {
178+
metric_exporter_builder = metric_exporter_builder.with_channel(channel.into());
179+
}
166180
let meter_provider = SdkMeterProvider::builder()
167181
.with_resource(resource)
168182
.with_periodic_exporter(
169-
MetricExporter::builder()
170-
.with_tonic()
183+
metric_exporter_builder
171184
.with_protocol(Protocol::Grpc)
172185
.build()
173186
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -191,6 +204,36 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
191204
Ok(())
192205
}
193206

207+
const NL_OTEL_ENDPOINT: &str = "NL_OTEL_ENDPOINT";
208+
209+
async fn maybe_load_balanced_channel() -> Option<LoadBalancedChannel> {
210+
match env::var(NL_OTEL_ENDPOINT) {
211+
Ok(endpoint) => {
212+
let url = Url::parse(endpoint.as_str()).map_err(|e| {
213+
make_err!(Code::Internal, "Unable to parse endpoint {endpoint}: {e:?}")
214+
}).unwrap();
215+
216+
let host = url
217+
.host()
218+
.err_tip(|| format!("Unable to get host from endpoint {endpoint}"))
219+
.unwrap();
220+
let port = url
221+
.port()
222+
.err_tip(|| format!("Unable to get port from endpoint {endpoint}"))
223+
.unwrap();
224+
225+
Some(
226+
LoadBalancedChannel::builder((host.to_string(), port))
227+
.channel()
228+
.await
229+
.map_err(|e| make_err!(Code::Internal, "Invalid hostname '{endpoint}': {e}"))
230+
.unwrap(),
231+
)
232+
}
233+
Err(_) => None,
234+
}
235+
}
236+
194237
/// Custom metadata key field for Bazel metadata.
195238
const BAZEL_METADATA_KEY: &str = "bazel.metadata";
196239

@@ -201,6 +244,7 @@ const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requ
201244

202245
use opentelemetry::baggage::BaggageExt;
203246
use opentelemetry::context::FutureExt;
247+
use url::Url;
204248

205249
#[derive(Debug, Clone)]
206250
pub struct OtlpMiddleware<S> {

nativelink-worker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ bytes = { version = "1.10.1", default-features = false }
2222
filetime = { version = "0.2.25", default-features = false }
2323
formatx = { version = "0.2.3", default-features = false }
2424
futures = { version = "0.3.31", default-features = false }
25-
opentelemetry = { version = "0.29.1", default-features = false }
25+
opentelemetry = { version = "0.30.0", default-features = false }
2626
parking_lot = { version = "0.12.3", default-features = false }
2727
prost = { version = "0.13.5", default-features = false }
2828
relative-path = { version = "2.0.0", default-features = false, features = [

src/bin/nativelink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ fn main() -> Result<(), Box<dyn core::error::Error>> {
719719
// The OTLP exporters need to run in a Tokio context
720720
// Do this first so all the other logging works
721721
#[expect(clippy::disallowed_methods, reason = "tracing init on main runtime")]
722-
runtime.block_on(async { tokio::spawn(async { init_tracing() }).await? })?;
722+
runtime.block_on(async { tokio::spawn(async { init_tracing().await }).await? })?;
723723

724724
let mut cfg = get_config()?;
725725

src/bin/redis_store_tester.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ fn main() -> Result<(), Box<dyn core::error::Error>> {
305305
.unwrap()
306306
.block_on(async {
307307
// The OTLP exporters need to run in a Tokio context.
308-
spawn!("init tracing", async { init_tracing() })
308+
spawn!("init tracing", async { init_tracing().await })
309309
.await?
310310
.expect("Init tracing should work");
311311

0 commit comments

Comments
 (0)