Skip to content

Commit dd17781

Browse files
jchrostek-ddclaude
andauthored
feat(otlp): add gRPC protocol support for trace ingestion (#1105)
## Summary - Add gRPC support for OTLP trace ingestion on port 4317, complementing the existing HTTP endpoint on port 4318 - Implement `TraceService` trait using tonic for gRPC server - Refactor `OtlpProcessor` to accept `ExportTraceServiceRequest` directly (avoids re-encoding for gRPC) - Wire up existing config: `DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT` and `DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_MAX_RECV_MSG_SIZE_MIB` ## Motivation Users were configuring gRPC OTLP endpoints (`DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT`) but the feature was not implemented, resulting in silent failures. This PR enables gRPC support. This was for an old ticket, but we should go ahead and add this, especially since OTLP is getting more popular. ## Changes | File | Change | |------|--------| | `bottlecap/Cargo.toml` | Add tonic with transport/server features | | `bottlecap/src/otlp/grpc_agent.rs` | NEW - gRPC server implementing TraceService | | `bottlecap/src/otlp/processor.rs` | Refactor to accept ExportTraceServiceRequest directly | | `bottlecap/src/otlp/mod.rs` | Update enablement logic for HTTP/gRPC | | `bottlecap/src/bin/bottlecap/main.rs` | Start both HTTP and gRPC servers | ## Binary Size Impact Measured from CI layer size checks (arm64): | Metric | Main | PR | Difference | |--------|------|-----|------------| | Zipped | 5,074 KB | 5,164 KB | **+90 KB** | | Unzipped | 11,183 KB | 11,376 KB | **+193 KB** | This is a ~1.7% increase, well under historical concerns (OTLP was removed from Go agent for adding ~1.15MB). ## Test plan - [x] New integration tests for gRPC protocol (4 tests) - [x] All existing OTLP tests continue to pass - [x] `cargo fmt`, `cargo clippy`, `cargo test` pass - [x] All CI checks passed (47 checks including integration suites) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent a69ae63 commit dd17781

File tree

11 files changed

+537
-48
lines changed

11 files changed

+537
-48
lines changed

bottlecap/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rustls-pki-types = { version = "1.0", default-features = false }
4545
hyper-rustls = { version = "0.27.7", default-features = false }
4646
rand = { version = "0.8", default-features = false }
4747
prost = { version = "0.14", default-features = false }
48+
tonic = { version = "0.14", features = ["transport", "codegen", "server", "channel", "router"], default-features = false }
4849
tonic-types = { version = "0.14", default-features = false }
4950
zstd = { version = "0.13.3", default-features = false }
5051
futures = { version = "0.3.31", default-features = false }

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ use bottlecap::{
5252
},
5353
flusher::LogsFlusher,
5454
},
55-
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
55+
otlp::{
56+
agent::Agent as OtlpAgent, grpc_agent::GrpcAgent as OtlpGrpcAgent, should_enable_otlp_grpc,
57+
should_enable_otlp_http,
58+
},
5659
proxy::{interceptor, should_start_proxy},
5760
secrets::decrypt,
5861
tags::{
@@ -1354,24 +1357,49 @@ fn start_otlp_agent(
13541357
trace_tx: Sender<SendDataBuilderInfo>,
13551358
stats_concentrator: StatsConcentratorHandle,
13561359
) -> Option<CancellationToken> {
1357-
if !should_enable_otlp_agent(config) {
1360+
let http_enabled = should_enable_otlp_http(config);
1361+
let grpc_enabled = should_enable_otlp_grpc(config);
1362+
1363+
if !http_enabled && !grpc_enabled {
13581364
return None;
13591365
}
1366+
13601367
let stats_generator = Arc::new(StatsGenerator::new(stats_concentrator));
1361-
let agent = OtlpAgent::new(
1362-
config.clone(),
1363-
tags_provider,
1364-
trace_processor,
1365-
trace_tx,
1366-
stats_generator,
1367-
);
1368-
let cancel_token = agent.cancel_token();
1368+
let cancel_token = CancellationToken::new();
13691369

1370-
tokio::spawn(async move {
1371-
if let Err(e) = agent.start().await {
1372-
error!("Error starting OTLP agent: {e:?}");
1373-
}
1374-
});
1370+
if http_enabled {
1371+
let agent = OtlpAgent::new(
1372+
config.clone(),
1373+
tags_provider.clone(),
1374+
trace_processor.clone(),
1375+
trace_tx.clone(),
1376+
stats_generator.clone(),
1377+
cancel_token.clone(),
1378+
);
1379+
1380+
tokio::spawn(async move {
1381+
if let Err(e) = agent.start().await {
1382+
error!("Error starting OTLP HTTP agent: {e:?}");
1383+
}
1384+
});
1385+
}
1386+
1387+
if grpc_enabled {
1388+
let grpc_agent = OtlpGrpcAgent::new(
1389+
config.clone(),
1390+
tags_provider,
1391+
trace_processor,
1392+
trace_tx,
1393+
stats_generator,
1394+
cancel_token.clone(),
1395+
);
1396+
1397+
tokio::spawn(async move {
1398+
if let Err(e) = grpc_agent.start().await {
1399+
error!("Error starting OTLP gRPC agent: {e:?}");
1400+
}
1401+
});
1402+
}
13751403

13761404
Some(cancel_token)
13771405
}

bottlecap/src/otlp/agent.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ impl Agent {
5656
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
5757
trace_tx: Sender<SendDataBuilderInfo>,
5858
stats_generator: Arc<StatsGenerator>,
59+
cancel_token: CancellationToken,
5960
) -> Self {
6061
let port = Self::parse_port(
6162
config.otlp_config_receiver_protocols_http_endpoint.as_ref(),
6263
OTLP_AGENT_HTTP_PORT,
6364
);
64-
let cancel_token = CancellationToken::new();
6565

6666
Self {
6767
config: Arc::clone(&config),
@@ -75,11 +75,6 @@ impl Agent {
7575
}
7676
}
7777

78-
#[must_use]
79-
pub fn cancel_token(&self) -> CancellationToken {
80-
self.cancel_token.clone()
81-
}
82-
8378
fn parse_port(endpoint: Option<&String>, default_port: u16) -> u16 {
8479
if let Some(endpoint) = endpoint {
8580
let port = endpoint.split(':').nth(1);
@@ -312,7 +307,6 @@ mod tests {
312307

313308
#[test]
314309
fn test_parse_port_with_valid_endpoint() {
315-
// Test with a valid endpoint containing a port
316310
let endpoint = Some("localhost:8080".to_string());
317311
assert_eq!(
318312
Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT),
@@ -322,7 +316,6 @@ mod tests {
322316

323317
#[test]
324318
fn test_parse_port_with_invalid_port_format() {
325-
// Test with an endpoint containing an invalid port format
326319
let endpoint = Some("localhost:invalid".to_string());
327320
assert_eq!(
328321
Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT),
@@ -332,7 +325,6 @@ mod tests {
332325

333326
#[test]
334327
fn test_parse_port_with_missing_port() {
335-
// Test with an endpoint missing a port
336328
let endpoint = Some("localhost".to_string());
337329
assert_eq!(
338330
Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT),
@@ -342,7 +334,6 @@ mod tests {
342334

343335
#[test]
344336
fn test_parse_port_with_none_endpoint() {
345-
// Test with None endpoint
346337
let endpoint: Option<String> = None;
347338
assert_eq!(
348339
Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT),
@@ -352,7 +343,6 @@ mod tests {
352343

353344
#[test]
354345
fn test_parse_port_with_empty_endpoint() {
355-
// Test with an empty endpoint
356346
let endpoint = Some(String::new());
357347
assert_eq!(
358348
Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT),

0 commit comments

Comments
 (0)