Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 7.15.0 (Unreleased)

### Features Added

- Added `ServiceBusReceivedMessage.from_bytes()` classmethod to construct a `ServiceBusReceivedMessage` from raw AMQP payload bytes without requiring the deprecated `uamqp` library. ([#43979](https://github.com/Azure/azure-sdk-for-python/issues/43979))

## 7.14.3 (2025-11-11)

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ def __init__(
self._raw_amqp_message = AmqpAnnotatedMessage(message=kwargs["message"])
else:
self._build_annotated_message(body)
self.application_properties = application_properties
self.session_id = session_id
self.message_id = message_id
self.content_type = content_type
self.correlation_id = correlation_id
self.to = to
self.reply_to = reply_to
self.reply_to_session_id = reply_to_session_id
self.subject = subject
self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc
self.time_to_live = time_to_live
self.partition_key = partition_key
self.application_properties = application_properties # type: ignore[assignment]
self.session_id = session_id # type: ignore[assignment]
self.message_id = message_id # type: ignore[assignment]
self.content_type = content_type # type: ignore[assignment]
self.correlation_id = correlation_id # type: ignore[assignment]
self.to = to # type: ignore[assignment]
self.reply_to = reply_to # type: ignore[assignment]
self.reply_to_session_id = reply_to_session_id # type: ignore[assignment]
self.subject = subject # type: ignore[assignment]
self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc # type: ignore[assignment]
self.time_to_live = time_to_live # type: ignore[assignment]
self.partition_key = partition_key # type: ignore[assignment]
Comment thread
EldertGrootenboer marked this conversation as resolved.

def __str__(self) -> str:
return str(self.raw_amqp_message)
Expand Down Expand Up @@ -810,6 +810,32 @@ def __getstate__(self) -> Dict[str, Any]:
def __setstate__(self, state: Dict[str, Any]) -> None:
self.__dict__.update(state)

@classmethod
def from_bytes(cls, message: bytes) -> "ServiceBusReceivedMessage":
"""Constructs a ServiceBusReceivedMessage from the raw bytes of an AMQP message payload.

The message payload should adhere to the Message Format specification
outlined in the AMQP v1.0 standard:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format

The returned message is created in a settled state with no associated receiver,
meaning settlement operations (e.g., complete, abandon, defer, dead_letter) and
lock renewal are not available. The lock_token property will return None.

:param bytes message: The raw bytes representing the AMQP message payload.
:return: A ServiceBusReceivedMessage created from the provided message payload.
:rtype: ~azure.servicebus.ServiceBusReceivedMessage
"""
from .._pyamqp._decode import decode_payload

amqp_message = decode_payload(memoryview(message))
received_msg = cls(
Comment thread
EldertGrootenboer marked this conversation as resolved.
message=amqp_message,
receiver=None,
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
)
return received_msg
Comment thread
EldertGrootenboer marked this conversation as resolved.

@property
def _lock_expired(self) -> bool:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "7.14.3"
VERSION = "7.15.0"
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ ignore_missing_imports = True

[mypy-azure.servicebus.management._generated.*]
ignore_errors = True

[mypy-azure.servicebus._transport._uamqp_transport]
ignore_errors = True

[mypy-azure.servicebus._transport._pyamqp_transport]
ignore_errors = True
Comment thread
EldertGrootenboer marked this conversation as resolved.
103 changes: 103 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,109 @@ def test_servicebus_message_time_to_live():
assert message.time_to_live == timedelta(days=1)


def test_servicebus_received_message_from_bytes():
"""Test that from_bytes can decode a basic AMQP message payload."""
from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload
from azure.servicebus._pyamqp.message import Message as PyamqpMessage, Header, Properties

# Construct a pyamqp Message with various sections
original = PyamqpMessage(
header=Header(durable=True, priority=4, ttl=30000, delivery_count=1),
properties=Properties(
message_id="test-message-id-123",
content_type=b"application/json",
correlation_id="corr-456",
subject=b"test-subject",
reply_to=b"reply-queue",
group_id=b"session-1",
reply_to_group_id=b"reply-session",
),
message_annotations={
_X_OPT_PARTITION_KEY: b"pk-1",
},
application_properties={b"custom_prop": b"custom_value"},
data=[b"hello world"],
)

# Encode to bytes then decode via from_bytes
output = bytearray()
payload = _encode_payload(output, original)
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
Comment thread
EldertGrootenboer marked this conversation as resolved.
Outdated
received = ServiceBusReceivedMessage.from_bytes(payload)

# Validate message body
body = b"".join(received.body)
assert body == b"hello world"
assert received.body_type == AmqpMessageBodyType.DATA

# Validate properties
assert received.message_id == "test-message-id-123"
assert received.content_type == "application/json"
assert received.correlation_id == "corr-456"
assert received.subject == "test-subject"
assert received.reply_to == "reply-queue"
assert received.session_id == "session-1"
assert received.reply_to_session_id == "reply-session"

# Validate header fields
assert received.time_to_live == timedelta(milliseconds=30000)
assert received.delivery_count == 1

# Validate annotations
assert received.partition_key == "pk-1"

# Validate application properties
assert received.raw_amqp_message.application_properties == {b"custom_prop": b"custom_value"}

# Validate settled state (from_bytes creates settled messages)
assert received.lock_token is None


def test_servicebus_received_message_from_bytes_minimal():
"""Test from_bytes with a minimal AMQP message (data body only, no properties)."""
from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload
from azure.servicebus._pyamqp.message import Message as PyamqpMessage

original = PyamqpMessage(data=[b"minimal payload"])
output = bytearray()
payload = _encode_payload(output, original)
received = ServiceBusReceivedMessage.from_bytes(payload)
Comment thread
EldertGrootenboer marked this conversation as resolved.

Comment thread
EldertGrootenboer marked this conversation as resolved.
body = b"".join(received.body)
assert body == b"minimal payload"
assert received.message_id is None
assert received.content_type is None
assert received.session_id is None
assert received.lock_token is None


def test_servicebus_received_message_from_bytes_value_body():
"""Test from_bytes with an AMQP value body message."""
from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload
from azure.servicebus._pyamqp.message import Message as PyamqpMessage

original = PyamqpMessage(value={"key": "value"})
output = bytearray()
payload = _encode_payload(output, original)
received = ServiceBusReceivedMessage.from_bytes(payload)
Comment thread
EldertGrootenboer marked this conversation as resolved.

Comment thread
EldertGrootenboer marked this conversation as resolved.
assert received.body_type == AmqpMessageBodyType.VALUE
# AMQP encoding round-trips strings as bytes
assert received.body == {b"key": b"value"}


def test_servicebus_received_message_from_bytes_sequence_body():
"""Test from_bytes with an AMQP sequence body message."""
from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload
from azure.servicebus._pyamqp.message import Message as PyamqpMessage

original = PyamqpMessage(sequence=[1, 2, 3])
output = bytearray()
payload = _encode_payload(output, original)
received = ServiceBusReceivedMessage.from_bytes(payload)

Comment thread
EldertGrootenboer marked this conversation as resolved.
assert received.body_type == AmqpMessageBodyType.SEQUENCE
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.


class TestServiceBusMessageBackcompat(AzureMgmtRecordedTestCase):

@pytest.mark.liveTest
Expand Down