Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 80 additions & 9 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from typing_extensions import Literal


from . import described
from .message import Message, Header, Properties

if TYPE_CHECKING:
Expand Down Expand Up @@ -153,6 +154,7 @@ def _decode_binary_large(buffer: memoryview) -> Tuple[memoryview, bytes]:
length_index = c_unsigned_long.unpack(buffer[:4])[0] + 4
return buffer[length_index:], buffer[4:length_index].tobytes()


def _decode_decimal128(buffer: memoryview) -> Tuple[memoryview, decimal.Decimal]:
"""
Decode a Decimal128 value from the buffer.
Expand Down Expand Up @@ -254,23 +256,41 @@ def _decode_map_large(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]:
def _decode_array_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]:
count = buffer[1] # Ignore first byte (size) and just rely on count
if count:
subconstructor = buffer[2]
buffer = buffer[3:]
values = [None] * count
for i in range(count):
buffer, values[i] = _DECODE_BY_CONSTRUCTOR[subconstructor](buffer)
subconstructor = buffer[2]

if subconstructor == 0:
composite_type = buffer[3]
buffer, descriptor = _DECODE_BY_CONSTRUCTOR[composite_type](buffer[4:])
subconstructor = buffer[0]
buffer = buffer[1:]
for i in range(count):
buffer, values[i] = _decode_described_array(buffer, subconstructor, descriptor)
else:
buffer = buffer[3:]
for i in range(count):
buffer, values[i] = _DECODE_BY_CONSTRUCTOR[subconstructor](buffer)
return buffer, values
return buffer[2:], []


def _decode_array_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]:
count = c_unsigned_long.unpack(buffer[4:8])[0]
if count:
subconstructor = buffer[8]
buffer = buffer[9:]
values = [None] * count
for i in range(count):
buffer, values[i] = _DECODE_BY_CONSTRUCTOR[subconstructor](buffer)
subconstructor = buffer[8]

if subconstructor == 0:
composite_type = buffer[9]
buffer, descriptor = _DECODE_BY_CONSTRUCTOR[composite_type](buffer[10:])
subconstructor = buffer[0]
buffer = buffer[1:]
for i in range(count):
buffer, values[i] = _decode_described_array(buffer, subconstructor, descriptor)
else:
buffer = buffer[9:]
for i in range(count):
buffer, values[i] = _DECODE_BY_CONSTRUCTOR[subconstructor](buffer)
return buffer, values
return buffer[8:], []

Expand All @@ -280,7 +300,25 @@ def _decode_described(buffer: memoryview) -> Tuple[memoryview, object]:
# descriptor without decoding descriptor value
composite_type = buffer[0]
buffer, descriptor = _DECODE_BY_CONSTRUCTOR[composite_type](buffer[1:])
buffer, value = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:])
tp = buffer[0]
buffer, value = _DECODE_BY_CONSTRUCTOR[tp](buffer[1:])
try:
value = _DESCR_BY_CONSTRUCTOR[tp](value, descriptor=descriptor)
except KeyError:
pass
Comment on lines +305 to +308
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only attaches .descriptor for constructors listed in _DESCR_BY_CONSTRUCTOR; for any other described underlying type (e.g., uuid/decimal128/bool/null), the descriptor is still dropped via the KeyError path. If the intent is to preserve the descriptor for all described values (per PR description), consider adding a generic fallback wrapper that stores (descriptor, value) when a specialized subclass wrapper is not available.

Copilot uses AI. Check for mistakes.
try:
composite_type = cast(int, _COMPOSITES[descriptor])
return buffer, {composite_type: value}
except KeyError:
Comment thread
fafhrd91 marked this conversation as resolved.
return buffer, value


def _decode_described_array(buffer: memoryview, tp: int, descriptor) -> Tuple[memoryview, Any]:
buffer, value = _DECODE_BY_CONSTRUCTOR[tp](buffer)
try:
value = _DESCR_BY_CONSTRUCTOR[tp](value, descriptor=descriptor)
except KeyError:
pass
try:
composite_type = cast(int, _COMPOSITES[descriptor])
return buffer, {composite_type: value}
Expand Down Expand Up @@ -394,3 +432,36 @@ def decode_empty_frame(header: memoryview) -> Tuple[int, bytes]:
_DECODE_BY_CONSTRUCTOR[209] = _decode_map_large
_DECODE_BY_CONSTRUCTOR[224] = _decode_array_small
_DECODE_BY_CONSTRUCTOR[240] = _decode_array_large

_DESCR_BY_CONSTRUCTOR: Dict[int, Any] = {
67: described.DescribedInt,
68: described.DescribedInt,
69: described.DescribedList,
80: described.DescribedInt,
81: described.DescribedInt,
82: described.DescribedInt,
83: described.DescribedInt,
84: described.DescribedInt,
85: described.DescribedInt,
96: described.DescribedInt,
97: described.DescribedInt,
112: described.DescribedInt,
113: described.DescribedInt,
114: described.DescribedFloat,
128: described.DescribedInt,
129: described.DescribedInt,
130: described.DescribedFloat,
131: described.DescribedInt,
160: described.DescribedBytes,
161: described.DescribedBytes,
163: described.DescribedBytes,
176: described.DescribedBytes,
177: described.DescribedBytes,
179: described.DescribedBytes,
192: described.DescribedList,
193: described.DescribedDict,
208: described.DescribedList,
209: described.DescribedDict,
224: described.DescribedList,
240: described.DescribedList,
}
31 changes: 31 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/described.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

class Described:
def __new__(cls, value, descriptor=None):
obj = super().__new__(cls, value)
obj.descriptor = descriptor
return obj


class DescribedInt(Described, int):
pass


class DescribedFloat(Described, float):
pass


class DescribedBytes(Described, bytes):
pass


class DescribedList(Described, list):
pass


class DescribedDict(Described, dict):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -723,3 +723,39 @@ def test_send_long_wait_idle_timeout(auth_credential_receivers, keep_alive, uamq
else:
with pytest.raises(AMQPConnectionError):
sender._send_event_data()


@pytest.mark.liveTest
def test_send_and_receive_described_types(auth_credential_receivers, uamqp_transport, timeout_factor, client_args):
if uamqp_transport:
pytest.skip("Described type preservation only applies to pyamqp transport")

fully_qualified_namespace, eventhub_name, credential, receivers = auth_credential_receivers
client = EventHubProducerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=credential(),
uamqp_transport=uamqp_transport,
**client_args
)

with client:
# A tuple is encoded as a described type: (descriptor, value)
described_value = (12345, "TEST")
message = AmqpAnnotatedMessage(value_body=described_value)
client.send_event(message)

timeout = 10 * timeout_factor
received = []
for r in receivers:
received.extend(r.receive_message_batch(timeout=timeout))

assert len(received) >= 1

for msg in received:
if msg.value is not None and hasattr(msg.value, "descriptor"):
assert msg.value.descriptor == 12345
assert msg.value == b"TEST"
break
else:
pytest.fail("Did not receive message with described value body")
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from azure.eventhub._pyamqp import authentication, ReceiveClient, SendClient
from azure.eventhub._pyamqp.constants import TransportType
from azure.eventhub._pyamqp.message import Message
from azure.eventhub._pyamqp._encode import encode_described


def send_message(live_eventhub):
Expand Down Expand Up @@ -54,3 +55,48 @@ def test_event_hubs_client_amqp(live_eventhub):
) as receive_client:
messages = receive_client.receive_message_batch(max_batch_size=1)
assert len(messages) > 0


def test_described(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub["hostname"], live_eventhub["event_hub"])
sas_auth = authentication.SASTokenAuth(
uri=uri, audience=uri, username=live_eventhub["key_name"], password=live_eventhub["access_key"]
)

target = "amqps://{}/{}/Partitions/{}".format(
live_eventhub["hostname"], live_eventhub["event_hub"], live_eventhub["partition"]
)

# A tuple is encoded as a described type: (descriptor, value)
message = Message(value=(12345, "TEST"))

with SendClient(
live_eventhub["hostname"], target, auth=sas_auth, debug=True, transport_type=TransportType.Amqp
) as send_client:
send_client.send_message(message)

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub["hostname"],
live_eventhub["event_hub"],
live_eventhub["consumer_group"],
live_eventhub["partition"],
)

with ReceiveClient(
live_eventhub["hostname"],
source,
auth=sas_auth,
debug=False,
timeout=500,
prefetch=1,
transport_type=TransportType.Amqp,
) as receive_client:
messages = receive_client.receive_message_batch(max_batch_size=1)
assert len(messages) > 0

msg = messages[0]
# The value body should be decoded as a described type with descriptor preserved
assert msg.value is not None
assert hasattr(msg.value, "descriptor")
assert msg.value.descriptor == 12345
assert msg.value == b"TEST"
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from azure.eventhub._pyamqp._decode import _decode_decimal128
from azure.eventhub._pyamqp._decode import _decode_decimal128, _decode_described, _decode_array_small, _decode_array_large
from decimal import Decimal


Expand All @@ -18,3 +18,30 @@ def test_decimal_decode(value, expected):
assert output[1] == expected


def test_described():
value = b"\x80\0\0\x017\0\0\x07\xd3\xd0\0\0\0\x12\0\0\0\x02\xa1\ntest/topicP\0"
buffer, output = _decode_described(memoryview(value))
assert output.descriptor == 1335734831059
assert output == [b'test/topic', 0]


def test_array_of_described():
value = b"\0\x03\0\x80\0\0\x017\0\0\x07\xd4\xd0\0\0\0\x0c\0\0\0\x02\xa1\x02n1\xa1\x02v1\0\0\0\x0c\0\0\0\x02\xa1\x02n2\xa1\x02v2\0\0\0\n\0\0\0\x02\xa1\x02n1\xa1\0"

buffer, output = _decode_array_small(memoryview(value))
assert output == [[b'n1', b'v1'], [b'n2', b'v2'], [b'n1', b'']]
assert output[0].descriptor == 1335734831060
assert output[1].descriptor == 1335734831060
assert output[2].descriptor == 1335734831060


def test_array_of_described_large():
value = b"\0\0\x0e\x0f\0\0\x01\0\0\x80\0\0\x017\0\0\x07\xd4\xd0"
for i in range(256):
value += b"\0\0\0\n\0\0\0\x02\xa1\x01n\xa1\x01v"

buffer, output = _decode_array_large(memoryview(value))
assert len(output) == 256
for i in range(256):
assert output[i] == [b'n', b'v']
assert output[i].descriptor == 1335734831060
Loading
Loading