diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 82d00c1f..2bb591c0 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -105,6 +105,16 @@ updates: interval: daily time: "06:00" +- package-ecosystem: "docker" + directory: "/kafka-mesh" + groups: + examples-kafka-mesh: + patterns: + - "*" + schedule: + interval: daily + time: "06:00" + - package-ecosystem: "docker" directory: "/local_ratelimit" groups: diff --git a/BUILD b/BUILD index 436e8dd8..e6e99eb2 100644 --- a/BUILD +++ b/BUILD @@ -22,6 +22,7 @@ EXAMPLE_TESTS = [ "gzip", "jaeger-tracing", "kafka", + "kafka-mesh", "load-reporting-service", "locality-load-balancing", "local_ratelimit", @@ -79,6 +80,7 @@ filegroup( "mysql/*.yaml", "postgres/*.yaml", "kafka/*.yaml", + "kafka-mesh/*.yaml", ], exclude = [ "**/*docker-compose*.yaml", diff --git a/kafka-mesh/Dockerfile-kafka b/kafka-mesh/Dockerfile-kafka new file mode 100644 index 00000000..999bdc0e --- /dev/null +++ b/kafka-mesh/Dockerfile-kafka @@ -0,0 +1 @@ +FROM confluentinc/cp-kafka:latest@sha256:f1c091384a9fbbbe7db084583e3434800b9916744eeb0410b2500371ac95c157 diff --git a/kafka-mesh/Dockerfile-zookeeper b/kafka-mesh/Dockerfile-zookeeper new file mode 100644 index 00000000..53509e3d --- /dev/null +++ b/kafka-mesh/Dockerfile-zookeeper @@ -0,0 +1 @@ +FROM confluentinc/cp-zookeeper:latest@sha256:3a5f014d6fb9ec3b0fd843a98b60c21648d2c20b094ef428b270f7dfda261cc4 diff --git a/kafka-mesh/README.md b/kafka-mesh/README.md new file mode 100644 index 00000000..f3698975 --- /dev/null +++ b/kafka-mesh/README.md @@ -0,0 +1,2 @@ +To learn about this sandbox and for instructions on how to run it please head over +to the [envoy docs](https://www.envoyproxy.io/docs/envoy/latest/start/sandboxes/kafka_mesh) diff --git a/kafka-mesh/docker-compose.yaml b/kafka-mesh/docker-compose.yaml new file mode 100644 index 00000000..3ca910de --- /dev/null +++ b/kafka-mesh/docker-compose.yaml @@ -0,0 +1,61 @@ +services: + + kafka-client: + build: + context: . + dockerfile: Dockerfile-kafka + restart: "no" + deploy: + replicas: 0 + + proxy: + build: + context: . + dockerfile: ../shared/envoy/Dockerfile + args: + ENVOY_VARIANT: contrib-dev + ports: + - "${PORT_PROXY:-10000}:10000" + - "${PORT_ADMIN:-8001}:8001" + + kafka-cluster1: + build: + context: . + dockerfile: Dockerfile-kafka + depends_on: + zookeeper: + condition: service_healthy + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/cluster1 + KAFKA_LISTENERS: PLAINTEXT://kafka-cluster1:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-cluster1:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + kafka-cluster2: + build: + context: . + dockerfile: Dockerfile-kafka + depends_on: + zookeeper: + condition: service_healthy + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/cluster2 + KAFKA_LISTENERS: PLAINTEXT://kafka-cluster2:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-cluster2:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + zookeeper: + build: + context: . + dockerfile: Dockerfile-zookeeper + healthcheck: + test: ["CMD", "sh", "-c", "echo ruok | nc 127.0.0.1 2181 || exit -1"] + interval: 5s + timeout: 60s + retries: 120 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=ruok" diff --git a/kafka-mesh/envoy.yaml b/kafka-mesh/envoy.yaml new file mode 100644 index 00000000..b5733afa --- /dev/null +++ b/kafka-mesh/envoy.yaml @@ -0,0 +1,36 @@ +static_resources: + listeners: + - address: + socket_address: + address: 0.0.0.0 + port_value: 10000 + filter_chains: + - filters: + - name: envoy.filters.network.kafka_broker + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker + stat_prefix: kafka_mesh + force_response_rewrite: true + - name: envoy.filters.network.kafka_mesh + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh + advertised_host: "proxy" + advertised_port: 10000 + upstream_clusters: + - cluster_name: kafka_cluster1 + bootstrap_servers: kafka-cluster1:9092 + partition_count: 1 + - cluster_name: kafka_cluster2 + bootstrap_servers: kafka-cluster2:9092 + partition_count: 1 + forwarding_rules: + - target_cluster: kafka_cluster1 + topic_prefix: a + - target_cluster: kafka_cluster2 + topic_prefix: b + +admin: + address: + socket_address: + address: 0.0.0.0 + port_value: 8001 diff --git a/kafka-mesh/example.rst b/kafka-mesh/example.rst new file mode 100644 index 00000000..2dca23f7 --- /dev/null +++ b/kafka-mesh/example.rst @@ -0,0 +1,155 @@ +.. _install_sandboxes_kafka_mesh: + +Kafka mesh filter +================= + +.. sidebar:: Requirements + + .. include:: _include/docker-env-setup-link.rst + + :ref:`curl ` + Used to make HTTP requests. + +This example demonstrates the Kafka mesh filter, which routes Kafka traffic +to different upstream clusters based on topic name. + +The :ref:`Kafka broker filter ` passes all traffic +to a single upstream Kafka cluster while collecting metrics. The mesh filter +goes further: it reads the Kafka protocol, extracts the topic name from each +request, and forwards it to the appropriate upstream cluster. + +In this example we configure two upstream Kafka clusters with routing rules: + +- Topics starting with ``a`` (e.g., ``apples``) → **cluster1** +- Topics starting with ``b`` (e.g., ``bananas``) → **cluster2** + +Clients connect to Envoy as if it were a single Kafka broker. Envoy handles +the routing transparently. + + +Step 1: Start all containers +**************************** + +Change to the ``kafka-mesh`` directory and start the containers. This brings +up Envoy, two independent Kafka clusters, and a shared Zookeeper instance. + +.. code-block:: console + + $ pwd + envoy/examples/kafka-mesh + $ docker compose pull + $ docker compose up --build -d + $ docker compose ps + + Name State Ports + ------------------------------------------------------------------ + kafka-mesh_proxy_1 Up 0.0.0.0:10000->10000/tcp + kafka-mesh_kafka-cluster1_1 Up 9092/tcp + kafka-mesh_kafka-cluster2_1 Up 9092/tcp + kafka-mesh_zookeeper_1 Up (healthy) 2181/tcp + + +Step 2: Produce a message to the ``apples`` topic +************************************************* + +Send a message to a topic starting with ``a``. The mesh filter will route +this to **cluster1**. + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + /bin/bash -c "echo 'hello from apples' | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic apples" + + +Step 3: Produce a message to the ``bananas`` topic +************************************************** + +Send a message to a topic starting with ``b``. The mesh filter will route +this to **cluster2**. + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + /bin/bash -c "echo 'hello from bananas' | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic bananas" + + +Step 4: Verify the message landed in cluster1 +********************************************* + +To confirm the routing worked, consume directly from **cluster1** (bypassing +Envoy). You should see the ``apples`` message: + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + kafka-console-consumer --bootstrap-server kafka-cluster1:9092 --topic apples --from-beginning --max-messages 1 + hello from apples + +The ``bananas`` topic should not exist on cluster1. + + +Step 5: Verify the message landed in cluster2 +********************************************* + +Similarly, consume directly from **cluster2**. You should see the ``bananas`` +message: + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + kafka-console-consumer --bootstrap-server kafka-cluster2:9092 --topic bananas --from-beginning --max-messages 1 + hello from bananas + +The ``apples`` topic should not exist on cluster2. + + +Step 6: Consume through the mesh filter +*************************************** + +The mesh filter also routes fetch (consume) requests to the correct upstream. +Clients can consume through Envoy without knowing which cluster holds the data. + +.. note:: + + The mesh filter supports only Produce and Fetch Kafka APIs. Consumer group + coordination (FIND_COORDINATOR, etc.) is not supported, so we consume by + directly specifying the partition with ``--partition 0``. + +Consume the ``apples`` topic through Envoy: + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + kafka-console-consumer --bootstrap-server proxy:10000 --topic apples --partition 0 --from-beginning --max-messages 1 + hello from apples + +Consume the ``bananas`` topic through Envoy: + +.. code-block:: console + + $ docker compose run --rm kafka-client \ + kafka-console-consumer --bootstrap-server proxy:10000 --topic bananas --partition 0 --from-beginning --max-messages 1 + hello from bananas + + +Step 7: Check Envoy stats +************************* + +Envoy records metrics for Kafka traffic. Query the admin interface to see +produce and fetch request counts: + +.. code-block:: console + + $ curl -s "http://localhost:8001/stats?filter=kafka" | grep -v ": 0" | grep "_request:" + kafka.kafka_mesh.request.produce_request: 2 + kafka.kafka_mesh.request.fetch_request: 4 + kafka.kafka_mesh.request.metadata_request: 8 + + +.. seealso:: + + :ref:`Envoy Kafka mesh filter ` + Learn more about the Kafka mesh filter configuration. + + :ref:`Kafka broker filter example ` + A simpler example using the broker filter for single-cluster proxying. diff --git a/kafka-mesh/verify.sh b/kafka-mesh/verify.sh new file mode 100755 index 00000000..249045e2 --- /dev/null +++ b/kafka-mesh/verify.sh @@ -0,0 +1,38 @@ +#!/bin/bash -e + +export NAME=kafka-mesh +export PORT_PROXY="${KAFKA_MESH_PORT_PROXY:-11200}" +export PORT_ADMIN="${KAFKA_MESH_PORT_ADMIN:-11201}" + +UPARGS="proxy kafka-cluster1 kafka-cluster2 zookeeper" + +# shellcheck source=verify-common.sh +. "$(dirname "${BASH_SOURCE[0]}")/../verify-common.sh" + +kafka_client () { + "${DOCKER_COMPOSE[@]}" run --rm kafka-client "$@" +} + +run_log "Produce message to topic 'apples' (routes to cluster1)" +kafka_client /bin/bash -c "echo 'hello from apples' | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic apples" + +run_log "Produce message to topic 'bananas' (routes to cluster2)" +kafka_client /bin/bash -c "echo 'hello from bananas' | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic bananas" + +run_log "Verify message landed in cluster1 (consume directly from cluster1)" +kafka_client kafka-console-consumer --bootstrap-server kafka-cluster1:9092 --topic apples --from-beginning --max-messages 1 | grep "hello from apples" + +run_log "Verify message landed in cluster2 (consume directly from cluster2)" +kafka_client kafka-console-consumer --bootstrap-server kafka-cluster2:9092 --topic bananas --from-beginning --max-messages 1 | grep "hello from bananas" + +run_log "Consume 'apples' through mesh filter" +kafka_client kafka-console-consumer --bootstrap-server proxy:10000 --topic apples --partition 0 --from-beginning --max-messages 1 | grep "hello from apples" + +run_log "Consume 'bananas' through mesh filter" +kafka_client kafka-console-consumer --bootstrap-server proxy:10000 --topic bananas --partition 0 --from-beginning --max-messages 1 | grep "hello from bananas" + +run_log "Check Envoy stats for produce and fetch requests" +stats_output=$(_curl "http://localhost:${PORT_ADMIN}/stats?filter=kafka") +echo "$stats_output" | grep "produce_request" | grep -v ": 0" +echo "$stats_output" | grep "fetch_request" | grep -v ": 0" +echo "$stats_output" | grep "metadata_request" | grep -v ": 0"