Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions docs/benchmark-results/iceberg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Iceberg Benchmark Results

Benchmarks for the `iceberg` output using a local REST catalog backed by MinIO (S3-compatible).

See [`internal/impl/iceberg/bench/`](../../internal/impl/iceberg/bench/) for the benchmark configs and run instructions.

## Write Throughput — CPU & Batch Size Scaling

Synthetic events generated at maximum speed (`generate` input with `count: 0, interval: ""`), written to a single Iceberg table. Varying `GOMAXPROCS` and `batching.count`.

**Environment:** Intel Core i7-10850H @ 2.70GHz, 32 GB RAM, WSL2 (Linux 6.6.87.2), x86_64, MinIO + REST catalog running in Docker (localhost)

**Dataset:** Synthetic events, ~142 B per message (id, user_id, event_type, value, info, ts) — measured at the pipeline processor. Actual bytes written to MinIO will differ due to Parquet columnar compression.

**Count:** 1,000,000 messages per run

### msg/sec

| GOMAXPROCS | batch=1000 | batch=5000 | batch=10000 |
|-------------|------------|------------|-------------|
| 1 | 757 | 3,105 | 5,442 |
| 2 | 1,186 | 4,408 | 6,763 |
| 4 | 1,147 | 4,758 | 8,483 |
| 8 | 1,056 | 4,107 | 8,231 |
| (unbounded) | | | |

### kB/sec (batch=1000, batch=5000) / MB/sec (batch=10000)

| GOMAXPROCS | batch=1000 | batch=5000 | batch=10000 |
|-------------|------------|------------|-------------|
| 1 | 106 | 435 | 774 |
| 2 | 166 | 618 | 961 |
| 4 | 161 | 667 | 1206 |
| 8 | 148 | 576 | 1170 |
| (unbounded) | | | |

**Observations:**

- **Batch size is the dominant factor:** throughput at 1 core scales from 757 (batch=1000) → 3,105 (batch=5000) → 5,442 (batch=10000) msg/sec. Each batch = one catalog commit round-trip, so fewer commits = dramatically higher throughput.
- **batch=5000 and batch=10000 benefit from more cores up to 4**, then regress at 8 — the catalog commit overhead is reduced enough that CPU parallelism helps, but 8 cores reintroduces contention.

---

## Write Throughput — Batch Size & max_in_flight Scaling

Fixed at `GOMAXPROCS=4`, varying `batching.count` and `max_in_flight` to measure the impact of concurrent catalog commits.

**Environment:** Intel Core i7-10850H @ 2.70GHz, 32 GB RAM, WSL2 (Linux 6.6.87.2), x86_64, MinIO + REST catalog running in Docker (localhost)

**Dataset:** Synthetic events, ~142 B per message

**Count:** 1,000,000 messages per run

### msg/sec

| max_in_flight | batch=5000 | batch=10000 |
|---------------|------------|-------------|
| 4 | 4,758 | 8,483 |
| 8 | 7,105 | 13,839 |
| 16 | 12,973 | 23,316 |
| 32 | 20,462 | 34,835 |
| 64 | 34,993 | 33,703 |
| 128 | 33,911 | 33,742 |

### MB/sec

| max_in_flight | batch=5000 | batch=10000 |
|---------------|------------|-------------|
| 4 | 0.67 | 1.21 |
| 8 | 1.00 | 2.00 |
| 16 | 1.80 | 3.30 |
| 32 | 2.90 | 5.00 |
| 64 | 5.00 | 4.80 |
| 128 | 4.80 | 4.80 |

**Observations:**

- **`max_in_flight` is the most impactful knob:** at batch=10000, throughput scales from 8,483 (MIF=4) → 13,839 (MIF=8) → 23,316 (MIF=16) → 34,835 (MIF=32) msg/sec — a 4x gain by increasing concurrent commits.
- **The ceiling is ~34K msg/sec / 5 MB/sec**, hit at MIF=32 for batch=10000 and MIF=64 for batch=5000. This is the MinIO throughput limit, not the connector.
- **batch=5000 and batch=10000 converge at high MIF values** — both plateau at ~34K msg/sec when given enough concurrent commits. batch=10000 reaches the ceiling with fewer in-flight requests (MIF=32 vs MIF=64).
- **Sweet spot: batch=10000, MIF=32** — reaches maximum throughput with the least concurrency overhead.
- The fundamental insight from both sections: the Iceberg write bottleneck is catalog commit latency. The connector itself is not the bottleneck — throw more concurrent commits at it (`max_in_flight`) and it scales linearly until MinIO saturates.

---

## Comparison: Kafka Connect vs Redpanda Connect Iceberg

**Environment:** Intel Core i7-10850H @ 2.70GHz, 32 GB RAM, WSL2 (Linux 6.6.87.2), x86_64
**Dataset:** 10,000,000 synthetic events, MinIO + Iceberg REST catalog in Docker

Both connectors use a 10s commit window and 16 Kafka partitions. The transformation computes 5 derived fields per message (`event_id`, `value_usd`, `value_tier`, `ts_ms`, `is_high_value`).

### Results

#### Sink only

| Connector | Throughput |
|--------------------------|---------------|
| Kafka Connect (Tabular) | 84,745 msg/s |
| Redpanda Connect | 61,349 msg/s |

#### Transform + sink

| Connector | Kafka CPUs | Throughput |
|--------------------------|------------|---------------|
| Kafka Connect (Tabular) | unbounded | 37,037 msg/s |
| Redpanda Connect | unbounded | 47,272 msg/s |
| Redpanda Connect | 1 | 45,248 msg/s |
| Redpanda Connect | 2 | 48,829 msg/s |

### Notes

- **Kafka Connect sink-only** is fastest in isolation — 16 tasks consuming pre-transformed data directly into Iceberg.
- **Kafka Connect with transformation** requires a separate RPCN pre-processing step that writes to an intermediate Kafka topic (`bench-events-transformed`), then Kafka Connect reads from that topic and sinks to Iceberg. The two-stage I/O cuts throughput by more than half.
- **Redpanda Connect** handles transformation and Iceberg writes in a single pipeline — no intermediate topic, no extra Kafka round-trip.
- **End-to-end (the realistic scenario):** Redpanda Connect is ~1.3x faster than Kafka Connect (47k vs 37k msg/s).
64 changes: 64 additions & 0 deletions internal/impl/iceberg/bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Iceberg Benchmark

Measures write throughput of the Redpanda Connect Iceberg output. Two benchmarks are available:

- **This folder** — Redpanda Connect only (generate → Iceberg), no Kafka
- **[`kafka-connector/`](kafka-connector/)** — End-to-end comparison: Kafka → transform → Iceberg, benchmarked against Kafka Connect (Tabular Iceberg Sink)

See [`docs/benchmark-results/iceberg.md`](../../../../docs/benchmark-results/iceberg.md) for results.

## Prerequisites

- Docker with Compose

## Infrastructure

```bash
task infra:up # start MinIO + Iceberg REST catalog
task infra:down # stop and remove all containers
task infra:reset # wipe data and restart
task infra:logs # follow container logs
```

MinIO console: http://localhost:9001 (admin/password)
Iceberg REST catalog: http://localhost:8181

## Running

### Generate → Iceberg (no Kafka)

```bash
task bench CORES=4 BATCH=5000 COUNT=1000000
```

| Parameter | Default | Description |
|-----------|---------|-------------|
| `CORES` | unbounded | `GOMAXPROCS` |
| `BATCH` | 1000 | `batching.count` |
| `COUNT` | 0 (unlimited) | number of messages |

### Varying max_in_flight

```bash
task bench:mif CORES=4 BATCH=10000 MIF=32 COUNT=1000000
```

| Parameter | Default | Description |
|-----------|---------|-------------|
| `MIF` | 4 | `max_in_flight` |

### Clean run

```bash
task bench:reset CORES=4 BATCH=5000 COUNT=1000000
```

Wipes all Iceberg data, restarts infrastructure, then runs the benchmark.

## Output

```
INFO rolling stats: 5000 msg/sec, 3.2 MB/sec @service=redpanda-connect ...
```

Throughput is measured at the pipeline processor, before the Iceberg writer.
151 changes: 151 additions & 0 deletions internal/impl/iceberg/bench/Taskfile.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
version: "3"

vars:
MINIO_USER: '{{.MINIO_USER | default "admin"}}'
MINIO_PASSWORD: '{{.MINIO_PASSWORD | default "password"}}'
MINIO_REGION: '{{.MINIO_REGION | default "us-east-1"}}'
CATALOG_URL: '{{.CATALOG_URL | default "http://localhost:8181"}}'
MINIO_ENDPOINT: '{{.MINIO_ENDPOINT | default "http://localhost:9000"}}'
MINIO_BUCKET: '{{.MINIO_BUCKET | default "warehouse"}}'

tasks:
infra:up:
desc: Start MinIO and Iceberg REST catalog
cmds:
- docker compose -f docker-compose.yaml up -d
- until curl -sf http://localhost:9000/minio/health/live > /dev/null; do sleep 1; done
- until curl -sf http://localhost:8181/v1/config > /dev/null; do sleep 1; done

infra:down:
desc: Stop and remove all containers and volumes
cmds:
- docker compose -f docker-compose.yaml down -v

infra:logs:
desc: Show container logs
cmds:
- docker compose -f docker-compose.yaml logs -f

infra:reset:
desc: Wipe all data and restart infrastructure
cmds:
- docker compose -f docker-compose.yaml down -v
- docker compose -f docker-compose.yaml up -d
- until curl -sf http://localhost:9000/minio/health/live > /dev/null; do sleep 1; done
- until curl -sf http://localhost:8181/v1/config > /dev/null; do sleep 1; done

# Parameterised benchmark — specify CORES, BATCH and COUNT freely
# Usage: task bench CORES=4 BATCH=5000 COUNT=1000000
bench:
desc: "Run write benchmark (e.g. task bench CORES=4 BATCH=5000 COUNT=1000000)"
vars:
CORES: '{{.CORES | default ""}}'
BATCH: '{{.BATCH | default "1000"}}'
COUNT: '{{.COUNT | default "0"}}'
cmds:
- |
AWS_EC2_METADATA_DISABLED=true \
AWS_ACCESS_KEY_ID={{.MINIO_USER}} \
AWS_SECRET_ACCESS_KEY={{.MINIO_PASSWORD}} \
AWS_REGION={{.MINIO_REGION}} \
GOMAXPROCS={{.CORES}} go run ../../../../cmd/redpanda-connect/main.go run \
--set input.generate.count={{.COUNT}} \
--set output.iceberg.batching.count={{.BATCH}} \
--set output.iceberg.batching.period=5s \
--set output.iceberg.catalog.url={{.CATALOG_URL}} \
--set output.iceberg.storage.aws_s3.endpoint={{.MINIO_ENDPOINT}} \
--set output.iceberg.storage.aws_s3.bucket={{.MINIO_BUCKET}} \
--set output.iceberg.storage.aws_s3.credentials.id={{.MINIO_USER}} \
--set output.iceberg.storage.aws_s3.credentials.secret={{.MINIO_PASSWORD}} \
./benchmark_config.yaml

# Usage: task bench:mif BATCH=10000 MIF=8 CORES=4 COUNT=1000000
bench:mif:
desc: "Run benchmark with varying BATCH, max_in_flight, CORES and COUNT (e.g. task bench:mif BATCH=10000 MIF=8 CORES=4 COUNT=1000000)"
vars:
BATCH: '{{.BATCH | default "1000"}}'
MIF: '{{.MIF | default "4"}}'
CORES: '{{.CORES | default "4"}}'
COUNT: '{{.COUNT | default "1000000"}}'
cmds:
- |
AWS_EC2_METADATA_DISABLED=true \
AWS_ACCESS_KEY_ID={{.MINIO_USER}} \
AWS_SECRET_ACCESS_KEY={{.MINIO_PASSWORD}} \
AWS_REGION={{.MINIO_REGION}} \
GOMAXPROCS={{.CORES}} go run ../../../../cmd/redpanda-connect/main.go run \
--set input.generate.count={{.COUNT}} \
--set output.iceberg.batching.count={{.BATCH}} \
--set output.iceberg.batching.period=5s \
--set output.iceberg.max_in_flight={{.MIF}} \
--set output.iceberg.catalog.url={{.CATALOG_URL}} \
--set output.iceberg.storage.aws_s3.endpoint={{.MINIO_ENDPOINT}} \
--set output.iceberg.storage.aws_s3.bucket={{.MINIO_BUCKET}} \
--set output.iceberg.storage.aws_s3.credentials.id={{.MINIO_USER}} \
--set output.iceberg.storage.aws_s3.credentials.secret={{.MINIO_PASSWORD}} \
./benchmark_config.yaml

bench:lag:
desc: Show current consumer lag for the Redpanda Connect Iceberg sink group
cmds:
- docker exec kc-kafka kafka-consumer-groups --bootstrap-server localhost:29092 --describe --group rpcn-iceberg-bench

# End-to-end benchmark using kafka-connector infrastructure (Kafka + MinIO + Iceberg REST).
# Prerequisites: run `task up` from the kafka-connector/ folder first.
#
# Usage:
# task bench:run COUNT=10000000
# task bench:run COUNT=10000000 BATCH=10000 MIF=32 CORES=4 INTERVAL=10
bench:load:
desc: "Load COUNT events to Kafka (e.g. task bench:load COUNT=10000000)"
vars:
COUNT: '{{.COUNT | default "1000000"}}'
cmds:
- echo "==> Loading {{.COUNT}} events to Kafka..."
- (cd kafka-connector && task data:load COUNT={{.COUNT}})

bench:run:
desc: "Drain bench-events via Redpanda Connect Iceberg sink (e.g. task bench:run CORES=4 BATCH=10000 MIF=32)"
vars:
BATCH: '{{.BATCH | default "0"}}'
MIF: '{{.MIF | default "32"}}'
CORES: '{{.CORES | default "4"}}'
INTERVAL: '{{.INTERVAL | default "1"}}'
CHECKPOINT: '{{.CHECKPOINT | default "1000000"}}'
PERIOD: '{{.PERIOD | default "10s"}}'
KC_CATALOG_URL: '{{.KC_CATALOG_URL | default "http://localhost:18181"}}'
KC_MINIO_ENDPOINT: '{{.KC_MINIO_ENDPOINT | default "http://localhost:19000"}}'
cmds:
- |
docker exec kc-kafka kafka-consumer-groups \
--bootstrap-server localhost:29092 \
--group rpcn-iceberg-bench \
--topic bench-events \
--reset-offsets --to-earliest --execute 2>/dev/null || true
- |
AWS_EC2_METADATA_DISABLED=true \
AWS_ACCESS_KEY_ID={{.MINIO_USER}} \
AWS_SECRET_ACCESS_KEY={{.MINIO_PASSWORD}} \
AWS_REGION={{.MINIO_REGION}} \
GOMAXPROCS={{.CORES}} go run ../../../../cmd/redpanda-connect/main.go run \
--set input.kafka.checkpoint_limit={{.CHECKPOINT}} \
--set output.iceberg.batching.count={{.BATCH}} \
--set output.iceberg.batching.period={{.PERIOD}} \
--set output.iceberg.max_in_flight={{.MIF}} \
--set output.iceberg.catalog.url={{.KC_CATALOG_URL}} \
--set output.iceberg.storage.aws_s3.endpoint={{.KC_MINIO_ENDPOINT}} \
--set output.iceberg.storage.aws_s3.bucket={{.MINIO_BUCKET}} \
--set output.iceberg.storage.aws_s3.credentials.id={{.MINIO_USER}} \
--set output.iceberg.storage.aws_s3.credentials.secret={{.MINIO_PASSWORD}} \
./kafka_benchmark_config.yaml


bench:reset:
desc: Wipe all data, restart infrastructure, then run benchmark
vars:
CORES: '{{.CORES | default ""}}'
BATCH: '{{.BATCH | default "1000"}}'
COUNT: '{{.COUNT | default "0"}}'
cmds:
- task infra:reset
- task bench CORES={{.CORES}} BATCH={{.BATCH}} COUNT={{.COUNT}}
49 changes: 49 additions & 0 deletions internal/impl/iceberg/bench/benchmark_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
http:
debug_endpoints: true

input:
generate:
count: 0
interval: ""
mapping: |
root.id = counter()
root.user_id = (counter() % 10000) + 1
root.event_type = ["click", "view", "purchase", "scroll", "hover"].index(counter() % 5)
root.value = (counter() % 1000) + random_int(max: 100)
root.info = "event info for record " + counter().string()
root.ts = now()

pipeline:
processors:
- benchmark:
interval: 1s
count_bytes: true

output:
iceberg:
catalog:
url: "${CATALOG_URL:http://localhost:8181}"
namespace: bench
table: events
storage:
aws_s3:
bucket: "${MINIO_BUCKET:warehouse}"
region: "${MINIO_REGION:us-east-1}"
endpoint: "${MINIO_ENDPOINT:http://localhost:9000}"
force_path_style_urls: true
credentials:
id: "${MINIO_USER:admin}"
secret: "${MINIO_PASSWORD:password}"
schema_evolution:
enabled: true
batching:
count: 1000
period: 5s

logger:
level: INFO

metrics:
prometheus:
add_process_metrics: true
add_go_metrics: true
Loading
Loading