diff --git a/docs/benchmark-results/iceberg.md b/docs/benchmark-results/iceberg.md new file mode 100644 index 0000000000..e78e7b7860 --- /dev/null +++ b/docs/benchmark-results/iceberg.md @@ -0,0 +1,116 @@ +# Iceberg Benchmark Results + +Benchmarks for the `iceberg` output using a local REST catalog backed by MinIO (S3-compatible). + +See [`internal/impl/iceberg/bench/`](../../internal/impl/iceberg/bench/) for the benchmark configs and run instructions. + +## Write Throughput — CPU & Batch Size Scaling + +Synthetic events generated at maximum speed (`generate` input with `count: 0, interval: ""`), written to a single Iceberg table. Varying `GOMAXPROCS` and `batching.count`. + +**Environment:** Intel Core i7-10850H @ 2.70GHz, 32 GB RAM, WSL2 (Linux 6.6.87.2), x86_64, MinIO + REST catalog running in Docker (localhost) + +**Dataset:** Synthetic events, ~142 B per message (id, user_id, event_type, value, info, ts) — measured at the pipeline processor. Actual bytes written to MinIO will differ due to Parquet columnar compression. + +**Count:** 1,000,000 messages per run + +### msg/sec + +| GOMAXPROCS | batch=1000 | batch=5000 | batch=10000 | +|-------------|------------|------------|-------------| +| 1 | 757 | 3,105 | 5,442 | +| 2 | 1,186 | 4,408 | 6,763 | +| 4 | 1,147 | 4,758 | 8,483 | +| 8 | 1,056 | 4,107 | 8,231 | +| (unbounded) | | | | + +### kB/sec (batch=1000, batch=5000) / MB/sec (batch=10000) + +| GOMAXPROCS | batch=1000 | batch=5000 | batch=10000 | +|-------------|------------|------------|-------------| +| 1 | 106 | 435 | 774 | +| 2 | 166 | 618 | 961 | +| 4 | 161 | 667 | 1206 | +| 8 | 148 | 576 | 1170 | +| (unbounded) | | | | + +**Observations:** + +- **Batch size is the dominant factor:** throughput at 1 core scales from 757 (batch=1000) → 3,105 (batch=5000) → 5,442 (batch=10000) msg/sec. Each batch = one catalog commit round-trip, so fewer commits = dramatically higher throughput. +- **batch=5000 and batch=10000 benefit from more cores up to 4**, then regress at 8 — the catalog commit overhead is reduced enough that CPU parallelism helps, but 8 cores reintroduces contention. + +--- + +## Write Throughput — Batch Size & max_in_flight Scaling + +Fixed at `GOMAXPROCS=4`, varying `batching.count` and `max_in_flight` to measure the impact of concurrent catalog commits. + +**Environment:** Intel Core i7-10850H @ 2.70GHz, 32 GB RAM, WSL2 (Linux 6.6.87.2), x86_64, MinIO + REST catalog running in Docker (localhost) + +**Dataset:** Synthetic events, ~142 B per message + +**Count:** 1,000,000 messages per run + +### msg/sec + +| max_in_flight | batch=5000 | batch=10000 | +|---------------|------------|-------------| +| 4 | 4,758 | 8,483 | +| 8 | 7,105 | 13,839 | +| 16 | 12,973 | 23,316 | +| 32 | 20,462 | 34,835 | +| 64 | 34,993 | 33,703 | +| 128 | 33,911 | 33,742 | + +### MB/sec + +| max_in_flight | batch=5000 | batch=10000 | +|---------------|------------|-------------| +| 4 | 0.67 | 1.21 | +| 8 | 1.00 | 2.00 | +| 16 | 1.80 | 3.30 | +| 32 | 2.90 | 5.00 | +| 64 | 5.00 | 4.80 | +| 128 | 4.80 | 4.80 | + +**Observations:** + +- **`max_in_flight` is the most impactful knob:** at batch=10000, throughput scales from 8,483 (MIF=4) → 13,839 (MIF=8) → 23,316 (MIF=16) → 34,835 (MIF=32) msg/sec — a 4x gain by increasing concurrent commits. +- **The ceiling is ~34K msg/sec / 5 MB/sec**, hit at MIF=32 for batch=10000 and MIF=64 for batch=5000. This is the MinIO throughput limit, not the connector. +- **batch=5000 and batch=10000 converge at high MIF values** — both plateau at ~34K msg/sec when given enough concurrent commits. batch=10000 reaches the ceiling with fewer in-flight requests (MIF=32 vs MIF=64). +- **Sweet spot: batch=10000, MIF=32** — reaches maximum throughput with the least concurrency overhead. +- The fundamental insight from both sections: the Iceberg write bottleneck is catalog commit latency. The connector itself is not the bottleneck — throw more concurrent commits at it (`max_in_flight`) and it scales linearly until MinIO saturates. + +--- + +## Comparison: Kafka Connect vs Redpanda Connect Iceberg + +**Environment:** Intel Core i7-10850H @ 2.70GHz, 32 GB RAM, WSL2 (Linux 6.6.87.2), x86_64 +**Dataset:** 10,000,000 synthetic events, MinIO + Iceberg REST catalog in Docker + +Both connectors use a 10s commit window and 16 Kafka partitions. The transformation computes 5 derived fields per message (`event_id`, `value_usd`, `value_tier`, `ts_ms`, `is_high_value`). + +### Results + +#### Sink only + +| Connector | Throughput | +|--------------------------|---------------| +| Kafka Connect (Tabular) | 84,745 msg/s | +| Redpanda Connect | 61,349 msg/s | + +#### Transform + sink + +| Connector | Kafka CPUs | Throughput | +|--------------------------|------------|---------------| +| Kafka Connect (Tabular) | unbounded | 37,037 msg/s | +| Redpanda Connect | unbounded | 47,272 msg/s | +| Redpanda Connect | 1 | 45,248 msg/s | +| Redpanda Connect | 2 | 48,829 msg/s | + +### Notes + +- **Kafka Connect sink-only** is fastest in isolation — 16 tasks consuming pre-transformed data directly into Iceberg. +- **Kafka Connect with transformation** requires a separate RPCN pre-processing step that writes to an intermediate Kafka topic (`bench-events-transformed`), then Kafka Connect reads from that topic and sinks to Iceberg. The two-stage I/O cuts throughput by more than half. +- **Redpanda Connect** handles transformation and Iceberg writes in a single pipeline — no intermediate topic, no extra Kafka round-trip. +- **End-to-end (the realistic scenario):** Redpanda Connect is ~1.3x faster than Kafka Connect (47k vs 37k msg/s). diff --git a/internal/impl/iceberg/bench/README.md b/internal/impl/iceberg/bench/README.md new file mode 100644 index 0000000000..69c24db2c7 --- /dev/null +++ b/internal/impl/iceberg/bench/README.md @@ -0,0 +1,64 @@ +# Iceberg Benchmark + +Measures write throughput of the Redpanda Connect Iceberg output. Two benchmarks are available: + +- **This folder** — Redpanda Connect only (generate → Iceberg), no Kafka +- **[`kafka-connector/`](kafka-connector/)** — End-to-end comparison: Kafka → transform → Iceberg, benchmarked against Kafka Connect (Tabular Iceberg Sink) + +See [`docs/benchmark-results/iceberg.md`](../../../../docs/benchmark-results/iceberg.md) for results. + +## Prerequisites + +- Docker with Compose + +## Infrastructure + +```bash +task infra:up # start MinIO + Iceberg REST catalog +task infra:down # stop and remove all containers +task infra:reset # wipe data and restart +task infra:logs # follow container logs +``` + +MinIO console: http://localhost:9001 (admin/password) +Iceberg REST catalog: http://localhost:8181 + +## Running + +### Generate → Iceberg (no Kafka) + +```bash +task bench CORES=4 BATCH=5000 COUNT=1000000 +``` + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `CORES` | unbounded | `GOMAXPROCS` | +| `BATCH` | 1000 | `batching.count` | +| `COUNT` | 0 (unlimited) | number of messages | + +### Varying max_in_flight + +```bash +task bench:mif CORES=4 BATCH=10000 MIF=32 COUNT=1000000 +``` + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `MIF` | 4 | `max_in_flight` | + +### Clean run + +```bash +task bench:reset CORES=4 BATCH=5000 COUNT=1000000 +``` + +Wipes all Iceberg data, restarts infrastructure, then runs the benchmark. + +## Output + +``` +INFO rolling stats: 5000 msg/sec, 3.2 MB/sec @service=redpanda-connect ... +``` + +Throughput is measured at the pipeline processor, before the Iceberg writer. diff --git a/internal/impl/iceberg/bench/Taskfile.yaml b/internal/impl/iceberg/bench/Taskfile.yaml new file mode 100644 index 0000000000..4035bdd07c --- /dev/null +++ b/internal/impl/iceberg/bench/Taskfile.yaml @@ -0,0 +1,151 @@ +version: "3" + +vars: + MINIO_USER: '{{.MINIO_USER | default "admin"}}' + MINIO_PASSWORD: '{{.MINIO_PASSWORD | default "password"}}' + MINIO_REGION: '{{.MINIO_REGION | default "us-east-1"}}' + CATALOG_URL: '{{.CATALOG_URL | default "http://localhost:8181"}}' + MINIO_ENDPOINT: '{{.MINIO_ENDPOINT | default "http://localhost:9000"}}' + MINIO_BUCKET: '{{.MINIO_BUCKET | default "warehouse"}}' + +tasks: + infra:up: + desc: Start MinIO and Iceberg REST catalog + cmds: + - docker compose -f docker-compose.yaml up -d + - until curl -sf http://localhost:9000/minio/health/live > /dev/null; do sleep 1; done + - until curl -sf http://localhost:8181/v1/config > /dev/null; do sleep 1; done + + infra:down: + desc: Stop and remove all containers and volumes + cmds: + - docker compose -f docker-compose.yaml down -v + + infra:logs: + desc: Show container logs + cmds: + - docker compose -f docker-compose.yaml logs -f + + infra:reset: + desc: Wipe all data and restart infrastructure + cmds: + - docker compose -f docker-compose.yaml down -v + - docker compose -f docker-compose.yaml up -d + - until curl -sf http://localhost:9000/minio/health/live > /dev/null; do sleep 1; done + - until curl -sf http://localhost:8181/v1/config > /dev/null; do sleep 1; done + + # Parameterised benchmark — specify CORES, BATCH and COUNT freely + # Usage: task bench CORES=4 BATCH=5000 COUNT=1000000 + bench: + desc: "Run write benchmark (e.g. task bench CORES=4 BATCH=5000 COUNT=1000000)" + vars: + CORES: '{{.CORES | default ""}}' + BATCH: '{{.BATCH | default "1000"}}' + COUNT: '{{.COUNT | default "0"}}' + cmds: + - | + AWS_EC2_METADATA_DISABLED=true \ + AWS_ACCESS_KEY_ID={{.MINIO_USER}} \ + AWS_SECRET_ACCESS_KEY={{.MINIO_PASSWORD}} \ + AWS_REGION={{.MINIO_REGION}} \ + GOMAXPROCS={{.CORES}} go run ../../../../cmd/redpanda-connect/main.go run \ + --set input.generate.count={{.COUNT}} \ + --set output.iceberg.batching.count={{.BATCH}} \ + --set output.iceberg.batching.period=5s \ + --set output.iceberg.catalog.url={{.CATALOG_URL}} \ + --set output.iceberg.storage.aws_s3.endpoint={{.MINIO_ENDPOINT}} \ + --set output.iceberg.storage.aws_s3.bucket={{.MINIO_BUCKET}} \ + --set output.iceberg.storage.aws_s3.credentials.id={{.MINIO_USER}} \ + --set output.iceberg.storage.aws_s3.credentials.secret={{.MINIO_PASSWORD}} \ + ./benchmark_config.yaml + + # Usage: task bench:mif BATCH=10000 MIF=8 CORES=4 COUNT=1000000 + bench:mif: + desc: "Run benchmark with varying BATCH, max_in_flight, CORES and COUNT (e.g. task bench:mif BATCH=10000 MIF=8 CORES=4 COUNT=1000000)" + vars: + BATCH: '{{.BATCH | default "1000"}}' + MIF: '{{.MIF | default "4"}}' + CORES: '{{.CORES | default "4"}}' + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - | + AWS_EC2_METADATA_DISABLED=true \ + AWS_ACCESS_KEY_ID={{.MINIO_USER}} \ + AWS_SECRET_ACCESS_KEY={{.MINIO_PASSWORD}} \ + AWS_REGION={{.MINIO_REGION}} \ + GOMAXPROCS={{.CORES}} go run ../../../../cmd/redpanda-connect/main.go run \ + --set input.generate.count={{.COUNT}} \ + --set output.iceberg.batching.count={{.BATCH}} \ + --set output.iceberg.batching.period=5s \ + --set output.iceberg.max_in_flight={{.MIF}} \ + --set output.iceberg.catalog.url={{.CATALOG_URL}} \ + --set output.iceberg.storage.aws_s3.endpoint={{.MINIO_ENDPOINT}} \ + --set output.iceberg.storage.aws_s3.bucket={{.MINIO_BUCKET}} \ + --set output.iceberg.storage.aws_s3.credentials.id={{.MINIO_USER}} \ + --set output.iceberg.storage.aws_s3.credentials.secret={{.MINIO_PASSWORD}} \ + ./benchmark_config.yaml + + bench:lag: + desc: Show current consumer lag for the Redpanda Connect Iceberg sink group + cmds: + - docker exec kc-kafka kafka-consumer-groups --bootstrap-server localhost:29092 --describe --group rpcn-iceberg-bench + + # End-to-end benchmark using kafka-connector infrastructure (Kafka + MinIO + Iceberg REST). + # Prerequisites: run `task up` from the kafka-connector/ folder first. + # + # Usage: + # task bench:run COUNT=10000000 + # task bench:run COUNT=10000000 BATCH=10000 MIF=32 CORES=4 INTERVAL=10 + bench:load: + desc: "Load COUNT events to Kafka (e.g. task bench:load COUNT=10000000)" + vars: + COUNT: '{{.COUNT | default "1000000"}}' + cmds: + - echo "==> Loading {{.COUNT}} events to Kafka..." + - (cd kafka-connector && task data:load COUNT={{.COUNT}}) + + bench:run: + desc: "Drain bench-events via Redpanda Connect Iceberg sink (e.g. task bench:run CORES=4 BATCH=10000 MIF=32)" + vars: + BATCH: '{{.BATCH | default "0"}}' + MIF: '{{.MIF | default "32"}}' + CORES: '{{.CORES | default "4"}}' + INTERVAL: '{{.INTERVAL | default "1"}}' + CHECKPOINT: '{{.CHECKPOINT | default "1000000"}}' + PERIOD: '{{.PERIOD | default "10s"}}' + KC_CATALOG_URL: '{{.KC_CATALOG_URL | default "http://localhost:18181"}}' + KC_MINIO_ENDPOINT: '{{.KC_MINIO_ENDPOINT | default "http://localhost:19000"}}' + cmds: + - | + docker exec kc-kafka kafka-consumer-groups \ + --bootstrap-server localhost:29092 \ + --group rpcn-iceberg-bench \ + --topic bench-events \ + --reset-offsets --to-earliest --execute 2>/dev/null || true + - | + AWS_EC2_METADATA_DISABLED=true \ + AWS_ACCESS_KEY_ID={{.MINIO_USER}} \ + AWS_SECRET_ACCESS_KEY={{.MINIO_PASSWORD}} \ + AWS_REGION={{.MINIO_REGION}} \ + GOMAXPROCS={{.CORES}} go run ../../../../cmd/redpanda-connect/main.go run \ + --set input.kafka.checkpoint_limit={{.CHECKPOINT}} \ + --set output.iceberg.batching.count={{.BATCH}} \ + --set output.iceberg.batching.period={{.PERIOD}} \ + --set output.iceberg.max_in_flight={{.MIF}} \ + --set output.iceberg.catalog.url={{.KC_CATALOG_URL}} \ + --set output.iceberg.storage.aws_s3.endpoint={{.KC_MINIO_ENDPOINT}} \ + --set output.iceberg.storage.aws_s3.bucket={{.MINIO_BUCKET}} \ + --set output.iceberg.storage.aws_s3.credentials.id={{.MINIO_USER}} \ + --set output.iceberg.storage.aws_s3.credentials.secret={{.MINIO_PASSWORD}} \ + ./kafka_benchmark_config.yaml + + + bench:reset: + desc: Wipe all data, restart infrastructure, then run benchmark + vars: + CORES: '{{.CORES | default ""}}' + BATCH: '{{.BATCH | default "1000"}}' + COUNT: '{{.COUNT | default "0"}}' + cmds: + - task infra:reset + - task bench CORES={{.CORES}} BATCH={{.BATCH}} COUNT={{.COUNT}} diff --git a/internal/impl/iceberg/bench/benchmark_config.yaml b/internal/impl/iceberg/bench/benchmark_config.yaml new file mode 100644 index 0000000000..ecaaedc796 --- /dev/null +++ b/internal/impl/iceberg/bench/benchmark_config.yaml @@ -0,0 +1,49 @@ +http: + debug_endpoints: true + +input: + generate: + count: 0 + interval: "" + mapping: | + root.id = counter() + root.user_id = (counter() % 10000) + 1 + root.event_type = ["click", "view", "purchase", "scroll", "hover"].index(counter() % 5) + root.value = (counter() % 1000) + random_int(max: 100) + root.info = "event info for record " + counter().string() + root.ts = now() + +pipeline: + processors: + - benchmark: + interval: 1s + count_bytes: true + +output: + iceberg: + catalog: + url: "${CATALOG_URL:http://localhost:8181}" + namespace: bench + table: events + storage: + aws_s3: + bucket: "${MINIO_BUCKET:warehouse}" + region: "${MINIO_REGION:us-east-1}" + endpoint: "${MINIO_ENDPOINT:http://localhost:9000}" + force_path_style_urls: true + credentials: + id: "${MINIO_USER:admin}" + secret: "${MINIO_PASSWORD:password}" + schema_evolution: + enabled: true + batching: + count: 1000 + period: 5s + +logger: + level: INFO + +metrics: + prometheus: + add_process_metrics: true + add_go_metrics: true diff --git a/internal/impl/iceberg/bench/docker-compose.yaml b/internal/impl/iceberg/bench/docker-compose.yaml new file mode 100644 index 0000000000..38870dbc53 --- /dev/null +++ b/internal/impl/iceberg/bench/docker-compose.yaml @@ -0,0 +1,48 @@ +services: + minio: + image: minio/minio:latest + network_mode: host + environment: + MINIO_ROOT_USER: admin + MINIO_ROOT_PASSWORD: password + MINIO_REGION: us-east-1 + command: server /data --address ":9000" --console-address ":9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 5s + timeout: 5s + retries: 5 + + minio-setup: + image: minio/mc:latest + network_mode: host + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + mc alias set myminio http://localhost:9000 admin password; + mc mb --ignore-existing myminio/warehouse; + mc anonymous set public myminio/warehouse; + exit 0; + " + + rest: + image: apache/iceberg-rest-fixture + network_mode: host + environment: + CATALOG_WAREHOUSE: s3://warehouse/ + CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO + CATALOG_S3_ENDPOINT: http://localhost:9000 + CATALOG_S3_PATH__STYLE__ACCESS: "true" + CATALOG_S3_ACCESS__KEY__ID: admin + CATALOG_S3_SECRET__ACCESS__KEY: password + AWS_REGION: us-east-1 + depends_on: + minio-setup: + condition: service_completed_successfully + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8181/v1/config"] + interval: 5s + timeout: 5s + retries: 5 diff --git a/internal/impl/iceberg/bench/kafka-connector/Dockerfile b/internal/impl/iceberg/bench/kafka-connector/Dockerfile new file mode 100644 index 0000000000..485ed7d58f --- /dev/null +++ b/internal/impl/iceberg/bench/kafka-connector/Dockerfile @@ -0,0 +1,8 @@ +FROM confluentinc/cp-kafka-connect-base:7.7.8 +RUN curl -fL \ + -o /tmp/iceberg-kafka-connect.zip \ + "https://github.com/databricks/iceberg-kafka-connect/releases/download/v0.6.19/iceberg-kafka-connect-runtime-0.6.19.zip" && \ + confluent-hub install --no-prompt \ + --component-dir /usr/share/confluent-hub-components/ \ + /tmp/iceberg-kafka-connect.zip && \ + rm /tmp/iceberg-kafka-connect.zip diff --git a/internal/impl/iceberg/bench/kafka-connector/README.md b/internal/impl/iceberg/bench/kafka-connector/README.md new file mode 100644 index 0000000000..80ef1fc653 --- /dev/null +++ b/internal/impl/iceberg/bench/kafka-connector/README.md @@ -0,0 +1,74 @@ +# Kafka → Iceberg Benchmark (Kafka Connect vs Redpanda Connect) + +End-to-end benchmark comparing **Kafka Connect (Tabular Iceberg Sink)** against **Redpanda Connect** writing from Kafka to Iceberg. + +See [`docs/benchmark-results/iceberg.md`](../../../../../docs/benchmark-results/iceberg.md) for results. + +## Architecture + +``` +producer → bench-events → RPCN transformer → bench-events-transformed → Kafka Connect Iceberg Sink → Iceberg +``` + +The RPCN transformer step is required because the Tabular Iceberg sink connector cannot apply field transformations natively. Redpanda Connect handles the same pipeline in a single process with no intermediate topic. + +## Prerequisites + +- Docker with Compose +- Go toolchain +- `task` CLI +- `jq` + +## Infrastructure + +```bash +task up # start Kafka, MinIO, Iceberg REST, Kafka Connect +task down # stop and remove all containers and volumes +``` + +| Service | URL | +|---|---| +| Kafka | localhost:9092 | +| Kafka Connect | http://localhost:8083 | +| Iceberg REST catalog | http://localhost:18181 | +| MinIO console | http://localhost:19001 (admin/password) | + +## Quickstart + +```bash +task up +task bench:run COUNT=10000000 +``` + +## Tasks Reference + +| Task | Description | +|---|---| +| `task up` | Start all containers, wait for Connect readiness | +| `task down` | Stop and remove all containers and volumes | +| `task bench:run COUNT=N` | Full benchmark: produce → transform → sink | +| `task bench:transform` | Phase 1: transform bench-events → bench-events-transformed | +| `task bench:sink` | Phase 2: register connector and measure sink throughput | +| `task bench:measure TOTAL=N INTERVAL=S` | Poll lag and print msg/s until drained | +| `task bench:lag` | Show current consumer group lag | +| `task bench:offsets` | Show end offsets for both topics | +| `task connector:create` | Register the Iceberg sink connector | +| `task connector:status` | Show connector and task status | +| `task connector:delete` | Delete the connector (keeps infrastructure running) | +| `task data:load COUNT=N` | Produce N events to bench-events | +| `task data:reset` | Drop and recreate the Iceberg benchmark.events table | +| `task data:stats` | Show Iceberg table snapshot stats | +| `task control-topic:reset` | Reset the Iceberg control topic | +| `task bench-topic:recreate` | Delete and recreate bench-events topic | +| `task logs:connect` | Follow Kafka Connect worker logs | + +## Redpanda Connect Benchmark + +Run from the parent folder ([`bench/`](../)): + +```bash +cd .. +task bench:run COUNT=10000000 +``` + +Single pipeline — no intermediate topic, no separate transform step. diff --git a/internal/impl/iceberg/bench/kafka-connector/Taskfile.yaml b/internal/impl/iceberg/bench/kafka-connector/Taskfile.yaml new file mode 100644 index 0000000000..b21ca7ac92 --- /dev/null +++ b/internal/impl/iceberg/bench/kafka-connector/Taskfile.yaml @@ -0,0 +1,205 @@ +version: "3" + +silent: true + +vars: + KAFKA_BOOTSTRAP: '{{.KAFKA_BOOTSTRAP | default "localhost:9092"}}' + CONNECT_URL: '{{.CONNECT_URL | default "http://localhost:8083"}}' + ICEBERG_REST_URL: '{{.ICEBERG_REST_URL | default "http://localhost:18181"}}' + +tasks: + up: + desc: Build and start all infrastructure, wait for Kafka Connect to be ready + cmds: + - docker compose up -d --build + - until curl -sf {{.CONNECT_URL}}/ > /dev/null 2>&1; do sleep 3; done + - echo "Kafka Connect ready at {{.CONNECT_URL}}" + + down: + desc: Stop and remove all containers and volumes + cmds: + - docker compose down -v --remove-orphans + + connector:create: + desc: Register the Iceberg sink connector + cmds: + - | + curl -sf -X POST {{.CONNECT_URL}}/connectors \ + -H "Content-Type: application/json" \ + -d @connector.json | jq '.name' + + connector:status: + desc: Show connector and task status + cmds: + - curl -sf {{.CONNECT_URL}}/connectors/iceberg-sink-bench/status | jq . + + connector:delete: + desc: Delete the Iceberg sink connector (without stopping infrastructure) + cmds: + - curl -s -X DELETE {{.CONNECT_URL}}/connectors/iceberg-sink-bench || true + + data:load: + desc: "Load COUNT events to Kafka (e.g. task data:load COUNT=10000000)" + vars: + COUNT: '{{.COUNT | default "1000000"}}' + env: + COUNT: '{{.COUNT}}' + KAFKA_BOOTSTRAP: '{{.KAFKA_BOOTSTRAP}}' + cmds: + - go run ../../../../../cmd/redpanda-connect/main.go run ./producer.yaml + + bench:run: + desc: "Sink bench-events via Kafka Connect into Iceberg (run after bench:load)" + cmds: + - task connector:delete + - | + docker exec kc-kafka kafka-consumer-groups \ + --bootstrap-server localhost:29092 \ + --group connect-iceberg-sink-bench \ + --topic bench-events \ + --reset-offsets --to-earliest --execute 2>/dev/null || true + - task control-topic:reset + - task connector:create + - | + GROUP=connect-iceberg-sink-bench + INTERVAL=10 + + TOTAL=$(docker exec kc-kafka kafka-get-offsets \ + --bootstrap-server localhost:29092 \ + --topic bench-events 2>/dev/null \ + | awk -F: '{sum += $3} END {print sum}') + echo "bench-events has $TOTAL messages" + + get_lag() { + docker exec kc-kafka kafka-consumer-groups \ + --bootstrap-server localhost:29092 \ + --describe --group "$GROUP" 2>/dev/null \ + | awk '/bench-events/ { cur = ($4 == "-") ? 0 : $4; sum += $5 - cur } END {print (NR == 0 ? -1 : sum)}' + } + + START="" + PREV_LAG="" + PREV_TIME="" + printf "%-10s %-14s %s\n" "TIME" "LAG" "MSG/S" + + while true; do + sleep "$INTERVAL" + LAG=$(get_lag) + NOW=$(date +%s) + if [ -z "$START" ]; then + if [ "$LAG" -gt 0 ] 2>/dev/null; then + START=$NOW; PREV_LAG=$LAG; PREV_TIME=$NOW + printf "%-10s %-14s (timing started)\n" "$(date +%H:%M:%S)" "$LAG" + else + printf "%-10s %-14s (waiting for lag...)\n" "$(date +%H:%M:%S)" "$LAG" + fi + continue + fi + DELTA_T=$((NOW - PREV_TIME)) + DELTA_LAG=$((PREV_LAG - LAG)) + RATE=$(( DELTA_T > 0 ? DELTA_LAG / DELTA_T : 0 )) + printf "%-10s %-14s %s msg/s\n" "$(date +%H:%M:%S)" "$LAG" "$RATE" + if [ "$LAG" -eq 0 ]; then + ELAPSED=$((NOW - START)) + AVG=$(( TOTAL / (ELAPSED > 0 ? ELAPSED : 1) )) + echo "---" + echo "Total messages : $TOTAL" + echo "Elapsed : ${ELAPSED}s" + echo "Avg throughput : ${AVG} msg/s" + break + fi + PREV_LAG=$LAG + PREV_TIME=$NOW + done + + bench:lag: + desc: Show current consumer lag for the Iceberg sink connector group + cmds: + - docker exec kc-kafka kafka-consumer-groups --bootstrap-server localhost:29092 --describe --group connect-iceberg-sink-bench + + bench:offsets: + desc: Show end offsets for bench-events + cmds: + - docker exec kc-kafka kafka-get-offsets --bootstrap-server localhost:29092 --topic bench-events + + control-topic:reset: + desc: Delete and recreate the control-iceberg topic to clear stale coordinator state + cmds: + - docker exec kc-kafka kafka-topics --bootstrap-server localhost:29092 --delete --topic control-iceberg 2>/dev/null || true + - | + until ! docker exec kc-kafka kafka-topics --bootstrap-server localhost:29092 \ + --list 2>/dev/null | grep -q '^control-iceberg$'; do + sleep 1 + done + - | + docker exec kc-kafka kafka-topics --bootstrap-server localhost:29092 \ + --create --topic control-iceberg \ + --partitions 1 \ + --replication-factor 1 \ + --config cleanup.policy=compact + + bench-topic:recreate: + desc: Delete and recreate bench-events topic for a clean slate + cmds: + - docker exec kc-kafka kafka-topics --bootstrap-server localhost:29092 --delete --topic bench-events 2>/dev/null || true + - | + until ! docker exec kc-kafka kafka-topics --bootstrap-server localhost:29092 \ + --list 2>/dev/null | grep -q '^bench-events$'; do + sleep 1 + done + - | + docker exec kc-kafka kafka-topics --bootstrap-server localhost:29092 \ + --create --topic bench-events \ + --partitions 16 \ + --replication-factor 1 + + data:reset: + desc: Drop and recreate the benchmark.events Iceberg table (clears all data) + cmds: + - task connector:delete + - task control-topic:reset + - curl -sf -X DELETE {{.ICEBERG_REST_URL}}/v1/namespaces/benchmark/tables/events || true + - | + curl -sf -X POST {{.ICEBERG_REST_URL}}/v1/namespaces/benchmark/tables \ + -H "Content-Type: application/json" \ + -d '{ + "name": "events", + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": false, "type": "long"}, + {"id": 2, "name": "category", "required": false, "type": "string"}, + {"id": 3, "name": "value", "required": false, "type": "double"}, + {"id": 4, "name": "ts", "required": false, "type": "long"} + ] + } + }' | jq .name + + data:stats: + desc: Show Iceberg table snapshot stats (row count, file size) + cmds: + - | + curl -s {{.ICEBERG_REST_URL}}/v1/namespaces/benchmark/tables/events \ + | jq ' + if .metadata.snapshots == null or (.metadata.snapshots | length) == 0 + then "no snapshots yet" + else { + snapshots: (.metadata.snapshots | length), + metadata_logs: (.metadata["metadata-log"] | length), + total_records: .metadata.snapshots[-1].summary["total-records"], + total_size: .metadata.snapshots[-1].summary["total-files-size"], + added_records: .metadata.snapshots[-1].summary["added-records"], + added_size: .metadata.snapshots[-1].summary["added-files-size"] + } + end' + + logs:connect: + desc: Follow Kafka Connect worker logs + cmds: + - docker compose logs -f kafka-connect + + logs:setup: + desc: Show setup container output + cmds: + - docker compose logs setup diff --git a/internal/impl/iceberg/bench/kafka-connector/connector.json b/internal/impl/iceberg/bench/kafka-connector/connector.json new file mode 100644 index 0000000000..9f181f5ccd --- /dev/null +++ b/internal/impl/iceberg/bench/kafka-connector/connector.json @@ -0,0 +1,34 @@ +{ + "name": "iceberg-sink-bench", + "config": { + "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", + "tasks.max": "16", + "topics": "bench-events", + + "iceberg.catalog.type": "rest", + "iceberg.catalog.uri": "http://iceberg-rest:8181", + "iceberg.catalog.warehouse": "s3://warehouse/", + "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", + "iceberg.catalog.s3.endpoint": "http://minio:9000", + "iceberg.catalog.s3.path-style-access": "true", + "iceberg.catalog.s3.access-key-id": "admin", + "iceberg.catalog.s3.secret-access-key": "password", + "iceberg.catalog.client.region": "us-east-1", + + "iceberg.tables": "benchmark.events", + "iceberg.tables.auto-create-enabled": "true", + + "iceberg.control.commit.interval-ms": "10000", + "iceberg.control.commit.timeout-ms": "30000", + + "consumer.override.max.partition.fetch.bytes": "1048576", + "consumer.override.session.timeout.ms": "300000", + "consumer.override.max.poll.interval.ms": "300000", + + "iceberg.kafka.session.timeout.ms": "300000", + "iceberg.kafka.max.poll.interval.ms": "300000", + + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false" + } +} diff --git a/internal/impl/iceberg/bench/kafka-connector/docker-compose.yaml b/internal/impl/iceberg/bench/kafka-connector/docker-compose.yaml new file mode 100644 index 0000000000..66390cf68b --- /dev/null +++ b/internal/impl/iceberg/bench/kafka-connector/docker-compose.yaml @@ -0,0 +1,169 @@ +services: + # ── Kafka (KRaft, no ZooKeeper) ────────────────────────────────────────────── + kafka: + image: confluentinc/cp-kafka:7.7.8 + container_name: kc-kafka + cpus: 2 + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" + healthcheck: + test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"] + interval: 10s + timeout: 10s + retries: 12 + + # ── MinIO (S3-compatible object storage) ───────────────────────────────────── + minio: + image: minio/minio:latest + container_name: kc-minio + ports: + - "19000:9000" + - "19001:9001" # Console UI + environment: + MINIO_ROOT_USER: admin + MINIO_ROOT_PASSWORD: password + MINIO_REGION: us-east-1 + command: server /data --console-address ":9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 5s + timeout: 5s + retries: 12 + + # ── Create the 'warehouse' S3 bucket ───────────────────────────────────────── + minio-setup: + image: minio/mc:latest + container_name: kc-minio-setup + depends_on: + minio: + condition: service_healthy + entrypoint: /bin/sh + command: + - -c + - | + mc alias set local http://minio:9000 admin password + mc mb local/warehouse --ignore-existing + echo "bucket ready" + + # ── Iceberg REST catalog ────────────────────────────────────────────────────── + iceberg-rest: + image: apache/iceberg-rest-fixture + container_name: kc-iceberg-rest + ports: + - "18181:8181" + environment: + CATALOG_WAREHOUSE: s3://warehouse/ + CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO + CATALOG_S3_ENDPOINT: http://minio:9000 + CATALOG_S3_PATH__STYLE__ACCESS: "true" + CATALOG_S3_ACCESS__KEY__ID: admin + CATALOG_S3_SECRET__ACCESS__KEY: password + AWS_REGION: us-east-1 + depends_on: + minio-setup: + condition: service_completed_successfully + healthcheck: + test: ["CMD-SHELL", "bash -c '/dev/null"] + interval: 5s + timeout: 5s + retries: 12 + + # ── Kafka Connect + Iceberg sink plugin (built from Dockerfile) ─────────────── + kafka-connect: + build: . + container_name: kc-connect + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka:29092 + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: iceberg-bench-group + CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: _connect-status + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect + CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/java + # S3 credentials picked up by the Iceberg Java FileIO + AWS_ACCESS_KEY_ID: admin + AWS_SECRET_ACCESS_KEY: password + AWS_REGION: us-east-1 + depends_on: + kafka: + condition: service_healthy + iceberg-rest: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8083/"] + interval: 10s + timeout: 10s + retries: 15 + + # ── Create bench-events topic and the Iceberg 'benchmark' namespace ─────────── + setup: + image: confluentinc/cp-kafka:7.7.8 + container_name: kc-setup + depends_on: + kafka: + condition: service_healthy + iceberg-rest: + condition: service_healthy + entrypoint: /bin/bash + command: + - -c + - | + kafka-topics --bootstrap-server kafka:29092 \ + --create --if-not-exists \ + --topic bench-events \ + --partitions 16 \ + --replication-factor 1 + kafka-topics --bootstrap-server kafka:29092 \ + --create --if-not-exists \ + --topic control-iceberg \ + --partitions 1 \ + --replication-factor 1 \ + --config cleanup.policy=compact + echo "topics ready" + # Create the Iceberg namespace used by the connector + curl -sf -X POST http://iceberg-rest:8181/v1/namespaces \ + -H "Content-Type: application/json" \ + -d '{"namespace": ["benchmark"]}' || true + echo "namespace ready" + # Pre-create the benchmark.events table so the connector never hits a + # NoSuchTableException 404 (which the rest-fixture logs noisily as ERROR). + curl -sf -X POST http://iceberg-rest:8181/v1/namespaces/benchmark/tables \ + -H "Content-Type: application/json" \ + -d '{ + "name": "events", + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": false, "type": "long"}, + {"id": 2, "name": "category", "required": false, "type": "string"}, + {"id": 3, "name": "value", "required": false, "type": "double"}, + {"id": 4, "name": "ts", "required": false, "type": "long"} + ] + } + }' || true + echo "table ready" diff --git a/internal/impl/iceberg/bench/kafka-connector/producer.yaml b/internal/impl/iceberg/bench/kafka-connector/producer.yaml new file mode 100644 index 0000000000..34fd9fdcce --- /dev/null +++ b/internal/impl/iceberg/bench/kafka-connector/producer.yaml @@ -0,0 +1,26 @@ +# Produces COUNT synthetic events to the bench-events Kafka topic. +# Usage: rpk connect run producer.yaml +# Override count/brokers: rpk connect run producer.yaml \ +# --set input.generate.count=5000000 \ +# --set output.kafka_franz.seed_brokers[0]=localhost:9092 + +input: + generate: + count: ${COUNT:1000000} + interval: "" + mapping: | + root.id = count("events") + root.category = "cat_" + random_int(min: 1, max: 10).string() + root.value = random_int(min: 1, max: 1000000).float64() / 100.0 + root.ts = now().ts_unix_micro() + +output: + kafka_franz: + seed_brokers: + - ${KAFKA_BOOTSTRAP:localhost:9092} + topic: bench-events + compression: snappy + batching: + count: 5000 + byte_size: 4194304 # 4 MiB + period: 5s diff --git a/internal/impl/iceberg/bench/kafka_benchmark_config.yaml b/internal/impl/iceberg/bench/kafka_benchmark_config.yaml new file mode 100644 index 0000000000..2dbadf7656 --- /dev/null +++ b/internal/impl/iceberg/bench/kafka_benchmark_config.yaml @@ -0,0 +1,54 @@ +http: + debug_endpoints: true + +input: + kafka: + addresses: ["${KAFKA_BOOTSTRAP:localhost:9092}"] + topics: ["bench-events"] + consumer_group: "rpcn-iceberg-bench" + checkpoint_limit: 10000 + +pipeline: + processors: + - mapping: | + root.event_id = this.id.string() + "-" + this.category + root.category = this.category + root.value = this.value + root.value_usd = this.value / 100.0 + root.value_tier = if this.value < 1000.0 { "low" } else if this.value < 5000.0 { "mid" } else { "high" } + root.ts = this.ts + root.ts_ms = (this.ts / 1000).floor() + root.is_high_value = this.value > 8000.0 + - benchmark: + interval: 1s + count_bytes: true + +output: + iceberg: + catalog: + url: "${CATALOG_URL:http://localhost:18181}" + namespace: benchmark + table: events + storage: + aws_s3: + bucket: "${MINIO_BUCKET:warehouse}" + region: "${MINIO_REGION:us-east-1}" + endpoint: "${MINIO_ENDPOINT:http://localhost:19000}" + force_path_style_urls: true + credentials: + id: "${MINIO_USER:admin}" + secret: "${MINIO_PASSWORD:password}" + schema_evolution: + enabled: true + batching: + count: 10000 + period: 5s + max_in_flight: 32 + +logger: + level: INFO + +metrics: + prometheus: + add_process_metrics: true + add_go_metrics: true