Skip to content

Commit ce13cf6

Browse files
jchrostek-ddclaude
andcommitted
feat(otlp): add gRPC protocol support for trace ingestion
Add gRPC support for OTLP trace ingestion on port 4317, complementing the existing HTTP endpoint on port 4318. Changes: - Add tonic dependency with transport/server features - Implement TraceService trait in new grpc_agent.rs module - Refactor OtlpProcessor to accept ExportTraceServiceRequest directly - Update enablement logic to check both HTTP and gRPC endpoints - Wire up max message size config from DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_MAX_RECV_MSG_SIZE_MIB - Add integration tests for gRPC protocol Users can now configure gRPC OTLP export by setting: DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT=localhost:4317 Binary size impact: +16 bytes (negligible - tonic transport was already pulled in transitively). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 07e4bb5 commit ce13cf6

10 files changed

Lines changed: 502 additions & 24 deletions

File tree

bottlecap/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rustls-pki-types = { version = "1.0", default-features = false }
4545
hyper-rustls = { version = "0.27.7", default-features = false }
4646
rand = { version = "0.8", default-features = false }
4747
prost = { version = "0.14", default-features = false }
48+
tonic = { version = "0.14", features = ["transport", "codegen", "server", "channel", "router"], default-features = false }
4849
tonic-types = { version = "0.14", default-features = false }
4950
zstd = { version = "0.13.3", default-features = false }
5051
futures = { version = "0.3.31", default-features = false }

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ use bottlecap::{
5252
},
5353
flusher::LogsFlusher,
5454
},
55-
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
55+
otlp::{
56+
agent::Agent as OtlpAgent, grpc_agent::GrpcAgent as OtlpGrpcAgent, should_enable_grpc,
57+
should_enable_http, should_enable_otlp_agent,
58+
},
5659
proxy::{interceptor, should_start_proxy},
5760
secrets::decrypt,
5861
tags::{
@@ -1358,22 +1361,49 @@ fn start_otlp_agent(
13581361
return None;
13591362
}
13601363
let stats_generator = Arc::new(StatsGenerator::new(stats_concentrator));
1361-
let agent = OtlpAgent::new(
1362-
config.clone(),
1363-
tags_provider,
1364-
trace_processor,
1365-
trace_tx,
1366-
stats_generator,
1367-
);
1368-
let cancel_token = agent.cancel_token();
13691364

1370-
tokio::spawn(async move {
1371-
if let Err(e) = agent.start().await {
1372-
error!("Error starting OTLP agent: {e:?}");
1373-
}
1374-
});
1365+
// Start HTTP agent if configured
1366+
if should_enable_http(config) {
1367+
let agent = OtlpAgent::new(
1368+
config.clone(),
1369+
tags_provider.clone(),
1370+
trace_processor.clone(),
1371+
trace_tx.clone(),
1372+
stats_generator.clone(),
1373+
);
1374+
1375+
tokio::spawn(async move {
1376+
if let Err(e) = agent.start().await {
1377+
error!("Error starting OTLP HTTP agent: {e:?}");
1378+
}
1379+
});
1380+
}
1381+
1382+
// Start gRPC agent if configured
1383+
let grpc_cancel_token = if should_enable_grpc(config) {
1384+
let grpc_agent = OtlpGrpcAgent::new(
1385+
config.clone(),
1386+
tags_provider,
1387+
trace_processor,
1388+
trace_tx,
1389+
stats_generator,
1390+
);
1391+
let cancel_token = grpc_agent.cancel_token();
1392+
1393+
tokio::spawn(async move {
1394+
if let Err(e) = grpc_agent.start().await {
1395+
error!("Error starting OTLP gRPC agent: {e:?}");
1396+
}
1397+
});
1398+
1399+
Some(cancel_token)
1400+
} else {
1401+
None
1402+
};
13751403

1376-
Some(cancel_token)
1404+
// Return the gRPC cancel token if available, or create a dummy one
1405+
// We need to return a token for the caller to cancel OTLP agents on shutdown
1406+
grpc_cancel_token.or_else(|| Some(CancellationToken::new()))
13771407
}
13781408

13791409
fn start_api_runtime_proxy(

bottlecap/src/otlp/grpc_agent.rs

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
use libdd_trace_utils::trace_utils::TracerHeaderTags as DatadogTracerHeaderTags;
2+
use opentelemetry_proto::tonic::collector::trace::v1::{
3+
ExportTraceServiceRequest, ExportTraceServiceResponse,
4+
trace_service_server::{TraceService, TraceServiceServer},
5+
};
6+
use std::mem::size_of_val;
7+
use std::net::SocketAddr;
8+
use std::sync::Arc;
9+
use tokio::sync::mpsc::Sender;
10+
use tokio_util::sync::CancellationToken;
11+
use tonic::{Request, Response, Status};
12+
use tracing::{debug, error};
13+
14+
use crate::{
15+
config::Config,
16+
otlp::processor::Processor as OtlpProcessor,
17+
tags::provider,
18+
traces::{
19+
stats_generator::StatsGenerator, trace_aggregator::SendDataBuilderInfo,
20+
trace_processor::TraceProcessor,
21+
},
22+
};
23+
24+
const OTLP_AGENT_GRPC_PORT: u16 = 4317;
25+
const DEFAULT_MAX_RECV_MSG_SIZE: usize = 4 * 1024 * 1024; // 4MB default
26+
27+
struct OtlpGrpcService {
28+
config: Arc<Config>,
29+
tags_provider: Arc<provider::Provider>,
30+
processor: OtlpProcessor,
31+
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
32+
trace_tx: Sender<SendDataBuilderInfo>,
33+
stats_generator: Arc<StatsGenerator>,
34+
}
35+
36+
#[tonic::async_trait]
37+
impl TraceService for OtlpGrpcService {
38+
async fn export(
39+
&self,
40+
request: Request<ExportTraceServiceRequest>,
41+
) -> Result<Response<ExportTraceServiceResponse>, Status> {
42+
let inner_request = request.into_inner();
43+
44+
let traces = match self.processor.process_request(inner_request) {
45+
Ok(traces) => traces,
46+
Err(e) => {
47+
error!("OTLP gRPC | Failed to process request: {:?}", e);
48+
return Err(Status::internal(format!("Failed to process request: {e}")));
49+
}
50+
};
51+
52+
let tracer_header_tags = DatadogTracerHeaderTags::default();
53+
let body_size = size_of_val(&traces);
54+
if body_size == 0 {
55+
error!("OTLP gRPC | Not sending traces, processor returned empty data");
56+
return Err(Status::internal(
57+
"Not sending traces, processor returned empty data",
58+
));
59+
}
60+
61+
let compute_trace_stats_on_extension = self.config.compute_trace_stats_on_extension;
62+
let (send_data_builder, processed_traces) = self.trace_processor.process_traces(
63+
self.config.clone(),
64+
self.tags_provider.clone(),
65+
tracer_header_tags,
66+
traces,
67+
body_size,
68+
None,
69+
);
70+
71+
if let Some(send_data_builder) = send_data_builder {
72+
if let Err(err) = self.trace_tx.send(send_data_builder).await {
73+
error!("OTLP gRPC | Error sending traces to the trace aggregator: {err}");
74+
return Err(Status::internal(format!(
75+
"Error sending traces to the trace aggregator: {err}"
76+
)));
77+
}
78+
debug!("OTLP gRPC | Successfully buffered traces to be aggregated.");
79+
}
80+
81+
// Compute trace stats after process_traces() which performs obfuscation
82+
if compute_trace_stats_on_extension
83+
&& let Err(err) = self.stats_generator.send(&processed_traces)
84+
{
85+
// Just log the error. Stats are not critical.
86+
error!("OTLP gRPC | Error sending traces to the stats concentrator: {err}");
87+
}
88+
89+
Ok(Response::new(ExportTraceServiceResponse {
90+
partial_success: None,
91+
}))
92+
}
93+
}
94+
95+
pub struct GrpcAgent {
96+
config: Arc<Config>,
97+
tags_provider: Arc<provider::Provider>,
98+
processor: OtlpProcessor,
99+
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
100+
trace_tx: Sender<SendDataBuilderInfo>,
101+
stats_generator: Arc<StatsGenerator>,
102+
port: u16,
103+
cancel_token: CancellationToken,
104+
}
105+
106+
impl GrpcAgent {
107+
pub fn new(
108+
config: Arc<Config>,
109+
tags_provider: Arc<provider::Provider>,
110+
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
111+
trace_tx: Sender<SendDataBuilderInfo>,
112+
stats_generator: Arc<StatsGenerator>,
113+
) -> Self {
114+
let port = Self::parse_port(
115+
config.otlp_config_receiver_protocols_grpc_endpoint.as_ref(),
116+
OTLP_AGENT_GRPC_PORT,
117+
);
118+
let cancel_token = CancellationToken::new();
119+
120+
Self {
121+
config: Arc::clone(&config),
122+
tags_provider: Arc::clone(&tags_provider),
123+
processor: OtlpProcessor::new(Arc::clone(&config)),
124+
trace_processor,
125+
trace_tx,
126+
stats_generator,
127+
port,
128+
cancel_token,
129+
}
130+
}
131+
132+
#[must_use]
133+
pub fn cancel_token(&self) -> CancellationToken {
134+
self.cancel_token.clone()
135+
}
136+
137+
fn parse_port(endpoint: Option<&String>, default_port: u16) -> u16 {
138+
if let Some(endpoint) = endpoint {
139+
let port = endpoint.split(':').nth(1);
140+
if let Some(port) = port {
141+
return port.parse::<u16>().unwrap_or_else(|_| {
142+
error!("Invalid OTLP gRPC port, using default port {default_port}");
143+
default_port
144+
});
145+
}
146+
147+
error!("Invalid OTLP gRPC endpoint format, using default port {default_port}");
148+
}
149+
150+
default_port
151+
}
152+
153+
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
154+
let socket = SocketAddr::from(([127, 0, 0, 1], self.port));
155+
156+
let max_recv_msg_size = self
157+
.config
158+
.otlp_config_receiver_protocols_grpc_max_recv_msg_size_mib
159+
.map_or(DEFAULT_MAX_RECV_MSG_SIZE, |mib| {
160+
mib.unsigned_abs() as usize * 1024 * 1024
161+
});
162+
163+
let service = OtlpGrpcService {
164+
config: Arc::clone(&self.config),
165+
tags_provider: Arc::clone(&self.tags_provider),
166+
processor: self.processor.clone(),
167+
trace_processor: Arc::clone(&self.trace_processor),
168+
trace_tx: self.trace_tx.clone(),
169+
stats_generator: Arc::clone(&self.stats_generator),
170+
};
171+
172+
let cancel_token = self.cancel_token.clone();
173+
174+
debug!(
175+
"OTLP gRPC | Starting collector on {} with max message size {} bytes",
176+
socket, max_recv_msg_size
177+
);
178+
179+
tonic::transport::Server::builder()
180+
.add_service(
181+
TraceServiceServer::new(service).max_decoding_message_size(max_recv_msg_size),
182+
)
183+
.serve_with_shutdown(socket, async move {
184+
cancel_token.cancelled().await;
185+
debug!("OTLP gRPC | Shutdown signal received, shutting down");
186+
})
187+
.await?;
188+
189+
Ok(())
190+
}
191+
}
192+
193+
#[cfg(test)]
194+
mod tests {
195+
use super::*;
196+
197+
#[test]
198+
fn test_parse_port_with_valid_endpoint() {
199+
let endpoint = Some("localhost:4317".to_string());
200+
assert_eq!(
201+
GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT),
202+
4317
203+
);
204+
}
205+
206+
#[test]
207+
fn test_parse_port_with_custom_port() {
208+
let endpoint = Some("0.0.0.0:9999".to_string());
209+
assert_eq!(
210+
GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT),
211+
9999
212+
);
213+
}
214+
215+
#[test]
216+
fn test_parse_port_with_invalid_port_format() {
217+
let endpoint = Some("localhost:invalid".to_string());
218+
assert_eq!(
219+
GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT),
220+
OTLP_AGENT_GRPC_PORT
221+
);
222+
}
223+
224+
#[test]
225+
fn test_parse_port_with_missing_port() {
226+
let endpoint = Some("localhost".to_string());
227+
assert_eq!(
228+
GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT),
229+
OTLP_AGENT_GRPC_PORT
230+
);
231+
}
232+
233+
#[test]
234+
fn test_parse_port_with_none_endpoint() {
235+
let endpoint: Option<String> = None;
236+
assert_eq!(
237+
GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT),
238+
OTLP_AGENT_GRPC_PORT
239+
);
240+
}
241+
}

0 commit comments

Comments
 (0)