Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions kafka/example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,84 @@ Envoy also records cluster stats for the Kafka service:
cluster.kafka_service.membership_healthy: 1
cluster.kafka_service.membership_total: 1


Step 8: Test consumer groups
****************************

Consumer groups allow multiple consumers to coordinate consumption of a topic.
When consumers join a group, they use the Kafka group coordination protocol
which Envoy proxies transparently.

Start a consumer in a group. It will wait for messages and then exit after
a timeout:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-console-consumer --bootstrap-server proxy:10000 \
--topic $TOPIC --group test-group --timeout-ms 5000

The consumer group protocol generates additional request types. Check that
the group coordination metrics have incremented:

.. code-block:: console

$ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -E "(find_coordinator|join_group|sync_group|leave_group)" | grep -v ": 0"
kafka.kafka_broker.request.find_coordinator_request: 1
kafka.kafka_broker.request.join_group_request: 1
kafka.kafka_broker.request.leave_group_request: 1
kafka.kafka_broker.response.find_coordinator_response: 1
kafka.kafka_broker.response.join_group_response: 1
kafka.kafka_broker.response.leave_group_response: 1


Step 9: Test additional admin operations
****************************************

The Kafka admin client supports various topic management operations. Test
altering topic configuration:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-configs --bootstrap-server proxy:10000 \
--alter --entity-type topics --entity-name $TOPIC \
--add-config retention.ms=86400000

Add partitions to the topic:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-topics --bootstrap-server proxy:10000 \
--alter --topic $TOPIC --partitions 3

Check the admin operation metrics:

.. code-block:: console

$ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -E "(incremental_alter_configs|create_partitions)" | grep -v ": 0"
kafka.kafka_broker.request.incremental_alter_configs_request: 1
kafka.kafka_broker.request.create_partitions_request: 1


Step 10: Delete the topic
*************************

Clean up by deleting the test topic:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-topics --bootstrap-server proxy:10000 --delete --topic $TOPIC

Verify the delete operation was tracked:

.. code-block:: console

$ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker.request.delete_topics" | grep -v ": 0"
kafka.kafka_broker.request.delete_topics_request: 1

.. seealso::

:ref:`Envoy Kafka broker filter <config_network_filters_kafka_broker>`
Expand Down
37 changes: 37 additions & 0 deletions kafka/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,40 @@ for stat in "${EXPECTED_BROKER_STATS[@]}"; do
"$stat" \
"http://localhost:${PORT_ADMIN}/stats?filter=${filter}"
done

run_log "Test consumer group coordination"
# Run a consumer in a group - it will timeout after 5s
# The timeout is expected since there are no new messages, so we ignore the exit code
kafka_client kafka-console-consumer --bootstrap-server proxy:10000 \
--topic $TOPIC --group test-group --timeout-ms 5000 || true

run_log "Check consumer group metrics"
EXPECTED_GROUP_STATS=(
"kafka.kafka_broker.request.find_coordinator_request"
"kafka.kafka_broker.request.join_group_request"
"kafka.kafka_broker.response.find_coordinator_response"
"kafka.kafka_broker.response.join_group_response")
for stat in "${EXPECTED_GROUP_STATS[@]}"; do
has_metric_with_at_least_1 "${stat}"
done

run_log "Test alter topic config"
kafka_client kafka-configs --bootstrap-server proxy:10000 \
--alter --entity-type topics --entity-name $TOPIC \
--add-config retention.ms=86400000

run_log "Check incremental_alter_configs metric"
has_metric_with_at_least_1 "kafka.kafka_broker.request.incremental_alter_configs_request"

run_log "Test add partitions"
kafka_client kafka-topics --bootstrap-server proxy:10000 \
--alter --topic $TOPIC --partitions 3

run_log "Check create_partitions metric"
has_metric_with_at_least_1 "kafka.kafka_broker.request.create_partitions_request"

run_log "Test delete topic"
kafka_client kafka-topics --bootstrap-server proxy:10000 --delete --topic $TOPIC

run_log "Check delete_topics metric"
has_metric_with_at_least_1 "kafka.kafka_broker.request.delete_topics_request"