Skip to content

Commit abc1a00

Browse files
authored
Fix missing signed empty end frame on event stream close (#691)
* Send signed empty end frame on event stream close * Update signer to use sign_empty method * Add sign_empty method to EventSigner protocol * Update unit test * Address PR feedback * Update changelog entries to feature type
1 parent 9b28c3a commit abc1a00

6 files changed

Lines changed: 75 additions & 6 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"type": "enhancement",
2+
"type": "feature",
33
"description": "Added `AsyncEventSigner.sign_empty` for SigV4-signed Amazon Event Stream terminator frames. This supports services that require a final signed empty message before the HTTP body stream closes."
44
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"type": "feature",
3+
"description": "Added a signed empty end frame to `AWSEventPublisher.close` to signal event stream completion. This supports services that require a final signed empty message before the HTTP body stream closes."
4+
}

packages/smithy-aws-event-stream/src/smithy_aws_event_stream/aio/__init__.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
from .._private.deserializers import EventDeserializer as _EventDeserializer
1919
from .._private.serializers import EventSerializer as _EventSerializer
20-
from ..events import Event
20+
from ..events import Event, EventMessage
2121
from ..exceptions import EventError
2222

2323
logger = logging.getLogger(__name__)
@@ -85,9 +85,26 @@ async def close(self) -> None:
8585
return
8686
self._closed = True
8787

88-
if (close := getattr(self._writer, "close", None)) is not None:
89-
if asyncio.iscoroutine(result := close()):
90-
await result
88+
try:
89+
# Send a signed empty frame to signal stream completion.
90+
if self._signing_config is not None:
91+
end_frame = EventMessage()
92+
identity = await self._signing_config.identity_resolver.get_identity(
93+
properties=self._signing_config.identity_properties
94+
)
95+
end_frame = await self._signing_config.signer.sign_empty(
96+
event=end_frame,
97+
identity=identity,
98+
properties=self._signing_config.signing_properties,
99+
)
100+
logger.debug(
101+
"Sending signed empty message to terminate the event stream."
102+
)
103+
await self._writer.write(end_frame.encode())
104+
finally:
105+
if (close := getattr(self._writer, "close", None)) is not None:
106+
if asyncio.iscoroutine(result := close()):
107+
await result
91108

92109
@property
93110
def closed(self) -> bool:

packages/smithy-aws-event-stream/tests/unit/_private/test_serializers.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
import asyncio
4+
from io import BytesIO
35
from typing import Any
6+
from unittest.mock import AsyncMock
47

58
import pytest
69
from smithy_aws_event_stream._private.serializers import EventSerializer
710
from smithy_aws_event_stream.aio import AWSEventPublisher
8-
from smithy_aws_event_stream.events import EventMessage
11+
from smithy_aws_event_stream.events import Event, EventMessage
912
from smithy_core.aio.types import AsyncBytesProvider
1013
from smithy_core.serializers import SerializeableShape
1114
from smithy_json import JSONCodec
@@ -77,3 +80,33 @@ async def test_send_to_closed_writer():
7780
await publisher.send(EVENT_STREAM_SERDE_CASES[0][0])
7881

7982
assert publisher.closed
83+
84+
85+
async def test_close_sends_empty_end_frame_when_signing():
86+
writer = AsyncBytesProvider()
87+
end_frame = EventMessage()
88+
signing_config = AsyncMock()
89+
signing_config.signer.sign_empty.return_value = end_frame
90+
91+
publisher: AWSEventPublisher[Any] = AWSEventPublisher(
92+
payload_codec=JSONCodec(), async_writer=writer, signing_config=signing_config
93+
)
94+
95+
# Read from the writer concurrently with close(), since close() flushes
96+
# and blocks until all chunks are consumed.
97+
reader = asyncio.create_task(_read(writer))
98+
await publisher.close()
99+
written = await reader
100+
101+
signing_config.signer.sign_empty.assert_awaited_once()
102+
assert publisher.closed
103+
assert writer.closed
104+
105+
decoded = Event.decode(BytesIO(written))
106+
assert decoded is not None
107+
assert decoded.message.payload == b""
108+
assert decoded.message.headers == {}
109+
110+
111+
async def _read(source: AsyncBytesProvider) -> bytes:
112+
return b"".join([chunk async for chunk in source])
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"type": "feature",
3+
"description": "Added `EventSigner.sign_empty` to the protocol for signing empty events."
4+
}

packages/smithy-core/src/smithy_core/aio/interfaces/auth.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@ async def sign(self, *, event: Any, identity: I, properties: SP) -> Any:
3131
"""Get a signed version of the event.
3232
3333
:param event: The event to be signed.
34+
:param identity: The identity to use to sign the event.
35+
:param properties: Additional properties used to sign the event.
36+
"""
37+
...
38+
39+
async def sign_empty(self, *, event: Any, identity: I, properties: SP) -> Any:
40+
"""Get a signed version of an empty event.
41+
42+
:param event: The empty event to be signed.
43+
:param identity: The identity to use to sign the event.
44+
:param properties: Additional properties used to sign the event.
3445
"""
3546
...
3647

0 commit comments

Comments
 (0)