Skip to content

Commit 23dbbf2

Browse files
authored
feat: add offset management integration to consumer stream (#541)
* feat: add offset management integration to consumer stream * chore: update fluvio version * feat: add delete_consumer_offset * test: more commit consumer tests
1 parent f5c303e commit 23dbbf2

5 files changed

Lines changed: 325 additions & 24 deletions

File tree

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ url = "2.5.0"
3838
webbrowser = "1.0.0"
3939

4040
fluvio-future = { version = "0.7.0", features = ["task", "io", "native_tls", "subscriber"] }
41-
fluvio = { features = ["admin", "rustls"], git = "https://github.com/infinyon/fluvio.git", tag = "v0.14.0" }
42-
fluvio-protocol = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.14.0" }
43-
fluvio-types = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.14.0" }
44-
fluvio-sc-schema = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.14.0" }
45-
fluvio-controlplane-metadata = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.14.0" }
41+
fluvio = { features = ["admin", "rustls"], git = "https://github.com/infinyon/fluvio.git", brach ="offset_flush_box" }
42+
fluvio-protocol = { git = "https://github.com/infinyon/fluvio.git", brach ="offset_flush_box" }
43+
fluvio-types = { git = "https://github.com/infinyon/fluvio.git", brach ="offset_flush_box" }
44+
fluvio-sc-schema = { git = "https://github.com/infinyon/fluvio.git", brach ="offset_flush_box" }
45+
fluvio-controlplane-metadata = { git = "https://github.com/infinyon/fluvio.git", brach ="offset_flush_box" }
4646

4747
# transitive version selection
4848
parking_lot = "0.12.3"

fluvio/__init__.py

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,28 @@
5858
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
5959
```
6060
61+
Also you can consume usign offset management:
62+
63+
```python
64+
import fluvio
65+
66+
fluvio = Fluvio.connect()
67+
68+
topic = "a_topic"
69+
builder = ConsumerConfigExtBuilder(topic)
70+
builder.offset_start(Offset.beginning())
71+
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
72+
builder.offset_consumer("a-consumer")
73+
config = builder.build()
74+
stream = fluvio.consumer_with_config(config)
75+
76+
num_items = 2
77+
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
78+
79+
stream.offset_commit()
80+
stream.offset_flush()
81+
```
82+
6183
For more examples see the integration tests in the fluvio-python repository.
6284
6385
[test_produce.py](https://github.com/infinyon/fluvio-client-python/blob/main/integration-tests/test_produce.py)
@@ -82,6 +104,8 @@
82104
PartitionSelectionStrategy as _PartitionSelectionStrategy,
83105
PartitionConsumerStream as _PartitionConsumerStream,
84106
AsyncPartitionConsumerStream as _AsyncPartitionConsumerStream,
107+
OffsetManagementStrategy,
108+
ConsumerOffset as _ConsumerOffset,
85109
# producer types
86110
TopicProducer as _TopicProducer,
87111
ProduceOutput as _ProduceOutput,
@@ -137,6 +161,8 @@
137161
"ConsumerConfig",
138162
"PartitionConsumer",
139163
"MultiplePartitionConsumer",
164+
"OffsetManagementStrategy",
165+
"ConsumerOffset",
140166
# specs
141167
"CommonCreateRequest",
142168
"SmartModuleSpec",
@@ -262,6 +288,26 @@ def smartmodule(
262288
)
263289

264290

291+
class ConsumerIterator:
292+
def __init__(self, stream: _PartitionConsumerStream):
293+
self.stream = stream
294+
295+
def __iter__(self):
296+
return self
297+
298+
def __next__(self):
299+
item = self.stream.next()
300+
if item is None:
301+
raise StopIteration
302+
return Record(item)
303+
304+
def offset_commit(self):
305+
self.stream.offset_commit()
306+
307+
def offset_flush(self):
308+
self.stream.offset_flush()
309+
310+
265311
class PartitionConsumer:
266312
"""
267313
An interface for consuming events from a particular partition
@@ -614,6 +660,30 @@ def with_multiple(cls, topic: typing.List[typing.Tuple[str, int]]):
614660
return cls(_PartitionSelectionStrategy.with_multiple(topic))
615661

616662

663+
class ConsumerOffset:
664+
"""Consumer offset"""
665+
666+
_inner: _ConsumerOffset
667+
668+
def __init__(self, inner: _ConsumerOffset):
669+
self._inner = inner
670+
671+
def consumer_id(self) -> str:
672+
return self._inner.consumer_id()
673+
674+
def topic(self) -> str:
675+
return self._inner.topic()
676+
677+
def partition(self) -> int:
678+
return self._inner.partition()
679+
680+
def offset(self) -> int:
681+
return self._inner.offset()
682+
683+
def modified_time(self) -> int:
684+
return self._inner.modified_time()
685+
686+
617687
class FluvioConfig:
618688
"""Configuration for Fluvio client"""
619689

@@ -687,14 +757,12 @@ def connect_with_config(cls, config: FluvioConfig):
687757
"""Creates a new Fluvio client using the given configuration"""
688758
return cls(_Fluvio.connect_with_config(config._inner))
689759

690-
def consumer_with_config(
691-
self, config: ConsumerConfigExt
692-
) -> typing.Iterator[Record]:
760+
def consumer_with_config(self, config: ConsumerConfigExt) -> ConsumerIterator:
693761
"""Creates consumer with settings defined in config
694762
695763
This is the recommended way to create a consume records.
696764
"""
697-
return self._generator(self._inner.consumer_with_config(config))
765+
return ConsumerIterator(self._inner.consumer_with_config(config))
698766

699767
def topic_producer(self, topic: str) -> TopicProducer:
700768
"""
@@ -740,6 +808,14 @@ def multi_topic_partition_consumer(
740808
self._inner.multi_partition_consumer(strategy._inner)
741809
)
742810

811+
def consumer_offsets(self) -> typing.List[ConsumerOffset]:
812+
"""Fetch the current offsets of the consumer"""
813+
return self._inner.consumer_offsets()
814+
815+
def delete_consumer_offset(self, consumer: str, topic: str, partition: int):
816+
"""Delete the consumer offset"""
817+
return self._inner.delete_consumer_offset(consumer, topic, partition)
818+
743819
def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record]:
744820
item = stream.next()
745821
while item is not None:

integration-tests/test_base.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uuid
33
import time
44

5-
from fluvio import FluvioAdmin
5+
from fluvio import FluvioAdmin, Fluvio
66

77

88
def create_smartmodule(sm_name, sm_path):
@@ -22,7 +22,9 @@ class CommonFluvioSetup(unittest.TestCase):
2222
def common_setup(self, sm_path=None):
2323
"""Optionally create a smartmodule if sm_path is provided"""
2424
self.admin = FluvioAdmin.connect()
25+
self.fluvio = Fluvio.connect()
2526
self.topic = str(uuid.uuid4())
27+
self.consumer_id = str(uuid.uuid4())
2628
self.sm_name = str(uuid.uuid4())
2729
self.sm_path = sm_path
2830

@@ -50,6 +52,9 @@ def setUp(self):
5052

5153
def tearDown(self):
5254
self.admin.delete_topic(self.topic)
55+
# TODO: we can remove this delete_consumer_offset after to fix
56+
# this bug: https://github.com/infinyon/fluvio/issues/4308
57+
self.fluvio.delete_consumer_offset(self.consumer_id, self.topic, 0)
5358
time.sleep(1)
5459
if self.sm_path is not None:
5560
self.admin.delete_smartmodule(self.sm_name)

integration-tests/test_consume.py

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from string import ascii_lowercase
2+
import time
23

3-
from fluvio import ConsumerConfig, Fluvio, Offset
4+
from fluvio import ConsumerConfig, Fluvio, Offset, OffsetManagementStrategy
45
from fluvio import ConsumerConfigExtBuilder
56
from test_base import CommonFluvioSetup
67

@@ -59,6 +60,140 @@ def test_consume_at_offset_begin(self):
5960
self.assertEqual(records[0], "record-z")
6061
self.assertEqual(records[1], "record-y")
6162

63+
def test_consume_offset_managed_auto_manual(self):
64+
"""
65+
Manual offset management test
66+
"""
67+
fluvio = Fluvio.connect()
68+
69+
builder = ConsumerConfigExtBuilder(self.topic)
70+
builder.offset_start(Offset.beginning())
71+
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
72+
builder.offset_consumer(self.consumer_id)
73+
74+
config = builder.build()
75+
stream = fluvio.consumer_with_config(config)
76+
77+
num_items = 2
78+
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
79+
80+
time.sleep(1)
81+
82+
stream.offset_commit()
83+
stream.offset_flush()
84+
85+
self.assertEqual(len(records), 2)
86+
self.assertEqual(records[0], "record-z")
87+
self.assertEqual(records[1], "record-y")
88+
89+
consumers = fluvio.consumer_offsets()
90+
self.assertEqual(len(consumers), 1)
91+
self.assertEqual(consumers[0].consumer_id, self.consumer_id)
92+
self.assertEqual(consumers[0].topic, self.topic)
93+
self.assertEqual(consumers[0].partition, 0)
94+
self.assertEqual(consumers[0].offset, 1)
95+
96+
def test_consume_offset_managed_auto_commit_and_flush(self):
97+
"""
98+
Automatic offset management test
99+
"""
100+
fluvio = Fluvio.connect()
101+
102+
builder = ConsumerConfigExtBuilder(self.topic)
103+
builder.offset_start(Offset.beginning())
104+
builder.offset_strategy(OffsetManagementStrategy.AUTO)
105+
builder.offset_consumer(self.consumer_id)
106+
107+
config = builder.build()
108+
stream = fluvio.consumer_with_config(config)
109+
110+
num_items = 2
111+
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
112+
113+
stream.offset_commit()
114+
stream.offset_flush()
115+
116+
self.assertEqual(len(records), 2)
117+
self.assertEqual(records[0], "record-z")
118+
self.assertEqual(records[1], "record-y")
119+
120+
consumers = fluvio.consumer_offsets()
121+
self.assertEqual(len(consumers), 1)
122+
self.assertEqual(consumers[0].consumer_id, self.consumer_id)
123+
self.assertEqual(consumers[0].topic, self.topic)
124+
self.assertEqual(consumers[0].partition, 0)
125+
self.assertEqual(consumers[0].offset, 1)
126+
127+
def test_consume_offset_managed_auto_flush(self):
128+
"""
129+
Consume with an offset_consume id defined
130+
131+
This will case the cluster to track that offsets
132+
that consumer has consumed, stoping and resuming
133+
from last offset
134+
135+
The equivalent of this test with the CLI is:
136+
`fluvio consume test-topic --offset-beginning --offset-consumer --end 1 foo`
137+
record-z
138+
139+
140+
`fluvio consume test-topic --offset-beginning --offset-consumer --end 2 foo`
141+
record-y
142+
143+
"""
144+
fluvio = Fluvio.connect()
145+
146+
# configure consumer
147+
builder = ConsumerConfigExtBuilder(self.topic)
148+
builder.disable_continuous()
149+
builder.offset_consumer(self.consumer_id)
150+
builder.offset_start(Offset.beginning())
151+
builder.offset_strategy(OffsetManagementStrategy.AUTO)
152+
153+
config = builder.build()
154+
stream = fluvio.consumer_with_config(config)
155+
156+
num_items = 2
157+
records = []
158+
for _ in range(num_items):
159+
records.append(bytearray(next(stream).value()).decode())
160+
self.assertEqual(len(records), 2)
161+
self.assertEqual(records[0], "record-z")
162+
# ensure offset is flushed
163+
# with Auto, this normally happens with advancement of the stream and/or
164+
# closure of the client
165+
# but that isn't at a guaranteed point for python in the middle
166+
# of this test, so do this explictly
167+
stream.offset_flush()
168+
# connect again with a new client, but the same id
169+
fluvio = Fluvio.connect()
170+
171+
# configure consumer
172+
# even though offset_start says to start at the beginning
173+
# it is expected start from the last offset consumed
174+
# because the offset_consumer is set
175+
builder = ConsumerConfigExtBuilder(self.topic)
176+
builder.disable_continuous()
177+
builder.offset_consumer(self.consumer_id)
178+
builder.offset_start(Offset.beginning())
179+
builder.offset_strategy(OffsetManagementStrategy.AUTO)
180+
181+
stream = fluvio.consumer_with_config(config)
182+
183+
num_items = 1
184+
records = []
185+
for _ in range(1):
186+
records.append(bytearray(next(stream).value()).decode())
187+
self.assertEqual(len(records), 1)
188+
self.assertEqual(records[0], "record-y")
189+
190+
consumers = fluvio.consumer_offsets()
191+
self.assertEqual(len(consumers), 1)
192+
self.assertEqual(consumers[0].consumer_id, self.consumer_id)
193+
self.assertEqual(consumers[0].topic, self.topic)
194+
self.assertEqual(consumers[0].partition, 0)
195+
self.assertEqual(consumers[0].offset, 1)
196+
62197
def test_consume_at_offset_from_begin(self):
63198
fluvio = Fluvio.connect()
64199

@@ -146,7 +281,7 @@ def test_consume_deprecated(self):
146281

147282
config = ConsumerConfig()
148283
stream = consumer.stream_with_config(Offset.beginning(), config)
149-
for i in range(2):
284+
for _ in range(2):
150285
records.append(bytearray(next(stream).value()).decode())
151286

152287
self.assertEqual(len(records), 2)

0 commit comments

Comments
 (0)