Skip to content
Merged
9 changes: 9 additions & 0 deletions examples/extension-telemetry-generic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "extension-telemetry-basic"
version = "0.1.0"
edition = "2021"

[dependencies]
lambda-extension = { path = "../../lambda-extension" }
serde_json = "1"
tokio = { version = "1", features = ["macros", "rt"] }
13 changes: 13 additions & 0 deletions examples/extension-telemetry-generic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# AWS Lambda Telemetry extension example

## Build & Deploy

1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
2. Build the extension with `cargo lambda build --release --extension`
3. Deploy the extension as a layer with `cargo lambda deploy --extension`

The last command will give you an ARN for the extension layer that you can use in your functions.

## Build for ARM 64

Build the extension with `cargo lambda build --release --extension --arm64`
29 changes: 29 additions & 0 deletions examples/extension-telemetry-generic/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use lambda_extension::{service_fn, tracing, Error, Extension, LambdaTelemetry, LambdaTelemetryRecord, SharedService};

async fn handler(events: Vec<LambdaTelemetry<serde_json::Value>>) -> Result<(), Error> {
for event in events {
match event.record {
LambdaTelemetryRecord::Function(record) => tracing::info!("[logs] [function] {}", record),
LambdaTelemetryRecord::Extension(record) => tracing::info!("[extension] [function] {}", record),
_ => (),
}
}

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// required to enable CloudWatch error logging by the runtime
tracing::init_default_subscriber();

let telemetry_processor = SharedService::new(service_fn(handler));

Extension::new()
.with_telemetry_record_type::<serde_json::Value>()
.with_telemetry_processor(telemetry_processor)
.run()
.await?;

Ok(())
}
67 changes: 58 additions & 9 deletions lambda-extension/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use hyper::{body::Incoming, server::conn::http1, service::service_fn};

use hyper_util::rt::tokio::TokioIo;
use lambda_runtime_api_client::Client;
use serde::Deserialize;
use serde::{de::DeserializeOwned, Deserialize};
use std::{
convert::Infallible,
fmt,
future::{ready, Future},
marker::PhantomData,
net::SocketAddr,
path::PathBuf,
pin::Pin,
Expand All @@ -29,7 +30,7 @@ const DEFAULT_LOG_PORT_NUMBER: u16 = 9002;
const DEFAULT_TELEMETRY_PORT_NUMBER: u16 = 9003;

/// An Extension that runs event, log and telemetry processors
pub struct Extension<'a, E, L, T> {
pub struct Extension<'a, E, L, T, TL = String> {
extension_name: Option<&'a str>,
events: Option<&'a [&'a str]>,
events_processor: E,
Expand All @@ -41,6 +42,7 @@ pub struct Extension<'a, E, L, T> {
telemetry_processor: Option<T>,
telemetry_buffering: Option<LogBuffering>,
telemetry_port_number: u16,
_telemetry_record_type: PhantomData<fn(TL)>,
}

impl Extension<'_, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>, MakeIdentity<Vec<LambdaTelemetry>>> {
Expand All @@ -58,6 +60,7 @@ impl Extension<'_, Identity<LambdaEvent>, MakeIdentity<Vec<LambdaLog>>, MakeIden
telemetry_buffering: None,
telemetry_processor: None,
telemetry_port_number: DEFAULT_TELEMETRY_PORT_NUMBER,
_telemetry_record_type: PhantomData,
}
}
}
Expand All @@ -70,7 +73,7 @@ impl Default
}
}

impl<'a, E, L, T> Extension<'a, E, L, T>
impl<'a, E, L, T, TL> Extension<'a, E, L, T, TL>
where
E: Service<LambdaEvent>,
E::Future: Future<Output = Result<(), E::Error>>,
Expand All @@ -85,12 +88,13 @@ where
L::Future: Send,

// Fixme: 'static bound might be too restrictive
T: MakeService<(), Vec<LambdaTelemetry>, Response = ()> + Send + Sync + 'static,
T::Service: Service<Vec<LambdaTelemetry>, Response = ()> + Send + Sync,
<T::Service as Service<Vec<LambdaTelemetry>>>::Future: Send + 'a,
T: MakeService<(), Vec<LambdaTelemetry<TL>>, Response = ()> + Send + Sync + 'static,
T::Service: Service<Vec<LambdaTelemetry<TL>>, Response = ()> + Send + Sync,
<T::Service as Service<Vec<LambdaTelemetry<TL>>>>::Future: Send + 'a,
T::Error: Into<Error> + fmt::Debug,
T::MakeError: Into<Error> + fmt::Debug,
T::Future: Send,
TL: DeserializeOwned + Send + 'static,
{
/// Create a new [`Extension`] with a given extension name
pub fn with_extension_name(self, extension_name: &'a str) -> Self {
Expand All @@ -110,7 +114,7 @@ where
}

/// Create a new [`Extension`] with a service that receives Lambda events.
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L, T>
pub fn with_events_processor<N>(self, ep: N) -> Extension<'a, N, L, T, TL>
where
N: Service<LambdaEvent>,
N::Future: Future<Output = Result<(), N::Error>>,
Expand All @@ -128,11 +132,12 @@ where
telemetry_buffering: self.telemetry_buffering,
telemetry_processor: self.telemetry_processor,
telemetry_port_number: self.telemetry_port_number,
_telemetry_record_type: self._telemetry_record_type,
}
}

/// Create a new [`Extension`] with a service that receives Lambda logs.
pub fn with_logs_processor<N, NS>(self, lp: N) -> Extension<'a, E, N, T>
pub fn with_logs_processor<N, NS>(self, lp: N) -> Extension<'a, E, N, T, TL>
where
N: Service<()>,
N::Future: Future<Output = Result<NS, N::Error>>,
Expand All @@ -150,6 +155,7 @@ where
telemetry_buffering: self.telemetry_buffering,
telemetry_processor: self.telemetry_processor,
telemetry_port_number: self.telemetry_port_number,
_telemetry_record_type: self._telemetry_record_type,
}
}

Expand Down Expand Up @@ -179,7 +185,7 @@ where
}

/// Create a new [`Extension`] with a service that receives Lambda telemetry data.
pub fn with_telemetry_processor<N, NS>(self, lp: N) -> Extension<'a, E, L, N>
pub fn with_telemetry_processor<N, NS>(self, lp: N) -> Extension<'a, E, L, N, TL>
where
N: Service<()>,
N::Future: Future<Output = Result<NS, N::Error>>,
Expand All @@ -197,6 +203,7 @@ where
telemetry_types: self.telemetry_types,
telemetry_buffering: self.telemetry_buffering,
telemetry_port_number: self.telemetry_port_number,
_telemetry_record_type: self._telemetry_record_type,
}
}

Expand Down Expand Up @@ -345,6 +352,48 @@ where
}
}

impl<'a, E, L> Extension<'a, E, L, MakeIdentity<Vec<LambdaTelemetry>>> {
/// Set the deserialization type for telemetry log records.
///
/// By default, telemetry log records are deserialized as `String`, but
/// it's possible to configure Lambda functions to emit logs in JSON format.
/// Use this method to deserialize into a different type, such as
/// `serde_json::Value`.
///
/// Must be called before [`Self::with_telemetry_processor`].
///
/// ```
/// use lambda_extension::{Extension, LambdaTelemetry, SharedService, service_fn};
///
/// async fn handler(events: Vec<LambdaTelemetry<serde_json::Value>>) -> Result<(), lambda_extension::Error> {
/// for event in &events {
/// println!("{event:?}");
/// }
/// Ok(())
/// }
///
/// let _ext = Extension::new()
/// .with_telemetry_record_type::<serde_json::Value>()
/// .with_telemetry_processor(SharedService::new(service_fn(handler)));
/// ```
pub fn with_telemetry_record_type<N>(self) -> Extension<'a, E, L, MakeIdentity<Vec<LambdaTelemetry<N>>>, N> {
Extension {
_telemetry_record_type: PhantomData,
telemetry_processor: None,
events_processor: self.events_processor,
extension_name: self.extension_name,
events: self.events,
log_types: self.log_types,
log_buffering: self.log_buffering,
logs_processor: self.logs_processor,
log_port_number: self.log_port_number,
telemetry_types: self.telemetry_types,
telemetry_buffering: self.telemetry_buffering,
telemetry_port_number: self.telemetry_port_number,
}
}
}

/// An extension registered by calling [`Extension::register`].
pub struct RegisteredExtension<E> {
/// The ID of the registered extension. This ID is unique per extension and remains constant
Expand Down
59 changes: 44 additions & 15 deletions lambda-extension/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@ use http::{Request, Response};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use lambda_runtime_api_client::body::Body;
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{boxed::Box, fmt, sync::Arc};
use tokio::sync::Mutex;
use tower::Service;
use tracing::{error, trace};

/// Payload received from the Telemetry API
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct LambdaTelemetry {
pub struct LambdaTelemetry<L = String> {
/// Time when the telemetry was generated
pub time: DateTime<Utc>,
/// Telemetry record entry
#[serde(flatten)]
pub record: LambdaTelemetryRecord,
pub record: LambdaTelemetryRecord<L>,
}

/// Record in a LambdaTelemetry entry
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(tag = "type", content = "record", rename_all = "lowercase")]
pub enum LambdaTelemetryRecord {
pub enum LambdaTelemetryRecord<L = String> {
/// Function log records
Function(String),
Function(L),

/// Extension log records
Extension(String),
Extension(L),

/// Platform init start record
#[serde(rename = "platform.initStart", rename_all = "camelCase")]
Expand Down Expand Up @@ -269,14 +269,15 @@ pub struct RuntimeDoneMetrics {
///
/// This takes an `hyper::Request` and transforms it into `Vec<LambdaTelemetry>` for the
/// underlying `Service` to process.
pub(crate) async fn telemetry_wrapper<S>(
pub(crate) async fn telemetry_wrapper<S, L>(
service: Arc<Mutex<S>>,
req: Request<Incoming>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>>
where
S: Service<Vec<LambdaTelemetry>, Response = ()>,
S: Service<Vec<LambdaTelemetry<L>>, Response = ()>,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
S::Future: Send,
L: DeserializeOwned,
{
trace!("Received telemetry request");
// Parse the request body as a Vec<LambdaTelemetry>
Expand All @@ -291,7 +292,7 @@ where
}
};

let telemetry: Vec<LambdaTelemetry> = match serde_json::from_slice(&body.to_bytes()) {
let telemetry: Vec<LambdaTelemetry<L>> = match serde_json::from_slice(&body.to_bytes()) {
Ok(telemetry) => telemetry,
Err(e) => {
error!("Error parsing telemetry: {}", e);
Expand Down Expand Up @@ -319,17 +320,17 @@ mod deserialization_tests {
use chrono::{TimeDelta, TimeZone};

macro_rules! deserialize_tests {
($($name:ident: $value:expr,)*) => {
($($name:ident$(<$log:ty>)?: $value:expr,)*) => {
$(
#[test]
fn $name() {
let (input, expected) = $value;
let actual = serde_json::from_str::<LambdaTelemetry>(&input).expect("unable to deserialize");
let actual = serde_json::from_str::<LambdaTelemetry$(<$log>)?>(&input).expect("unable to deserialize");

assert!(actual.record == expected);
}
)*
}
};
}

deserialize_tests! {
Expand All @@ -339,12 +340,24 @@ mod deserialization_tests {
LambdaTelemetryRecord::Function("hello world".to_string()),
),

// function (json)
function_generic<bool>: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": true}"#,
LambdaTelemetryRecord::Function(true),
),

// extension
extension: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#,
LambdaTelemetryRecord::Extension("hello world".to_string()),
),

// extension (json)
extension_generic<bool>: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": true}"#,
LambdaTelemetryRecord::Extension(true),
),

// platform.start
platform_start: (
r#"{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
Expand Down Expand Up @@ -477,11 +490,11 @@ mod serialization_tests {

use super::*;
macro_rules! serialize_tests {
($($name:ident: $value:expr,)*) => {
($($name:ident$(<$log:ty>)?: $value:expr,)*) => {
$(
#[test]
fn $name() {
let (input, expected) = $value;
let (input, expected): (LambdaTelemetry$(<$log>)?, &str) = $value;
let actual = serde_json::to_string(&input).expect("unable to serialize");
println!("Input: {:?}\n", input);
println!("Expected:\n {:?}\n", expected);
Expand All @@ -490,7 +503,7 @@ mod serialization_tests {
assert!(actual == expected);
}
)*
}
};
}

serialize_tests! {
Expand All @@ -502,6 +515,14 @@ mod serialization_tests {
},
r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":"hello world"}"#,
),
// function (json)
function_generic<bool>: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::Function(true),
},
r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":true}"#,
),
// extension
extension: (
LambdaTelemetry {
Expand All @@ -510,6 +531,14 @@ mod serialization_tests {
},
r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":"hello world"}"#,
),
// extension (json)
extension_generic<bool>: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::Extension(true),
},
r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":true}"#,
),
//platform.Start
platform_start: (
LambdaTelemetry{
Expand Down