Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ class $3L:
""\"An asynchronous HTTP client solely for testing purposes.""\"

TIMEOUT_EXCEPTIONS = ()
SUPPORTS_DUPLEX_STREAMING: bool = True

def __init__(self, *, client_config: HTTPClientConfiguration | None = None):
self._client_config = client_config
Expand All @@ -677,6 +678,7 @@ class $4L:
""\"An asynchronous HTTP client solely for testing purposes.""\"

TIMEOUT_EXCEPTIONS = ()
SUPPORTS_DUPLEX_STREAMING: bool = True

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ private static List<ConfigProperty> getProtocolProperties(GenerationContext cont
.namespace("smithy_core.aio.interfaces", ".")
.build())
.build())
.documentation("The transport to use to send requests (e.g. an HTTP client).");
.documentation("The transport to use to send requests (e.g. an HTTP client). "
+ "Operations with bidirectional event streams require a transport that "
+ "sets SUPPORTS_DUPLEX_STREAMING to True, such as AWSCRTHTTPClient.");

if (context.applicationProtocol().isHttpProtocol()) {
properties.addAll(HTTP_PROPERTIES);
Expand Down
8 changes: 8 additions & 0 deletions designs/http-interfaces.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ which takes a request and some configuration and asynchronously return a respons
Having a minimal interface makes it much easier to implement these interfaces on top of
a variety http libraries.

Clients that support duplex (bidirectional) event streaming, which in practice
requires HTTP/2, must declare it by setting the `SUPPORTS_DUPLEX_STREAMING` class
attribute to `True`. Clients are assumed not to support it otherwise, and duplex
stream operations invoked with such a client fail fast with an
`UnsupportedTransportError`. This surfaces the configuration problem before any
request is sent instead of letting the stream fail later with an opaque connection
error.

```python
@dataclass(kw_only=True)
class HTTPRequestConfiguration:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "enhancement",
"description": "Added `SUPPORTS_DUPLEX_STREAMING` to `ClientTransport` so transports can declare support for duplex (bidirectional) event streaming. `RequestPipeline.duplex_stream` now fails fast with an `UnsupportedTransportError` when the configured transport does not declare support."
}
19 changes: 18 additions & 1 deletion packages/smithy-core/src/smithy_core/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from ..auth import AuthParams
from ..deserializers import DeserializeableShape, ShapeDeserializer
from ..endpoints import EndpointResolverParams
from ..exceptions import ClientTimeoutError, RetryError, SmithyError
from ..exceptions import (
ClientTimeoutError,
RetryError,
SmithyError,
UnsupportedTransportError,
)
from ..interceptors import (
InputContext,
Interceptor,
Expand Down Expand Up @@ -197,6 +202,18 @@ async def duplex_stream[
:param output_event_type: The event type to receive in the output stream.
:param event_deserializer: The method used to deserialize events.
"""
# The transport is assumed not to support duplex streaming unless it
# explicitly declares otherwise.
if not getattr(self.transport, "SUPPORTS_DUPLEX_STREAMING", False):
raise UnsupportedTransportError(
f"The configured transport ({type(self.transport).__name__}) does "
f"not support duplex (bidirectional) event streaming, which is "
f"required by the {call.operation.schema.id} operation. Use a "
f"transport that does, such as "
f"smithy_http.aio.crt.AWSCRTHTTPClient. Custom transports that "
f"support duplex streaming must set SUPPORTS_DUPLEX_STREAMING "
f"to True."
)
request_future = Future[RequestContext[I, TRequest]]()
execute_task = asyncio.create_task(self._execute_request(call, request_future))
request_context = await request_future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ class ClientTransport[I: Request, O: Response](Protocol):

TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...]

SUPPORTS_DUPLEX_STREAMING: bool = False
"""Whether this transport can read response data while the request body is still
being written (typically requires HTTP/2).

Transports that support duplex (bidirectional) event streaming must explicitly set
this to True. Transports that don't declare it are assumed not to support it.
"""

async def send(self, request: I) -> O:
"""Send a request over the transport and receive the response."""
...
Expand Down
5 changes: 5 additions & 0 deletions packages/smithy-core/src/smithy_core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,10 @@ class UnsupportedStreamError(SmithyError):
streams are not supported."""


class UnsupportedTransportError(SmithyError):
"""Indicates that an operation requires a transport capability that the configured
transport does not declare support for (e.g. duplex event streaming)."""


class EndpointResolutionError(SmithyError):
"""Exception type for all exceptions raised by endpoint resolution."""
226 changes: 226 additions & 0 deletions packages/smithy-core/tests/unit/aio/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Self, cast

import pytest
from smithy_core import URI
from smithy_core.aio.client import ClientCall, RequestPipeline
from smithy_core.aio.eventstream import DuplexEventStream, InputEventStream
from smithy_core.aio.interfaces import ClientProtocol, ClientTransport
from smithy_core.deserializers import ShapeDeserializer
from smithy_core.documents import TypeRegistry
from smithy_core.endpoints import EndpointResolverParams
from smithy_core.exceptions import UnsupportedTransportError
from smithy_core.interceptors import InterceptorChain
from smithy_core.schemas import APIOperation, Schema
from smithy_core.serializers import ShapeSerializer
from smithy_core.shapes import ShapeID, ShapeType
from smithy_core.traits import StreamingTrait
from smithy_core.types import TypedProperties

_STRING = Schema(id=ShapeID("smithy.api#String"), shape_type=ShapeType.STRING)

_EVENTS = Schema.collection(
id=ShapeID("com.example#Events"),
shape_type=ShapeType.UNION,
members={"message": {"target": _STRING}},
)

_INPUT_SCHEMA = Schema.collection(
id=ShapeID("com.example#StreamingInput"),
members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}},
)

_OUTPUT_SCHEMA = Schema.collection(
id=ShapeID("com.example#StreamingOutput"),
members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}},
)


class _Input:
def serialize(self, serializer: ShapeSerializer) -> None:
pass


class _Output:
@classmethod
def deserialize(cls, deserializer: ShapeDeserializer) -> Self:
return cls()


class _Event:
def serialize(self, serializer: ShapeSerializer) -> None:
pass

@classmethod
def deserialize(cls, deserializer: ShapeDeserializer) -> Self:
return cls()


_OPERATION = APIOperation(
input=_Input,
output=_Output,
schema=Schema(
id=ShapeID("com.example#StreamingOperation"),
shape_type=ShapeType.OPERATION,
),
input_schema=_INPUT_SCHEMA,
output_schema=_OUTPUT_SCHEMA,
error_registry=TypeRegistry({}),
effective_auth_schemes=[],
error_schemas=[],
)


class _StubRequest:
def __init__(self) -> None:
self.destination = URI(host="example.com")
self.body = b""

async def consume_body_async(self) -> bytes:
return b""

def consume_body(self) -> bytes:
return b""


class _StubResponse:
body = b""

async def consume_body_async(self) -> bytes:
return b""

def consume_body(self) -> bytes:
return b""


class _StubEventPublisher:
async def send(self, event: Any) -> None:
pass

async def close(self) -> None:
pass


class _StubEventReceiver:
async def receive(self) -> Any:
return None

async def close(self) -> None:
pass


class _StubProtocol:
@property
def id(self) -> ShapeID:
return ShapeID("com.example#testProtocol")

def serialize_request(self, **kwargs: Any) -> _StubRequest:
return _StubRequest()

def set_service_endpoint(self, *, request: Any, endpoint: Any) -> Any:
return request

async def deserialize_response(self, **kwargs: Any) -> _Output:
return _Output()

def create_event_publisher(self, **kwargs: Any) -> _StubEventPublisher:
return _StubEventPublisher()

def create_event_receiver(self, **kwargs: Any) -> _StubEventReceiver:
return _StubEventReceiver()


class _StubEndpoint:
def __init__(self) -> None:
self.uri = URI(host="example.com")
self.properties = TypedProperties()


class _StubEndpointResolver:
async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Any:
return _StubEndpoint()


class _StubAuthResolver:
def resolve_auth_scheme(self, *, auth_parameters: Any) -> list[Any]:
return []


class _UndeclaredTransport:
"""A transport that does not declare whether it supports duplex streaming."""

TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] = ()

async def send(self, request: Any) -> _StubResponse:
return _StubResponse()


class _NonDuplexTransport(_UndeclaredTransport):
SUPPORTS_DUPLEX_STREAMING = False


class _DuplexTransport(_UndeclaredTransport):
SUPPORTS_DUPLEX_STREAMING = True


def _pipeline(transport: object) -> RequestPipeline[Any, Any]:
# The stubs are intentionally structural (they don't subclass the
# protocols), so cast them to keep the type checker focused on the
# runtime behavior under test.
return RequestPipeline(
protocol=cast("ClientProtocol[Any, Any]", _StubProtocol()),
transport=cast("ClientTransport[Any, Any]", transport),
)


def _client_call() -> ClientCall[Any, Any]:
return ClientCall(
input=_Input(),
operation=_OPERATION,
context=TypedProperties(),
interceptor=InterceptorChain([]),
auth_scheme_resolver=_StubAuthResolver(),
supported_auth_schemes={},
endpoint_resolver=_StubEndpointResolver(),
retry_strategy=None, # type: ignore[arg-type] # unused for streaming input
)


async def test_duplex_stream_raises_for_undeclared_transport() -> None:
pipeline = _pipeline(_UndeclaredTransport())

with pytest.raises(UnsupportedTransportError) as exc_info:
await pipeline.duplex_stream(_client_call(), _Event, _Event, _Event.deserialize)

assert "_UndeclaredTransport" in str(exc_info.value)
assert "com.example#StreamingOperation" in str(exc_info.value)


async def test_duplex_stream_raises_for_non_duplex_transport() -> None:
pipeline = _pipeline(_NonDuplexTransport())

with pytest.raises(UnsupportedTransportError):
await pipeline.duplex_stream(_client_call(), _Event, _Event, _Event.deserialize)


async def test_duplex_stream_proceeds_for_duplex_transport() -> None:
pipeline = _pipeline(_DuplexTransport())

stream = await pipeline.duplex_stream(
_client_call(), _Event, _Event, _Event.deserialize
)

assert isinstance(stream, DuplexEventStream)
output, output_stream = await stream.await_output()
assert isinstance(output, _Output)
assert isinstance(output_stream, _StubEventReceiver)


async def test_input_stream_does_not_require_duplex_support() -> None:
pipeline = _pipeline(_NonDuplexTransport())

stream = await pipeline.input_stream(_client_call(), _Event)

assert isinstance(stream, InputEventStream)
assert isinstance(await stream.await_output(), _Output)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "enhancement",
"description": "Declared duplex (bidirectional) streaming support on `AWSCRTHTTPClient` via `SUPPORTS_DUPLEX_STREAMING`. `AIOHTTPClient` explicitly does not support it."
}
4 changes: 4 additions & 0 deletions packages/smithy-http/src/smithy_http/aio/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class AIOHTTPClient(HTTPClient):

TIMEOUT_EXCEPTIONS = (TimeoutError,)

# aiohttp has no HTTP/2 support and this client fully buffers the response
# before returning, so it can never interleave request and response data.
SUPPORTS_DUPLEX_STREAMING = False

def __init__(
self,
*,
Expand Down
4 changes: 4 additions & 0 deletions packages/smithy-http/src/smithy_http/aio/crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class AWSCRTHTTPClient(http_aio_interfaces.HTTPClient):

TIMEOUT_EXCEPTIONS = (_CRTTimeoutError,)

# True duplex streaming additionally requires the connection to negotiate
# HTTP/2 via ALPN; over HTTP/1.1 the CRT falls back to a buffered body_stream.
SUPPORTS_DUPLEX_STREAMING = True

def __init__(
self,
eventloop: _AWSCRTEventLoop | None = None,
Expand Down
5 changes: 5 additions & 0 deletions packages/smithy-http/src/smithy_http/testing/mockhttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class MockHTTPClient(HTTPClient):

TIMEOUT_EXCEPTIONS = (TimeoutError,)

# Declared so duplex (bidirectional) stream operations can be unit tested
# against this client. Queued responses are returned without consuming the
# request body, which is compatible with duplex streaming.
SUPPORTS_DUPLEX_STREAMING = True

def __init__(
self,
*,
Expand Down
7 changes: 7 additions & 0 deletions packages/smithy-http/tests/unit/aio/test_aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from smithy_http.aio.aiohttp import AIOHTTPClient


def test_does_not_support_duplex_streaming() -> None:
assert AIOHTTPClient.SUPPORTS_DUPLEX_STREAMING is False
4 changes: 4 additions & 0 deletions packages/smithy-http/tests/unit/aio/test_crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def test_deepcopy_client() -> None:
deepcopy(client)


def test_supports_duplex_streaming() -> None:
assert AWSCRTHTTPClient.SUPPORTS_DUPLEX_STREAMING is True


def test_client_marshal_request() -> None:
"""Test that HTTPRequest is correctly marshaled to CRT HttpRequest."""
client = AWSCRTHTTPClient()
Expand Down
Loading
Loading