Skip to content

Commit 0241cde

Browse files
committed
feat(asyncio): add Reader API
1 parent 0fc51a0 commit 0241cde

4 files changed

Lines changed: 326 additions & 0 deletions

File tree

pulsar/asyncio.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,117 @@ def consumer_name(self) -> str:
449449
"""
450450
return self._consumer.consumer_name()
451451

452+
class Reader:
453+
"""
454+
The Pulsar topic reader, used to read messages from a topic.
455+
"""
456+
457+
def __init__(self, reader: _pulsar.Reader, schema: pulsar.schema.Schema) -> None:
458+
"""
459+
Create the reader.
460+
Users should not call this constructor directly. Instead, create the
461+
reader via ``Client.create_reader``.
462+
463+
Parameters
464+
----------
465+
reader: _pulsar.Reader
466+
The underlying Reader object from the C extension.
467+
schema: pulsar.schema.Schema
468+
The schema of the data that will be received by this reader.
469+
"""
470+
self._reader = reader
471+
self._schema = schema
472+
473+
async def read_next(self, timeout_millis: int | None = None) -> pulsar.Message:
474+
"""
475+
Read a single message asynchronously.
476+
477+
Parameters
478+
----------
479+
timeout_millis: int | None, optional
480+
If specified, the reader will raise an exception if a message is not
481+
available within the timeout.
482+
483+
Returns
484+
-------
485+
pulsar.Message
486+
The message received.
487+
488+
Raises
489+
------
490+
PulsarException
491+
"""
492+
future = asyncio.get_running_loop().create_future()
493+
if timeout_millis is None:
494+
self._reader.read_next_async(functools.partial(_set_future, future))
495+
else:
496+
_check_type(int, timeout_millis, 'timeout_millis')
497+
self._reader.read_next_async(functools.partial(_set_future, future))
498+
msg = await future
499+
m = pulsar.Message()
500+
m._message = msg
501+
m._schema = self._schema
502+
return m
503+
504+
async def has_message_available(self) -> bool:
505+
"""
506+
Check if there is any message available to read from the current
507+
position.
508+
"""
509+
future = asyncio.get_running_loop().create_future()
510+
self._reader.has_message_available_async(functools.partial(_set_future, future))
511+
return await future
512+
513+
async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
514+
"""
515+
Reset this reader to a specific message id or publish timestamp
516+
asynchronously.
517+
518+
Parameters
519+
----------
520+
messageid : MessageId or int
521+
The message id for seek, OR an integer event time (timestamp) to
522+
seek to.
523+
524+
Raises
525+
------
526+
PulsarException
527+
"""
528+
future = asyncio.get_running_loop().create_future()
529+
if isinstance(messageid, pulsar.MessageId):
530+
msg_id = messageid._msg_id
531+
elif isinstance(messageid, int):
532+
msg_id = messageid
533+
else:
534+
raise ValueError(f"invalid messageid type {type(messageid)}")
535+
self._reader.seek_async(msg_id, functools.partial(_set_future, future, value=None))
536+
await future
537+
538+
async def close(self) -> None:
539+
"""
540+
Close the reader asynchronously.
541+
542+
Raises
543+
------
544+
PulsarException
545+
"""
546+
future = asyncio.get_running_loop().create_future()
547+
self._reader.close_async(functools.partial(_set_future, future, value=None))
548+
await future
549+
550+
def topic(self) -> str:
551+
"""
552+
Return the topic this reader is reading from.
553+
"""
554+
return self._reader.topic()
555+
556+
def is_connected(self) -> bool:
557+
"""
558+
Check if the reader is connected or not.
559+
"""
560+
return self._reader.is_connected()
561+
562+
452563
class Client:
453564
"""
454565
The asynchronous version of `pulsar.Client`.
@@ -777,6 +888,93 @@ async def subscribe(self, topic: Union[str, List[str]],
777888
schema.attach_client(self._client)
778889
return Consumer(await future, schema)
779890

891+
# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
892+
async def create_reader(self, topic: str,
893+
start_message_id: Union[pulsar.MessageId, _pulsar.MessageId],
894+
schema: pulsar.schema.Schema | None = None,
895+
receiver_queue_size: int = 1000,
896+
reader_name: str | None = None,
897+
subscription_role_prefix: str | None = None,
898+
is_read_compacted: bool = False,
899+
crypto_key_reader: pulsar.CryptoKeyReader | None = None,
900+
start_message_id_inclusive: bool = False,
901+
crypto_failure_action: ConsumerCryptoFailureAction =
902+
ConsumerCryptoFailureAction.FAIL,
903+
) -> Reader:
904+
"""
905+
Create a reader on a particular topic.
906+
907+
Parameters
908+
----------
909+
topic: str
910+
The name of the topic.
911+
start_message_id: MessageId or _pulsar.MessageId
912+
The initial reader positioning is done by specifying a message id.
913+
The options are:
914+
915+
* ``MessageId.earliest``: Start reading from the earliest message
916+
available in the topic.
917+
* ``MessageId.latest``: Start reading from the end topic, only
918+
getting messages published after the reader was created.
919+
* ``MessageId``: When passing a particular message id, the reader
920+
will position itself on that specific position.
921+
schema: pulsar.schema.Schema | None, default=None
922+
Define the schema of the data that will be received by this reader.
923+
receiver_queue_size: int, default=1000
924+
Sets the size of the reader receive queue.
925+
reader_name: str | None, default=None
926+
Sets the reader name.
927+
subscription_role_prefix: str | None, default=None
928+
Sets the subscription role prefix.
929+
is_read_compacted: bool, default=False
930+
Selects whether to read the compacted version of the topic.
931+
crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
932+
Symmetric encryption class implementation.
933+
start_message_id_inclusive: bool, default=False
934+
Set the reader to include the startMessageId or given position of
935+
any reset operation like Reader.seek.
936+
crypto_failure_action: ConsumerCryptoFailureAction, \
937+
default=ConsumerCryptoFailureAction.FAIL
938+
Set the behavior when the decryption fails.
939+
940+
Returns
941+
-------
942+
Reader
943+
The reader created
944+
945+
Raises
946+
------
947+
PulsarException
948+
"""
949+
if schema is None:
950+
schema = pulsar.schema.BytesSchema()
951+
952+
if isinstance(start_message_id, pulsar.MessageId):
953+
start_message_id = start_message_id._msg_id
954+
955+
_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
956+
957+
conf = _pulsar.ReaderConfiguration()
958+
conf.receiver_queue_size(receiver_queue_size)
959+
if reader_name is not None:
960+
conf.reader_name(reader_name)
961+
if subscription_role_prefix is not None:
962+
conf.subscription_role_prefix(subscription_role_prefix)
963+
conf.schema(schema.schema_info())
964+
conf.read_compacted(is_read_compacted)
965+
if crypto_key_reader is not None:
966+
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
967+
conf.start_message_id_inclusive(start_message_id_inclusive)
968+
conf.crypto_failure_action(crypto_failure_action)
969+
970+
future = asyncio.get_running_loop().create_future()
971+
self._client.create_reader_async(
972+
topic, start_message_id, conf, functools.partial(_set_future, future)
973+
)
974+
reader = await future
975+
schema.attach_client(self._client)
976+
return Reader(reader, schema)
977+
780978
def shutdown(self) -> None:
781979
"""
782980
Shutdown the client and all the associated producers and consumers

src/client.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ Reader Client_createReader(Client& client, const std::string& topic, const Messa
118118
[&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); });
119119
}
120120

121+
void Client_createReaderAsync(Client& client, const std::string& topic, const MessageId& startMessageId,
122+
ReaderConfiguration conf, ReaderCallback callback) {
123+
py::gil_scoped_release release;
124+
client.createReaderAsync(topic, startMessageId, conf, callback);
125+
}
126+
121127
std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
122128
return waitForAsyncValue<std::vector<std::string>>(
123129
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
@@ -204,6 +210,7 @@ void export_client(py::module_& m) {
204210
.def("subscribe_topics", &Client_subscribe_topics)
205211
.def("subscribe_pattern", &Client_subscribe_pattern)
206212
.def("create_reader", &Client_createReader)
213+
.def("create_reader_async", &Client_createReaderAsync)
207214
.def("create_table_view",
208215
[](Client& client, const std::string& topic, const TableViewConfiguration& config) {
209216
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {

src/reader.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* under the License.
1818
*/
1919
#include "utils.h"
20+
#include <pybind11/functional.h>
2021
#include <pybind11/pybind11.h>
2122

2223
namespace py = pybind11;
@@ -54,16 +55,46 @@ void Reader_seek_timestamp(Reader& reader, uint64_t timestamp) {
5455

5556
bool Reader_is_connected(Reader& reader) { return reader.isConnected(); }
5657

58+
void Reader_readNextAsync(Reader& reader, ReadNextCallback callback) {
59+
py::gil_scoped_release release;
60+
reader.readNextAsync(callback);
61+
}
62+
63+
void Reader_closeAsync(Reader& reader, ResultCallback callback) {
64+
py::gil_scoped_release release;
65+
reader.closeAsync(callback);
66+
}
67+
68+
void Reader_seekAsync(Reader& reader, const MessageId& msgId, ResultCallback callback) {
69+
py::gil_scoped_release release;
70+
reader.seekAsync(msgId, callback);
71+
}
72+
73+
void Reader_seekAsync_timestamp(Reader& reader, uint64_t timestamp, ResultCallback callback) {
74+
py::gil_scoped_release release;
75+
reader.seekAsync(timestamp, callback);
76+
}
77+
78+
void Reader_hasMessageAvailableAsync(Reader& reader, HasMessageAvailableCallback callback) {
79+
py::gil_scoped_release release;
80+
reader.hasMessageAvailableAsync(callback);
81+
}
82+
5783
void export_reader(py::module_& m) {
5884
using namespace py;
5985

6086
class_<Reader>(m, "Reader")
6187
.def("topic", &Reader::getTopic, return_value_policy::copy)
6288
.def("read_next", &Reader_readNext)
6389
.def("read_next", &Reader_readNextTimeout)
90+
.def("read_next_async", &Reader_readNextAsync)
6491
.def("has_message_available", &Reader_hasMessageAvailable)
92+
.def("has_message_available_async", &Reader_hasMessageAvailableAsync)
6593
.def("close", &Reader_close)
94+
.def("close_async", &Reader_closeAsync)
6695
.def("seek", &Reader_seek)
6796
.def("seek", &Reader_seek_timestamp)
97+
.def("seek_async", &Reader_seekAsync)
98+
.def("seek_async", &Reader_seekAsync_timestamp)
6899
.def("is_connected", &Reader_is_connected);
69100
}

tests/asyncio_test.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
Consumer,
4040
Producer,
4141
PulsarException,
42+
Reader,
4243
_set_future,
4344
)
4445
from pulsar.schema import ( # pylint: disable=import-error
@@ -465,6 +466,95 @@ async def test_seek_timestamp(self):
465466
msg = await consumer.receive()
466467
self.assertEqual(msg.data(), b'msg-3')
467468

469+
async def test_reader_simple(self):
470+
topic = f'asyncio-test-reader-simple-{time.time()}'
471+
reader = await self._client.create_reader(topic, pulsar.MessageId.earliest)
472+
producer = await self._client.create_producer(topic)
473+
await producer.send(b'hello')
474+
msg = await reader.read_next()
475+
self.assertEqual(msg.data(), b'hello')
476+
with self.assertRaises(asyncio.TimeoutError):
477+
await asyncio.wait_for(reader.read_next(), 1)
478+
await reader.close()
479+
480+
async def test_reader_on_last_message(self):
481+
topic = f'asyncio-test-reader-on-last-message-{time.time()}'
482+
producer = await self._client.create_producer(topic)
483+
for i in range(10):
484+
await producer.send(f'hello-{i}'.encode())
485+
reader = await self._client.create_reader(topic, pulsar.MessageId.latest)
486+
for i in range(10, 20):
487+
await producer.send(f'hello-{i}'.encode())
488+
for i in range(10, 20):
489+
msg = await reader.read_next()
490+
self.assertEqual(msg.data(), f'hello-{i}'.encode())
491+
await reader.close()
492+
493+
async def test_reader_on_specific_message(self):
494+
topic = f'asyncio-test-reader-on-specific-msg-{time.time()}'
495+
producer = await self._client.create_producer(topic)
496+
msg_ids = []
497+
for i in range(10):
498+
msg_id = await producer.send(f'hello-{i}'.encode())
499+
msg_ids.append(msg_id)
500+
reader1 = await self._client.create_reader(topic, pulsar.MessageId.earliest)
501+
for i in range(5):
502+
msg = await reader1.read_next()
503+
self.assertEqual(msg.data(), f'hello-{i}'.encode())
504+
last_msg_id = msg_ids[4]
505+
reader2 = await self._client.create_reader(topic, last_msg_id)
506+
for i in range(5, 10):
507+
msg = await reader2.read_next()
508+
self.assertEqual(msg.data(), f'hello-{i}'.encode())
509+
await reader1.close()
510+
await reader2.close()
511+
512+
async def test_reader_has_message_available(self):
513+
topic = f'asyncio-test-reader-has-message-available-{time.time()}'
514+
producer = await self._client.create_producer(topic)
515+
reader = await self._client.create_reader(topic, pulsar.MessageId.latest)
516+
self.assertFalse(await reader.has_message_available())
517+
for i in range(10):
518+
await producer.send(f'hello-{i}'.encode())
519+
self.assertTrue(await reader.has_message_available())
520+
for _ in range(10):
521+
await reader.read_next()
522+
self.assertFalse(await reader.has_message_available())
523+
await reader.close()
524+
525+
async def test_reader_seek(self):
526+
topic = f'asyncio-test-reader-seek-{time.time()}'
527+
producer = await self._client.create_producer(topic)
528+
msg_ids = []
529+
for i in range(10):
530+
msg_id = await producer.send(f'msg-{i}'.encode())
531+
msg_ids.append(msg_id)
532+
reader = await self._client.create_reader(topic, pulsar.MessageId.latest,
533+
start_message_id_inclusive=False)
534+
await reader.seek(msg_ids[2])
535+
msg = await reader.read_next()
536+
self.assertEqual(msg.data(), b'msg-3')
537+
await reader.close()
538+
reader_inclusive = await self._client.create_reader(topic, pulsar.MessageId.latest,
539+
start_message_id_inclusive=True)
540+
await reader_inclusive.seek(msg_ids[2])
541+
msg = await reader_inclusive.read_next()
542+
self.assertEqual(msg.data(), b'msg-2')
543+
await reader_inclusive.close()
544+
545+
async def test_reader_is_connected(self):
546+
topic = f'asyncio-test-reader-is-connected-{time.time()}'
547+
reader = await self._client.create_reader(topic, pulsar.MessageId.earliest)
548+
self.assertTrue(reader.is_connected())
549+
await reader.close()
550+
self.assertFalse(reader.is_connected())
551+
552+
async def test_reader_topic(self):
553+
topic = f'asyncio-test-reader-topic-{time.time()}'
554+
reader = await self._client.create_reader(topic, pulsar.MessageId.earliest)
555+
self.assertEqual(reader.topic(), f'persistent://public/default/{topic}')
556+
await reader.close()
557+
468558
async def test_schema(self):
469559
class ExampleRecord(Record): # pylint: disable=too-few-public-methods
470560
"""Example record schema for testing."""

0 commit comments

Comments
 (0)