@@ -3,7 +3,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
33 ExportTraceServiceRequest , ExportTraceServiceResponse ,
44 trace_service_server:: { TraceService , TraceServiceServer } ,
55} ;
6- use std :: mem :: size_of_val ;
6+ use prost :: Message ;
77use std:: net:: SocketAddr ;
88use std:: sync:: Arc ;
99use tokio:: sync:: mpsc:: Sender ;
@@ -23,6 +23,7 @@ use crate::{
2323
2424const OTLP_AGENT_GRPC_PORT : u16 = 4317 ;
2525const DEFAULT_MAX_RECV_MSG_SIZE : usize = 4 * 1024 * 1024 ; // 4MB default
26+ const MAX_RECV_MSG_SIZE_CAP : usize = 64 * 1024 * 1024 ; // 64MB cap to prevent DoS
2627
2728struct OtlpGrpcService {
2829 config : Arc < Config > ,
@@ -41,6 +42,9 @@ impl TraceService for OtlpGrpcService {
4142 ) -> Result < Response < ExportTraceServiceResponse > , Status > {
4243 let inner_request = request. into_inner ( ) ;
4344
45+ // Capture encoded size before processing for metrics
46+ let body_size = inner_request. encoded_len ( ) ;
47+
4448 let traces = match self . processor . process_request ( inner_request) {
4549 Ok ( traces) => traces,
4650 Err ( e) => {
@@ -49,15 +53,16 @@ impl TraceService for OtlpGrpcService {
4953 }
5054 } ;
5155
52- let tracer_header_tags = DatadogTracerHeaderTags :: default ( ) ;
53- let body_size = size_of_val ( & traces) ;
54- if body_size == 0 {
56+ // Check if processor returned any actual traces
57+ if traces. iter ( ) . all ( Vec :: is_empty) {
5558 error ! ( "OTLP gRPC | Not sending traces, processor returned empty data" ) ;
5659 return Err ( Status :: internal (
5760 "Not sending traces, processor returned empty data" ,
5861 ) ) ;
5962 }
6063
64+ let tracer_header_tags = DatadogTracerHeaderTags :: default ( ) ;
65+
6166 let compute_trace_stats_on_extension = self . config . compute_trace_stats_on_extension ;
6267 let ( send_data_builder, processed_traces) = self . trace_processor . process_traces (
6368 self . config . clone ( ) ,
@@ -136,15 +141,24 @@ impl GrpcAgent {
136141
137142 fn parse_port ( endpoint : Option < & String > , default_port : u16 ) -> u16 {
138143 if let Some ( endpoint) = endpoint {
139- let port = endpoint. split ( ':' ) . nth ( 1 ) ;
140- if let Some ( port) = port {
141- return port. parse :: < u16 > ( ) . unwrap_or_else ( |_| {
142- error ! ( "Invalid OTLP gRPC port, using default port {default_port}" ) ;
143- default_port
144- } ) ;
144+ // Strip scheme if present (e.g., "http://localhost:4317" -> "localhost:4317")
145+ let without_scheme = endpoint
146+ . strip_prefix ( "http://" )
147+ . or_else ( || endpoint. strip_prefix ( "https://" ) )
148+ . unwrap_or ( endpoint) ;
149+
150+ // Use rsplit to get port from the last colon (handles IPv6 like [::1]:4317)
151+ if let Some ( port_str) = without_scheme. rsplit ( ':' ) . next ( ) {
152+ // Ensure we got a port, not part of IPv6 address
153+ if let Ok ( port) = port_str. parse :: < u16 > ( ) {
154+ return port;
155+ }
145156 }
146157
147- error ! ( "Invalid OTLP gRPC endpoint format, using default port {default_port}" ) ;
158+ error ! (
159+ "Invalid OTLP gRPC endpoint format '{}', using default port {}" ,
160+ endpoint, default_port
161+ ) ;
148162 }
149163
150164 default_port
@@ -157,7 +171,26 @@ impl GrpcAgent {
157171 . config
158172 . otlp_config_receiver_protocols_grpc_max_recv_msg_size_mib
159173 . map_or ( DEFAULT_MAX_RECV_MSG_SIZE , |mib| {
160- mib. unsigned_abs ( ) as usize * 1024 * 1024
174+ if mib <= 0 {
175+ error ! (
176+ "Invalid gRPC max message size {}MiB, using default {}MiB" ,
177+ mib,
178+ DEFAULT_MAX_RECV_MSG_SIZE / ( 1024 * 1024 )
179+ ) ;
180+ return DEFAULT_MAX_RECV_MSG_SIZE ;
181+ }
182+ // Safe: we validated mib > 0 above
183+ #[ allow( clippy:: cast_sign_loss) ]
184+ let size = ( mib as usize ) * 1024 * 1024 ;
185+ if size > MAX_RECV_MSG_SIZE_CAP {
186+ error ! (
187+ "gRPC max message size {}MiB exceeds cap, limiting to {}MiB" ,
188+ mib,
189+ MAX_RECV_MSG_SIZE_CAP / ( 1024 * 1024 )
190+ ) ;
191+ return MAX_RECV_MSG_SIZE_CAP ;
192+ }
193+ size
161194 } ) ;
162195
163196 let service = OtlpGrpcService {
@@ -212,6 +245,24 @@ mod tests {
212245 ) ;
213246 }
214247
248+ #[ test]
249+ fn test_parse_port_with_http_scheme ( ) {
250+ let endpoint = Some ( "http://localhost:4317" . to_string ( ) ) ;
251+ assert_eq ! (
252+ GrpcAgent :: parse_port( endpoint. as_ref( ) , OTLP_AGENT_GRPC_PORT ) ,
253+ 4317
254+ ) ;
255+ }
256+
257+ #[ test]
258+ fn test_parse_port_with_https_scheme ( ) {
259+ let endpoint = Some ( "https://localhost:4317" . to_string ( ) ) ;
260+ assert_eq ! (
261+ GrpcAgent :: parse_port( endpoint. as_ref( ) , OTLP_AGENT_GRPC_PORT ) ,
262+ 4317
263+ ) ;
264+ }
265+
215266 #[ test]
216267 fn test_parse_port_with_invalid_port_format ( ) {
217268 let endpoint = Some ( "localhost:invalid" . to_string ( ) ) ;
0 commit comments