Skip to content

Commit 6b36b45

Browse files
propagate log record type generic
1 parent f028ee0 commit 6b36b45

2 files changed

Lines changed: 28 additions & 15 deletions

File tree

lambda-extension/src/extension.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use hyper::{body::Incoming, server::conn::http1, service::service_fn};
44

55
use hyper_util::rt::tokio::TokioIo;
66
use lambda_runtime_api_client::Client;
7-
use serde::Deserialize;
7+
use serde::{de::DeserializeOwned, Deserialize};
88
use std::{
99
convert::Infallible,
1010
fmt,
@@ -83,14 +83,6 @@ where
8383
L::Error: Into<Error> + fmt::Debug,
8484
L::MakeError: Into<Error> + fmt::Debug,
8585
L::Future: Send,
86-
87-
// Fixme: 'static bound might be too restrictive
88-
T: MakeService<(), Vec<LambdaTelemetry>, Response = ()> + Send + Sync + 'static,
89-
T::Service: Service<Vec<LambdaTelemetry>, Response = ()> + Send + Sync,
90-
<T::Service as Service<Vec<LambdaTelemetry>>>::Future: Send + 'a,
91-
T::Error: Into<Error> + fmt::Debug,
92-
T::MakeError: Into<Error> + fmt::Debug,
93-
T::Future: Send,
9486
{
9587
/// Create a new [`Extension`] with a given extension name
9688
pub fn with_extension_name(self, extension_name: &'a str) -> Self {
@@ -232,7 +224,17 @@ where
232224
/// Lambda lifecycle operations to register the extension. When implementing an internal Lambda
233225
/// extension, it is safe to call `lambda_runtime::run` once the future returned by this
234226
/// function resolves.
235-
pub async fn register(self) -> Result<RegisteredExtension<E>, Error> {
227+
pub async fn register<TL>(self) -> Result<RegisteredExtension<E>, Error>
228+
where
229+
// Fixme: 'static bound might be too restrictive
230+
T: MakeService<(), Vec<LambdaTelemetry<TL>>, Response = ()> + Send + Sync + 'static,
231+
T::Service: Service<Vec<LambdaTelemetry<TL>>, Response = ()> + Send + Sync,
232+
<T::Service as Service<Vec<LambdaTelemetry<TL>>>>::Future: Send + 'a,
233+
T::Error: Into<Error> + fmt::Debug,
234+
T::MakeError: Into<Error> + fmt::Debug,
235+
T::Future: Send,
236+
TL: DeserializeOwned + Send + 'static,
237+
{
236238
let client = &Client::builder().build()?;
237239

238240
let register_res = register(client, self.extension_name, self.events).await?;
@@ -340,7 +342,17 @@ where
340342
}
341343

342344
/// Execute the given extension.
343-
pub async fn run(self) -> Result<(), Error> {
345+
pub async fn run<TL>(self) -> Result<(), Error>
346+
where
347+
// Fixme: 'static bound might be too restrictive
348+
T: MakeService<(), Vec<LambdaTelemetry<TL>>, Response = ()> + Send + Sync + 'static,
349+
T::Service: Service<Vec<LambdaTelemetry<TL>>, Response = ()> + Send + Sync,
350+
<T::Service as Service<Vec<LambdaTelemetry<TL>>>>::Future: Send + 'a,
351+
T::Error: Into<Error> + fmt::Debug,
352+
T::MakeError: Into<Error> + fmt::Debug,
353+
T::Future: Send,
354+
TL: DeserializeOwned + Send + 'static,
355+
{
344356
self.register().await?.run().await
345357
}
346358
}

lambda-extension/src/telemetry.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use http::{Request, Response};
33
use http_body_util::BodyExt;
44
use hyper::body::Incoming;
55
use lambda_runtime_api_client::body::Body;
6-
use serde::{Deserialize, Serialize};
6+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
77
use std::{boxed::Box, fmt, sync::Arc};
88
use tokio::sync::Mutex;
99
use tower::Service;
@@ -269,14 +269,15 @@ pub struct RuntimeDoneMetrics {
269269
///
270270
/// This takes an `hyper::Request` and transforms it into `Vec<LambdaTelemetry>` for the
271271
/// underlying `Service` to process.
272-
pub(crate) async fn telemetry_wrapper<S>(
272+
pub(crate) async fn telemetry_wrapper<S, L>(
273273
service: Arc<Mutex<S>>,
274274
req: Request<Incoming>,
275275
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>>
276276
where
277-
S: Service<Vec<LambdaTelemetry>, Response = ()>,
277+
S: Service<Vec<LambdaTelemetry<L>>, Response = ()>,
278278
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
279279
S::Future: Send,
280+
L: DeserializeOwned,
280281
{
281282
trace!("Received telemetry request");
282283
// Parse the request body as a Vec<LambdaTelemetry>
@@ -291,7 +292,7 @@ where
291292
}
292293
};
293294

294-
let telemetry: Vec<LambdaTelemetry> = match serde_json::from_slice(&body.to_bytes()) {
295+
let telemetry: Vec<LambdaTelemetry<L>> = match serde_json::from_slice(&body.to_bytes()) {
295296
Ok(telemetry) => telemetry,
296297
Err(e) => {
297298
error!("Error parsing telemetry: {}", e);

0 commit comments

Comments
 (0)