Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/kafka/load-tests/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
scripts/*.sh text eol=lf
114 changes: 114 additions & 0 deletions packages/kafka/load-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Kafka Load Tests

Load tests for `@message-queue-toolkit/kafka` measuring throughput, latency, and backlog under configurable load.

Two producer modes and two consumer modes can be combined:

- **CDC**: CockroachDB inserts → CDC changefeed → Kafka (realistic end-to-end)
- **Direct**: `AbstractKafkaPublisher` → Kafka (isolates Kafka consumer performance)
- **Single**: One message at a time (`batchProcessingEnabled: false`)
- **Batch**: Batched consumption via `KafkaMessageBatchStream` (`batchProcessingEnabled: true`)

## Prerequisites

- Docker & Docker Compose
- Node.js >= 22.14.0

## Quick Start

```bash
# Install dependencies
npm install

# Start infrastructure (Kafka + CockroachDB + CDC changefeed)
npm run docker:start

# Stop and clean up
npm run docker:stop
```

## Test Scripts

### CDC (CockroachDB → CDC → Kafka)

```bash
# Single-message consumer
npm run load:cdc:light # 100 rows/sec, 30s
npm run load:cdc:medium # 1000 rows/sec, 60s
npm run load:cdc:heavy # 5000 rows/sec, 120s
npm run load:cdc -- --rate 500 --duration 45 --batch 50

# Batch consumer
npm run load:cdc:batch:light # 100 rows/sec, 30s
npm run load:cdc:batch:medium # 1000 rows/sec, 60s
npm run load:cdc:batch:heavy # 5000 rows/sec, 120s
npm run load:cdc:batch -- --rate 500 --consumer-batch 100 --consumer-timeout 500
```

### Direct (Kafka publisher → Kafka)

```bash
# Single-message consumer
npm run load:direct:light # 100 msgs/sec, 30s
npm run load:direct:medium # 1000 msgs/sec, 60s
npm run load:direct:heavy # 5000 msgs/sec, 120s
npm run load:direct -- --rate 500 --duration 45 --batch 50

# Batch consumer
npm run load:direct:batch:light # 100 msgs/sec, 30s
npm run load:direct:batch:medium # 1000 msgs/sec, 60s
npm run load:direct:batch:heavy # 5000 msgs/sec, 120s
npm run load:direct:batch -- --rate 500 --consumer-batch 100 --consumer-timeout 500
```

## CLI Options

### Common

| Flag | Short | Default | Description |
|------|-------|---------|-------------|
| `--rate` | `-r` | 1000 | Target produce rate (rows or msgs per sec) |
| `--duration` | `-d` | 60 | Test duration (seconds) |
| `--batch` | `-b` | 100 | Producer batch size |

### Batch consumer only

| Flag | Default | Description |
|------|---------|-------------|
| `--consumer-batch` | 50 | Messages per consumer batch |
| `--consumer-timeout` | 200 | Batch flush timeout (ms) |

## Architecture

### CDC mode

```
Load Generator → CockroachDB (inserts) → CDC Changefeed → Kafka → Consumer → Metrics
```

1. **CockroachDB** tables (`events`, `orders`) with CDC changefeed targeting Kafka
2. **Load generator** inserts rows into CRDB at configurable rate with fire-and-forget concurrency
3. **CDC changefeed** publishes row changes to Kafka topics
4. **Consumer** (single or batch) processes messages and records metrics

### Direct mode

```
Load Generator → AbstractKafkaPublisher → Kafka → Consumer → Metrics
```

1. **Publisher** sends messages directly to Kafka topics (`direct-events`, `direct-orders`)
2. **Consumer** (single or batch) processes messages and records metrics

## Services

| Service | Port | Description |
|---------|------|-------------|
| Kafka | 9092 | Message broker (KRaft, 6 partitions) |
| CockroachDB | 26257 | SQL database |
| CockroachDB UI | 8181 | DB admin console |
| Kafka UI | 8080 | Topic browser |

## Latency Measurement

Each `events` row embeds `{"loadtest_ts": <epoch_ms>}` in its payload. The consumer extracts this timestamp and computes end-to-end latency (insert/publish → consume). Reported as avg, p50, p95, p99.
67 changes: 67 additions & 0 deletions packages/kafka/load-tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
services:

kafka:
image: apache/kafka:3.7.1
ports:
- 9092:9092
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: LOCAL://0.0.0.0:9092,DOCKER://kafka:9093,CONTROLLER://localhost:9094
KAFKA_ADVERTISED_LISTENERS: LOCAL://localhost:9092,DOCKER://kafka:9093
KAFKA_INTER_BROKER_LISTENER_NAME: LOCAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,LOCAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 6
healthcheck:
test: /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
interval: 5s
timeout: 10s
retries: 10
start_period: 15s
restart: on-failure

cockroachdb:
image: cockroachdb/cockroach:latest-v25.1
command: start-single-node --insecure
ports:
- 26257:26257
- 8181:8080
depends_on:
kafka:
condition: service_healthy
healthcheck:
test: curl -f http://localhost:8080/health?ready=1 || exit 1
interval: 5s
timeout: 5s
retries: 10
start_period: 10s
restart: on-failure

crdb-init:
image: cockroachdb/cockroach:latest-v25.1
volumes:
- ./scripts/init-crdb.sh:/init-crdb.sh
entrypoint: ["/bin/bash", "/init-crdb.sh"]
depends_on:
cockroachdb:
condition: service_healthy
restart: "no"

kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
kafka:
condition: service_healthy
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_NAME: LoadTest
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
restart: on-failure
Loading
Loading