Skip to content

Commit fb73a4a

Browse files
authored
examples: Improve kafka example with more functionality (#1019)
Signed-off-by: Ryan Northey <ryan@synca.io>
1 parent fb0e3a8 commit fb73a4a

2 files changed

Lines changed: 115 additions & 0 deletions

File tree

kafka/example.rst

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,84 @@ Envoy also records cluster stats for the Kafka service:
125125
cluster.kafka_service.membership_healthy: 1
126126
cluster.kafka_service.membership_total: 1
127127
128+
129+
Step 8: Test consumer groups
130+
****************************
131+
132+
Consumer groups allow multiple consumers to coordinate consumption of a topic.
133+
When consumers join a group, they use the Kafka group coordination protocol
134+
which Envoy proxies transparently.
135+
136+
Start a consumer in a group. It will wait for messages and then exit after
137+
a timeout:
138+
139+
.. code-block:: console
140+
141+
$ docker compose run --rm kafka-client \
142+
kafka-console-consumer --bootstrap-server proxy:10000 \
143+
--topic $TOPIC --group test-group --timeout-ms 5000
144+
145+
The consumer group protocol generates additional request types. Check that
146+
the group coordination metrics have incremented:
147+
148+
.. code-block:: console
149+
150+
$ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -E "(find_coordinator|join_group|sync_group|leave_group)" | grep -v ": 0"
151+
kafka.kafka_broker.request.find_coordinator_request: 1
152+
kafka.kafka_broker.request.join_group_request: 1
153+
kafka.kafka_broker.request.leave_group_request: 1
154+
kafka.kafka_broker.response.find_coordinator_response: 1
155+
kafka.kafka_broker.response.join_group_response: 1
156+
kafka.kafka_broker.response.leave_group_response: 1
157+
158+
159+
Step 9: Test additional admin operations
160+
****************************************
161+
162+
The Kafka admin client supports various topic management operations. Test
163+
altering topic configuration:
164+
165+
.. code-block:: console
166+
167+
$ docker compose run --rm kafka-client \
168+
kafka-configs --bootstrap-server proxy:10000 \
169+
--alter --entity-type topics --entity-name $TOPIC \
170+
--add-config retention.ms=86400000
171+
172+
Add partitions to the topic:
173+
174+
.. code-block:: console
175+
176+
$ docker compose run --rm kafka-client \
177+
kafka-topics --bootstrap-server proxy:10000 \
178+
--alter --topic $TOPIC --partitions 3
179+
180+
Check the admin operation metrics:
181+
182+
.. code-block:: console
183+
184+
$ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -E "(incremental_alter_configs|create_partitions)" | grep -v ": 0"
185+
kafka.kafka_broker.request.incremental_alter_configs_request: 1
186+
kafka.kafka_broker.request.create_partitions_request: 1
187+
188+
189+
Step 10: Delete the topic
190+
*************************
191+
192+
Clean up by deleting the test topic:
193+
194+
.. code-block:: console
195+
196+
$ docker compose run --rm kafka-client \
197+
kafka-topics --bootstrap-server proxy:10000 --delete --topic $TOPIC
198+
199+
Verify the delete operation was tracked:
200+
201+
.. code-block:: console
202+
203+
$ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker.request.delete_topics" | grep -v ": 0"
204+
kafka.kafka_broker.request.delete_topics_request: 1
205+
128206
.. seealso::
129207

130208
:ref:`Envoy Kafka broker filter <config_network_filters_kafka_broker>`

kafka/verify.sh

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,40 @@ for stat in "${EXPECTED_BROKER_STATS[@]}"; do
7979
"$stat" \
8080
"http://localhost:${PORT_ADMIN}/stats?filter=${filter}"
8181
done
82+
83+
run_log "Test consumer group coordination"
84+
# Run a consumer in a group - it will timeout after 5s
85+
# The timeout is expected since there are no new messages, so we ignore the exit code
86+
kafka_client kafka-console-consumer --bootstrap-server proxy:10000 \
87+
--topic $TOPIC --group test-group --timeout-ms 5000 || true
88+
89+
run_log "Check consumer group metrics"
90+
EXPECTED_GROUP_STATS=(
91+
"kafka.kafka_broker.request.find_coordinator_request"
92+
"kafka.kafka_broker.request.join_group_request"
93+
"kafka.kafka_broker.response.find_coordinator_response"
94+
"kafka.kafka_broker.response.join_group_response")
95+
for stat in "${EXPECTED_GROUP_STATS[@]}"; do
96+
has_metric_with_at_least_1 "${stat}"
97+
done
98+
99+
run_log "Test alter topic config"
100+
kafka_client kafka-configs --bootstrap-server proxy:10000 \
101+
--alter --entity-type topics --entity-name $TOPIC \
102+
--add-config retention.ms=86400000
103+
104+
run_log "Check incremental_alter_configs metric"
105+
has_metric_with_at_least_1 "kafka.kafka_broker.request.incremental_alter_configs_request"
106+
107+
run_log "Test add partitions"
108+
kafka_client kafka-topics --bootstrap-server proxy:10000 \
109+
--alter --topic $TOPIC --partitions 3
110+
111+
run_log "Check create_partitions metric"
112+
has_metric_with_at_least_1 "kafka.kafka_broker.request.create_partitions_request"
113+
114+
run_log "Test delete topic"
115+
kafka_client kafka-topics --bootstrap-server proxy:10000 --delete --topic $TOPIC
116+
117+
run_log "Check delete_topics metric"
118+
has_metric_with_at_least_1 "kafka.kafka_broker.request.delete_topics_request"

0 commit comments

Comments
 (0)