Skip to content

Commit 0bb7235

Browse files
jchrostek-ddclaude
andcommitted
[SVLS-8626] Add JSON encoding support for OTLP trace payloads
The OTLP receiver was unconditionally decoding trace payloads as Protobuf, causing "invalid wire type" errors for clients that use JSON encoding (e.g., Node.js dd-trace with default settings). Changes: - Add OtlpEncoding enum to detect encoding from Content-Type header - Support both application/json and application/x-protobuf - Return correct Content-Type in responses per OTLP spec - Add integration test with Lambda that sends both encodings 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent db59a28 commit 0bb7235

File tree

6 files changed

+173
-169
lines changed

6 files changed

+173
-169
lines changed

bottlecap/src/otlp/agent.rs

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use tracing::{debug, error};
1919
use crate::{
2020
config::Config,
2121
http::{extract_request_body, handler_not_found},
22-
otlp::processor::Processor as OtlpProcessor,
22+
otlp::processor::{OtlpEncoding, Processor as OtlpProcessor},
2323
tags::provider,
2424
traces::{
2525
stats_generator::StatsGenerator, trace_aggregator::SendDataBuilderInfo,
@@ -141,24 +141,33 @@ impl Agent {
141141
>,
142142
request: Request,
143143
) -> Response {
144+
let content_type = request
145+
.headers()
146+
.get(header::CONTENT_TYPE)
147+
.and_then(|v| v.to_str().ok())
148+
.map(String::from);
149+
let encoding = OtlpEncoding::from_content_type(content_type.as_deref());
150+
144151
let (parts, body) = match extract_request_body(request).await {
145152
Ok(r) => r,
146153
Err(e) => {
147154
error!("OTLP | Failed to extract request body: {e}");
148155
return Self::otlp_error_response(
149156
StatusCode::BAD_REQUEST,
150157
format!("Failed to extract request body: {e}"),
158+
encoding,
151159
);
152160
}
153161
};
154162

155-
let traces = match processor.process(&body) {
163+
let traces = match processor.process(&body, encoding) {
156164
Ok(traces) => traces,
157165
Err(e) => {
158166
error!("OTLP | Failed to process request: {:?}", e);
159167
return Self::otlp_error_response(
160-
StatusCode::INTERNAL_SERVER_ERROR,
168+
StatusCode::BAD_REQUEST,
161169
format!("Failed to process request: {e}"),
170+
encoding,
162171
);
163172
}
164173
};
@@ -170,6 +179,7 @@ impl Agent {
170179
return Self::otlp_error_response(
171180
StatusCode::INTERNAL_SERVER_ERROR,
172181
"Not sending traces, processor returned empty data".to_string(),
182+
encoding,
173183
);
174184
}
175185

@@ -192,6 +202,7 @@ impl Agent {
192202
return Self::otlp_error_response(
193203
StatusCode::INTERNAL_SERVER_ERROR,
194204
format!("Error sending traces to the trace aggregator: {err}"),
205+
encoding,
195206
);
196207
}
197208
}
@@ -206,49 +217,88 @@ impl Agent {
206217
error!("OTLP | Error sending traces to the stats concentrator: {err}");
207218
}
208219

209-
Self::otlp_success_response()
220+
Self::otlp_success_response(encoding)
210221
}
211222

212-
fn otlp_error_response(status_code: StatusCode, message: String) -> Response {
213-
let status = Status {
214-
code: i32::from(status_code.as_u16()),
215-
message: message.clone(),
216-
details: vec![],
223+
fn otlp_error_response(
224+
status_code: StatusCode,
225+
message: String,
226+
encoding: OtlpEncoding,
227+
) -> Response {
228+
let body = match encoding {
229+
OtlpEncoding::Json => {
230+
let json_error = serde_json::json!({
231+
"code": status_code.as_u16(),
232+
"message": message
233+
});
234+
match serde_json::to_vec(&json_error) {
235+
Ok(buf) => buf,
236+
Err(e) => {
237+
error!("OTLP | Failed to encode error response as JSON: {e}");
238+
return (status_code, [(header::CONTENT_TYPE, "text/plain")], message)
239+
.into_response();
240+
}
241+
}
242+
}
243+
OtlpEncoding::Protobuf => {
244+
let status = Status {
245+
code: i32::from(status_code.as_u16()),
246+
message: message.clone(),
247+
details: vec![],
248+
};
249+
let mut buf = Vec::new();
250+
if let Err(e) = status.encode(&mut buf) {
251+
error!("OTLP | Failed to encode error response: {e}");
252+
return (status_code, [(header::CONTENT_TYPE, "text/plain")], message)
253+
.into_response();
254+
}
255+
buf
256+
}
217257
};
218258

219-
let mut buf = Vec::new();
220-
if let Err(e) = status.encode(&mut buf) {
221-
error!("OTLP | Failed to encode error response: {e}");
222-
return (status_code, [(header::CONTENT_TYPE, "text/plain")], message).into_response();
223-
}
224-
225259
(
226260
status_code,
227-
[(header::CONTENT_TYPE, "application/x-protobuf")],
228-
buf,
261+
[(header::CONTENT_TYPE, encoding.content_type())],
262+
body,
229263
)
230264
.into_response()
231265
}
232266

233-
fn otlp_success_response() -> Response {
267+
fn otlp_success_response(encoding: OtlpEncoding) -> Response {
234268
let response = ExportTraceServiceResponse {
235269
partial_success: None,
236270
};
237271

238-
let mut buf = Vec::new();
239-
if let Err(e) = response.encode(&mut buf) {
240-
error!("OTLP | Failed to encode success response: {e}");
241-
return (
242-
StatusCode::INTERNAL_SERVER_ERROR,
243-
"Failed to encode response".to_string(),
244-
)
245-
.into_response();
246-
}
272+
let body = match encoding {
273+
OtlpEncoding::Json => match serde_json::to_vec(&response) {
274+
Ok(buf) => buf,
275+
Err(e) => {
276+
error!("OTLP | Failed to encode success response as JSON: {e}");
277+
return (
278+
StatusCode::INTERNAL_SERVER_ERROR,
279+
"Failed to encode response".to_string(),
280+
)
281+
.into_response();
282+
}
283+
},
284+
OtlpEncoding::Protobuf => {
285+
let mut buf = Vec::new();
286+
if let Err(e) = response.encode(&mut buf) {
287+
error!("OTLP | Failed to encode success response: {e}");
288+
return (
289+
StatusCode::INTERNAL_SERVER_ERROR,
290+
"Failed to encode response".to_string(),
291+
)
292+
.into_response();
293+
}
294+
buf
295+
}
296+
};
247297

248298
(
249299
StatusCode::OK,
250-
[(header::CONTENT_TYPE, "application/x-protobuf")],
251-
buf,
300+
[(header::CONTENT_TYPE, encoding.content_type())],
301+
body,
252302
)
253303
.into_response()
254304
}

bottlecap/src/otlp/processor.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,30 @@ use std::{error::Error, sync::Arc};
55

66
use crate::{config::Config, otlp::transform::otel_resource_spans_to_dd_spans};
77

8+
#[derive(Debug, Clone, Copy, PartialEq)]
9+
pub enum OtlpEncoding {
10+
Protobuf,
11+
Json,
12+
}
13+
14+
impl OtlpEncoding {
15+
#[must_use]
16+
pub fn from_content_type(content_type: Option<&str>) -> Self {
17+
match content_type {
18+
Some(ct) if ct.starts_with("application/json") => OtlpEncoding::Json,
19+
_ => OtlpEncoding::Protobuf,
20+
}
21+
}
22+
23+
#[must_use]
24+
pub fn content_type(&self) -> &'static str {
25+
match self {
26+
OtlpEncoding::Json => "application/json",
27+
OtlpEncoding::Protobuf => "application/x-protobuf",
28+
}
29+
}
30+
}
31+
832
#[derive(Clone)]
933
pub struct Processor {
1034
config: Arc<Config>,
@@ -16,9 +40,21 @@ impl Processor {
1640
Self { config }
1741
}
1842

19-
pub fn process(&self, body: &[u8]) -> Result<Vec<Vec<DatadogSpan>>, Box<dyn Error>> {
20-
// Decode the OTLP HTTP request
21-
let request = ExportTraceServiceRequest::decode(body)?;
43+
pub fn process(
44+
&self,
45+
body: &[u8],
46+
encoding: OtlpEncoding,
47+
) -> Result<Vec<Vec<DatadogSpan>>, Box<dyn Error>> {
48+
let request = match encoding {
49+
OtlpEncoding::Json => {
50+
tracing::debug!("Decoding OTLP traces as JSON");
51+
serde_json::from_slice::<ExportTraceServiceRequest>(body)?
52+
}
53+
OtlpEncoding::Protobuf => {
54+
tracing::debug!("Decoding OTLP traces as Protobuf");
55+
ExportTraceServiceRequest::decode(body)?
56+
}
57+
};
2258

2359
let mut spans: Vec<Vec<DatadogSpan>> = Vec::new();
2460
for resource_spans in &request.resource_spans {

integration-tests/lambda/otlp-node/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"main": "index.js",
66
"dependencies": {
77
"@opentelemetry/api": "^1.9.0",
8+
"@opentelemetry/exporter-trace-otlp-http": "^0.54.2",
89
"@opentelemetry/exporter-trace-otlp-proto": "^0.54.2",
910
"@opentelemetry/otlp-transformer": "^0.54.2",
1011
"@opentelemetry/resources": "^1.28.0",
Lines changed: 36 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,122 +1,48 @@
1+
const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
2+
const { SimpleSpanProcessor } = require('@opentelemetry/sdk-trace-base');
3+
const { OTLPTraceExporter: OTLPTraceExporterHttp } = require('@opentelemetry/exporter-trace-otlp-http');
4+
const { OTLPTraceExporter: OTLPTraceExporterProto } = require('@opentelemetry/exporter-trace-otlp-proto');
15
const { Resource } = require('@opentelemetry/resources');
2-
const { BasicTracerProvider, InMemorySpanExporter } = require('@opentelemetry/sdk-trace-base');
3-
const { createExportTraceServiceRequest } = require('@opentelemetry/otlp-transformer');
4-
const { trace, SpanKind, SpanStatusCode } = require('@opentelemetry/api');
5-
6-
const root = require('@opentelemetry/otlp-transformer/build/esm/generated/root');
7-
const ExportTraceServiceRequest = root.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
8-
const ExportTraceServiceResponse = root.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
6+
const { ATTR_SERVICE_NAME } = require('@opentelemetry/semantic-conventions');
97

108
exports.handler = async (event, context) => {
11-
console.log('Starting OTLP response validation...');
12-
const result = await validateOtlpResponse();
13-
return {
14-
statusCode: result.success ? 200 : 500,
15-
body: JSON.stringify(result, null, 2),
16-
};
17-
};
18-
19-
async function validateOtlpResponse() {
20-
try {
21-
const requestBuffer = createOtlpTraceRequest();
22-
const response = await fetch('http://localhost:4318/v1/traces', {
23-
method: 'POST',
24-
headers: {
25-
'Content-Type': 'application/x-protobuf',
26-
},
27-
body: requestBuffer,
28-
});
29-
30-
if (response.status !== 200) {
31-
return {
32-
success: false,
33-
error: `Expected status 200, got ${response.status}`,
34-
statusCode: response.status,
35-
};
36-
}
37-
38-
const responseBuffer = Buffer.from(await response.arrayBuffer());
39-
40-
const contentType = response.headers.get('content-type');
41-
if (contentType !== 'application/x-protobuf') {
42-
return {
43-
success: false,
44-
error: `Expected Content-Type 'application/x-protobuf', got '${contentType}'`,
45-
statusCode: response.status,
46-
contentType,
47-
};
48-
}
49-
50-
let decodedResponse;
51-
try {
52-
decodedResponse = ExportTraceServiceResponse.decode(responseBuffer);
53-
} catch (decodeError) {
54-
return {
55-
success: false,
56-
error: `Failed to decode response as ExportTraceServiceResponse: ${decodeError.message}`,
57-
statusCode: response.status,
58-
contentType,
59-
decodeError: decodeError.message,
60-
};
61-
}
9+
const serviceName = context.functionName;
10+
const requestId = context.awsRequestId;
11+
const endpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT || 'http://localhost:4318';
6212

63-
// Verify that partial_success is not set (should be None for success)
64-
if (decodedResponse.partialSuccess) {
65-
return {
66-
success: false,
67-
error: 'Unexpected partial_success in response - should only be success or failure',
68-
statusCode: response.status,
69-
contentType,
70-
partialSuccess: decodedResponse.partialSuccess,
71-
};
72-
}
13+
const jsonProvider = new NodeTracerProvider({
14+
resource: new Resource({ [ATTR_SERVICE_NAME]: serviceName }),
15+
});
16+
const jsonExporter = new OTLPTraceExporterHttp({
17+
url: endpoint + '/v1/traces',
18+
headers: { 'Content-Type': 'application/json' },
19+
});
20+
jsonProvider.addSpanProcessor(new SimpleSpanProcessor(jsonExporter));
21+
jsonProvider.register();
7322

74-
return {
75-
success: true,
76-
statusCode: response.status,
77-
contentType,
78-
response: decodedResponse,
79-
message: 'OTLP response is properly formatted and decodable',
80-
};
23+
const jsonSpan = jsonProvider.getTracer('json-test').startSpan('test-span-json');
24+
jsonSpan.setAttribute('request_id', requestId);
25+
jsonSpan.setAttribute('encoding', 'json');
26+
jsonSpan.end();
8127

82-
} catch (error) {
83-
console.error('Validation error:', error);
84-
return {
85-
success: false,
86-
error: `Unexpected error: ${error.message}`,
87-
stack: error.stack,
88-
};
89-
}
90-
}
28+
await jsonProvider.forceFlush();
29+
await jsonProvider.shutdown();
9130

92-
function createOtlpTraceRequest() {
93-
const resource = new Resource({
94-
'service.name': process.env.DD_SERVICE,
31+
const protoProvider = new NodeTracerProvider({
32+
resource: new Resource({ [ATTR_SERVICE_NAME]: serviceName }),
9533
});
96-
97-
const provider = new BasicTracerProvider({ resource });
98-
const exporter = new InMemorySpanExporter();
99-
provider.register();
100-
101-
const tracer = provider.getTracer('validation-tracer', '1.0.0');
102-
const span = tracer.startSpan('test-span', {
103-
kind: SpanKind.INTERNAL,
104-
attributes: {
105-
'test': 'validation',
106-
},
34+
const protoExporter = new OTLPTraceExporterProto({
35+
url: endpoint + '/v1/traces',
10736
});
108-
span.setStatus({ code: SpanStatusCode.OK });
109-
span.end();
37+
protoProvider.addSpanProcessor(new SimpleSpanProcessor(protoExporter));
11038

111-
provider.forceFlush();
39+
const protoSpan = protoProvider.getTracer('proto-test').startSpan('test-span-protobuf');
40+
protoSpan.setAttribute('request_id', requestId);
41+
protoSpan.setAttribute('encoding', 'protobuf');
42+
protoSpan.end();
11243

113-
const finishedSpans = exporter.getFinishedSpans();
114-
const otlpRequest = createExportTraceServiceRequest(finishedSpans, {
115-
useHex: true,
116-
useLongBits: false,
117-
});
44+
await protoProvider.forceFlush();
45+
await protoProvider.shutdown();
11846

119-
const message = ExportTraceServiceRequest.create(otlpRequest);
120-
const buffer = ExportTraceServiceRequest.encode(message).finish();
121-
return Buffer.from(buffer);
122-
}
47+
return { statusCode: 200, body: JSON.stringify({ message: 'Success' }) };
48+
};

0 commit comments

Comments
 (0)