Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 7.15.0 (Unreleased)

### Features Added

- Added `ServiceBusReceivedMessage.from_bytes()` classmethod to construct a `ServiceBusReceivedMessage` from raw AMQP payload bytes without requiring the deprecated `uamqp` library. ([#43979](https://github.com/Azure/azure-sdk-for-python/issues/43979))

## 7.14.3 (2025-11-11)

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ def __init__(
self._raw_amqp_message = AmqpAnnotatedMessage(message=kwargs["message"])
else:
self._build_annotated_message(body)
self.application_properties = application_properties
self.session_id = session_id
self.message_id = message_id
self.content_type = content_type
self.correlation_id = correlation_id
self.to = to
self.reply_to = reply_to
self.reply_to_session_id = reply_to_session_id
self.subject = subject
self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc
self.time_to_live = time_to_live
self.partition_key = partition_key
self.application_properties = application_properties # type: ignore[assignment]
self.session_id = session_id # type: ignore[assignment]
self.message_id = message_id # type: ignore[assignment]
self.content_type = content_type # type: ignore[assignment]
self.correlation_id = correlation_id # type: ignore[assignment]
self.to = to # type: ignore[assignment]
self.reply_to = reply_to # type: ignore[assignment]
self.reply_to_session_id = reply_to_session_id # type: ignore[assignment]
self.subject = subject # type: ignore[assignment]
self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc # type: ignore[assignment]
self.time_to_live = time_to_live # type: ignore[assignment]
self.partition_key = partition_key # type: ignore[assignment]
Comment thread
EldertGrootenboer marked this conversation as resolved.

def __str__(self) -> str:
return str(self.raw_amqp_message)
Expand Down Expand Up @@ -810,6 +810,32 @@ def __getstate__(self) -> Dict[str, Any]:
def __setstate__(self, state: Dict[str, Any]) -> None:
self.__dict__.update(state)

@classmethod
def from_bytes(cls, message: bytes) -> "ServiceBusReceivedMessage":
"""Constructs a ServiceBusReceivedMessage from the raw bytes of an AMQP message payload.

The message payload should adhere to the Message Format specification
outlined in the AMQP v1.0 standard:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format

The returned message is created in a settled state with no associated receiver,
meaning settlement operations (e.g., complete, abandon, defer, dead_letter) and
lock renewal are not available. The lock_token property will return None.

:param bytes message: The raw bytes representing the AMQP message payload.
:return: A ServiceBusReceivedMessage created from the provided message payload.
:rtype: ~azure.servicebus.ServiceBusReceivedMessage
"""
from .._pyamqp._decode import decode_payload

amqp_message = decode_payload(memoryview(message))
received_msg = cls(
Comment thread
EldertGrootenboer marked this conversation as resolved.
message=amqp_message,
receiver=None,
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
)
return received_msg
Comment thread
EldertGrootenboer marked this conversation as resolved.

@property
def _lock_expired(self) -> bool:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "7.14.3"
VERSION = "7.15.0"
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ ignore_missing_imports = True

[mypy-azure.servicebus.management._generated.*]
ignore_errors = True

[mypy-azure.servicebus._transport._uamqp_transport]
ignore_errors = True

[mypy-azure.servicebus._transport._pyamqp_transport]
ignore_errors = True
Comment thread
EldertGrootenboer marked this conversation as resolved.
103 changes: 103 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,109 @@ def test_servicebus_message_time_to_live():
assert message.time_to_live == timedelta(days=1)


def test_servicebus_received_message_from_bytes():
"""Test that from_bytes can decode a basic AMQP message payload."""
from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload
from azure.servicebus._pyamqp.message import Message as PyamqpMessage, Header, Properties

# Construct a pyamqp Message with various sections
original = PyamqpMessage(
header=Header(durable=True, priority=4, ttl=30000, delivery_count=1),
properties=Properties(
message_id="test-message-id-123",
content_type=b"application/json",
correlation_id="corr-456",
subject=b"test-subject",
reply_to=b"reply-queue",
group_id=b"session-1",
reply_to_group_id=b"reply-session",
),
message_annotations={
_X_OPT_PARTITION_KEY: b"pk-1",
},
application_properties={b"custom_prop": b"custom_value"},
data=[b"hello world"],
)

# Encode to bytes then decode via from_bytes
output = bytearray()
payload = _encode_payload(output, original)
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
received = ServiceBusReceivedMessage.from_bytes(payload)

# Validate message body
body = b"".join(received.body)
assert body == b"hello world"
assert received.body_type == AmqpMessageBodyType.DATA

# Validate properties
assert received.message_id == "test-message-id-123"
assert received.content_type == "application/json"
assert received.correlation_id == "corr-456"
assert received.subject == "test-subject"
assert received.reply_to == "reply-queue"
assert received.session_id == "session-1"
assert received.reply_to_session_id == "reply-session"

# Validate header fields
assert received.time_to_live == timedelta(milliseconds=30000)
assert received.delivery_count == 1

# Validate annotations
assert received.partition_key == "pk-1"

# Validate application properties
assert received.raw_amqp_message.application_properties == {b"custom_prop": b"custom_value"}

# Validate settled state (from_bytes creates settled messages)
assert received.lock_token is None


def test_servicebus_received_message_from_bytes_minimal():
"""Test from_bytes with a minimal AMQP message (data body only, no properties)."""
from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload
from azure.servicebus._pyamqp.message import Message as PyamqpMessage

original = PyamqpMessage(data=[b"minimal payload"])
output = bytearray()
payload = _encode_payload(output, original)
received = ServiceBusReceivedMessage.from_bytes(payload)
Comment thread
EldertGrootenboer marked this conversation as resolved.

Comment thread
EldertGrootenboer marked this conversation as resolved.
body = b"".join(received.body)
assert body == b"minimal payload"
assert received.message_id is None
assert received.content_type is None
assert received.session_id is None
assert received.lock_token is None


def test_servicebus_received_message_from_bytes_value_body():
"""Test from_bytes with an AMQP value body message."""
from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload
from azure.servicebus._pyamqp.message import Message as PyamqpMessage

original = PyamqpMessage(value={"key": "value"})
output = bytearray()
payload = _encode_payload(output, original)
received = ServiceBusReceivedMessage.from_bytes(payload)
Comment thread
EldertGrootenboer marked this conversation as resolved.

Comment thread
EldertGrootenboer marked this conversation as resolved.
assert received.body_type == AmqpMessageBodyType.VALUE
# AMQP encoding round-trips strings as bytes
assert received.body == {b"key": b"value"}


def test_servicebus_received_message_from_bytes_sequence_body():
"""Test from_bytes with an AMQP sequence body message."""
from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload
from azure.servicebus._pyamqp.message import Message as PyamqpMessage

original = PyamqpMessage(sequence=[1, 2, 3])
output = bytearray()
payload = _encode_payload(output, original)
received = ServiceBusReceivedMessage.from_bytes(payload)

Comment thread
EldertGrootenboer marked this conversation as resolved.
assert received.body_type == AmqpMessageBodyType.SEQUENCE
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.


class TestServiceBusMessageBackcompat(AzureMgmtRecordedTestCase):

@pytest.mark.liveTest
Expand Down