Skip to content

Commit 87f7e17

Browse files
committed
refactor
1 parent 7d6b284 commit 87f7e17

2 files changed

Lines changed: 24 additions & 29 deletions

File tree

bottlecap/src/http.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use core::time::Duration;
99
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
1010
use std::sync::Arc;
1111
use std::{collections::HashMap, error::Error, fs::File, io::BufReader};
12-
use tracing::{debug, error};
12+
use tracing::{debug, error, warn};
1313

1414
#[must_use]
1515
pub fn get_client(config: &Arc<config::Config>) -> reqwest::Client {
@@ -117,6 +117,25 @@ pub async fn extract_request_body(
117117
Ok((parts, bytes))
118118
}
119119

120+
/// Like [`extract_request_body`], but never fails: if buffering the body
121+
/// errors (e.g. an oversized payload exceeding `DefaultBodyLimit`), the body
122+
/// is replaced with empty bytes so that processing can continue with headers
123+
/// only.
124+
pub async fn extract_request_body_or_empty(
125+
request: Request,
126+
) -> (http::request::Parts, Bytes) {
127+
let (parts, body) = request.into_parts();
128+
let bytes =
129+
match Bytes::from_request(Request::from_parts(parts.clone(), body), &()).await {
130+
Ok(b) => b,
131+
Err(e) => {
132+
warn!("Failed to buffer request body: {e}. Processing with empty payload.");
133+
Bytes::new()
134+
}
135+
};
136+
(parts, bytes)
137+
}
138+
120139
#[must_use]
121140
pub fn headers_to_map(headers: &HeaderMap) -> HashMap<String, String> {
122141
headers

bottlecap/src/lifecycle/listener.rs

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use axum::{
55
Router,
6-
extract::{DefaultBodyLimit, FromRequest, Request, State},
6+
extract::{DefaultBodyLimit, Request, State},
77
http::{HeaderMap, StatusCode},
88
response::{IntoResponse, Response},
99
routing::{get, post},
@@ -19,7 +19,7 @@ use tokio_util::sync::CancellationToken;
1919
use tracing::{debug, error, warn};
2020

2121
use crate::{
22-
http::{extract_request_body, headers_to_map},
22+
http::{extract_request_body_or_empty, headers_to_map},
2323
lifecycle::invocation::processor_service::InvocationProcessorHandle,
2424
traces::{
2525
context::SpanContext,
@@ -125,17 +125,7 @@ impl Listener {
125125
State((invocation_processor_handle, propagator, tasks)): State<ListenerState>,
126126
request: Request,
127127
) -> Response {
128-
let (parts, body) = match extract_request_body(request).await {
129-
Ok(r) => r,
130-
Err(e) => {
131-
error!("Failed to extract request body: {e}");
132-
return (
133-
StatusCode::BAD_REQUEST,
134-
"Could not read start invocation request body",
135-
)
136-
.into_response();
137-
}
138-
};
128+
let (parts, body) = extract_request_body_or_empty(request).await;
139129

140130
let headers = headers_to_map(&parts.headers);
141131
let payload_value = serde_json::from_slice::<Value>(&body).unwrap_or_else(|_| json!({}));
@@ -173,21 +163,7 @@ impl Listener {
173163
// processed before the body is read, causing orphaned traces. (SLES-2666)
174164
// On oversized payloads (>6MB) we gracefully degrade to an empty body
175165
// so that processing still runs. (SLES-2722)
176-
let (parts, body) = request.into_parts();
177-
let body = match Bytes::from_request(
178-
axum::extract::Request::from_parts(parts.clone(), body),
179-
&(),
180-
)
181-
.await
182-
{
183-
Ok(b) => b,
184-
Err(e) => {
185-
warn!(
186-
"Failed to buffer end-invocation request body: {e}. Processing with empty payload."
187-
);
188-
Bytes::new()
189-
}
190-
};
166+
let (parts, body) = extract_request_body_or_empty(request).await;
191167

192168
let mut join_set = tasks.lock().await;
193169
join_set.spawn(async move {

0 commit comments

Comments
 (0)