Skip to content

Commit 0781bf5

Browse files
committed
extract headers and body
1 parent 0855c5b commit 0781bf5

File tree

4 files changed

+75
-37
lines changed

4 files changed

+75
-37
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,8 +502,8 @@ async fn extension_loop_active(
502502
async fn flush_all(
503503
logs_flusher: &LogsFlusher,
504504
metrics_flusher: &mut MetricsFlusher,
505-
trace_flusher: &dyn TraceFlusher,
506-
stats_flusher: &dyn StatsFlusher,
505+
trace_flusher: &impl TraceFlusher,
506+
stats_flusher: &impl StatsFlusher,
507507
race_flush_interval: &mut tokio::time::Interval,
508508
) {
509509
tokio::join!(

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,9 @@ impl Processor {
442442

443443
let payload_value = serde_json::from_slice::<Value>(&payload).unwrap_or_else(|_| json!({}));
444444

445+
debug!("AG: at start invocation headers: {:?}", headers);
446+
debug!("AG: at start invocation payload: {:?}", payload_value);
447+
445448
// Tag the invocation span with the request payload
446449
if self.config.capture_lambda_payload {
447450
tag_span_from_value(
@@ -523,6 +526,9 @@ impl Processor {
523526
);
524527
}
525528

529+
debug!("AG: at end invocation headers: {:?}", headers);
530+
debug!("AG: at end invocation payload: {:?}", payload_value);
531+
526532
if let Some(status_code) = payload_value.get("statusCode").and_then(Value::as_i64) {
527533
let status_code_as_string = status_code.to_string();
528534
self.span.meta.insert(

bottlecap/src/lifecycle/invocation/span_inferrer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl SpanInferrer {
7272
self.generated_span_context = None;
7373
self.trigger_tags = None;
7474

75-
let mut trigger: Option<Box<dyn Trigger>> = None;
75+
let mut trigger: Option<impl Trigger> = None;
7676
let mut inferred_span = Span {
7777
span_id: generate_span_id(),
7878
..Default::default()

bottlecap/src/lwa/proxy.rs

Lines changed: 66 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,13 @@ use crate::{
22
lifecycle::invocation::processor::Processor,
33
lifecycle::listener::Listener,
44
};
5-
use hyper::http::request::Parts;
6-
use hyper::{
7-
body::{Bytes, HttpBody},
8-
client::HttpConnector,
9-
service::{make_service_fn, service_fn},
10-
Body, Client, Error, Request, Response, Server, Uri,
11-
};
5+
use hyper::{http::request::Parts, body::HttpBody, client::HttpConnector, service::{make_service_fn, service_fn}, Body, Client, Error, Request, Response, Server, Uri, HeaderMap};
126
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
13-
use serde_json::Value;
7+
use serde_json::{json, Value};
148
use std::{net::SocketAddr, sync::Arc};
9+
use std::collections::HashMap;
10+
use hyper::header::HeaderValue;
11+
use hyper::http::HeaderName;
1512
use tokio::{sync::Mutex, task::JoinHandle};
1613
use tracing::{debug, error};
1714

@@ -110,7 +107,7 @@ fn parse_env_addresses() -> Option<(SocketAddr, Uri)> {
110107
None
111108
}
112109
},
113-
Err(e) => {
110+
Err(_) => {
114111
None
115112
}
116113
};
@@ -162,33 +159,56 @@ async fn intercept_payload(
162159
// request received from lambda handler directed to AWS runtime API
163160
// it can be either invocation/next, or a lambda handler response to it
164161
let (intercepted_parts, intercepted_body) = intercepted.into_parts();
165-
debug!("LWA: Intercepted request: {:?}", intercepted_parts);
166162

167163
let waited_intercepted_body = intercepted_body.collect().await?.to_bytes();
164+
165+
debug!("LWA: Intercepted request: {:?}", intercepted_parts);
166+
debug!("LWA: Intercepted request body: {:?}", waited_intercepted_body);
167+
168168
let forward_intercepted =
169169
forward_request(aws_runtime_addr, &intercepted_parts, waited_intercepted_body.clone().into()).await?;
170170

171171
// response after forwarding to AWS runtime API
172172
let response_to_intercepted_req = client.request(forward_intercepted).await?;
173173

174+
175+
let (resp_part, resp_body) = response_to_intercepted_req.into_parts();
176+
let resp_payload = resp_body.collect().await?.to_bytes();
177+
178+
debug!("LWA: Intercepted resp: {:?}", &resp_part);
179+
debug!("LWA: Intercepted resp body: {:?}", resp_payload);
180+
181+
let mut response_to_intercepted_req = Response::builder()
182+
.status(resp_part.status)
183+
.version(resp_part.version)
184+
.body(Body::from(resp_payload.clone()))
185+
.unwrap();
186+
*response_to_intercepted_req.headers_mut() = resp_part.headers.clone();
187+
188+
174189
match (intercepted_parts.method, intercepted_parts.uri.path()) {
175190
(hyper::Method::GET, "/2018-06-01/runtime/invocation/next") => {
176191
// intercepted invocation/next. The *response body* contains the payload of
177192
// the request that the lambda handler will see
178-
let (resp_part, resp_body) = response_to_intercepted_req.into_parts();
179-
let resp_payload = resp_body.collect().await?.to_bytes();
193+
// let (resp_part, resp_body) = response_to_intercepted_req.into_parts();
194+
// let resp_payload = resp_body.collect().await?.to_bytes();
195+
196+
let inner_payload = serde_json::from_slice::<Value>(&resp_payload).unwrap_or_else(|_| json!({}));
197+
debug!("LWA: payload wrapped in body {}", inner_payload);
198+
199+
let body_bytes = inner_payload.get("body")
200+
.map(|body| serde_json::to_vec(body)
201+
.unwrap_or_else(|_| vec![]))
202+
.unwrap_or_else(|| vec![]);
203+
204+
let header_map = inner_header(inner_payload);
205+
180206
let _ = Listener::start_invocation_handler(
181-
resp_part.headers.clone(),
182-
resp_payload.clone().into(),
207+
header_map,
208+
body_bytes.into(),
183209
Arc::clone(&processor),
184210
)
185211
.await;
186-
// invoke_universal_instrumentation_start(
187-
// Arc::clone(&span_generator),
188-
// Arc::clone(&processor),
189-
// resp_payload.clone(),
190-
// )
191-
// .await;
192212

193213
// Response is not cloneable, so it must be built again
194214
let mut rebuild_response = Response::builder()
@@ -205,21 +225,18 @@ async fn intercept_payload(
205225
if path.starts_with("/2018-06-01/runtime/invocation/")
206226
&& path.ends_with("/response") =>
207227
{
208-
// intercepted response to runtime/invocation. The *request* contains the returned
209-
// values and headers from lambda handler
210-
// let _ = Listener::start_invocation_handler(
211-
// req,
212-
// processor,
213-
// ).await;
214-
// lifecycle_listener.lock().trace_invocation_end(
215-
// lifecycle_listener.clone(),
216-
// req_parts.headers.clone(),
217-
// parsed_body,
218-
// )
219-
// .await;
228+
let inner_payload = serde_json::from_slice::<Value>(&waited_intercepted_body).unwrap_or_else(|_| json!({}));
229+
230+
let body_bytes = inner_payload.get("body")
231+
.map(|body| serde_json::to_vec(body)
232+
.unwrap_or_else(|_| vec![]))
233+
.unwrap_or_else(|| vec![]);
234+
235+
let header_map = inner_header(inner_payload);
236+
220237
let _ = Listener::end_invocation_handler(
221-
intercepted_parts.headers,
222-
waited_intercepted_body.into(),
238+
header_map,
239+
body_bytes.into(),
223240
Arc::clone(&processor),
224241
)
225242
.await;
@@ -231,6 +248,21 @@ async fn intercept_payload(
231248
}
232249
}
233250

251+
fn inner_header(inner_payload: Value) -> HeaderMap {
252+
let headers = if let Some(body) = inner_payload.get("headers") {
253+
serde_json::from_value::<HashMap<String, String>>(body.clone()).unwrap_or_else(|_| HashMap::new())
254+
} else {
255+
HashMap::new()
256+
};
257+
258+
let mut header_map = HeaderMap::new();
259+
for (k, v) in headers.into_iter() {
260+
let header_name = HeaderName::from_bytes(k.as_bytes()).unwrap();
261+
header_map.insert(header_name, HeaderValue::from_str(&v).unwrap());
262+
}
263+
header_map
264+
}
265+
234266
async fn forward_request(
235267
aws_runtime_addr: Uri,
236268
req_parts: &Parts,

0 commit comments

Comments
 (0)