From f9179b06f44bb201401e736a0cd0e59b3974e856 Mon Sep 17 00:00:00 2001 From: jonathan343 Date: Thu, 11 Jun 2026 23:19:10 -0400 Subject: [PATCH] feat: validate transport support for bidirectional streaming Most Python HTTP clients cannot read response data while the request body is still being written, so bidirectional event stream operations fail mid-stream with an opaque connection error. Transports must now opt in by setting SUPPORTS_DUPLEX_STREAMING to True, and RequestPipeline.duplex_stream fails fast with UnsupportedTransportError when the configured transport does not declare support. AWSCRTHTTPClient and MockHTTPClient declare support; AIOHTTPClient explicitly does not. --- .../codegen/HttpProtocolTestGenerator.java | 2 + .../codegen/generators/ConfigGenerator.java | 4 +- designs/http-interfaces.md | 8 + ...ment-318f09ab1c7e425e88c6e8ba970cb2fe.json | 4 + .../smithy-core/src/smithy_core/aio/client.py | 19 +- .../smithy_core/aio/interfaces/__init__.py | 8 + .../smithy-core/src/smithy_core/exceptions.py | 5 + .../smithy-core/tests/unit/aio/test_client.py | 226 ++++++++++++++++++ ...ment-91c7d1ab9ee64c0fbfc3dcb60f52c284.json | 4 + .../src/smithy_http/aio/aiohttp.py | 4 + .../smithy-http/src/smithy_http/aio/crt.py | 4 + .../src/smithy_http/testing/mockhttp.py | 5 + .../tests/unit/aio/test_aiohttp.py | 7 + .../smithy-http/tests/unit/aio/test_crt.py | 4 + .../tests/unit/testing/test_mockhttp.py | 6 + uv.lock | 2 +- 16 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 packages/smithy-core/.changes/next-release/smithy-core-enhancement-318f09ab1c7e425e88c6e8ba970cb2fe.json create mode 100644 packages/smithy-core/tests/unit/aio/test_client.py create mode 100644 packages/smithy-http/.changes/next-release/smithy-http-enhancement-91c7d1ab9ee64c0fbfc3dcb60f52c284.json create mode 100644 packages/smithy-http/tests/unit/aio/test_aiohttp.py diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java index c432847bb..7e933fa9f 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java @@ -662,6 +662,7 @@ class $3L: ""\"An asynchronous HTTP client solely for testing purposes.""\" TIMEOUT_EXCEPTIONS = () + SUPPORTS_DUPLEX_STREAMING: bool = True def __init__(self, *, client_config: HTTPClientConfiguration | None = None): self._client_config = client_config @@ -677,6 +678,7 @@ class $4L: ""\"An asynchronous HTTP client solely for testing purposes.""\" TIMEOUT_EXCEPTIONS = () + SUPPORTS_DUPLEX_STREAMING: bool = True def __init__( self, diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java index de03d42d0..8738e07d6 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java @@ -147,7 +147,9 @@ private static List getProtocolProperties(GenerationContext cont .namespace("smithy_core.aio.interfaces", ".") .build()) .build()) - .documentation("The transport to use to send requests (e.g. an HTTP client)."); + .documentation("The transport to use to send requests (e.g. an HTTP client). " + + "Operations with bidirectional event streams require a transport that " + + "sets SUPPORTS_DUPLEX_STREAMING to True, such as AWSCRTHTTPClient."); if (context.applicationProtocol().isHttpProtocol()) { properties.addAll(HTTP_PROPERTIES); diff --git a/designs/http-interfaces.md b/designs/http-interfaces.md index 9a41f1268..108e81112 100644 --- a/designs/http-interfaces.md +++ b/designs/http-interfaces.md @@ -258,6 +258,14 @@ which takes a request and some configuration and asynchronously return a respons Having a minimal interface makes it much easier to implement these interfaces on top of a variety http libraries. +Clients that support duplex (bidirectional) event streaming, which in practice +requires HTTP/2, must declare it by setting the `SUPPORTS_DUPLEX_STREAMING` class +attribute to `True`. Clients are assumed not to support it otherwise, and duplex +stream operations invoked with such a client fail fast with an +`UnsupportedTransportError`. This surfaces the configuration problem before any +request is sent instead of letting the stream fail later with an opaque connection +error. + ```python @dataclass(kw_only=True) class HTTPRequestConfiguration: diff --git a/packages/smithy-core/.changes/next-release/smithy-core-enhancement-318f09ab1c7e425e88c6e8ba970cb2fe.json b/packages/smithy-core/.changes/next-release/smithy-core-enhancement-318f09ab1c7e425e88c6e8ba970cb2fe.json new file mode 100644 index 000000000..9bb9ab83f --- /dev/null +++ b/packages/smithy-core/.changes/next-release/smithy-core-enhancement-318f09ab1c7e425e88c6e8ba970cb2fe.json @@ -0,0 +1,4 @@ +{ + "type": "enhancement", + "description": "Added `SUPPORTS_DUPLEX_STREAMING` to `ClientTransport` so transports can declare support for duplex (bidirectional) event streaming. `RequestPipeline.duplex_stream` now fails fast with an `UnsupportedTransportError` when the configured transport does not declare support." +} diff --git a/packages/smithy-core/src/smithy_core/aio/client.py b/packages/smithy-core/src/smithy_core/aio/client.py index 6060727b4..1cc97459d 100644 --- a/packages/smithy-core/src/smithy_core/aio/client.py +++ b/packages/smithy-core/src/smithy_core/aio/client.py @@ -12,7 +12,12 @@ from ..auth import AuthParams from ..deserializers import DeserializeableShape, ShapeDeserializer from ..endpoints import EndpointResolverParams -from ..exceptions import ClientTimeoutError, RetryError, SmithyError +from ..exceptions import ( + ClientTimeoutError, + RetryError, + SmithyError, + UnsupportedTransportError, +) from ..interceptors import ( InputContext, Interceptor, @@ -197,6 +202,18 @@ async def duplex_stream[ :param output_event_type: The event type to receive in the output stream. :param event_deserializer: The method used to deserialize events. """ + # The transport is assumed not to support duplex streaming unless it + # explicitly declares otherwise. + if not getattr(self.transport, "SUPPORTS_DUPLEX_STREAMING", False): + raise UnsupportedTransportError( + f"The configured transport ({type(self.transport).__name__}) does " + f"not support duplex (bidirectional) event streaming, which is " + f"required by the {call.operation.schema.id} operation. Use a " + f"transport that does, such as " + f"smithy_http.aio.crt.AWSCRTHTTPClient. Custom transports that " + f"support duplex streaming must set SUPPORTS_DUPLEX_STREAMING " + f"to True." + ) request_future = Future[RequestContext[I, TRequest]]() execute_task = asyncio.create_task(self._execute_request(call, request_future)) request_context = await request_future diff --git a/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py b/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py index 0c900a8cf..90d63d9ea 100644 --- a/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py +++ b/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py @@ -90,6 +90,14 @@ class ClientTransport[I: Request, O: Response](Protocol): TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] + SUPPORTS_DUPLEX_STREAMING: bool = False + """Whether this transport can read response data while the request body is still + being written (typically requires HTTP/2). + + Transports that support duplex (bidirectional) event streaming must explicitly set + this to True. Transports that don't declare it are assumed not to support it. + """ + async def send(self, request: I) -> O: """Send a request over the transport and receive the response.""" ... diff --git a/packages/smithy-core/src/smithy_core/exceptions.py b/packages/smithy-core/src/smithy_core/exceptions.py index 0a99976f9..80874152d 100644 --- a/packages/smithy-core/src/smithy_core/exceptions.py +++ b/packages/smithy-core/src/smithy_core/exceptions.py @@ -114,5 +114,10 @@ class UnsupportedStreamError(SmithyError): streams are not supported.""" +class UnsupportedTransportError(SmithyError): + """Indicates that an operation requires a transport capability that the configured + transport does not declare support for (e.g. duplex event streaming).""" + + class EndpointResolutionError(SmithyError): """Exception type for all exceptions raised by endpoint resolution.""" diff --git a/packages/smithy-core/tests/unit/aio/test_client.py b/packages/smithy-core/tests/unit/aio/test_client.py new file mode 100644 index 000000000..6cb08e2dc --- /dev/null +++ b/packages/smithy-core/tests/unit/aio/test_client.py @@ -0,0 +1,226 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Any, Self, cast + +import pytest +from smithy_core import URI +from smithy_core.aio.client import ClientCall, RequestPipeline +from smithy_core.aio.eventstream import DuplexEventStream, InputEventStream +from smithy_core.aio.interfaces import ClientProtocol, ClientTransport +from smithy_core.deserializers import ShapeDeserializer +from smithy_core.documents import TypeRegistry +from smithy_core.endpoints import EndpointResolverParams +from smithy_core.exceptions import UnsupportedTransportError +from smithy_core.interceptors import InterceptorChain +from smithy_core.schemas import APIOperation, Schema +from smithy_core.serializers import ShapeSerializer +from smithy_core.shapes import ShapeID, ShapeType +from smithy_core.traits import StreamingTrait +from smithy_core.types import TypedProperties + +_STRING = Schema(id=ShapeID("smithy.api#String"), shape_type=ShapeType.STRING) + +_EVENTS = Schema.collection( + id=ShapeID("com.example#Events"), + shape_type=ShapeType.UNION, + members={"message": {"target": _STRING}}, +) + +_INPUT_SCHEMA = Schema.collection( + id=ShapeID("com.example#StreamingInput"), + members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, +) + +_OUTPUT_SCHEMA = Schema.collection( + id=ShapeID("com.example#StreamingOutput"), + members={"events": {"target": _EVENTS, "traits": [StreamingTrait()]}}, +) + + +class _Input: + def serialize(self, serializer: ShapeSerializer) -> None: + pass + + +class _Output: + @classmethod + def deserialize(cls, deserializer: ShapeDeserializer) -> Self: + return cls() + + +class _Event: + def serialize(self, serializer: ShapeSerializer) -> None: + pass + + @classmethod + def deserialize(cls, deserializer: ShapeDeserializer) -> Self: + return cls() + + +_OPERATION = APIOperation( + input=_Input, + output=_Output, + schema=Schema( + id=ShapeID("com.example#StreamingOperation"), + shape_type=ShapeType.OPERATION, + ), + input_schema=_INPUT_SCHEMA, + output_schema=_OUTPUT_SCHEMA, + error_registry=TypeRegistry({}), + effective_auth_schemes=[], + error_schemas=[], +) + + +class _StubRequest: + def __init__(self) -> None: + self.destination = URI(host="example.com") + self.body = b"" + + async def consume_body_async(self) -> bytes: + return b"" + + def consume_body(self) -> bytes: + return b"" + + +class _StubResponse: + body = b"" + + async def consume_body_async(self) -> bytes: + return b"" + + def consume_body(self) -> bytes: + return b"" + + +class _StubEventPublisher: + async def send(self, event: Any) -> None: + pass + + async def close(self) -> None: + pass + + +class _StubEventReceiver: + async def receive(self) -> Any: + return None + + async def close(self) -> None: + pass + + +class _StubProtocol: + @property + def id(self) -> ShapeID: + return ShapeID("com.example#testProtocol") + + def serialize_request(self, **kwargs: Any) -> _StubRequest: + return _StubRequest() + + def set_service_endpoint(self, *, request: Any, endpoint: Any) -> Any: + return request + + async def deserialize_response(self, **kwargs: Any) -> _Output: + return _Output() + + def create_event_publisher(self, **kwargs: Any) -> _StubEventPublisher: + return _StubEventPublisher() + + def create_event_receiver(self, **kwargs: Any) -> _StubEventReceiver: + return _StubEventReceiver() + + +class _StubEndpoint: + def __init__(self) -> None: + self.uri = URI(host="example.com") + self.properties = TypedProperties() + + +class _StubEndpointResolver: + async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Any: + return _StubEndpoint() + + +class _StubAuthResolver: + def resolve_auth_scheme(self, *, auth_parameters: Any) -> list[Any]: + return [] + + +class _UndeclaredTransport: + """A transport that does not declare whether it supports duplex streaming.""" + + TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...] = () + + async def send(self, request: Any) -> _StubResponse: + return _StubResponse() + + +class _NonDuplexTransport(_UndeclaredTransport): + SUPPORTS_DUPLEX_STREAMING = False + + +class _DuplexTransport(_UndeclaredTransport): + SUPPORTS_DUPLEX_STREAMING = True + + +def _pipeline(transport: object) -> RequestPipeline[Any, Any]: + # The stubs are intentionally structural (they don't subclass the + # protocols), so cast them to keep the type checker focused on the + # runtime behavior under test. + return RequestPipeline( + protocol=cast("ClientProtocol[Any, Any]", _StubProtocol()), + transport=cast("ClientTransport[Any, Any]", transport), + ) + + +def _client_call() -> ClientCall[Any, Any]: + return ClientCall( + input=_Input(), + operation=_OPERATION, + context=TypedProperties(), + interceptor=InterceptorChain([]), + auth_scheme_resolver=_StubAuthResolver(), + supported_auth_schemes={}, + endpoint_resolver=_StubEndpointResolver(), + retry_strategy=None, # type: ignore[arg-type] # unused for streaming input + ) + + +async def test_duplex_stream_raises_for_undeclared_transport() -> None: + pipeline = _pipeline(_UndeclaredTransport()) + + with pytest.raises(UnsupportedTransportError) as exc_info: + await pipeline.duplex_stream(_client_call(), _Event, _Event, _Event.deserialize) + + assert "_UndeclaredTransport" in str(exc_info.value) + assert "com.example#StreamingOperation" in str(exc_info.value) + + +async def test_duplex_stream_raises_for_non_duplex_transport() -> None: + pipeline = _pipeline(_NonDuplexTransport()) + + with pytest.raises(UnsupportedTransportError): + await pipeline.duplex_stream(_client_call(), _Event, _Event, _Event.deserialize) + + +async def test_duplex_stream_proceeds_for_duplex_transport() -> None: + pipeline = _pipeline(_DuplexTransport()) + + stream = await pipeline.duplex_stream( + _client_call(), _Event, _Event, _Event.deserialize + ) + + assert isinstance(stream, DuplexEventStream) + output, output_stream = await stream.await_output() + assert isinstance(output, _Output) + assert isinstance(output_stream, _StubEventReceiver) + + +async def test_input_stream_does_not_require_duplex_support() -> None: + pipeline = _pipeline(_NonDuplexTransport()) + + stream = await pipeline.input_stream(_client_call(), _Event) + + assert isinstance(stream, InputEventStream) + assert isinstance(await stream.await_output(), _Output) diff --git a/packages/smithy-http/.changes/next-release/smithy-http-enhancement-91c7d1ab9ee64c0fbfc3dcb60f52c284.json b/packages/smithy-http/.changes/next-release/smithy-http-enhancement-91c7d1ab9ee64c0fbfc3dcb60f52c284.json new file mode 100644 index 000000000..89b8706ff --- /dev/null +++ b/packages/smithy-http/.changes/next-release/smithy-http-enhancement-91c7d1ab9ee64c0fbfc3dcb60f52c284.json @@ -0,0 +1,4 @@ +{ + "type": "enhancement", + "description": "Declared duplex (bidirectional) streaming support on `AWSCRTHTTPClient` via `SUPPORTS_DUPLEX_STREAMING`. `AIOHTTPClient` explicitly does not support it." +} diff --git a/packages/smithy-http/src/smithy_http/aio/aiohttp.py b/packages/smithy-http/src/smithy_http/aio/aiohttp.py index 5a330931d..5c11110f7 100644 --- a/packages/smithy-http/src/smithy_http/aio/aiohttp.py +++ b/packages/smithy-http/src/smithy_http/aio/aiohttp.py @@ -53,6 +53,10 @@ class AIOHTTPClient(HTTPClient): TIMEOUT_EXCEPTIONS = (TimeoutError,) + # aiohttp has no HTTP/2 support and this client fully buffers the response + # before returning, so it can never interleave request and response data. + SUPPORTS_DUPLEX_STREAMING = False + def __init__( self, *, diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 1cb5c34b5..12b4ce3e2 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -143,6 +143,10 @@ class AWSCRTHTTPClient(http_aio_interfaces.HTTPClient): TIMEOUT_EXCEPTIONS = (_CRTTimeoutError,) + # True duplex streaming additionally requires the connection to negotiate + # HTTP/2 via ALPN; over HTTP/1.1 the CRT falls back to a buffered body_stream. + SUPPORTS_DUPLEX_STREAMING = True + def __init__( self, eventloop: _AWSCRTEventLoop | None = None, diff --git a/packages/smithy-http/src/smithy_http/testing/mockhttp.py b/packages/smithy-http/src/smithy_http/testing/mockhttp.py index 95d1f758d..1bcc010e4 100644 --- a/packages/smithy-http/src/smithy_http/testing/mockhttp.py +++ b/packages/smithy-http/src/smithy_http/testing/mockhttp.py @@ -22,6 +22,11 @@ class MockHTTPClient(HTTPClient): TIMEOUT_EXCEPTIONS = (TimeoutError,) + # Declared so duplex (bidirectional) stream operations can be unit tested + # against this client. Queued responses are returned without consuming the + # request body, which is compatible with duplex streaming. + SUPPORTS_DUPLEX_STREAMING = True + def __init__( self, *, diff --git a/packages/smithy-http/tests/unit/aio/test_aiohttp.py b/packages/smithy-http/tests/unit/aio/test_aiohttp.py new file mode 100644 index 000000000..8e43414e6 --- /dev/null +++ b/packages/smithy-http/tests/unit/aio/test_aiohttp.py @@ -0,0 +1,7 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from smithy_http.aio.aiohttp import AIOHTTPClient + + +def test_does_not_support_duplex_streaming() -> None: + assert AIOHTTPClient.SUPPORTS_DUPLEX_STREAMING is False diff --git a/packages/smithy-http/tests/unit/aio/test_crt.py b/packages/smithy-http/tests/unit/aio/test_crt.py index 9718b3a2a..089529b3a 100644 --- a/packages/smithy-http/tests/unit/aio/test_crt.py +++ b/packages/smithy-http/tests/unit/aio/test_crt.py @@ -27,6 +27,10 @@ def test_deepcopy_client() -> None: deepcopy(client) +def test_supports_duplex_streaming() -> None: + assert AWSCRTHTTPClient.SUPPORTS_DUPLEX_STREAMING is True + + def test_client_marshal_request() -> None: """Test that HTTPRequest is correctly marshaled to CRT HttpRequest.""" client = AWSCRTHTTPClient() diff --git a/packages/smithy-http/tests/unit/testing/test_mockhttp.py b/packages/smithy-http/tests/unit/testing/test_mockhttp.py index 9efe68589..7784051db 100644 --- a/packages/smithy-http/tests/unit/testing/test_mockhttp.py +++ b/packages/smithy-http/tests/unit/testing/test_mockhttp.py @@ -5,6 +5,12 @@ from smithy_http.testing import MockHTTPClient, MockHTTPClientError, create_test_request +def test_supports_duplex_streaming(): + # Declared True so generated duplex stream operations can be unit tested + # against this client. + assert MockHTTPClient.SUPPORTS_DUPLEX_STREAMING is True + + async def test_default_response(): # Test error when no responses are queued mock_client = MockHTTPClient() diff --git a/uv.lock b/uv.lock index 90e8cea62..e90f42d74 100644 --- a/uv.lock +++ b/uv.lock @@ -753,7 +753,7 @@ awscrt = [ [package.metadata] requires-dist = [ - { name = "aiohttp", marker = "extra == 'aiohttp'", specifier = ">=3.14.0,<4.0" }, + { name = "aiohttp", marker = "extra == 'aiohttp'", specifier = ">=3.11.12,<4.0" }, { name = "awscrt", marker = "extra == 'awscrt'", specifier = "~=0.32.0" }, { name = "smithy-core", editable = "packages/smithy-core" }, { name = "yarl", marker = "extra == 'aiohttp'" },