@@ -966,7 +966,8 @@ def create_reader(self, topic, start_message_id,
966966 reader_name = None ,
967967 subscription_role_prefix = None ,
968968 is_read_compacted = False ,
969- crypto_key_reader = None
969+ crypto_key_reader = None ,
970+ start_message_id_inclusive = False
970971 ):
971972 """
972973 Create a reader on a particular topic
@@ -1025,6 +1026,8 @@ def my_listener(reader, message):
10251026 crypto_key_reader: CryptoKeyReader, optional
10261027 Symmetric encryption class implementation, configuring public key encryption messages for the producer
10271028 and private key decryption messages for the consumer
1029+ start_message_id_inclusive: bool, default=False
1030+ Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
10281031 """
10291032
10301033 # If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -1039,6 +1042,7 @@ def my_listener(reader, message):
10391042 _check_type_or_none (str , subscription_role_prefix , 'subscription_role_prefix' )
10401043 _check_type (bool , is_read_compacted , 'is_read_compacted' )
10411044 _check_type_or_none (CryptoKeyReader , crypto_key_reader , 'crypto_key_reader' )
1045+ _check_type (bool , start_message_id_inclusive , 'start_message_id_inclusive' )
10421046
10431047 conf = _pulsar .ReaderConfiguration ()
10441048 if reader_listener :
@@ -1052,6 +1056,7 @@ def my_listener(reader, message):
10521056 conf .read_compacted (is_read_compacted )
10531057 if crypto_key_reader :
10541058 conf .crypto_key_reader (crypto_key_reader .cryptoKeyReader )
1059+ conf .start_message_id_inclusive (start_message_id_inclusive )
10551060
10561061 c = Reader ()
10571062 c ._reader = self ._client .create_reader (topic , start_message_id , conf )
0 commit comments