-
Notifications
You must be signed in to change notification settings - Fork 951
Expand file tree
/
Copy pathtest_delete_records.py
More file actions
137 lines (121 loc) · 5.67 KB
/
test_delete_records.py
File metadata and controls
137 lines (121 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-
# Copyright 2024 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from confluent_kafka.admin import OffsetSpec, DeletedRecords
from confluent_kafka import TopicPartition
def test_delete_records(kafka_cluster):
"""
Test delete_records, delete the records upto the specified offset
in that particular partition of the specified topic.
"""
admin_client = kafka_cluster.admin()
# Create a topic with a single partition
topic = kafka_cluster.create_topic_and_wait_propagation("test-del-records",
{
"num_partitions": 1,
"replication_factor": 1,
})
# Create Producer instance
p = kafka_cluster.producer()
p.produce(topic, "Message-1")
p.produce(topic, "Message-2")
p.produce(topic, "Message-3")
p.flush()
topic_partition = TopicPartition(topic, 0)
requests = {topic_partition: OffsetSpec.earliest()}
# Check if the earliest avilable offset for this topic partition is 0
fs = admin_client.list_offsets(requests)
result = list(fs.values())[0].result()
assert (result.offset == 0)
topic_partition_offset = TopicPartition(topic, 0, 2)
# Delete the records
fs1 = admin_client.delete_records([topic_partition_offset])
# Find the earliest available offset for that specific topic partition after deletion has been done
fs2 = admin_client.list_offsets(requests)
# Check if the earliest available offset is equal to the offset passed to the delete records function
res = list(fs1.values())[0].result()
assert isinstance(res, DeletedRecords)
assert (res.low_watermark == list(fs2.values())[0].result().offset)
# Delete created topic
fs = admin_client.delete_topics([topic])
for topic, f in fs.items():
f.result()
def test_delete_records_multiple_topics_and_partitions(kafka_cluster):
"""
Test delete_records, delete the records upto the specified offset
in that particular partition of the specified topic.
"""
admin_client = kafka_cluster.admin()
num_partitions = 3
# Create two topics with a single partition
topic = kafka_cluster.create_topic_and_wait_propagation("test-del-records",
{
"num_partitions": num_partitions,
"replication_factor": 1,
})
topic2 = kafka_cluster.create_topic_and_wait_propagation("test-del-records2",
{
"num_partitions": num_partitions,
"replication_factor": 1,
})
topics = [topic, topic2]
partitions = list(range(num_partitions))
# Create Producer instance
p = kafka_cluster.producer()
for t in topics:
for partition in partitions:
p.produce(t, "Message-1", partition=partition)
p.produce(t, "Message-2", partition=partition)
p.produce(t, "Message-3", partition=partition)
p.flush()
requests = dict(
[
(TopicPartition(t, partition), OffsetSpec.earliest())
for t in topics
for partition in partitions
]
)
# Check if the earliest available offset for this topic partition is 0
fs = admin_client.list_offsets(requests)
assert all([p.result().offset == 0 for p in fs.values()])
delete_index = 0
# Delete the records
for delete_partitions in [
# Single partition no deletion
[TopicPartition(topic, 0, 0)],
# Single topic, two partitions, single record deleted
[TopicPartition(topic, 0, 1), TopicPartition(topic, 1, 1)],
# Two topics, four partitions, two records deleted
[TopicPartition(topic, 2, 2), TopicPartition(topic2, 0, 2),
TopicPartition(topic2, 1, 2), TopicPartition(topic2, 2, 2)],
]:
list_offsets_requests = dict([
(part, OffsetSpec.earliest()) for part in delete_partitions
])
futmap_delete = admin_client.delete_records(delete_partitions)
delete_results = [(part, fut.result())
for part, fut in futmap_delete.items()]
futmap_list = admin_client.list_offsets(list_offsets_requests)
list_results = dict([(part, fut.result())
for part, fut in futmap_list.items()])
for part, delete_result in delete_results:
list_result = list_results[part]
assert isinstance(delete_result, DeletedRecords)
assert delete_result.low_watermark == list_result.offset
assert delete_result.low_watermark == delete_index
delete_index += 1
# Delete created topics
fs = admin_client.delete_topics(topics)
for topic, f in fs.items():
f.result()