Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ updates:
interval: daily
time: "06:00"

- package-ecosystem: "docker"
directory: "/kafka-mesh"
groups:
examples-kafka-mesh:
patterns:
- "*"
schedule:
interval: daily
time: "06:00"

- package-ecosystem: "docker"
directory: "/local_ratelimit"
groups:
Expand Down
2 changes: 2 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ EXAMPLE_TESTS = [
"gzip",
"jaeger-tracing",
"kafka",
"kafka-mesh",
"load-reporting-service",
"locality-load-balancing",
"local_ratelimit",
Expand Down Expand Up @@ -79,6 +80,7 @@ filegroup(
"mysql/*.yaml",
"postgres/*.yaml",
"kafka/*.yaml",
"kafka-mesh/*.yaml",
],
exclude = [
"**/*docker-compose*.yaml",
Expand Down
1 change: 1 addition & 0 deletions kafka-mesh/Dockerfile-kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM confluentinc/cp-kafka:latest@sha256:f1c091384a9fbbbe7db084583e3434800b9916744eeb0410b2500371ac95c157
1 change: 1 addition & 0 deletions kafka-mesh/Dockerfile-zookeeper
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM confluentinc/cp-zookeeper:latest@sha256:3a5f014d6fb9ec3b0fd843a98b60c21648d2c20b094ef428b270f7dfda261cc4
2 changes: 2 additions & 0 deletions kafka-mesh/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
To learn about this sandbox and for instructions on how to run it please head over
to the [envoy docs](https://www.envoyproxy.io/docs/envoy/latest/start/sandboxes/kafka_mesh)
61 changes: 61 additions & 0 deletions kafka-mesh/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
services:

kafka-client:
build:
context: .
dockerfile: Dockerfile-kafka
restart: "no"
deploy:
replicas: 0

proxy:
build:
context: .
dockerfile: ../shared/envoy/Dockerfile
args:
ENVOY_VARIANT: contrib-dev
ports:
- "${PORT_PROXY:-10000}:10000"
- "${PORT_ADMIN:-8001}:8001"

kafka-cluster1:
build:
context: .
dockerfile: Dockerfile-kafka
depends_on:
zookeeper:
condition: service_healthy
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/cluster1
KAFKA_LISTENERS: PLAINTEXT://kafka-cluster1:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-cluster1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

kafka-cluster2:
build:
context: .
dockerfile: Dockerfile-kafka
depends_on:
zookeeper:
condition: service_healthy
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/cluster2
KAFKA_LISTENERS: PLAINTEXT://kafka-cluster2:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-cluster2:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

zookeeper:
build:
context: .
dockerfile: Dockerfile-zookeeper
healthcheck:
test: ["CMD", "sh", "-c", "echo ruok | nc 127.0.0.1 2181 || exit -1"]
interval: 5s
timeout: 60s
retries: 120
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=ruok"
36 changes: 36 additions & 0 deletions kafka-mesh/envoy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
static_resources:
listeners:
- address:
socket_address:
address: 0.0.0.0
port_value: 10000
filter_chains:
- filters:
- name: envoy.filters.network.kafka_broker
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
stat_prefix: kafka_mesh
force_response_rewrite: true
- name: envoy.filters.network.kafka_mesh
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh
advertised_host: "proxy"
advertised_port: 10000
upstream_clusters:
- cluster_name: kafka_cluster1
bootstrap_servers: kafka-cluster1:9092
partition_count: 1
- cluster_name: kafka_cluster2
bootstrap_servers: kafka-cluster2:9092
partition_count: 1
forwarding_rules:
- target_cluster: kafka_cluster1
topic_prefix: a
- target_cluster: kafka_cluster2
topic_prefix: b

admin:
address:
socket_address:
address: 0.0.0.0
port_value: 8001
155 changes: 155 additions & 0 deletions kafka-mesh/example.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
.. _install_sandboxes_kafka_mesh:

Kafka mesh filter
=================

.. sidebar:: Requirements

.. include:: _include/docker-env-setup-link.rst

:ref:`curl <start_sandboxes_setup_curl>`
Used to make HTTP requests.

This example demonstrates the Kafka mesh filter, which routes Kafka traffic
to different upstream clusters based on topic name.

The :ref:`Kafka broker filter <install_sandboxes_kafka>` passes all traffic
to a single upstream Kafka cluster while collecting metrics. The mesh filter
goes further: it reads the Kafka protocol, extracts the topic name from each
request, and forwards it to the appropriate upstream cluster.

In this example we configure two upstream Kafka clusters with routing rules:

- Topics starting with ``a`` (e.g., ``apples``) → **cluster1**
- Topics starting with ``b`` (e.g., ``bananas``) → **cluster2**

Clients connect to Envoy as if it were a single Kafka broker. Envoy handles
the routing transparently.


Step 1: Start all containers
****************************

Change to the ``kafka-mesh`` directory and start the containers. This brings
up Envoy, two independent Kafka clusters, and a shared Zookeeper instance.

.. code-block:: console

$ pwd
envoy/examples/kafka-mesh
$ docker compose pull
$ docker compose up --build -d
$ docker compose ps

Name State Ports
------------------------------------------------------------------
kafka-mesh_proxy_1 Up 0.0.0.0:10000->10000/tcp
kafka-mesh_kafka-cluster1_1 Up 9092/tcp
kafka-mesh_kafka-cluster2_1 Up 9092/tcp
kafka-mesh_zookeeper_1 Up (healthy) 2181/tcp


Step 2: Produce a message to the ``apples`` topic
*************************************************

Send a message to a topic starting with ``a``. The mesh filter will route
this to **cluster1**.

.. code-block:: console

$ docker compose run --rm kafka-client \
/bin/bash -c "echo 'hello from apples' | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic apples"


Step 3: Produce a message to the ``bananas`` topic
**************************************************

Send a message to a topic starting with ``b``. The mesh filter will route
this to **cluster2**.

.. code-block:: console

$ docker compose run --rm kafka-client \
/bin/bash -c "echo 'hello from bananas' | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic bananas"


Step 4: Verify the message landed in cluster1
*********************************************

To confirm the routing worked, consume directly from **cluster1** (bypassing
Envoy). You should see the ``apples`` message:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-console-consumer --bootstrap-server kafka-cluster1:9092 --topic apples --from-beginning --max-messages 1
hello from apples

The ``bananas`` topic should not exist on cluster1.


Step 5: Verify the message landed in cluster2
*********************************************

Similarly, consume directly from **cluster2**. You should see the ``bananas``
message:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-console-consumer --bootstrap-server kafka-cluster2:9092 --topic bananas --from-beginning --max-messages 1
hello from bananas

The ``apples`` topic should not exist on cluster2.


Step 6: Consume through the mesh filter
***************************************

The mesh filter also routes fetch (consume) requests to the correct upstream.
Clients can consume through Envoy without knowing which cluster holds the data.

.. note::

The mesh filter supports only Produce and Fetch Kafka APIs. Consumer group
coordination (FIND_COORDINATOR, etc.) is not supported, so we consume by
directly specifying the partition with ``--partition 0``.

Consume the ``apples`` topic through Envoy:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-console-consumer --bootstrap-server proxy:10000 --topic apples --partition 0 --from-beginning --max-messages 1
hello from apples

Consume the ``bananas`` topic through Envoy:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-console-consumer --bootstrap-server proxy:10000 --topic bananas --partition 0 --from-beginning --max-messages 1
hello from bananas


Step 7: Check Envoy stats
*************************

Envoy records metrics for Kafka traffic. Query the admin interface to see
produce and fetch request counts:

.. code-block:: console

$ curl -s "http://localhost:8001/stats?filter=kafka" | grep -v ": 0" | grep "_request:"
kafka.kafka_mesh.request.produce_request: 2
kafka.kafka_mesh.request.fetch_request: 4
kafka.kafka_mesh.request.metadata_request: 8


.. seealso::

:ref:`Envoy Kafka mesh filter <config_network_filters_kafka_mesh>`
Learn more about the Kafka mesh filter configuration.

:ref:`Kafka broker filter example <install_sandboxes_kafka>`
A simpler example using the broker filter for single-cluster proxying.
38 changes: 38 additions & 0 deletions kafka-mesh/verify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash -e

export NAME=kafka-mesh
export PORT_PROXY="${KAFKA_MESH_PORT_PROXY:-11200}"
export PORT_ADMIN="${KAFKA_MESH_PORT_ADMIN:-11201}"

UPARGS="proxy kafka-cluster1 kafka-cluster2 zookeeper"

# shellcheck source=verify-common.sh
. "$(dirname "${BASH_SOURCE[0]}")/../verify-common.sh"

kafka_client () {
"${DOCKER_COMPOSE[@]}" run --rm kafka-client "$@"
}

run_log "Produce message to topic 'apples' (routes to cluster1)"
kafka_client /bin/bash -c "echo 'hello from apples' | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic apples"

run_log "Produce message to topic 'bananas' (routes to cluster2)"
kafka_client /bin/bash -c "echo 'hello from bananas' | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic bananas"

run_log "Verify message landed in cluster1 (consume directly from cluster1)"
kafka_client kafka-console-consumer --bootstrap-server kafka-cluster1:9092 --topic apples --from-beginning --max-messages 1 | grep "hello from apples"

run_log "Verify message landed in cluster2 (consume directly from cluster2)"
kafka_client kafka-console-consumer --bootstrap-server kafka-cluster2:9092 --topic bananas --from-beginning --max-messages 1 | grep "hello from bananas"

run_log "Consume 'apples' through mesh filter"
kafka_client kafka-console-consumer --bootstrap-server proxy:10000 --topic apples --partition 0 --from-beginning --max-messages 1 | grep "hello from apples"

run_log "Consume 'bananas' through mesh filter"
kafka_client kafka-console-consumer --bootstrap-server proxy:10000 --topic bananas --partition 0 --from-beginning --max-messages 1 | grep "hello from bananas"

run_log "Check Envoy stats for produce and fetch requests"
stats_output=$(_curl "http://localhost:${PORT_ADMIN}/stats?filter=kafka")
echo "$stats_output" | grep "produce_request" | grep -v ": 0"
echo "$stats_output" | grep "fetch_request" | grep -v ": 0"
echo "$stats_output" | grep "metadata_request" | grep -v ": 0"
Loading