Skip to content

Commit e731d52

Browse files
authored
Merge branch 'main' into joey/handle_msk
2 parents 4683724 + 06947f1 commit e731d52

7 files changed

Lines changed: 169 additions & 166 deletions

File tree

bottlecap/src/otlp/agent.rs

Lines changed: 78 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use tracing::{debug, error};
1919
use crate::{
2020
config::Config,
2121
http::{extract_request_body, handler_not_found},
22-
otlp::processor::Processor as OtlpProcessor,
22+
otlp::processor::{OtlpEncoding, Processor as OtlpProcessor},
2323
tags::provider,
2424
traces::{
2525
stats_generator::StatsGenerator, trace_aggregator::SendDataBuilderInfo,
@@ -141,24 +141,33 @@ impl Agent {
141141
>,
142142
request: Request,
143143
) -> Response {
144+
let content_type = request
145+
.headers()
146+
.get(header::CONTENT_TYPE)
147+
.and_then(|v| v.to_str().ok())
148+
.map(String::from);
149+
let encoding = OtlpEncoding::from_content_type(content_type.as_deref());
150+
144151
let (parts, body) = match extract_request_body(request).await {
145152
Ok(r) => r,
146153
Err(e) => {
147154
error!("OTLP | Failed to extract request body: {e}");
148155
return Self::otlp_error_response(
149156
StatusCode::BAD_REQUEST,
150157
format!("Failed to extract request body: {e}"),
158+
encoding,
151159
);
152160
}
153161
};
154162

155-
let traces = match processor.process(&body) {
163+
let traces = match processor.process(&body, encoding) {
156164
Ok(traces) => traces,
157165
Err(e) => {
158166
error!("OTLP | Failed to process request: {:?}", e);
159167
return Self::otlp_error_response(
160168
StatusCode::INTERNAL_SERVER_ERROR,
161169
format!("Failed to process request: {e}"),
170+
encoding,
162171
);
163172
}
164173
};
@@ -170,6 +179,7 @@ impl Agent {
170179
return Self::otlp_error_response(
171180
StatusCode::INTERNAL_SERVER_ERROR,
172181
"Not sending traces, processor returned empty data".to_string(),
182+
encoding,
173183
);
174184
}
175185

@@ -193,6 +203,7 @@ impl Agent {
193203
return Self::otlp_error_response(
194204
StatusCode::INTERNAL_SERVER_ERROR,
195205
format!("Error sending traces to the trace aggregator: {err}"),
206+
encoding,
196207
);
197208
}
198209
}
@@ -208,49 +219,88 @@ impl Agent {
208219
error!("OTLP | Error sending traces to the stats concentrator: {err}");
209220
}
210221

211-
Self::otlp_success_response()
222+
Self::otlp_success_response(encoding)
212223
}
213224

214-
fn otlp_error_response(status_code: StatusCode, message: String) -> Response {
215-
let status = Status {
216-
code: i32::from(status_code.as_u16()),
217-
message: message.clone(),
218-
details: vec![],
225+
fn otlp_error_response(
226+
status_code: StatusCode,
227+
message: String,
228+
encoding: OtlpEncoding,
229+
) -> Response {
230+
let body = match encoding {
231+
OtlpEncoding::Json => {
232+
let json_error = serde_json::json!({
233+
"code": status_code.as_u16(),
234+
"message": message
235+
});
236+
match serde_json::to_vec(&json_error) {
237+
Ok(buf) => buf,
238+
Err(e) => {
239+
error!("OTLP | Failed to encode error response as JSON: {e}");
240+
return (status_code, [(header::CONTENT_TYPE, "text/plain")], message)
241+
.into_response();
242+
}
243+
}
244+
}
245+
OtlpEncoding::Protobuf => {
246+
let status = Status {
247+
code: i32::from(status_code.as_u16()),
248+
message: message.clone(),
249+
details: vec![],
250+
};
251+
let mut buf = Vec::new();
252+
if let Err(e) = status.encode(&mut buf) {
253+
error!("OTLP | Failed to encode error response as Protobuf: {e}");
254+
return (status_code, [(header::CONTENT_TYPE, "text/plain")], message)
255+
.into_response();
256+
}
257+
buf
258+
}
219259
};
220260

221-
let mut buf = Vec::new();
222-
if let Err(e) = status.encode(&mut buf) {
223-
error!("OTLP | Failed to encode error response: {e}");
224-
return (status_code, [(header::CONTENT_TYPE, "text/plain")], message).into_response();
225-
}
226-
227261
(
228262
status_code,
229-
[(header::CONTENT_TYPE, "application/x-protobuf")],
230-
buf,
263+
[(header::CONTENT_TYPE, encoding.content_type())],
264+
body,
231265
)
232266
.into_response()
233267
}
234268

235-
fn otlp_success_response() -> Response {
269+
fn otlp_success_response(encoding: OtlpEncoding) -> Response {
236270
let response = ExportTraceServiceResponse {
237271
partial_success: None,
238272
};
239273

240-
let mut buf = Vec::new();
241-
if let Err(e) = response.encode(&mut buf) {
242-
error!("OTLP | Failed to encode success response: {e}");
243-
return (
244-
StatusCode::INTERNAL_SERVER_ERROR,
245-
"Failed to encode response".to_string(),
246-
)
247-
.into_response();
248-
}
274+
let body = match encoding {
275+
OtlpEncoding::Json => match serde_json::to_vec(&response) {
276+
Ok(buf) => buf,
277+
Err(e) => {
278+
error!("OTLP | Failed to encode success response as JSON: {e}");
279+
return (
280+
StatusCode::INTERNAL_SERVER_ERROR,
281+
"Failed to encode response".to_string(),
282+
)
283+
.into_response();
284+
}
285+
},
286+
OtlpEncoding::Protobuf => {
287+
let mut buf = Vec::new();
288+
if let Err(e) = response.encode(&mut buf) {
289+
error!("OTLP | Failed to encode success response as Protobuf: {e}");
290+
return (
291+
StatusCode::INTERNAL_SERVER_ERROR,
292+
"Failed to encode response".to_string(),
293+
)
294+
.into_response();
295+
}
296+
buf
297+
}
298+
};
249299

250300
(
251301
StatusCode::OK,
252-
[(header::CONTENT_TYPE, "application/x-protobuf")],
253-
buf,
302+
[(header::CONTENT_TYPE, encoding.content_type())],
303+
body,
254304
)
255305
.into_response()
256306
}

bottlecap/src/otlp/processor.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,30 @@ use std::{error::Error, sync::Arc};
55

66
use crate::{config::Config, otlp::transform::otel_resource_spans_to_dd_spans};
77

8+
#[derive(Debug, Clone, Copy, PartialEq)]
9+
pub enum OtlpEncoding {
10+
Protobuf,
11+
Json,
12+
}
13+
14+
impl OtlpEncoding {
15+
#[must_use]
16+
pub fn from_content_type(content_type: Option<&str>) -> Self {
17+
match content_type {
18+
Some(ct) if ct.starts_with("application/json") => OtlpEncoding::Json,
19+
_ => OtlpEncoding::Protobuf,
20+
}
21+
}
22+
23+
#[must_use]
24+
pub fn content_type(&self) -> &'static str {
25+
match self {
26+
OtlpEncoding::Json => "application/json",
27+
OtlpEncoding::Protobuf => "application/x-protobuf",
28+
}
29+
}
30+
}
31+
832
#[derive(Clone)]
933
pub struct Processor {
1034
config: Arc<Config>,
@@ -16,9 +40,15 @@ impl Processor {
1640
Self { config }
1741
}
1842

19-
pub fn process(&self, body: &[u8]) -> Result<Vec<Vec<DatadogSpan>>, Box<dyn Error>> {
20-
// Decode the OTLP HTTP request
21-
let request = ExportTraceServiceRequest::decode(body)?;
43+
pub fn process(
44+
&self,
45+
body: &[u8],
46+
encoding: OtlpEncoding,
47+
) -> Result<Vec<Vec<DatadogSpan>>, Box<dyn Error>> {
48+
let request = match encoding {
49+
OtlpEncoding::Json => serde_json::from_slice::<ExportTraceServiceRequest>(body)?,
50+
OtlpEncoding::Protobuf => ExportTraceServiceRequest::decode(body)?,
51+
};
2252

2353
let mut spans: Vec<Vec<DatadogSpan>> = Vec::new();
2454
for resource_spans in &request.resource_spans {

integration-tests/lambda/otlp-node/package-lock.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration-tests/lambda/otlp-node/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"main": "index.js",
66
"dependencies": {
77
"@opentelemetry/api": "^1.9.0",
8+
"@opentelemetry/exporter-trace-otlp-http": "^0.54.2",
89
"@opentelemetry/exporter-trace-otlp-proto": "^0.54.2",
910
"@opentelemetry/otlp-transformer": "^0.54.2",
1011
"@opentelemetry/resources": "^1.28.0",
Lines changed: 36 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,122 +1,48 @@
1+
const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
2+
const { SimpleSpanProcessor } = require('@opentelemetry/sdk-trace-base');
3+
const { OTLPTraceExporter: OTLPTraceExporterHttp } = require('@opentelemetry/exporter-trace-otlp-http');
4+
const { OTLPTraceExporter: OTLPTraceExporterProto } = require('@opentelemetry/exporter-trace-otlp-proto');
15
const { Resource } = require('@opentelemetry/resources');
2-
const { BasicTracerProvider, InMemorySpanExporter } = require('@opentelemetry/sdk-trace-base');
3-
const { createExportTraceServiceRequest } = require('@opentelemetry/otlp-transformer');
4-
const { trace, SpanKind, SpanStatusCode } = require('@opentelemetry/api');
5-
6-
const root = require('@opentelemetry/otlp-transformer/build/esm/generated/root');
7-
const ExportTraceServiceRequest = root.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
8-
const ExportTraceServiceResponse = root.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
6+
const { ATTR_SERVICE_NAME } = require('@opentelemetry/semantic-conventions');
97

108
exports.handler = async (event, context) => {
11-
console.log('Starting OTLP response validation...');
12-
const result = await validateOtlpResponse();
13-
return {
14-
statusCode: result.success ? 200 : 500,
15-
body: JSON.stringify(result, null, 2),
16-
};
17-
};
18-
19-
async function validateOtlpResponse() {
20-
try {
21-
const requestBuffer = createOtlpTraceRequest();
22-
const response = await fetch('http://localhost:4318/v1/traces', {
23-
method: 'POST',
24-
headers: {
25-
'Content-Type': 'application/x-protobuf',
26-
},
27-
body: requestBuffer,
28-
});
29-
30-
if (response.status !== 200) {
31-
return {
32-
success: false,
33-
error: `Expected status 200, got ${response.status}`,
34-
statusCode: response.status,
35-
};
36-
}
37-
38-
const responseBuffer = Buffer.from(await response.arrayBuffer());
39-
40-
const contentType = response.headers.get('content-type');
41-
if (contentType !== 'application/x-protobuf') {
42-
return {
43-
success: false,
44-
error: `Expected Content-Type 'application/x-protobuf', got '${contentType}'`,
45-
statusCode: response.status,
46-
contentType,
47-
};
48-
}
49-
50-
let decodedResponse;
51-
try {
52-
decodedResponse = ExportTraceServiceResponse.decode(responseBuffer);
53-
} catch (decodeError) {
54-
return {
55-
success: false,
56-
error: `Failed to decode response as ExportTraceServiceResponse: ${decodeError.message}`,
57-
statusCode: response.status,
58-
contentType,
59-
decodeError: decodeError.message,
60-
};
61-
}
9+
const serviceName = context.functionName;
10+
const requestId = context.awsRequestId;
11+
const endpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT || 'http://localhost:4318';
6212

63-
// Verify that partial_success is not set (should be None for success)
64-
if (decodedResponse.partialSuccess) {
65-
return {
66-
success: false,
67-
error: 'Unexpected partial_success in response - should only be success or failure',
68-
statusCode: response.status,
69-
contentType,
70-
partialSuccess: decodedResponse.partialSuccess,
71-
};
72-
}
13+
const jsonProvider = new NodeTracerProvider({
14+
resource: new Resource({ [ATTR_SERVICE_NAME]: serviceName }),
15+
});
16+
const jsonExporter = new OTLPTraceExporterHttp({
17+
url: endpoint + '/v1/traces',
18+
headers: { 'Content-Type': 'application/json' },
19+
});
20+
jsonProvider.addSpanProcessor(new SimpleSpanProcessor(jsonExporter));
21+
jsonProvider.register();
7322

74-
return {
75-
success: true,
76-
statusCode: response.status,
77-
contentType,
78-
response: decodedResponse,
79-
message: 'OTLP response is properly formatted and decodable',
80-
};
23+
const jsonSpan = jsonProvider.getTracer('json-test').startSpan('test-span-json');
24+
jsonSpan.setAttribute('request_id', requestId);
25+
jsonSpan.setAttribute('encoding', 'json');
26+
jsonSpan.end();
8127

82-
} catch (error) {
83-
console.error('Validation error:', error);
84-
return {
85-
success: false,
86-
error: `Unexpected error: ${error.message}`,
87-
stack: error.stack,
88-
};
89-
}
90-
}
28+
await jsonProvider.forceFlush();
29+
await jsonProvider.shutdown();
9130

92-
function createOtlpTraceRequest() {
93-
const resource = new Resource({
94-
'service.name': process.env.DD_SERVICE,
31+
const protoProvider = new NodeTracerProvider({
32+
resource: new Resource({ [ATTR_SERVICE_NAME]: serviceName }),
9533
});
96-
97-
const provider = new BasicTracerProvider({ resource });
98-
const exporter = new InMemorySpanExporter();
99-
provider.register();
100-
101-
const tracer = provider.getTracer('validation-tracer', '1.0.0');
102-
const span = tracer.startSpan('test-span', {
103-
kind: SpanKind.INTERNAL,
104-
attributes: {
105-
'test': 'validation',
106-
},
34+
const protoExporter = new OTLPTraceExporterProto({
35+
url: endpoint + '/v1/traces',
10736
});
108-
span.setStatus({ code: SpanStatusCode.OK });
109-
span.end();
37+
protoProvider.addSpanProcessor(new SimpleSpanProcessor(protoExporter));
11038

111-
provider.forceFlush();
39+
const protoSpan = protoProvider.getTracer('proto-test').startSpan('test-span-protobuf');
40+
protoSpan.setAttribute('request_id', requestId);
41+
protoSpan.setAttribute('encoding', 'protobuf');
42+
protoSpan.end();
11243

113-
const finishedSpans = exporter.getFinishedSpans();
114-
const otlpRequest = createExportTraceServiceRequest(finishedSpans, {
115-
useHex: true,
116-
useLongBits: false,
117-
});
44+
await protoProvider.forceFlush();
45+
await protoProvider.shutdown();
11846

119-
const message = ExportTraceServiceRequest.create(otlpRequest);
120-
const buffer = ExportTraceServiceRequest.encode(message).finish();
121-
return Buffer.from(buffer);
122-
}
47+
return { statusCode: 200, body: JSON.stringify({ message: 'Success' }) };
48+
};

0 commit comments

Comments
 (0)