diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index a00f558792d2..0d94f1daa1b6 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 7.15.0 (Unreleased) + +### Features Added + +- Added `ServiceBusReceivedMessage.from_bytes()` classmethod to construct a `ServiceBusReceivedMessage` from raw AMQP payload bytes without requiring the deprecated `uamqp` library. ([#43979](https://github.com/Azure/azure-sdk-for-python/issues/43979)) + ## 7.14.3 (2025-11-11) ### Bugs Fixed diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 00b2346d460a..288049bf0afd 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -124,18 +124,18 @@ def __init__( self._raw_amqp_message = AmqpAnnotatedMessage(message=kwargs["message"]) else: self._build_annotated_message(body) - self.application_properties = application_properties - self.session_id = session_id - self.message_id = message_id - self.content_type = content_type - self.correlation_id = correlation_id - self.to = to - self.reply_to = reply_to - self.reply_to_session_id = reply_to_session_id - self.subject = subject - self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc - self.time_to_live = time_to_live - self.partition_key = partition_key + self.application_properties = application_properties # type: ignore[assignment] + self.session_id = session_id # type: ignore[assignment] + self.message_id = message_id # type: ignore[assignment] + self.content_type = content_type # type: ignore[assignment] + self.correlation_id = correlation_id # type: ignore[assignment] + self.to = to # type: ignore[assignment] + self.reply_to = reply_to # type: ignore[assignment] + self.reply_to_session_id = reply_to_session_id # type: ignore[assignment] + self.subject = subject # type: ignore[assignment] + self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc # type: ignore[assignment] + self.time_to_live = time_to_live # type: ignore[assignment] + self.partition_key = partition_key # type: ignore[assignment] def __str__(self) -> str: return str(self.raw_amqp_message) @@ -810,6 +810,32 @@ def __getstate__(self) -> Dict[str, Any]: def __setstate__(self, state: Dict[str, Any]) -> None: self.__dict__.update(state) + @classmethod + def from_bytes(cls, message: bytes) -> "ServiceBusReceivedMessage": + """Constructs a ServiceBusReceivedMessage from the raw bytes of an AMQP message payload. + + The message payload should adhere to the Message Format specification + outlined in the AMQP v1.0 standard: + http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format + + The returned message is created in a settled state with no associated receiver, + meaning settlement operations (e.g., complete, abandon, defer, dead_letter) and + lock renewal are not available. The lock_token property will return None. + + :param bytes message: The raw bytes representing the AMQP message payload. + :return: A ServiceBusReceivedMessage created from the provided message payload. + :rtype: ~azure.servicebus.ServiceBusReceivedMessage + """ + from .._pyamqp._decode import decode_payload + + amqp_message = decode_payload(memoryview(message)) + received_msg = cls( + message=amqp_message, + receiver=None, + receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE, + ) + return received_msg + @property def _lock_expired(self) -> bool: """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_version.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_version.py index 6235fea047d0..bbae47a9af60 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_version.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_version.py @@ -3,4 +3,4 @@ # Licensed under the MIT License. # ------------------------------------ -VERSION = "7.14.3" +VERSION = "7.15.0" diff --git a/sdk/servicebus/azure-servicebus/mypy.ini b/sdk/servicebus/azure-servicebus/mypy.ini index c8dd195b44ed..074fab72a287 100644 --- a/sdk/servicebus/azure-servicebus/mypy.ini +++ b/sdk/servicebus/azure-servicebus/mypy.ini @@ -7,3 +7,9 @@ ignore_missing_imports = True [mypy-azure.servicebus.management._generated.*] ignore_errors = True + +[mypy-azure.servicebus._transport._uamqp_transport] +ignore_errors = True + +[mypy-azure.servicebus._transport._pyamqp_transport] +ignore_errors = True diff --git a/sdk/servicebus/azure-servicebus/tests/test_message.py b/sdk/servicebus/azure-servicebus/tests/test_message.py index b6fc1aa21edc..8ad3e55c305e 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_message.py +++ b/sdk/servicebus/azure-servicebus/tests/test_message.py @@ -360,6 +360,113 @@ def test_servicebus_message_time_to_live(): assert message.time_to_live == timedelta(days=1) +def test_servicebus_received_message_from_bytes(): + """Test that from_bytes can decode a basic AMQP message payload.""" + from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload + from azure.servicebus._pyamqp.message import Message as PyamqpMessage, Header, Properties + + # Construct a pyamqp Message with various sections + original = PyamqpMessage( + header=Header(durable=True, priority=4, ttl=30000, delivery_count=1), + properties=Properties( + message_id="test-message-id-123", + content_type=b"application/json", + correlation_id="corr-456", + subject=b"test-subject", + reply_to=b"reply-queue", + group_id=b"session-1", + reply_to_group_id=b"reply-session", + ), + message_annotations={ + _X_OPT_PARTITION_KEY: b"pk-1", + }, + application_properties={b"custom_prop": b"custom_value"}, + data=[b"hello world"], + ) + + # Encode to bytes then decode via from_bytes. + # _encode_payload returns the bytearray buffer it was given; wrap in bytes() + # so the test exercises the public from_bytes(bytes) contract. + output = bytearray() + payload = bytes(_encode_payload(output, original)) + received = ServiceBusReceivedMessage.from_bytes(payload) + + # Validate message body + body = b"".join(received.body) + assert body == b"hello world" + assert received.body_type == AmqpMessageBodyType.DATA + + # Validate properties + assert received.message_id == "test-message-id-123" + assert received.content_type == "application/json" + assert received.correlation_id == "corr-456" + assert received.subject == "test-subject" + assert received.reply_to == "reply-queue" + assert received.session_id == "session-1" + assert received.reply_to_session_id == "reply-session" + + # Validate header fields + assert received.time_to_live == timedelta(milliseconds=30000) + assert received.delivery_count == 1 + + # Validate annotations + assert received.partition_key == "pk-1" + + # Validate application properties + assert received.raw_amqp_message.application_properties == {b"custom_prop": b"custom_value"} + + # Validate settled state (from_bytes creates settled messages) + assert received.lock_token is None + + +def test_servicebus_received_message_from_bytes_minimal(): + """Test from_bytes with a minimal AMQP message (data body only, no properties).""" + from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload + from azure.servicebus._pyamqp.message import Message as PyamqpMessage + + original = PyamqpMessage(data=[b"minimal payload"]) + output = bytearray() + payload = bytes(_encode_payload(output, original)) + received = ServiceBusReceivedMessage.from_bytes(payload) + + body = b"".join(received.body) + assert body == b"minimal payload" + assert received.message_id is None + assert received.content_type is None + assert received.session_id is None + assert received.lock_token is None + + +def test_servicebus_received_message_from_bytes_value_body(): + """Test from_bytes with an AMQP value body message.""" + from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload + from azure.servicebus._pyamqp.message import Message as PyamqpMessage + + original = PyamqpMessage(value={"key": "value"}) + output = bytearray() + payload = bytes(_encode_payload(output, original)) + received = ServiceBusReceivedMessage.from_bytes(payload) + + assert received.body_type == AmqpMessageBodyType.VALUE + # AMQP encoding round-trips strings as bytes + assert received.body == {b"key": b"value"} + + +def test_servicebus_received_message_from_bytes_sequence_body(): + """Test from_bytes with an AMQP sequence body message.""" + from azure.servicebus._pyamqp._encode import encode_payload as _encode_payload + from azure.servicebus._pyamqp.message import Message as PyamqpMessage + + original = PyamqpMessage(sequence=[1, 2, 3]) + output = bytearray() + payload = bytes(_encode_payload(output, original)) + received = ServiceBusReceivedMessage.from_bytes(payload) + + assert received.body_type == AmqpMessageBodyType.SEQUENCE + # Confirm the decoded sequence contents round-trip, not just the body type. + assert list(received.body) == [1, 2, 3] + + class TestServiceBusMessageBackcompat(AzureMgmtRecordedTestCase): @pytest.mark.liveTest