@@ -19,7 +19,7 @@ use tracing::{debug, error};
1919use crate :: {
2020 config:: Config ,
2121 http:: { extract_request_body, handler_not_found} ,
22- otlp:: processor:: Processor as OtlpProcessor ,
22+ otlp:: processor:: { OtlpEncoding , Processor as OtlpProcessor } ,
2323 tags:: provider,
2424 traces:: {
2525 stats_generator:: StatsGenerator , trace_aggregator:: SendDataBuilderInfo ,
@@ -141,24 +141,33 @@ impl Agent {
141141 > ,
142142 request : Request ,
143143 ) -> Response {
144+ let content_type = request
145+ . headers ( )
146+ . get ( header:: CONTENT_TYPE )
147+ . and_then ( |v| v. to_str ( ) . ok ( ) )
148+ . map ( String :: from) ;
149+ let encoding = OtlpEncoding :: from_content_type ( content_type. as_deref ( ) ) ;
150+
144151 let ( parts, body) = match extract_request_body ( request) . await {
145152 Ok ( r) => r,
146153 Err ( e) => {
147154 error ! ( "OTLP | Failed to extract request body: {e}" ) ;
148155 return Self :: otlp_error_response (
149156 StatusCode :: BAD_REQUEST ,
150157 format ! ( "Failed to extract request body: {e}" ) ,
158+ encoding,
151159 ) ;
152160 }
153161 } ;
154162
155- let traces = match processor. process ( & body) {
163+ let traces = match processor. process ( & body, encoding ) {
156164 Ok ( traces) => traces,
157165 Err ( e) => {
158166 error ! ( "OTLP | Failed to process request: {:?}" , e) ;
159167 return Self :: otlp_error_response (
160168 StatusCode :: INTERNAL_SERVER_ERROR ,
161169 format ! ( "Failed to process request: {e}" ) ,
170+ encoding,
162171 ) ;
163172 }
164173 } ;
@@ -170,6 +179,7 @@ impl Agent {
170179 return Self :: otlp_error_response (
171180 StatusCode :: INTERNAL_SERVER_ERROR ,
172181 "Not sending traces, processor returned empty data" . to_string ( ) ,
182+ encoding,
173183 ) ;
174184 }
175185
@@ -192,6 +202,7 @@ impl Agent {
192202 return Self :: otlp_error_response (
193203 StatusCode :: INTERNAL_SERVER_ERROR ,
194204 format ! ( "Error sending traces to the trace aggregator: {err}" ) ,
205+ encoding,
195206 ) ;
196207 }
197208 }
@@ -206,49 +217,88 @@ impl Agent {
206217 error ! ( "OTLP | Error sending traces to the stats concentrator: {err}" ) ;
207218 }
208219
209- Self :: otlp_success_response ( )
220+ Self :: otlp_success_response ( encoding )
210221 }
211222
212- fn otlp_error_response ( status_code : StatusCode , message : String ) -> Response {
213- let status = Status {
214- code : i32:: from ( status_code. as_u16 ( ) ) ,
215- message : message. clone ( ) ,
216- details : vec ! [ ] ,
223+ fn otlp_error_response (
224+ status_code : StatusCode ,
225+ message : String ,
226+ encoding : OtlpEncoding ,
227+ ) -> Response {
228+ let body = match encoding {
229+ OtlpEncoding :: Json => {
230+ let json_error = serde_json:: json!( {
231+ "code" : status_code. as_u16( ) ,
232+ "message" : message
233+ } ) ;
234+ match serde_json:: to_vec ( & json_error) {
235+ Ok ( buf) => buf,
236+ Err ( e) => {
237+ error ! ( "OTLP | Failed to encode error response as JSON: {e}" ) ;
238+ return ( status_code, [ ( header:: CONTENT_TYPE , "text/plain" ) ] , message)
239+ . into_response ( ) ;
240+ }
241+ }
242+ }
243+ OtlpEncoding :: Protobuf => {
244+ let status = Status {
245+ code : i32:: from ( status_code. as_u16 ( ) ) ,
246+ message : message. clone ( ) ,
247+ details : vec ! [ ] ,
248+ } ;
249+ let mut buf = Vec :: new ( ) ;
250+ if let Err ( e) = status. encode ( & mut buf) {
251+ error ! ( "OTLP | Failed to encode error response as Protobuf: {e}" ) ;
252+ return ( status_code, [ ( header:: CONTENT_TYPE , "text/plain" ) ] , message)
253+ . into_response ( ) ;
254+ }
255+ buf
256+ }
217257 } ;
218258
219- let mut buf = Vec :: new ( ) ;
220- if let Err ( e) = status. encode ( & mut buf) {
221- error ! ( "OTLP | Failed to encode error response: {e}" ) ;
222- return ( status_code, [ ( header:: CONTENT_TYPE , "text/plain" ) ] , message) . into_response ( ) ;
223- }
224-
225259 (
226260 status_code,
227- [ ( header:: CONTENT_TYPE , "application/x-protobuf" ) ] ,
228- buf ,
261+ [ ( header:: CONTENT_TYPE , encoding . content_type ( ) ) ] ,
262+ body ,
229263 )
230264 . into_response ( )
231265 }
232266
233- fn otlp_success_response ( ) -> Response {
267+ fn otlp_success_response ( encoding : OtlpEncoding ) -> Response {
234268 let response = ExportTraceServiceResponse {
235269 partial_success : None ,
236270 } ;
237271
238- let mut buf = Vec :: new ( ) ;
239- if let Err ( e) = response. encode ( & mut buf) {
240- error ! ( "OTLP | Failed to encode success response: {e}" ) ;
241- return (
242- StatusCode :: INTERNAL_SERVER_ERROR ,
243- "Failed to encode response" . to_string ( ) ,
244- )
245- . into_response ( ) ;
246- }
272+ let body = match encoding {
273+ OtlpEncoding :: Json => match serde_json:: to_vec ( & response) {
274+ Ok ( buf) => buf,
275+ Err ( e) => {
276+ error ! ( "OTLP | Failed to encode success response as JSON: {e}" ) ;
277+ return (
278+ StatusCode :: INTERNAL_SERVER_ERROR ,
279+ "Failed to encode response" . to_string ( ) ,
280+ )
281+ . into_response ( ) ;
282+ }
283+ } ,
284+ OtlpEncoding :: Protobuf => {
285+ let mut buf = Vec :: new ( ) ;
286+ if let Err ( e) = response. encode ( & mut buf) {
287+ error ! ( "OTLP | Failed to encode success response as Protobuf: {e}" ) ;
288+ return (
289+ StatusCode :: INTERNAL_SERVER_ERROR ,
290+ "Failed to encode response" . to_string ( ) ,
291+ )
292+ . into_response ( ) ;
293+ }
294+ buf
295+ }
296+ } ;
247297
248298 (
249299 StatusCode :: OK ,
250- [ ( header:: CONTENT_TYPE , "application/x-protobuf" ) ] ,
251- buf ,
300+ [ ( header:: CONTENT_TYPE , encoding . content_type ( ) ) ] ,
301+ body ,
252302 )
253303 . into_response ( )
254304 }
0 commit comments