Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
841785e
clean code and remove debug stuff
alexgallotta Oct 15, 2024
25081f0
add logs and preserve path and query in uri proxying
alexgallotta Oct 16, 2024
f098b5e
inject headers into body
alexgallotta Oct 17, 2024
3b5f881
feat: enable proxy lost in rebase
alexgallotta Oct 21, 2024
202db99
print the response too
alexgallotta Oct 23, 2024
ecc1dd4
feat: add parent and trace id from proxy to all spans
alexgallotta Oct 25, 2024
acfb86c
fix: ensure the proper path is used for reading test files
alexgallotta Oct 29, 2024
0bf7127
add comments and split logic
alexgallotta Oct 29, 2024
1760270
use ids from processor
alexgallotta Oct 30, 2024
7f5febe
reuse the start invocation hook to parse ids
alexgallotta Oct 30, 2024
0b0f827
format
alexgallotta Oct 30, 2024
87c1fcf
reuse end invocation handler
alexgallotta Oct 31, 2024
8de3a69
remove useless wrapping struct
alexgallotta Oct 31, 2024
b7bb6b3
update code after merge and use start/end lifecycle endpoints with whole
alexgallotta Mar 13, 2025
6c673c3
add end invocation call
alexgallotta Mar 13, 2025
48d07af
add comment of flow
alexgallotta Mar 14, 2025
36a98a1
extract headers and body
alexgallotta Mar 18, 2025
7baf6f9
hook the span extaction
alexgallotta Mar 18, 2025
b8e51de
inject dd trace
alexgallotta Apr 1, 2025
2f38582
put back some renaming
alexgallotta Mar 27, 2025
f30ae25
reparent instead of injecting traces
alexgallotta Apr 1, 2025
8f8d59e
missed tracer changes
alexgallotta Apr 1, 2025
6f651be
more strict take of lock
alexgallotta Apr 2, 2025
1071248
reparent if an inferred spans or existing trace is found
alexgallotta Apr 2, 2025
6caf920
Remove prints
alexgallotta Apr 3, 2025
7174a14
clean up
alexgallotta Apr 3, 2025
600e00d
make handling of interception async
alexgallotta Apr 3, 2025
f8307c6
shutdown proxy at the end
alexgallotta Apr 3, 2025
237b485
rename for clarity and make sure reparenting is set
alexgallotta Apr 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "4
figment = { version = "0.10", default-features = false, features = ["yaml", "env"] }
hyper = { version = "0.14", default-features = false, features = ["server"] }
lazy_static = { version = "1.5", default-features = false }
hyper-proxy = { version = "0.9.1", default-features = false }
log = { version = "0.4", default-features = false }
nix = { version = "0.26", default-features = false, features = ["feature", "fs"] }
protobuf = { version = "3.5", default-features = false }
regex = { version = "1.10", default-features = false }
reqwest = { version = "0.12.11", features = ["json", "http2", "rustls-tls"], default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
thiserror = { version = "1.0", default-features = false}
thiserror = { version = "1.0", default-features = false }
tokio = { version = "1.37", default-features = false, features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false }
tracing = { version = "0.1", default-features = false }
Expand All @@ -41,7 +42,7 @@ rustls = { version = "0.23.18", default-features = false, features = ["aws-lc-rs
rand = { version = "0.8", default-features = false }
prost = { version = "0.11.6", default-features = false }
zstd = { version = "0.13.3", default-features = false }
futures = { version = "0.3.31", default-features = false}
futures = { version = "0.3.31", default-features = false }

[dev-dependencies]
figment = { version = "0.10", default-features = false, features = ["yaml", "env", "test"] }
Expand Down
11 changes: 9 additions & 2 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#![deny(missing_copy_implementations)]
#![deny(missing_debug_implementations)]

use bottlecap::lwa::proxy::start_lwa_proxy;
use bottlecap::{
base_url,
config::{self, get_aws_partition_by_region, AwsConfig, Config},
Expand Down Expand Up @@ -343,6 +344,8 @@ async fn extension_loop_active(
Arc::clone(&invocation_processor),
);

let lwa_proxy_handle = start_lwa_proxy(Arc::clone(&invocation_processor));

let lifecycle_listener = LifecycleListener {
invocation_processor: Arc::clone(&invocation_processor),
};
Expand Down Expand Up @@ -477,6 +480,10 @@ async fn extension_loop_active(
}
}
}
if let Some(lwa_proxy_task) = lwa_proxy_handle {
// use with graceful shutdown after rebase with hyper 1
lwa_proxy_task.abort();
}
dogstatsd_cancel_token.cancel();
telemetry_listener_cancel_token.cancel();
flush_all(
Expand All @@ -495,8 +502,8 @@ async fn extension_loop_active(
async fn flush_all(
logs_flusher: &LogsFlusher,
metrics_flusher: &mut MetricsFlusher,
trace_flusher: &dyn TraceFlusher,
stats_flusher: &dyn StatsFlusher,
trace_flusher: &impl TraceFlusher,
stats_flusher: &impl StatsFlusher,
race_flush_interval: &mut tokio::time::Interval,
) {
tokio::join!(
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod http_client;
pub mod lifecycle;
pub mod logger;
pub mod logs;
pub mod lwa;
pub mod metrics;
pub mod proc;
pub mod secrets;
Expand Down
3 changes: 2 additions & 1 deletion bottlecap/src/lifecycle/invocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ fn create_empty_span(name: String, resource: String, service: String) -> Span {
}
}

fn generate_span_id() -> u64 {
#[must_use]
pub fn generate_span_id() -> u64 {
if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) {
return OsRng.next_u64();
}
Expand Down
35 changes: 28 additions & 7 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct Processor {
cold_start_span: Option<Span>,
// Extracted span context from inferred span, headers, or payload
pub extracted_span_context: Option<SpanContext>,
pub reparenting_id: Option<u64>,
pub aws_lambda_span_needs_trace_id: bool,
// Used to extract the trace context from inferred span, headers, or payload
propagator: DatadogCompositePropagator,
// Helper to send enhanced metrics
Expand Down Expand Up @@ -95,6 +97,8 @@ impl Processor {
extracted_span_context: None,
propagator,
enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)),
reparenting_id: None,
aws_lambda_span_needs_trace_id: false,
aws_config: aws_config.clone(),
tracer_detected: false,
runtime: None,
Expand Down Expand Up @@ -160,6 +164,9 @@ impl Processor {
self.extracted_span_context = None;
// Cold Start Span
self.cold_start_span = None;
// LWA reset
self.aws_lambda_span_needs_trace_id = false;
self.reparenting_id = None;
}

/// On the first invocation, determine if it's a cold start or proactive init.
Expand Down Expand Up @@ -341,7 +348,7 @@ impl Processor {
}

if self.tracer_detected {
let mut body_size = std::mem::size_of_val(&self.span);
let mut body_size = size_of_val(&self.span);
let mut traces = vec![self.span.clone()];

if let Some(inferred_span) = &self.inferrer.inferred_span {
Expand All @@ -350,12 +357,12 @@ impl Processor {
}

if let Some(ws) = &self.inferrer.wrapped_inferred_span {
body_size += std::mem::size_of_val(ws);
body_size += size_of_val(ws);
traces.push(ws.clone());
}

if let Some(cold_start_span) = &self.cold_start_span {
body_size += std::mem::size_of_val(cold_start_span);
body_size += size_of_val(cold_start_span);
traces.push(cold_start_span.clone());
}

Expand Down Expand Up @@ -426,7 +433,11 @@ impl Processor {
/// If this method is called, it means that we are operating in a Universally Instrumented
/// runtime. Therefore, we need to set the `tracer_detected` flag to `true`.
///
pub fn on_invocation_start(&mut self, headers: HashMap<String, String>, payload: Vec<u8>) {
pub fn universal_instrumentation_start(
&mut self,
headers: HashMap<String, String>,
payload: Vec<u8>,
) -> u64 {
self.tracer_detected = true;

let payload_value = serde_json::from_slice::<Value>(&payload).unwrap_or_else(|_| json!({}));
Expand Down Expand Up @@ -459,12 +470,18 @@ impl Processor {
self.span.meta.extend(sc.tags.clone());
}
}

// If we have an inferred span, set the invocation span parent id
// to be the inferred span id, even if we don't have an extracted trace context
if let Some(inferred_span) = &self.inferrer.inferred_span {
self.span.parent_id = inferred_span.span_id;
}
self.span.parent_id
}

pub fn set_reparenting(&mut self, span_id: u64, parent_id: u64) {
self.span.span_id = span_id;
self.reparenting_id = Some(parent_id);
self.aws_lambda_span_needs_trace_id = true;
}

fn extract_span_context(
Expand Down Expand Up @@ -493,7 +510,11 @@ impl Processor {

/// Given trace context information, set it to the current span.
///
pub fn on_invocation_end(&mut self, headers: HashMap<String, String>, payload: Vec<u8>) {
pub fn universal_instrumentation_end(
&mut self,
headers: HashMap<String, String>,
payload: Vec<u8>,
) {
let payload_value = serde_json::from_slice::<Value>(&payload).unwrap_or_else(|_| json!({}));

// Tag the invocation span with the request payload
Expand Down Expand Up @@ -830,7 +851,7 @@ mod tests {
}
"#;

p.on_invocation_end(HashMap::new(), response.as_bytes().to_vec());
p.universal_instrumentation_end(HashMap::new(), response.as_bytes().to_vec());

assert_eq!(
p.span
Expand Down
74 changes: 47 additions & 27 deletions bottlecap/src/lifecycle/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;

use hyper::body::HttpBody;
use hyper::service::{make_service_fn, service_fn};
use hyper::{http, Body, Method, Request, Response, StatusCode};
use hyper::{http, Body, HeaderMap, Method, Request, Response, StatusCode};
use serde_json::json;
use tokio::sync::Mutex;
use tracing::{debug, error, warn};
Expand Down Expand Up @@ -61,10 +61,20 @@ impl Listener {
) -> http::Result<Response<Body>> {
match (req.method(), req.uri().path()) {
(&Method::POST, START_INVOCATION_PATH) => {
Self::start_invocation_handler(req, invocation_processor).await
let (parts, body) = req.into_parts();
Self::universal_instrumentation_start(&parts.headers, body, invocation_processor)
.await
.1
}
(&Method::POST, END_INVOCATION_PATH) => {
match Self::end_invocation_handler(req, invocation_processor).await {
let (parts, body) = req.into_parts();
match Self::universal_instrumentation_end(
&parts.headers,
body,
invocation_processor,
)
.await
{
Ok(response) => Ok(response),
Err(e) => {
error!("Failed to end invocation {e}");
Expand All @@ -84,27 +94,30 @@ impl Listener {
}
}

async fn start_invocation_handler(
req: Request<Body>,
pub async fn universal_instrumentation_start(
headers: &HeaderMap,
body: Body,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
) -> http::Result<Response<Body>> {
) -> (u64, http::Result<Response<Body>>) {
debug!("Received start invocation request");
let (parts, body) = req.into_parts();
match body.collect().await {
Ok(b) => {
let body = b.to_bytes().to_vec();
let mut processor = invocation_processor.lock().await;

let headers = Self::headers_to_map(parts.headers);

processor.on_invocation_start(headers, body);
let headers = Self::headers_to_map(headers);

let extracted_span_context = {
let mut processor = invocation_processor.lock().await;
processor.universal_instrumentation_start(headers, body);
processor.extracted_span_context.clone()
};
let mut response = Response::builder().status(200);

let found_parent_span_id;
// If a `SpanContext` exists, then tell the tracer to use it.
// todo: update this whole code with DatadogHeaderPropagator::inject
// since this logic looks messy
if let Some(sp) = &processor.extracted_span_context {
if let Some(sp) = extracted_span_context {
response = response.header(DATADOG_TRACE_ID_KEY, sp.trace_id.to_string());
if let Some(priority) = sp.sampling.and_then(|s| s.priority) {
response =
Expand All @@ -116,39 +129,46 @@ impl Listener {
sp.tags.get(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY)
{
response = response.header(
DATADOG_TAGS_KEY,
format!("{DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY}={trace_id_higher_order_bits}"),
);
DATADOG_TAGS_KEY,
format!("{DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY}={trace_id_higher_order_bits}"),
);
}
found_parent_span_id = sp.span_id;
} else {
found_parent_span_id = 0;
}

drop(processor);

response.body(Body::from(json!({}).to_string()))
(
found_parent_span_id,
response.body(Body::from(json!({}).to_string())),
)
}
Err(e) => {
error!("Could not read start invocation request body {e}");

Response::builder()
.status(400)
.body(Body::from("Could not read start invocation request body"))
(
0,
Response::builder()
.status(400)
.body(Body::from("Could not read start invocation request body")),
)
}
}
}

async fn end_invocation_handler(
req: Request<Body>,
pub async fn universal_instrumentation_end(
headers: &HeaderMap,
body: Body,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
) -> http::Result<Response<Body>> {
debug!("Received end invocation request");
let (parts, body) = req.into_parts();
match body.collect().await {
Ok(b) => {
let body = b.to_bytes().to_vec();
let mut processor = invocation_processor.lock().await;

let headers = Self::headers_to_map(parts.headers);
processor.on_invocation_end(headers, body);
let headers = Self::headers_to_map(headers);
processor.universal_instrumentation_end(headers, body);
drop(processor);

Response::builder()
Expand All @@ -172,7 +192,7 @@ impl Listener {
.body(Body::from(json!({}).to_string()))
}

fn headers_to_map(headers: http::HeaderMap) -> HashMap<String, String> {
fn headers_to_map(headers: &HeaderMap) -> HashMap<String, String> {
headers
.iter()
.map(|(k, v)| {
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/lwa/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod proxy;
Loading