Skip to content

Commit 37b0ced

Browse files
committed
hook the span extaction
1 parent 0781bf5 commit 37b0ced

File tree

2 files changed

+50
-16
lines changed

2 files changed

+50
-16
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ async fn extension_loop_active(
348348
);
349349

350350
// let _ = start_lwa_proxy(Arc::clone(&invocation_processor), Arc::clone(&trace_processor));
351-
let _ = start_lwa_proxy(Arc::clone(&invocation_processor));
351+
let _ = start_lwa_proxy(Arc::clone(&config), Arc::clone(&invocation_processor));
352352

353353
let lifecycle_listener = LifecycleListener {
354354
invocation_processor: Arc::clone(&invocation_processor),

bottlecap/src/lwa/proxy.rs

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@ use std::{net::SocketAddr, sync::Arc};
99
use std::collections::HashMap;
1010
use hyper::header::HeaderValue;
1111
use hyper::http::HeaderName;
12+
use nix::libc::sock_fprog;
1213
use tokio::{sync::Mutex, task::JoinHandle};
1314
use tracing::{debug, error};
15+
use crate::config::Config;
16+
use crate::lifecycle::invocation::span_inferrer::SpanInferrer;
17+
use crate::traces::context::SpanContext;
18+
use crate::traces::propagation::{DatadogCompositePropagator, Propagator};
1419

1520
#[must_use]
1621
pub fn start_lwa_proxy(
22+
config: Arc<Config>,
1723
invocation_processor: Arc<Mutex<Processor>>,
1824
// trace_processor: Arc<Mutex<ServerlessTraceProcessor>>,
1925
) -> Option<JoinHandle<()>> {
@@ -39,7 +45,7 @@ pub fn start_lwa_proxy(
3945
intercept_payload(
4046
req,
4147
Arc::clone(&client),
42-
// Arc::clone(&trace_processor),
48+
Arc::clone(&config),
4349
Arc::clone(&processor),
4450
uri.clone(),
4551
)
@@ -150,6 +156,7 @@ fn parse_env_addresses() -> Option<(SocketAddr, Uri)> {
150156
// LWA: Intercepted request body: b""
151157

152158
async fn intercept_payload(
159+
config: Arc<Config>,
153160
intercepted: Request<Body>,
154161
client: Arc<Client<ProxyConnector<HttpConnector>>>,
155162
// span_generator: Arc<Mutex<ServerlessTraceProcessor>>,
@@ -201,14 +208,13 @@ async fn intercept_payload(
201208
.unwrap_or_else(|_| vec![]))
202209
.unwrap_or_else(|| vec![]);
203210

204-
let header_map = inner_header(inner_payload);
211+
let header_map = inner_header(&inner_payload);
205212

206-
let _ = Listener::start_invocation_handler(
207-
header_map,
208-
body_bytes.into(),
209-
Arc::clone(&processor),
210-
)
211-
.await;
213+
if let Some(span_context) = extract_span_context(Arc::clone(&config), &header_map, &inner_payload.clone()){
214+
// re parent with aws lambda span
215+
} else {
216+
// inject aws lambda span
217+
}
212218

213219
// Response is not cloneable, so it must be built again
214220
let mut rebuild_response = Response::builder()
@@ -248,19 +254,47 @@ async fn intercept_payload(
248254
}
249255
}
250256

251-
fn inner_header(inner_payload: Value) -> HeaderMap {
257+
fn extract_span_context(
258+
config: Arc<Config>,
259+
headers: &HashMap<String, String>,
260+
payload_value: &Value,
261+
) -> Option<SpanContext> {
262+
let propagator = DatadogCompositePropagator::new(Arc::clone(&config));
263+
let inferrer = SpanInferrer::new(config.service_mapping.clone());
264+
265+
if let Some(sc) = inferrer.get_span_context(&propagator) {
266+
return Some(sc);
267+
}
268+
269+
270+
if let Some(payload_headers) = payload_value.get("headers") {
271+
if let Some(sc) = propagator.extract(payload_headers) {
272+
debug!("Extracted trace context from event headers");
273+
return Some(sc);
274+
}
275+
}
276+
277+
if let Some(sc) = propagator.extract(headers) {
278+
debug!("Extracted trace context from headers");
279+
return Some(sc);
280+
}
281+
282+
None
283+
}
284+
285+
fn inner_header(inner_payload: &Value) -> HashMap<String, String> {
252286
let headers = if let Some(body) = inner_payload.get("headers") {
253287
serde_json::from_value::<HashMap<String, String>>(body.clone()).unwrap_or_else(|_| HashMap::new())
254288
} else {
255289
HashMap::new()
256290
};
257291

258-
let mut header_map = HeaderMap::new();
259-
for (k, v) in headers.into_iter() {
260-
let header_name = HeaderName::from_bytes(k.as_bytes()).unwrap();
261-
header_map.insert(header_name, HeaderValue::from_str(&v).unwrap());
262-
}
263-
header_map
292+
// let mut header_map = HeaderMap::new();
293+
// for (k, v) in headers.into_iter() {
294+
// let header_name = HeaderName::from_bytes(k.as_bytes()).unwrap();
295+
// header_map.insert(header_name, HeaderValue::from_str(&v).unwrap());
296+
// }
297+
headers
264298
}
265299

266300
async fn forward_request(

0 commit comments

Comments
 (0)