Skip to content

Commit 836d856

Browse files
feat(asyncio): include detailed error message in exception when creating a producer or consumer (#307)
1 parent fe89ca0 commit 836d856

4 files changed

Lines changed: 74 additions & 16 deletions

File tree

pulsar/asyncio.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,24 @@
4141
import pulsar
4242
from pulsar import _check_type
4343

44-
class PulsarException(BaseException):
44+
class PulsarException(Exception):
4545
"""
4646
The exception that wraps the Pulsar error code
4747
"""
4848

49-
def __init__(self, result: pulsar.Result) -> None:
49+
def __init__(self, result: pulsar.Result, msg: str | None = None) -> None:
5050
"""
5151
Create the Pulsar exception.
5252
5353
Parameters
5454
----------
5555
result: pulsar.Result
5656
The error code of the underlying Pulsar APIs.
57+
msg: str | None
58+
An optional error message providing more details.
5759
"""
5860
self._result = result
61+
self._msg = msg
5962

6063
def error(self) -> pulsar.Result:
6164
"""
@@ -67,6 +70,8 @@ def __str__(self):
6770
"""
6871
Convert the exception to string.
6972
"""
73+
if self._msg:
74+
return f'{self._result.value} {self._result.name}: {self._msg}'
7075
return f'{self._result.value} {self._result.name}'
7176

7277
class Producer:
@@ -591,8 +596,8 @@ def underlying_router(msg: _pulsar.Message, num_partitions: int) -> int:
591596
return message_router(pulsar.Message._wrap(msg), num_partitions)
592597
conf.message_router(underlying_router)
593598

594-
self._client.create_producer_async(
595-
topic, conf, functools.partial(_set_future, future)
599+
self._client.create_producer_async_v2(
600+
topic, conf, functools.partial(_set_future_v2, future)
596601
)
597602
return Producer(await future, schema)
598603

@@ -751,28 +756,23 @@ async def subscribe(self, topic: Union[str, List[str]],
751756

752757
if isinstance(topic, str):
753758
if is_pattern_topic:
754-
self._client.subscribe_async_pattern(
755-
topic, subscription_name, conf,
756-
functools.partial(_set_future, future)
757-
)
759+
topics = _pulsar.TopicRegex(topic)
758760
else:
759-
self._client.subscribe_async(
760-
topic, subscription_name, conf,
761-
functools.partial(_set_future, future)
762-
)
761+
topics = topic
763762
elif isinstance(topic, list):
764763
if is_pattern_topic:
765764
raise ValueError(
766765
"Argument 'topic' must be a string when "
767766
"'is_pattern_topic' is True; lists of topics do not "
768767
"support pattern subscriptions"
769768
)
770-
self._client.subscribe_async_topics(
771-
topic, subscription_name, conf,
772-
functools.partial(_set_future, future)
773-
)
769+
topics = topic
774770
else:
775771
raise ValueError( "Argument 'topic' is expected to be of type 'str' or 'list'")
772+
self._client.subscribe_async_v2(
773+
topics, subscription_name, conf,
774+
functools.partial(_set_future_v2, future)
775+
)
776776

777777
schema.attach_client(self._client)
778778
return Consumer(await future, schema)
@@ -835,3 +835,14 @@ def complete():
835835
else:
836836
future.set_exception(PulsarException(result))
837837
future.get_loop().call_soon_threadsafe(complete)
838+
839+
def _set_future_v2(future: asyncio.Future, value: Any):
840+
def callback():
841+
if future.done():
842+
return
843+
if isinstance(value, _pulsar.Error):
844+
exc = PulsarException(value.error, value.message)
845+
future.get_loop().call_soon_threadsafe(future.set_exception, exc)
846+
else:
847+
future.get_loop().call_soon_threadsafe(future.set_result, value)
848+
future.get_loop().call_soon_threadsafe(callback)

src/client.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ void export_client(py::module_& m) {
193193
py::arg("client_configuration"))
194194
.def("create_producer", &Client_createProducer)
195195
.def("create_producer_async", &Client_createProducerAsync)
196+
.def("create_producer_async_v2",
197+
[](Client& client, const std::string& topic, ProducerConfiguration conf,
198+
CreateProducerV2Callback callback) {
199+
py::gil_scoped_release release;
200+
client.createProducerAsyncV2(
201+
topic, conf, [callback = std::move(callback)](auto&& variant) { callback(variant); });
202+
})
196203
.def("subscribe", &Client_subscribe)
197204
.def("subscribe_topics", &Client_subscribe_topics)
198205
.def("subscribe_pattern", &Client_subscribe_pattern)
@@ -212,5 +219,11 @@ void export_client(py::module_& m) {
212219
.def("subscribe_async", &Client_subscribeAsync)
213220
.def("subscribe_async_topics", &Client_subscribeAsync_topics)
214221
.def("subscribe_async_pattern", &Client_subscribeAsync_pattern)
222+
.def("subscribe_async_v2",
223+
[](Client& client, const SubscribeTopics& topics, const std::string& subscriptionName,
224+
ConsumerConfiguration conf, SubscribeV2Callback callback) {
225+
py::gil_scoped_release release;
226+
client.subscribeAsyncV2(topics, subscriptionName, conf, std::move(callback));
227+
})
215228
.def("shutdown", &Client::shutdown);
216229
}

src/enums.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/ConsumerCryptoFailureAction.h>
2323
#include <pulsar/ProducerConfiguration.h>
2424
#include <pulsar/KeySharedPolicy.h>
25+
#include <pulsar/Result.h>
2526
#include <pybind11/pybind11.h>
2627

2728
using namespace pulsar;
@@ -147,4 +148,12 @@ void export_enums(py::module_& m) {
147148
.value("FAIL", ConsumerCryptoFailureAction::FAIL)
148149
.value("DISCARD", ConsumerCryptoFailureAction::DISCARD)
149150
.value("CONSUME", ConsumerCryptoFailureAction::CONSUME);
151+
152+
class_<Error>(m, "Error")
153+
.def_readonly("error", &Error::result)
154+
.def_readonly("message", &Error::message);
155+
156+
class_<TopicRegex>(m, "TopicRegex")
157+
.def(py::init<const std::string&>(), py::arg("pattern"))
158+
.def_readonly("pattern", &TopicRegex::pattern);
150159
}

tests/asyncio_test.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,31 @@ class ExampleRecord(Record): # pylint: disable=too-few-public-methods
484484
self.assertEqual(msg.value().str_field, 'test')
485485
self.assertEqual(msg.value().int_field, 42)
486486

487+
async def test_token_auth_supplier_exception(self):
488+
def raise_exception():
489+
raise Exception("token supplier failed")
490+
491+
client = Client(SERVICE_URL,
492+
authentication=pulsar.AuthenticationToken(raise_exception))
493+
topic = "private/auth/asyncio-test-token-auth"
494+
495+
with self.assertRaises(PulsarException) as e:
496+
await client.create_producer(topic)
497+
self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError)
498+
self.assertIn("token supplier failed", str(e.exception))
499+
500+
with self.assertRaises(PulsarException) as e:
501+
await client.subscribe(topic, 'sub')
502+
self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError)
503+
self.assertIn("token supplier failed", str(e.exception))
504+
505+
with self.assertRaises(PulsarException) as e:
506+
await client.subscribe("private/auth/.*", 'sub', is_pattern_topic=True)
507+
self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError)
508+
# TODO: we should fix the error message not included in pattern subscription case
509+
510+
await client.close()
511+
487512

488513
class AsyncioSetFutureTest(IsolatedAsyncioTestCase):
489514
"""Tests for asyncio bridge helpers (no live Pulsar broker)."""

0 commit comments

Comments
 (0)