Skip to content

Commit dd594b7

Browse files
committed
use v2 API to create reader
1 parent 0241cde commit dd594b7

3 files changed

Lines changed: 15 additions & 2 deletions

File tree

pulsar/asyncio.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -968,8 +968,8 @@ async def create_reader(self, topic: str,
968968
conf.crypto_failure_action(crypto_failure_action)
969969

970970
future = asyncio.get_running_loop().create_future()
971-
self._client.create_reader_async(
972-
topic, start_message_id, conf, functools.partial(_set_future, future)
971+
self._client.create_reader_async_v2(
972+
topic, start_message_id, conf, functools.partial(_set_future_v2, future)
973973
)
974974
reader = await future
975975
schema.attach_client(self._client)

src/client.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ void Client_createReaderAsync(Client& client, const std::string& topic, const Me
124124
client.createReaderAsync(topic, startMessageId, conf, callback);
125125
}
126126

127+
void Client_createReaderAsyncV2(Client& client, const std::string& topic, const MessageId& startMessageId,
128+
ReaderConfiguration conf, ReaderV2Callback callback) {
129+
py::gil_scoped_release release;
130+
client.createReaderAsyncV2(topic, startMessageId, conf, std::move(callback));
131+
}
132+
127133
std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
128134
return waitForAsyncValue<std::vector<std::string>>(
129135
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
@@ -211,6 +217,7 @@ void export_client(py::module_& m) {
211217
.def("subscribe_pattern", &Client_subscribe_pattern)
212218
.def("create_reader", &Client_createReader)
213219
.def("create_reader_async", &Client_createReaderAsync)
220+
.def("create_reader_async_v2", &Client_createReaderAsyncV2)
214221
.def("create_table_view",
215222
[](Client& client, const std::string& topic, const TableViewConfiguration& config) {
216223
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {

tests/asyncio_test.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,12 @@ def raise_exception():
597597
self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError)
598598
# TODO: we should fix the error message not included in pattern subscription case
599599

600+
with self.assertRaises(PulsarException) as e:
601+
await client.create_reader("private/auth/asyncio-test-token-auth-reader",
602+
pulsar.MessageId.earliest)
603+
self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError)
604+
self.assertIn("token supplier failed", str(e.exception))
605+
600606
await client.close()
601607

602608

0 commit comments

Comments
 (0)