Skip to content

Generic Buffered/Streaming Response Type Constraints #995

Description

@naftulikay

I'm writing a fairly generic all-in-one Lambda function which will be handling many different types of events. For example, SQS, SNS, CloudWatch scheduled events, S3 events, SES events, and of course API Gateway requests/responses. I have an existing Rust Lambda doing this but it's incredibly old (from the crowbar days) so I'm essentially rewriting it from scratch.

I'm implementing tower::Service for my service directly and I'm trying to define its outputs so that I can return either a synchronous buffered response or a streaming response so I can have the flexibility of both. I'm aware that the only case that streaming is applicable is in API Gateway responses.

use bytes::Bytes;
use lambda_runtime::streaming::Body;
use lambda_runtime::{Error as LambdaError, StreamResponse};
use lambda_runtime::{FunctionResponse, LambdaEvent};
use std::pin::Pin;
use std::sync::Arc;
use tower::Service;

#[derive(Clone)]
pub struct LambdaService(Arc<LambdaServiceInner>);

impl LambdaService {
    pub fn new(id: impl Into<String>) -> Self {
        Self(Arc::new(LambdaServiceInner { id: id.into() }))
    }
}

struct LambdaServiceInner {
    id: String,
}

/// Represents a response to the Lambda invocation, either a buffered response or a streaming 
/// response.
// FIXME here is where the issue occurs
pub type LambdaResponse = FunctionResponse<Bytes, StreamResponse<Body>>;

impl Service<LambdaEvent<serde_json::Value>> for LambdaService {
    type Response = LambdaResponse;
    type Error = LambdaError;
    type Future = Pin<Box<dyn Future<Output=Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: LambdaEvent<serde_json::Value>) -> Self::Future {
        let cloned = self.clone();

        Box::pin(async move {
            cloned.on_event(req).await
        })
    }
}

impl LambdaService {
    pub async fn on_event(&self, event: LambdaEvent<serde_json::Value>) -> Result<LambdaResponse, LambdaError> {
        eprintln!("service id: {}; request_id: {}", self.0.id, event.context.request_id);

        let output = serde_json::json!({
            "context": event.context,
            "event": event.payload,
        });

        eprintln!("event: {}", serde_json::to_string_pretty(&output).unwrap());

        Ok(FunctionResponse::BufferedResponse(Vec::with_capacity(0).into()))
    }
}

My main function simply instantiates the service and calls lambda_runtime::run with it:

use api_lambda::LambdaService;
use lambda_runtime::Error as LambdaError;

#[tokio::main]
async fn main() -> Result<(), LambdaError> {
    lambda_runtime::run(LambdaService::new("my-id")).await
}

When I try to compile this:

error[E0277]: the trait bound `Response<Body>: futures_core::stream::Stream` is not satisfied
   --> crates/api-lambda/src/main.rs:6:70
    |
6   |     lambda_runtime::run(LambdaService::new("my-id")).await
    |                                                                      ^^^^^ the trait `futures_core::stream::Stream` is not implemented for `Response<Body>`
    |
    = help: the following other types implement trait `futures_core::stream::Stream`:
              &mut S
              AssertUnwindSafe<S>
              Body
              Box<S>
              CallAll<Svc, S>
              CallAllUnordered<Svc, S>
              Pin<P>
              async_stream::async_stream::AsyncStream<T, U>
            and 111 others
note: required by a bound in `run`
   --> /path/to/dir/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/lambda_runtime-0.13.0/src/lib.rs:119:8
    |
111 | pub async fn run<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
    |              --- required by a bound in this function
...
119 |     S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
    |        ^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `run`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `api-lambda` (bin "api-lambda") due to 2 previous errors

Note my type alias definition for the response:

lambda_runtime::FunctionResponse<bytes::Bytes, lambda_runtime::StreamResponse<lambda_runtime::streaming::Body>>;

I've tried a number of different values for StreamResponse<S>:

  • lambda_runtime::streaming::Body
  • axum::body::Body
  • axum::body::BodyDataStream

All of these fail indicating:

the trait `futures_core::stream::Stream` is not implemented for `Response<S>`.

The issue is that it appears that for all of the above types, Stream is implemented, so I'm a bit confused as to why this is happening.

I'm happy to contribute an example showing a generic return over both outcomes if I can get some help wrangling the types.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions