Skip to content

Commit 0ca1041

Browse files
committed
ADD message deletion when they cannot be decoded
Deleting messages with decoding failures will avoid the consumers being stuck on the same message forever. It also converts the decoding exception to a custom one with an envelope with an empty message for logging purposes.
1 parent 6da8b09 commit 0ca1041

7 files changed

Lines changed: 170 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
## [1.2.0] - 2022-02-28
10+
### Added
11+
- Converts the `MessageDecodingFailedException` to a new `SerializerDecodingException` containing an envelope with an empty message for logging purposes.
12+
13+
### Fixed
14+
- Deletes messages for consumers when the serializer throws a `MessageDecodingFailedException` to avoid retying them forever.
15+
916
## [1.1.1] - 2022-02-15
1017
### Fixed
1118
- **AzureBrokerPropertiesStamp** **DateTime** properties timezones are now set to the current default timezone.
@@ -21,7 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2128
### Added
2229
- **Symfony Messenger** transport for **Azure Service Bus** *queues* and *topics*
2330

24-
[Unreleased]: https://github.com/AymDev/MessengerAzureBundle/compare/v1.1.1...HEAD
31+
[Unreleased]: https://github.com/AymDev/MessengerAzureBundle/compare/v1.2.0...HEAD
32+
[1.2.0]: https://github.com/AymDev/MessengerAzureBundle/releases/tag/v1.2.0
2533
[1.1.1]: https://github.com/AymDev/MessengerAzureBundle/releases/tag/v1.1.1
2634
[1.1.0]: https://github.com/AymDev/MessengerAzureBundle/releases/tag/v1.1.0
2735
[1.0.0]: https://github.com/AymDev/MessengerAzureBundle/releases/tag/v1.0.0

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,16 @@ The `AymDev\MessengerAzureBundle\Messenger\Stamp\AzureReceivedStamp` stamp holds
8383

8484
## Serialization
8585

86+
### Creating your serializers
8687
There is no serializer provided, but here is the expected array structure of an encoded envelope:
8788

8889
- `body`: your plain text message
89-
- `headers`: optional HTTP headers (either received from *Azure Service Bus* response or to send to the REST API)
90+
- `headers`: optional HTTP headers (either received from *Azure Service Bus* response or to send to the REST API)
91+
92+
### Logging decoding errors
93+
When a serializer throws a `Symfony\Component\Messenger\Exception\MessageDecodingFailedException` while decoding a message,
94+
it will be converted to a `AymDev\MessengerAzureBundle\Messenger\Exception\SerializerDecodingException` which contains an
95+
envelope with an empty message but with the same stamps as a successfully decoded message.
96+
97+
You can then [listen](https://symfony.com/doc/current/event_dispatcher.html) to the `console.error` Symfony event and get
98+
the topic/queue name where then decoding failure happened, the original message, etc.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace AymDev\MessengerAzureBundle\Messenger\Exception;
6+
7+
use Symfony\Component\Messenger\Envelope;
8+
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
9+
use Throwable;
10+
11+
/**
12+
* This exception class contains an envelope for a message that could not be decoded. Its only purpose is to provide the
13+
* same metadata as a decoded messages with its stamps.
14+
* It is thrown when a consumer serializer throws a MessageDecodingFailedException and can be used for logging using the
15+
* console.error Symfony event.
16+
*/
17+
final class SerializerDecodingException extends MessageDecodingFailedException
18+
{
19+
/** @var Envelope */
20+
private $envelope;
21+
22+
/**
23+
* @param Envelope $envelope an envelope with an empty message
24+
*/
25+
public function __construct(Envelope $envelope, string $message = "", int $code = 0, Throwable $previous = null)
26+
{
27+
parent::__construct($message, $code, $previous);
28+
$this->envelope = $envelope;
29+
}
30+
31+
public function getEnvelope(): Envelope
32+
{
33+
return $this->envelope;
34+
}
35+
}

src/Messenger/Transport/AzureTransport.php

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@
44

55
namespace AymDev\MessengerAzureBundle\Messenger\Transport;
66

7+
use AymDev\MessengerAzureBundle\Messenger\Exception\SerializerDecodingException;
78
use AymDev\MessengerAzureBundle\Messenger\Stamp\AzureBrokerPropertiesStamp;
89
use AymDev\MessengerAzureBundle\Messenger\Stamp\AzureMessageStamp;
910
use AymDev\MessengerAzureBundle\Messenger\Stamp\AzureReceivedStamp;
1011
use Symfony\Component\Messenger\Envelope;
12+
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1113
use Symfony\Component\Messenger\Exception\TransportException;
1214
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1315
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1416
use Symfony\Component\Messenger\Transport\TransportInterface;
1517
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
1618
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
1719
use Symfony\Contracts\HttpClient\HttpClientInterface;
20+
use Symfony\Contracts\HttpClient\ResponseInterface;
1821

1922
/**
2023
* Messenger transport for Azure Service Bus
@@ -92,12 +95,34 @@ public function get(): iterable
9295
}
9396

9497
// Decode message
95-
$envelope = $this->serializer->decode([
96-
'body' => $body,
97-
'headers' => $headers,
98-
]);
98+
try {
99+
$envelope = $this->serializer->decode([
100+
'body' => $body,
101+
'headers' => $headers,
102+
]);
103+
} catch (MessageDecodingFailedException $e) {
104+
// Delete the message using an empty message
105+
$envelope = $this->addStamps(new Envelope(new EmptyMessage()), $response);
106+
$this->delete($envelope);
107+
108+
throw new SerializerDecodingException(
109+
$envelope,
110+
$e->getMessage(),
111+
1646061041,
112+
$e
113+
);
114+
}
99115

100-
// Add stamps
116+
return [$this->addStamps($envelope, $response)];
117+
}
118+
119+
/**
120+
* Add stamps to an envelope
121+
* @throws HttpExceptionInterface
122+
* @throws TransportExceptionInterface
123+
*/
124+
private function addStamps(Envelope $envelope, ResponseInterface $response): Envelope
125+
{
101126
$brokerPropertiesStamp = AzureBrokerPropertiesStamp::createFromResponse($response);
102127
$envelope = $envelope
103128
->with(AzureReceivedStamp::createFromResponse($response))
@@ -110,7 +135,7 @@ public function get(): iterable
110135
$envelope = $envelope->with(new TransportMessageIdStamp($brokerPropertiesStamp->getMessageId()));
111136
}
112137

113-
return [$envelope];
138+
return $envelope;
114139
}
115140

116141
/**
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace AymDev\MessengerAzureBundle\Messenger\Transport;
6+
7+
/**
8+
* This message class is used when creating an envelope for decoding failures
9+
* @internal
10+
*/
11+
final class EmptyMessage
12+
{
13+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Messenger\Exception;
6+
7+
use AymDev\MessengerAzureBundle\Messenger\Exception\SerializerDecodingException;
8+
use AymDev\MessengerAzureBundle\Messenger\Transport\EmptyMessage;
9+
use PHPUnit\Framework\TestCase;
10+
use Symfony\Component\Messenger\Envelope;
11+
12+
final class SerializerDecodingExceptionTest extends TestCase
13+
{
14+
public function testGetMessageProperties(): void
15+
{
16+
$envelope = new Envelope(new EmptyMessage());
17+
18+
$exception = new SerializerDecodingException($envelope);
19+
20+
self::assertSame($envelope, $exception->getEnvelope());
21+
}
22+
}

tests/Messenger/Transport/AzureTransportTest.php

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Tests\AymDev\MessengerAzureBundle\Messenger\Transport;
66

7+
use AymDev\MessengerAzureBundle\Messenger\Exception\SerializerDecodingException;
78
use AymDev\MessengerAzureBundle\Messenger\Stamp\AzureMessageStamp;
89
use AymDev\MessengerAzureBundle\Messenger\Transport\AzureTransport;
910
use AymDev\MessengerAzureBundle\Messenger\Stamp\AzureBrokerPropertiesStamp;
@@ -12,6 +13,7 @@
1213
use Symfony\Component\HttpClient\MockHttpClient;
1314
use Symfony\Component\HttpClient\Response\MockResponse;
1415
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1517
use Symfony\Component\Messenger\Exception\TransportException;
1618
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1719
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -86,6 +88,54 @@ public function testGetWithUnexpectedStatusCode(): void
8688
$transport->get();
8789
}
8890

91+
/**
92+
* Messages that can't be decoded must throw a converted exception and delete the message
93+
*/
94+
public function testGetSerializationExceptionIsConvertedAndDeletesMessage(): void
95+
{
96+
self::expectException(SerializerDecodingException::class);
97+
self::expectExceptionCode(1646061041);
98+
99+
$serializer = self::createMock(SerializerInterface::class);
100+
$serializer->expects(self::once())
101+
->method('decode')
102+
->willThrowException(new MessageDecodingFailedException());
103+
104+
$location = 'https://test-location/deletion';
105+
106+
$httpReceiver = new MockHttpClient(
107+
[
108+
new MockResponse('test-body', [
109+
'http_code' => 201,
110+
'response_headers' => [
111+
'Location' => $location
112+
],
113+
]),
114+
function (string $method, string $url) use ($location): ResponseInterface {
115+
self::assertSame('DELETE', $method);
116+
self::assertSame($location, $url);
117+
118+
return new MockResponse();
119+
},
120+
]
121+
);
122+
123+
$transport = new AzureTransport(
124+
$serializer,
125+
new MockHttpClient(),
126+
$httpReceiver,
127+
AzureTransport::RECEIVE_MODE_PEEK_LOCK,
128+
'test-entity',
129+
'test-subscription'
130+
);
131+
132+
try {
133+
$transport->get();
134+
} finally {
135+
self::assertSame(2, $httpReceiver->getRequestsCount());
136+
}
137+
}
138+
89139
/**
90140
* Read messages must be returned in an envelope with specific stamps
91141
* @dataProvider provideValidGetCases

0 commit comments

Comments
 (0)