Skip to content

Commit d33d164

Browse files
committed
add end invocation call
1 parent 98e44dd commit d33d164

File tree

3 files changed

+32
-25
lines changed

3 files changed

+32
-25
lines changed

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,7 @@ impl Processor {
458458

459459
// Set the extracted trace context to the spans
460460
if let Some(sc) = &self.extracted_span_context {
461+
debug!("LIP: found span context in headers {:?}", sc);
461462
self.span.trace_id = sc.trace_id;
462463
self.span.parent_id = sc.span_id;
463464

@@ -476,6 +477,9 @@ impl Processor {
476477
if let Some(inferred_span) = &self.inferrer.inferred_span {
477478
self.span.parent_id = inferred_span.span_id;
478479
}
480+
481+
debug!("LIP: trace context found trace_id:{trace_id} span_id:{span_id} parent_id:{parent_id}",
482+
trace_id = self.span.trace_id, span_id = self.span.span_id, parent_id = self.span.parent_id);
479483
(self.span.span_id, self.span.trace_id, self.span.parent_id)
480484
}
481485

bottlecap/src/lifecycle/listener.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,13 @@ use std::net::SocketAddr;
77
use std::sync::Arc;
88

99
use hyper::body::HttpBody;
10-
use hyper::http::request::Parts;
1110
use hyper::service::{make_service_fn, service_fn};
1211
use hyper::{http, Body, HeaderMap, Method, Request, Response, StatusCode};
1312
use serde_json::json;
1413
use tokio::sync::Mutex;
1514
use tracing::{debug, error, warn};
1615

17-
use crate::lifecycle::invocation::processor::{Processor as InvocationProcessor, Processor};
16+
use crate::lifecycle::invocation::processor::{Processor as InvocationProcessor};
1817
use crate::traces::propagation::text_map_propagator::{
1918
DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY, DATADOG_SAMPLING_PRIORITY_KEY, DATADOG_TAGS_KEY,
2019
DATADOG_TRACE_ID_KEY,
@@ -66,7 +65,8 @@ impl Listener {
6665
Self::start_invocation_handler(parts.headers, body, invocation_processor).await
6766
}
6867
(&Method::POST, END_INVOCATION_PATH) => {
69-
match Self::end_invocation_handler(req, invocation_processor).await {
68+
let (parts, body) = req.into_parts();
69+
match Self::end_invocation_handler(parts.headers, body, invocation_processor).await {
7070
Ok(response) => Ok(response),
7171
Err(e) => {
7272
error!("Failed to end invocation {e}");
@@ -139,17 +139,17 @@ impl Listener {
139139
}
140140

141141
pub async fn end_invocation_handler(
142-
req: Request<Body>,
142+
headers: HeaderMap,
143+
body: Body,
143144
invocation_processor: Arc<Mutex<InvocationProcessor>>,
144145
) -> http::Result<Response<Body>> {
145146
debug!("Received end invocation request");
146-
let (parts, body) = req.into_parts();
147147
match body.collect().await {
148148
Ok(b) => {
149149
let body = b.to_bytes().to_vec();
150150
let mut processor = invocation_processor.lock().await;
151151

152-
let headers = Self::headers_to_map(parts.headers);
152+
let headers = Self::headers_to_map(headers);
153153
processor.on_invocation_end(headers, body);
154154
drop(processor);
155155

bottlecap/src/lwa/proxy.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::{
22
lifecycle::invocation::processor::Processor,
33
lifecycle::listener::Listener,
4-
traces::trace_processor::{ServerlessTraceProcessor, TraceProcessor},
54
};
65
use hyper::http::request::Parts;
76
use hyper::{
@@ -11,9 +10,8 @@ use hyper::{
1110
Body, Client, Error, Request, Response, Server, Uri,
1211
};
1312
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
14-
use rand::random;
1513
use serde_json::Value;
16-
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
14+
use std::{net::SocketAddr, sync::Arc};
1715
use tokio::{sync::Mutex, task::JoinHandle};
1816
use tracing::{debug, error};
1917

@@ -87,7 +85,7 @@ fn parse_env_addresses() -> Option<(SocketAddr, Uri)> {
8785
if let (Some(host), Some(port)) = (host, port) {
8886
if host == "localhost" {
8987
error!("LWA: Cannot use localhost as host in AWS_LWA_PROXY_LAMBDA_RUNTIME_API, use 127.0.0.1 instead");
90-
None
88+
return None;
9189
}
9290
format!("{host}:{port}")
9391
.parse::<SocketAddr>()
@@ -113,7 +111,6 @@ fn parse_env_addresses() -> Option<(SocketAddr, Uri)> {
113111
}
114112
},
115113
Err(e) => {
116-
error!("LWA: Error parsing AWS_LWA_PROXY_LAMBDA_RUNTIME_API: {}", e);
117114
None
118115
}
119116
};
@@ -148,8 +145,9 @@ async fn intercept_payload(
148145
let (intercepted_parts, intercepted_body) = intercepted.into_parts();
149146
debug!("LWA: Intercepted request: {:?}", intercepted_parts);
150147

148+
let waited_intercepted_body = intercepted_body.collect().await?.to_bytes();
151149
let forward_intercepted =
152-
forward_request(aws_runtime_addr, &intercepted_parts, intercepted_body).await?;
150+
forward_request(aws_runtime_addr, &intercepted_parts, waited_intercepted_body.clone().into()).await?;
153151

154152
// response after forwarding to AWS runtime API
155153
let response_to_intercepted_req = client.request(forward_intercepted).await?;
@@ -190,7 +188,6 @@ async fn intercept_payload(
190188
{
191189
// intercepted response to runtime/invocation. The *request* contains the returned
192190
// values and headers from lambda handler
193-
// let parsed_body = serde_json::from_slice::<Value>(&request_body_waited);
194191
// let _ = Listener::start_invocation_handler(
195192
// req,
196193
// processor,
@@ -201,6 +198,12 @@ async fn intercept_payload(
201198
// parsed_body,
202199
// )
203200
// .await;
201+
let _ = Listener::end_invocation_handler(
202+
intercepted_parts.headers,
203+
waited_intercepted_body.into(),
204+
Arc::clone(&processor),
205+
)
206+
.await;
204207
// only parsing of the original request (handler -> runtime API) is needed so
205208
// the original response can be used
206209
Ok(response_to_intercepted_req)
@@ -269,18 +272,18 @@ async fn forward_request(
269272
// }
270273
// }
271274

272-
fn deserialize_json(response: Result<Bytes, Error>) -> Option<Value> {
273-
match response {
274-
Ok(bytes) => serde_json::from_slice(bytes.as_ref()).unwrap_or_else(|e| {
275-
error!("Error deserializing response body: {}", e);
276-
None
277-
}),
278-
Err(e) => {
279-
error!("Error reading response body: {}", e);
280-
None
281-
}
282-
}
283-
}
275+
// fn deserialize_json(response: Result<Bytes, Error>) -> Option<Value> {
276+
// match response {
277+
// Ok(bytes) => serde_json::from_slice(bytes.as_ref()).unwrap_or_else(|e| {
278+
// error!("Error deserializing response body: {}", e);
279+
// None
280+
// }),
281+
// Err(e) => {
282+
// error!("Error reading response body: {}", e);
283+
// None
284+
// }
285+
// }
286+
// }
284287

285288
#[cfg(test)]
286289
mod tests {

0 commit comments

Comments
 (0)