diff --git a/kafka/example.rst b/kafka/example.rst index 22c4ac0e..4684e21f 100644 --- a/kafka/example.rst +++ b/kafka/example.rst @@ -125,6 +125,84 @@ Envoy also records cluster stats for the Kafka service: cluster.kafka_service.membership_healthy: 1 cluster.kafka_service.membership_total: 1 + +Step 8: Test consumer groups +**************************** + +Consumer groups allow multiple consumers to coordinate consumption of a topic. +When consumers join a group, they use the Kafka group coordination protocol +which Envoy proxies transparently. + +Start a consumer in a group. It will wait for messages and then exit after +a timeout: + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + kafka-console-consumer --bootstrap-server proxy:10000 \ + --topic $TOPIC --group test-group --timeout-ms 5000 + +The consumer group protocol generates additional request types. Check that +the group coordination metrics have incremented: + +.. code-block:: console + + $ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -E "(find_coordinator|join_group|sync_group|leave_group)" | grep -v ": 0" + kafka.kafka_broker.request.find_coordinator_request: 1 + kafka.kafka_broker.request.join_group_request: 1 + kafka.kafka_broker.request.leave_group_request: 1 + kafka.kafka_broker.response.find_coordinator_response: 1 + kafka.kafka_broker.response.join_group_response: 1 + kafka.kafka_broker.response.leave_group_response: 1 + + +Step 9: Test additional admin operations +**************************************** + +The Kafka admin client supports various topic management operations. Test +altering topic configuration: + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + kafka-configs --bootstrap-server proxy:10000 \ + --alter --entity-type topics --entity-name $TOPIC \ + --add-config retention.ms=86400000 + +Add partitions to the topic: + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + kafka-topics --bootstrap-server proxy:10000 \ + --alter --topic $TOPIC --partitions 3 + +Check the admin operation metrics: + +.. code-block:: console + + $ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -E "(incremental_alter_configs|create_partitions)" | grep -v ": 0" + kafka.kafka_broker.request.incremental_alter_configs_request: 1 + kafka.kafka_broker.request.create_partitions_request: 1 + + +Step 10: Delete the topic +************************* + +Clean up by deleting the test topic: + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + kafka-topics --bootstrap-server proxy:10000 --delete --topic $TOPIC + +Verify the delete operation was tracked: + +.. code-block:: console + + $ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker.request.delete_topics" | grep -v ": 0" + kafka.kafka_broker.request.delete_topics_request: 1 + .. seealso:: :ref:`Envoy Kafka broker filter ` diff --git a/kafka/verify.sh b/kafka/verify.sh index 24f7b769..4d8a2ea7 100755 --- a/kafka/verify.sh +++ b/kafka/verify.sh @@ -79,3 +79,40 @@ for stat in "${EXPECTED_BROKER_STATS[@]}"; do "$stat" \ "http://localhost:${PORT_ADMIN}/stats?filter=${filter}" done + +run_log "Test consumer group coordination" +# Run a consumer in a group - it will timeout after 5s +# The timeout is expected since there are no new messages, so we ignore the exit code +kafka_client kafka-console-consumer --bootstrap-server proxy:10000 \ + --topic $TOPIC --group test-group --timeout-ms 5000 || true + +run_log "Check consumer group metrics" +EXPECTED_GROUP_STATS=( + "kafka.kafka_broker.request.find_coordinator_request" + "kafka.kafka_broker.request.join_group_request" + "kafka.kafka_broker.response.find_coordinator_response" + "kafka.kafka_broker.response.join_group_response") +for stat in "${EXPECTED_GROUP_STATS[@]}"; do + has_metric_with_at_least_1 "${stat}" +done + +run_log "Test alter topic config" +kafka_client kafka-configs --bootstrap-server proxy:10000 \ + --alter --entity-type topics --entity-name $TOPIC \ + --add-config retention.ms=86400000 + +run_log "Check incremental_alter_configs metric" +has_metric_with_at_least_1 "kafka.kafka_broker.request.incremental_alter_configs_request" + +run_log "Test add partitions" +kafka_client kafka-topics --bootstrap-server proxy:10000 \ + --alter --topic $TOPIC --partitions 3 + +run_log "Check create_partitions metric" +has_metric_with_at_least_1 "kafka.kafka_broker.request.create_partitions_request" + +run_log "Test delete topic" +kafka_client kafka-topics --bootstrap-server proxy:10000 --delete --topic $TOPIC + +run_log "Check delete_topics metric" +has_metric_with_at_least_1 "kafka.kafka_broker.request.delete_topics_request"