Skip to content

Enhance Kafka example with consumer groups and admin operations coverage#1017

Closed
Copilot wants to merge 5 commits into
mainfrom
copilot/enhance-kafka-integration-tests
Closed

Enhance Kafka example with consumer groups and admin operations coverage#1017
Copilot wants to merge 5 commits into
mainfrom
copilot/enhance-kafka-integration-tests

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Jan 27, 2026

Expands Kafka broker filter integration testing to cover consumer group coordination, topic configuration management, partition operations, and topic deletion. Migrates test coverage from envoyproxy/envoy repository.

Changes

Documentation (kafka/example.rst)

  • Step 8: Consumer group coordination testing (find_coordinator, join_group, sync_group, leave_group request/response metrics)
  • Step 9: Admin operations - topic config alteration and partition creation
  • Step 10: Topic deletion

Integration Tests (kafka/verify.sh)

  • Automated validation for all new operations with corresponding metric checks
  • Consumer group test with timeout handling (expected behavior when no new messages)

Technical Notes

Uses incremental_alter_configs_request metric rather than alter_configs_request - newer Kafka versions (including the one in this example) use the incremental API for configuration changes.

Example metric validation:

has_metric_with_at_least_1 "kafka.kafka_broker.request.incremental_alter_configs_request"
has_metric_with_at_least_1 "kafka.kafka_broker.request.create_partitions_request"
has_metric_with_at_least_1 "kafka.kafka_broker.request.delete_topics_request"

Warning

Firewall rules blocked me from connecting to one or more addresses (expand for details)

I tried to connect to the following addresses, but was blocked by firewall rules:

  • 2.0.18.172.in-addr.arpa
    • Triggering command: /usr/bin/java java -cp /usr/share/java/cp-base-new/* -Dlog4j.configuration=file:/etc/kafka/log4j.properties io.confluent.admin.utils.cli.ZookeeperReadyCommand zookeeper:2181 40000 /opt/containerd/bin/bash /run/containerd//usr/sbin/iptables --log-format json 635/log.json ocke�� y /var/run/docker/-s ntime.v2.task/mo127.0.0.11 /run/containerd//usr/sbin/iptables --log-format json a33852097bc08fd2nat (dns block)
  • 3.0.18.172.in-addr.arpa
    • Triggering command: /usr/bin/java java -cp /usr/share/java/cp-base-new/* -Dlog4j.configuration=file:/etc/kafka/log4j.properties io.confluent.admin.utils.cli.ZookeeperReadyCommand zookeeper:2181 40000 (dns block)
    • Triggering command: /usr/bin/java java -cp /usr/share/java/cp-base-new/* -Dlog4j.configuration=file:/etc/kafka/log4j.properties io.confluent.admin.utils.cli.ZookeeperReadyCommand zookeeper:2181 40000 /usr/bin/gpgv ntime.v2.task/mo/usr/sbin/iptables (dns block)
  • kafka-server
    • Triggering command: /usr/local/bin/envoy /usr/local/bin/envoy -c /etc/envoy.yaml (dns block)

If you need me to access, download, or install something from one of these locations, you can either:

Original prompt

Summary

Enhance the existing kafka example to provide more comprehensive integration test coverage for the Kafka broker filter.

This is part of migrating Kafka integration test coverage from envoyproxy/envoy to the examples repo. See: envoyproxy/envoy#43182

Current coverage

The existing example tests:

  • Create topic
  • List topics
  • Produce a message
  • Consume a message
  • Basic metrics (api_versions, metadata, create_topics, produce, fetch)

Gaps to fill

The Python integration tests in envoy (kafka_broker_integration_test.py) also test:

  1. Consumer groups - multiple consumers forming a group, which exercises:

    • find_coordinator_request/response
    • join_group_request/response
    • sync_group_request/response
    • leave_group_request/response
  2. Additional admin operations:

    • alter_configs (change topic config)
    • create_partitions (add partitions to topic)
    • delete_topics
  3. Higher volume - produce/consume multiple messages

Changes to kafka/example.rst

Add new steps after the existing ones:

Step 8: Test consumer groups
****************************

Consumer groups allow multiple consumers to coordinate consumption of a topic.
When consumers join a group, they use the Kafka group coordination protocol
which Envoy proxies transparently.

Start a consumer in a group. It will wait for messages and then exit after 
a timeout:

.. code-block:: console

   $ docker compose run --rm kafka-client \
       kafka-console-consumer --bootstrap-server proxy:10000 \
           --topic $TOPIC --group test-group --timeout-ms 5000

The consumer group protocol generates additional request types. Check that
the group coordination metrics have incremented:

.. code-block:: console

   $ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -E "(find_coordinator|join_group|sync_group|leave_group)" | grep -v ": 0"
   kafka.kafka_broker.request.find_coordinator_request: 1
   kafka.kafka_broker.request.join_group_request: 1
   kafka.kafka_broker.request.leave_group_request: 1
   kafka.kafka_broker.response.find_coordinator_response: 1
   kafka.kafka_broker.response.join_group_response: 1
   kafka.kafka_broker.response.leave_group_response: 1


Step 9: Test additional admin operations
****************************************

The Kafka admin client supports various topic management operations. Test
altering topic configuration:

.. code-block:: console

   $ docker compose run --rm kafka-client \
       kafka-configs --bootstrap-server proxy:10000 \
           --alter --entity-type topics --entity-name $TOPIC \
           --add-config retention.ms=86400000

Add partitions to the topic:

.. code-block:: console

   $ docker compose run --rm kafka-client \
       kafka-topics --bootstrap-server proxy:10000 \
           --alter --topic $TOPIC --partitions 3

Check the admin operation metrics:

.. code-block:: console

   $ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -E "(alter_configs|create_partitions)" | grep -v ": 0"
   kafka.kafka_broker.request.alter_configs_request: 1
   kafka.kafka_broker.request.create_partitions_request: 1


Step 10: Delete the topic
*************************

Clean up by deleting the test topic:

.. code-block:: console

   $ docker compose run --rm kafka-client \
       kafka-topics --bootstrap-server proxy:10000 --delete --topic $TOPIC

Verify the delete operation was tracked:

.. code-block:: console

   $ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker.request.delete_topics" | grep -v ": 0"
   kafka.kafka_broker.request.delete_topics_request: 1

Changes to kafka/verify.sh

Add tests for the new functionality after the existing tests. The verify.sh should test what the docs describe:

# Add after existing tests...

run_log "Test consumer group coordination"
# Run a consumer in a group - it will timeout after 5s
kafka_client kafka-console-consumer --bootstrap-server proxy:10000 \
    --topic $TOPIC --group test-group --timeout-ms 5000 || true

run_log "Check consumer group metrics"
EXPECTED_GROUP_STATS=(
    "kafka.kafka_broker.request.find_coordinator_request"
    "kafka.kafka_broker.request.join_group_request"
    "kafka.kafka_broker.response.find_coordinator_response"
    "kafka.kafka_broker.response.join_group_response")
for stat in "${EXPECTED_GROUP_STATS[@]}"; do
    has_metric_with_at_least_1 "${stat}"
done

run_log "Test alter topic config"
kafka_client kafka-configs --bootstrap-server proxy:10000 \
    --alter --entity-type topics --entity-name $TOPIC \
    --add-config retention.ms=86400000

run_log "Check alter_configs metric"
has_metric_with_at_least_1 "kafka.kafka_broker.request.alter_configs_request"

run_log "Test add partitions"
kafka_client kafka-topics --bootstrap-server proxy:10000 \
    --alter --topic $TOPIC --partitions 3

run_log "Check create_partitions metric"
has_metric_with_at_least_1 "kafka....

</details>



<!-- START COPILOT CODING AGENT SUFFIX -->

*This pull request was created from Copilot chat.*
>

<!-- START COPILOT CODING AGENT TIPS -->
---

✨ Let Copilot coding agent [set things up for you](https://github.com/envoyproxy/examples/issues/new?title=✨+Set+up+Copilot+instructions&body=Configure%20instructions%20for%20this%20repository%20as%20documented%20in%20%5BBest%20practices%20for%20Copilot%20coding%20agent%20in%20your%20repository%5D%28https://gh.io/copilot-coding-agent-tips%29%2E%0A%0A%3COnboard%20this%20repo%3E&assignees=copilot) — coding agent works faster and does higher quality work when set up for your repo.

Copilot AI and others added 4 commits January 27, 2026 10:48
…tions, and topic deletion

Co-authored-by: phlax <454682+phlax@users.noreply.github.com>
Co-authored-by: phlax <454682+phlax@users.noreply.github.com>
Co-authored-by: phlax <454682+phlax@users.noreply.github.com>
Co-authored-by: phlax <454682+phlax@users.noreply.github.com>
Copilot AI changed the title [WIP] Enhance Kafka example with comprehensive integration test coverage Enhance Kafka example with consumer groups and admin operations coverage Jan 27, 2026
Copilot AI requested a review from phlax January 27, 2026 10:55
@phlax phlax closed this Jan 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants