From 006c6c94d8a5be2f9ef57d581f1fe81b349427f5 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Fri, 27 Mar 2026 18:11:34 +0000 Subject: [PATCH 1/4] rpk: add explicit mode subcommand to benchmark Make the current (and only) produce mode be explicit by requiring a "produce" subcommand. This is in preparation for adding "consume". --- src/go/rpk/pkg/cli/benchmark/benchmark.go | 17 ++- src/go/rpk/pkg/cli/benchmark/produce.go | 156 ++++++++++++++++++++++ 2 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 src/go/rpk/pkg/cli/benchmark/produce.go diff --git a/src/go/rpk/pkg/cli/benchmark/benchmark.go b/src/go/rpk/pkg/cli/benchmark/benchmark.go index 0f358a284416b..7b0597e5ffd44 100644 --- a/src/go/rpk/pkg/cli/benchmark/benchmark.go +++ b/src/go/rpk/pkg/cli/benchmark/benchmark.go @@ -274,6 +274,19 @@ func runProducerLoop( } func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { + cmd := &cobra.Command{ + Use: "benchmark", + Short: "Run a Kafka benchmark", + Long: "Load testing tool which stresses the broker by sending small batches with high request rate", + Hidden: true, + } + + cmd.AddCommand(newProduceCommand(fs, p)) + + return cmd +} + +func newProduceCommand(fs afero.Fs, p *config.Params) *cobra.Command { var ( topic string partitions int32 @@ -287,8 +300,8 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { ) cmd := &cobra.Command{ - Use: "benchmark", - Short: "Run a Kafka benchmark", + Use: "produce", + Short: "Run a Kafka produce benchmark", Long: "Load testing tool which stresses the broker by sending small batches with high request rate", Args: cobra.NoArgs, Hidden: true, diff --git a/src/go/rpk/pkg/cli/benchmark/produce.go b/src/go/rpk/pkg/cli/benchmark/produce.go new file mode 100644 index 0000000000000..f4f14b50fd709 --- /dev/null +++ b/src/go/rpk/pkg/cli/benchmark/produce.go @@ -0,0 +1,156 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package benchmark + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka" + "github.com/spf13/afero" + "github.com/spf13/cobra" + "github.com/twmb/franz-go/pkg/kgo" +) + +type produceConfig struct { + benchmarkConfig + recordSize int +} + +func newProduceCommand(fs afero.Fs, p *config.Params) *cobra.Command { + var cfg produceConfig + + cmd := &cobra.Command{ + Use: "produce", + Short: "Run a Kafka produce benchmark", + Long: "Load testing tool which stresses the broker by sending small batches with high request rate", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return runProduceBenchmark(fs, p, cmd, cfg) + }, + } + + cfg.addFlags(cmd) + cmd.Flags().IntVar(&cfg.recordSize, "record-size", 100, "Record payload size in bytes") + + return cmd +} + +func runProduceBenchmark(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg produceConfig) error { + if err := cfg.validate(); err != nil { + return err + } + if cfg.recordSize <= 0 { + return fmt.Errorf("invalid --record-size %d, must be > 0", cfg.recordSize) + } + + run, err := newBenchmarkRun(fs, p, cmd, cfg.benchmarkConfig) + if err != nil { + return err + } + defer run.Close() + + payload := createPayload(cfg.recordSize) + stats := &stats{} + + producerOpts := []kgo.Opt{ + kgo.DefaultProduceTopic(cfg.topic), + kgo.RequiredAcks(kgo.AllISRAcks()), + kgo.RecordPartitioner(kgo.RoundRobinPartitioner()), + kgo.ProducerLinger(0), + kgo.ProducerBatchCompression(kgo.NoCompression()), + } + + producerClients := make([]*kgo.Client, 0, cfg.clients) + for i := 0; i < cfg.clients; i++ { + cl, err := kafka.NewFranzClient(fs, run.profile, producerOpts...) + if err != nil { + for _, started := range producerClients { + started.Close() + } + return fmt.Errorf("unable to initialize producer client %d: %w", i, err) + } + producerClients = append(producerClients, cl) + } + defer func() { + for _, cl := range producerClients { + cl.Close() + } + }() + + if cfg.useExistingTopic { + fmt.Printf( + "mode=produce topic=%s clients=%d record_size=%d use_existing_topic=true\n", + cfg.topic, + cfg.clients, + cfg.recordSize, + ) + } else { + fmt.Printf( + "mode=produce topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n", + cfg.topic, + cfg.clients, + cfg.partitions, + cfg.recordSize, + cfg.replicas, + ) + } + if run.timing.warmup > 0 { + fmt.Printf("warming up for %ds...\n", cfg.warmupS) + } + + var wg sync.WaitGroup + for _, cl := range producerClients { + wg.Add(1) + go func(cl *kgo.Client) { + defer wg.Done() + runProducerLoop(run.timing.runCtx, cl, cfg.topic, payload, run.timing.measureStart, stats) + }(cl) + } + + return runBenchmarkReporter(run.ctx, run.timing, stats, cfg.metricsJSON, wg.Wait) +} + +func runProducerLoop( + ctx context.Context, + cl *kgo.Client, + topic string, + payload []byte, + measureStart time.Time, + stats *stats, +) { + for { + if ctx.Err() != nil { + return + } + + rec := &kgo.Record{Topic: topic, Value: payload} + + // We use sync produce. Like this we can guarantee single record per batch per request. + // To increase inflight it's easy to just bump clients/connections (this is cheap in franz-go) + err := cl.ProduceSync(ctx, rec).FirstErr() + if time.Now().Before(measureStart) { + continue + } + if err != nil { + if ctx.Err() == nil { + stats.requests.Add(1) + stats.errors.Add(1) + } + continue + } + + stats.requests.Add(1) + stats.bytes.Add(uint64(len(payload))) + } +} From ba4e8c8545bcb2595b758f326102fab30c91185a Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Fri, 27 Mar 2026 18:30:08 +0000 Subject: [PATCH 2/4] rpk: refactor produce benchmark mode wiring Pure non-functional refactor of moving everything purely produce related to its own file and into subfunctions. Again in preparation for adding other subcommands like consume. --- src/go/rpk/pkg/cli/benchmark/BUILD | 5 +- src/go/rpk/pkg/cli/benchmark/benchmark.go | 439 +++++++++--------- src/go/rpk/pkg/cli/benchmark/produce.go | 33 +- tests/rptest/perf/rpk_benchmark_test.py | 5 +- .../rptest/services/rpk_benchmark_service.py | 6 +- .../tests/rpk_benchmark_service_test.py | 8 +- 6 files changed, 245 insertions(+), 251 deletions(-) diff --git a/src/go/rpk/pkg/cli/benchmark/BUILD b/src/go/rpk/pkg/cli/benchmark/BUILD index 5c2343c51f401..c232fc8e6dbba 100644 --- a/src/go/rpk/pkg/cli/benchmark/BUILD +++ b/src/go/rpk/pkg/cli/benchmark/BUILD @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "benchmark", - srcs = ["benchmark.go"], + srcs = [ + "benchmark.go", + "produce.go", + ], importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/benchmark", visibility = ["//visibility:public"], deps = [ diff --git a/src/go/rpk/pkg/cli/benchmark/benchmark.go b/src/go/rpk/pkg/cli/benchmark/benchmark.go index 7b0597e5ffd44..5dfc39f409b8d 100644 --- a/src/go/rpk/pkg/cli/benchmark/benchmark.go +++ b/src/go/rpk/pkg/cli/benchmark/benchmark.go @@ -16,7 +16,6 @@ import ( "fmt" "os" "os/signal" - "sync" "sync/atomic" "syscall" "time" @@ -28,7 +27,6 @@ import ( "github.com/spf13/cobra" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" - "github.com/twmb/franz-go/pkg/kgo" ) type stats struct { @@ -43,12 +41,71 @@ type finalMetrics struct { Errors uint64 `json:"errors"` } +type benchmarkConfig struct { + topic string + partitions int32 + replicas int16 + clients int + warmupS int + durationS int + metricsJSON string + waitLeadershipBalanced bool +} + +type benchmarkTiming struct { + warmup time.Duration + duration time.Duration + measureStart time.Time + measureEnd time.Time + runCtx context.Context + cancel context.CancelFunc +} + +type benchmarkRun struct { + profile *config.RpkProfile + ctx context.Context + cancel context.CancelFunc + timing benchmarkTiming + adm *kadm.Client + topic string +} + const ( statsReqWidth = 12 statsMBWidth = 8 statsErrWidth = 8 ) +func (cfg *benchmarkConfig) addFlags(cmd *cobra.Command) { + cmd.Flags().StringVar(&cfg.topic, "topic", "rpk-benchmark-topic", "Benchmark topic name") + cmd.Flags().Int32VarP(&cfg.partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation") + cmd.Flags().Int16VarP(&cfg.replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation") + cmd.Flags().IntVar(&cfg.clients, "clients", 16, "Number of benchmark client connections") + cmd.Flags().IntVar(&cfg.warmupS, "warmup", 10, "Warmup duration in seconds") + cmd.Flags().IntVar(&cfg.durationS, "duration", 60, "Measurement duration in seconds") + cmd.Flags().StringVar(&cfg.metricsJSON, "metrics-json", "", "Optional path to write final metrics JSON") + cmd.Flags().BoolVar(&cfg.waitLeadershipBalanced, "wait-leadership-balanced", true, "Wait for topic leadership to become balanced before starting the benchmark") +} + +func (cfg benchmarkConfig) validate() error { + if cfg.partitions <= 0 { + return fmt.Errorf("invalid --partitions %d, must be > 0", cfg.partitions) + } + if cfg.replicas <= 0 { + return fmt.Errorf("invalid --replicas %d, must be > 0", cfg.replicas) + } + if cfg.clients <= 0 { + return fmt.Errorf("invalid --clients %d, must be > 0", cfg.clients) + } + if cfg.warmupS < 0 { + return fmt.Errorf("invalid --warmup %d, must be >= 0", cfg.warmupS) + } + if cfg.durationS <= 0 { + return fmt.Errorf("invalid --duration %d, must be > 0", cfg.durationS) + } + return nil +} + func computeMetrics(s *stats, now, measureStart time.Time) finalMetrics { elapsed := now.Sub(measureStart).Seconds() if elapsed <= 0 { @@ -81,6 +138,160 @@ func printStats(tw *out.TabWriter, s *stats, now, measureStart time.Time) { _ = tw.Flush() } +func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benchmarkConfig) (*benchmarkRun, error) { + var err error + run := &benchmarkRun{} + defer func() { + if err != nil { + run.Close() + } + }() + run.profile, err = p.LoadVirtualProfile(fs) + if err != nil { + return nil, fmt.Errorf("rpk unable to load config: %w", err) + } + + run.adm, err = kafka.NewAdmin(fs, run.profile) + if err != nil { + return nil, fmt.Errorf("unable to initialize admin kafka client: %w", err) + } + + run.ctx, run.cancel = setupSignalContext(cmd) + + err = createBenchmarkTopic(run.ctx, run.adm, cfg.topic, cfg.partitions, cfg.replicas) + if err != nil { + return nil, err + } + run.topic = cfg.topic + + if cfg.waitLeadershipBalanced { + fmt.Printf("waiting for balanced leadership on topic=%s\n", cfg.topic) + if err = waitForBalancedLeadership(run.ctx, run.adm, cfg.topic, cfg.partitions); err != nil { + return nil, err + } + } + + run.timing = newBenchmarkTiming(run.ctx, cfg) + + return run, nil +} + +func (r *benchmarkRun) Close() { + if r.cancel != nil { + r.cancel() + } + if r.timing.cancel != nil { + r.timing.cancel() + } + if r.topic != "" { + if err := deleteBenchmarkTopic(context.Background(), r.adm, r.topic); err != nil { + fmt.Printf("cleanup warning: unable to delete topic %q: %v\n", r.topic, err) + } + } + if r.adm != nil { + r.adm.Close() + } +} + +func setupSignalContext(cmd *cobra.Command) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(cmd.Context()) + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + go func() { + defer signal.Stop(sigCh) + select { + case <-sigCh: + cancel() + case <-ctx.Done(): + } + }() + + return ctx, cancel +} + +func createPayload(recordSize int) []byte { + payload := make([]byte, recordSize) + for i := range payload { + payload[i] = 'x' + } + return payload +} + +func newBenchmarkTiming(ctx context.Context, cfg benchmarkConfig) benchmarkTiming { + warmup := time.Duration(cfg.warmupS) * time.Second + duration := time.Duration(cfg.durationS) * time.Second + measureStart := time.Now().Add(warmup) + measureEnd := measureStart.Add(duration) + runCtx, cancel := context.WithDeadline(ctx, measureEnd) + + return benchmarkTiming{ + warmup: warmup, + duration: duration, + measureStart: measureStart, + measureEnd: measureEnd, + runCtx: runCtx, + cancel: cancel, + } +} + +func writeMetricsJSON(path string, metrics finalMetrics) error { + b, err := json.MarshalIndent(metrics, "", " ") + if err != nil { + return fmt.Errorf("unable to marshal metrics json: %w", err) + } + if err := os.WriteFile(path, append(b, '\n'), 0o644); err != nil { + return fmt.Errorf("unable to write metrics json to %q: %w", path, err) + } + return nil +} + +func waitForWarmup(ctx context.Context, warmup time.Duration) { + if warmup <= 0 { + return + } + + select { + case <-ctx.Done(): + case <-time.After(warmup): + } +} + +func runBenchmarkReporter( + ctx context.Context, + timing benchmarkTiming, + stats *stats, + metricsJSON string, + wait func(), +) error { + statsTable := out.NewTable() + + waitForWarmup(ctx, timing.warmup) + printStatsHeader(statsTable) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-timing.runCtx.Done(): + wait() + if ctx.Err() == nil { + printStats(statsTable, stats, timing.measureEnd, timing.measureStart) + } + if metricsJSON != "" { + if err := writeMetricsJSON(metricsJSON, computeMetrics(stats, timing.measureEnd, timing.measureStart)); err != nil { + return err + } + } + return nil + case <-ticker.C: + printStats(statsTable, stats, time.Now(), timing.measureStart) + } + } +} + func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, partitions int32, replicas int16) error { resps, err := adm.CreateTopics(ctx, partitions, replicas, nil, topic) if err != nil { @@ -99,8 +310,8 @@ func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, p return nil } -func deleteBenchmarkTopic(adm *kadm.Client, topic string) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() resps, err := adm.DeleteTopics(ctx, topic) if err != nil { @@ -238,41 +449,6 @@ func waitForBalancedLeadership( } } -func runProducerLoop( - ctx context.Context, - cl *kgo.Client, - topic string, - payload []byte, - measureStart time.Time, - stats *stats, -) { - for { - if ctx.Err() != nil { - return - } - - rec := &kgo.Record{Topic: topic, Value: payload} - - // We use sync produce. Like this we can guarantee single record per batch per request. - // To increase inflight it's easy to just bump clients/connections (this is cheap in franz-go) - err := cl.ProduceSync(ctx, rec).FirstErr() - now := time.Now() - if now.Before(measureStart) { - continue - } - if err != nil { - if ctx.Err() == nil { - stats.requests.Add(1) - stats.errors.Add(1) - } - continue - } - - stats.requests.Add(1) - stats.bytes.Add(uint64(len(payload))) - } -} - func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { cmd := &cobra.Command{ Use: "benchmark", @@ -285,188 +461,3 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { return cmd } - -func newProduceCommand(fs afero.Fs, p *config.Params) *cobra.Command { - var ( - topic string - partitions int32 - replicas int16 - clients int - recordSize int - warmupS int - durationS int - metricsJSON string - waitLeadershipBalanced bool - ) - - cmd := &cobra.Command{ - Use: "produce", - Short: "Run a Kafka produce benchmark", - Long: "Load testing tool which stresses the broker by sending small batches with high request rate", - Args: cobra.NoArgs, - Hidden: true, - RunE: func(cmd *cobra.Command, _ []string) error { - if partitions <= 0 { - return fmt.Errorf("invalid --partitions %d, must be > 0", partitions) - } - if replicas <= 0 { - return fmt.Errorf("invalid --replicas %d, must be > 0", replicas) - } - if clients <= 0 { - return fmt.Errorf("invalid --clients %d, must be > 0", clients) - } - if recordSize <= 0 { - return fmt.Errorf("invalid --record-size %d, must be > 0", recordSize) - } - if warmupS < 0 { - return fmt.Errorf("invalid --warmup %d, must be >= 0", warmupS) - } - if durationS <= 0 { - return fmt.Errorf("invalid --duration %d, must be > 0", durationS) - } - - profile, err := p.LoadVirtualProfile(fs) - if err != nil { - return fmt.Errorf("rpk unable to load config: %w", err) - } - - adm, err := kafka.NewAdmin(fs, profile) - if err != nil { - return fmt.Errorf("unable to initialize admin kafka client: %w", err) - } - defer adm.Close() - - ctx, cancel := context.WithCancel(cmd.Context()) - defer cancel() - - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) - defer signal.Stop(sigCh) - go func() { - select { - case <-sigCh: - cancel() - case <-ctx.Done(): - } - }() - - err = createBenchmarkTopic(ctx, adm, topic, partitions, replicas) - if err != nil { - return err - } - defer func() { - if err := deleteBenchmarkTopic(adm, topic); err != nil { - fmt.Printf("cleanup warning: unable to delete topic %q: %v\n", topic, err) - } - }() - - if waitLeadershipBalanced { - fmt.Printf("waiting for balanced leadership on topic=%s\n", topic) - err = waitForBalancedLeadership(ctx, adm, topic, partitions) - if err != nil { - return err - } - } - - warmup := time.Duration(warmupS) * time.Second - duration := time.Duration(durationS) * time.Second - measureStart := time.Now().Add(warmup) - measureEnd := measureStart.Add(duration) - - runCtx, runCancel := context.WithDeadline(ctx, measureEnd) - defer runCancel() - - payload := make([]byte, recordSize) - for i := range payload { - payload[i] = 'x' - } - stats := &stats{} - - producerOpts := []kgo.Opt{ - kgo.DefaultProduceTopic(topic), - kgo.RequiredAcks(kgo.AllISRAcks()), - kgo.RecordPartitioner(kgo.RoundRobinPartitioner()), - kgo.ProducerLinger(0), - kgo.ProducerBatchCompression(kgo.NoCompression()), - } - - producerClients := make([]*kgo.Client, 0, clients) - for i := 0; i < clients; i++ { - cl, err := kafka.NewFranzClient(fs, profile, producerOpts...) - if err != nil { - for _, started := range producerClients { - started.Close() - } - return fmt.Errorf("unable to initialize producer client %d: %w", i, err) - } - producerClients = append(producerClients, cl) - } - defer func() { - for _, cl := range producerClients { - cl.Close() - } - }() - - fmt.Printf("topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n", topic, clients, partitions, recordSize, replicas) - if warmup > 0 { - fmt.Printf("warming up for %ds...\n", warmupS) - } - - statsTable := out.NewTable() - - var wg sync.WaitGroup - for _, cl := range producerClients { - wg.Add(1) - go func(cl *kgo.Client) { - defer wg.Done() - runProducerLoop(runCtx, cl, topic, payload, measureStart, stats) - }(cl) - } - - select { - case <-runCtx.Done(): - case <-time.After(warmup): - } - - printStatsHeader(statsTable) - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-runCtx.Done(): - wg.Wait() - if ctx.Err() == nil { - printStats(statsTable, stats, measureEnd, measureStart) - } - if metricsJSON != "" { - metrics := computeMetrics(stats, measureEnd, measureStart) - b, err := json.MarshalIndent(metrics, "", " ") - if err != nil { - return fmt.Errorf("unable to marshal metrics json: %w", err) - } - err = os.WriteFile(metricsJSON, append(b, '\n'), 0o644) - if err != nil { - return fmt.Errorf("unable to write metrics json to %q: %w", metricsJSON, err) - } - } - return nil - case <-ticker.C: - printStats(statsTable, stats, time.Now(), measureStart) - } - } - }, - } - - cmd.Flags().StringVar(&topic, "topic", "rpk-benchmark-topic", "Benchmark topic name") - cmd.Flags().Int32VarP(&partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation") - cmd.Flags().Int16VarP(&replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation") - cmd.Flags().IntVar(&clients, "clients", 16, "Number of producer client connections") - cmd.Flags().IntVar(&recordSize, "record-size", 100, "Record payload size in bytes") - cmd.Flags().IntVar(&warmupS, "warmup", 10, "Warmup duration in seconds") - cmd.Flags().IntVar(&durationS, "duration", 60, "Measurement duration in seconds") - cmd.Flags().StringVar(&metricsJSON, "metrics-json", "", "Optional path to write final metrics JSON") - cmd.Flags().BoolVar(&waitLeadershipBalanced, "wait-leadership-balanced", true, "Wait for topic leadership to become balanced before producing") - - return cmd -} diff --git a/src/go/rpk/pkg/cli/benchmark/produce.go b/src/go/rpk/pkg/cli/benchmark/produce.go index f4f14b50fd709..0ea46e11a543d 100644 --- a/src/go/rpk/pkg/cli/benchmark/produce.go +++ b/src/go/rpk/pkg/cli/benchmark/produce.go @@ -31,10 +31,10 @@ func newProduceCommand(fs afero.Fs, p *config.Params) *cobra.Command { var cfg produceConfig cmd := &cobra.Command{ - Use: "produce", - Short: "Run a Kafka produce benchmark", - Long: "Load testing tool which stresses the broker by sending small batches with high request rate", - Args: cobra.NoArgs, + Use: "produce", + Short: "Run a Kafka produce benchmark", + Long: "Load testing tool which stresses the broker by sending small batches with high request rate", + Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { return runProduceBenchmark(fs, p, cmd, cfg) }, @@ -88,23 +88,14 @@ func runProduceBenchmark(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg } }() - if cfg.useExistingTopic { - fmt.Printf( - "mode=produce topic=%s clients=%d record_size=%d use_existing_topic=true\n", - cfg.topic, - cfg.clients, - cfg.recordSize, - ) - } else { - fmt.Printf( - "mode=produce topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n", - cfg.topic, - cfg.clients, - cfg.partitions, - cfg.recordSize, - cfg.replicas, - ) - } + fmt.Printf( + "mode=produce topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n", + cfg.topic, + cfg.clients, + cfg.partitions, + cfg.recordSize, + cfg.replicas, + ) if run.timing.warmup > 0 { fmt.Printf("warming up for %ds...\n", cfg.warmupS) } diff --git a/tests/rptest/perf/rpk_benchmark_test.py b/tests/rptest/perf/rpk_benchmark_test.py index f2fdf0ffba3e0..0616838ede2ff 100644 --- a/tests/rptest/perf/rpk_benchmark_test.py +++ b/tests/rptest/perf/rpk_benchmark_test.py @@ -28,11 +28,12 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: *args, num_brokers=3, resource_settings=resource_settings, **kwargs ) - def run_workload(self) -> None: + def run_workload(self, mode: str) -> None: svc = RpkBenchmarkService( self.test_context, self.redpanda, topic="rpk-bench-topic", + mode=mode, partitions=self.PARTITIONS, replicas=self.REPLICAS, clients=self.CLIENTS, @@ -52,4 +53,4 @@ def run_workload(self) -> None: @cluster(num_nodes=6) def test_produce(self) -> None: - self.run_workload() + self.run_workload("produce") diff --git a/tests/rptest/services/rpk_benchmark_service.py b/tests/rptest/services/rpk_benchmark_service.py index d04c4cb9712b0..cd410221b3d30 100644 --- a/tests/rptest/services/rpk_benchmark_service.py +++ b/tests/rptest/services/rpk_benchmark_service.py @@ -50,6 +50,7 @@ def __init__( redpanda: RedpandaService, topic: str, *, + mode: str = "produce", partitions: int = 18, replicas: int = 3, clients: int = 1, @@ -59,8 +60,11 @@ def __init__( wait_for_stable_leadership: bool = True, ): super().__init__(context, num_nodes=1) + if mode != "produce": + raise ValueError(f"unsupported rpk benchmark mode: {mode}") self._redpanda = redpanda self._topic = topic + self._mode = mode self._partitions = partitions self._replicas = replicas self._clients = clients @@ -72,7 +76,7 @@ def __init__( def _build_cmd(self) -> str: return ( - f"{self._redpanda.find_binary('rpk')} -X brokers={self._redpanda.brokers()} benchmark " + f"{self._redpanda.find_binary('rpk')} -X brokers={self._redpanda.brokers()} benchmark {self._mode} " f"--topic {self._topic} --partitions {self._partitions} --replicas {self._replicas} " f"--clients {self._clients} --record-size {self._record_size} " f"--warmup {self._warmup_s} --duration {self._duration_s} " diff --git a/tests/rptest/tests/rpk_benchmark_service_test.py b/tests/rptest/tests/rpk_benchmark_service_test.py index d391e2345db0a..eb96f26e17008 100644 --- a/tests/rptest/tests/rpk_benchmark_service_test.py +++ b/tests/rptest/tests/rpk_benchmark_service_test.py @@ -21,13 +21,13 @@ class RpkBenchmarkServiceSelfTest(RedpandaTest): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, num_brokers=1, **kwargs) - @cluster(num_nodes=2) - def test_smoke(self) -> None: + def run_smoke(self, mode: str) -> None: topic = f"rpk-bench-smoke-{uuid4().hex[:8]}" svc = RpkBenchmarkService( self.test_context, self.redpanda, topic=topic, + mode=mode, partitions=6, replicas=1, clients=5, @@ -47,3 +47,7 @@ def test_smoke(self) -> None: assert metrics.mb_per_sec > 0, "Expected MB/s > 0" svc.write_metrics_result(metrics) + + @cluster(num_nodes=2) + def test_produce_smoke(self) -> None: + self.run_smoke("produce") From b9600240a5baee30e2440f60a317bcdd836d461f Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Tue, 31 Mar 2026 07:03:57 +0100 Subject: [PATCH 3/4] rpk: Add --reset-topic to benchmark When cancelling the command we delete the test topic we created. Sometimes the topic might still leak (SIGKILL or rpk is killed during topic creation before RP has responded). Provide a `--reset-topic` flag which deletes and recreates the topic similar to what OMB provides. --- src/go/rpk/pkg/cli/benchmark/benchmark.go | 26 ++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/go/rpk/pkg/cli/benchmark/benchmark.go b/src/go/rpk/pkg/cli/benchmark/benchmark.go index 5dfc39f409b8d..bcc8ef1963f50 100644 --- a/src/go/rpk/pkg/cli/benchmark/benchmark.go +++ b/src/go/rpk/pkg/cli/benchmark/benchmark.go @@ -46,6 +46,7 @@ type benchmarkConfig struct { partitions int32 replicas int16 clients int + reset bool warmupS int durationS int metricsJSON string @@ -81,6 +82,7 @@ func (cfg *benchmarkConfig) addFlags(cmd *cobra.Command) { cmd.Flags().Int32VarP(&cfg.partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation") cmd.Flags().Int16VarP(&cfg.replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation") cmd.Flags().IntVar(&cfg.clients, "clients", 16, "Number of benchmark client connections") + cmd.Flags().BoolVar(&cfg.reset, "reset-topic", false, "Delete the benchmark topic first if it already exists") cmd.Flags().IntVar(&cfg.warmupS, "warmup", 10, "Warmup duration in seconds") cmd.Flags().IntVar(&cfg.durationS, "duration", 60, "Measurement duration in seconds") cmd.Flags().StringVar(&cfg.metricsJSON, "metrics-json", "", "Optional path to write final metrics JSON") @@ -158,7 +160,7 @@ func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benc run.ctx, run.cancel = setupSignalContext(cmd) - err = createBenchmarkTopic(run.ctx, run.adm, cfg.topic, cfg.partitions, cfg.replicas) + err = setupBenchmarkTopic(run.ctx, run.adm, cfg.topic, cfg.partitions, cfg.replicas, cfg.reset) if err != nil { return nil, err } @@ -302,7 +304,7 @@ func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, p return fmt.Errorf("missing create topic response for %q", topic) } if errors.Is(resp.Err, kerr.TopicAlreadyExists) { - return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic", topic) + return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic or pass --reset-topic to recreate it (destructive) - error: %w", topic, resp.Err) } if resp.Err != nil { return resp.Err @@ -310,6 +312,22 @@ func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, p return nil } +func setupBenchmarkTopic( + ctx context.Context, + adm *kadm.Client, + topic string, + partitions int32, + replicas int16, + reset bool, +) error { + if reset { + if err := deleteBenchmarkTopic(ctx, adm, topic); err != nil { + return fmt.Errorf("unable to reset benchmark topic %q: %w", topic, err) + } + } + return createBenchmarkTopic(ctx, adm, topic, partitions, replicas) +} + func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) error { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -321,7 +339,9 @@ func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) e if !ok { return fmt.Errorf("missing delete topic response for %q", topic) } - if resp.Err != nil { + if errors.Is(resp.Err, kerr.UnknownTopicOrPartition) { + return nil + } else if resp.Err != nil { return resp.Err } return nil From 986b62ead460b16e3afb6d92e5e59ac370921d8b Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Tue, 31 Mar 2026 07:26:14 +0100 Subject: [PATCH 4/4] rpk: Add --use-prexisting-topic to benchmark Add a flag to use a prexisting topic. Can sometimes be useful for various reasons (existing data etc.). --- src/go/rpk/pkg/cli/benchmark/benchmark.go | 78 ++++++++++++++++------- src/go/rpk/pkg/cli/benchmark/produce.go | 25 +++++--- 2 files changed, 72 insertions(+), 31 deletions(-) diff --git a/src/go/rpk/pkg/cli/benchmark/benchmark.go b/src/go/rpk/pkg/cli/benchmark/benchmark.go index bcc8ef1963f50..af84b23b9ac67 100644 --- a/src/go/rpk/pkg/cli/benchmark/benchmark.go +++ b/src/go/rpk/pkg/cli/benchmark/benchmark.go @@ -47,6 +47,7 @@ type benchmarkConfig struct { replicas int16 clients int reset bool + useExistingTopic bool warmupS int durationS int metricsJSON string @@ -63,6 +64,7 @@ type benchmarkTiming struct { } type benchmarkRun struct { + cfg benchmarkConfig profile *config.RpkProfile ctx context.Context cancel context.CancelFunc @@ -79,22 +81,29 @@ const ( func (cfg *benchmarkConfig) addFlags(cmd *cobra.Command) { cmd.Flags().StringVar(&cfg.topic, "topic", "rpk-benchmark-topic", "Benchmark topic name") - cmd.Flags().Int32VarP(&cfg.partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation") - cmd.Flags().Int16VarP(&cfg.replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation") + cmd.Flags().Int32VarP(&cfg.partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation (ignored with --use-existing-topic)") + cmd.Flags().Int16VarP(&cfg.replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation (ignored with --use-existing-topic)") cmd.Flags().IntVar(&cfg.clients, "clients", 16, "Number of benchmark client connections") cmd.Flags().BoolVar(&cfg.reset, "reset-topic", false, "Delete the benchmark topic first if it already exists") + cmd.Flags().BoolVar(&cfg.useExistingTopic, "use-existing-topic", false, "Use the benchmark topic as-is without creating or deleting it") cmd.Flags().IntVar(&cfg.warmupS, "warmup", 10, "Warmup duration in seconds") cmd.Flags().IntVar(&cfg.durationS, "duration", 60, "Measurement duration in seconds") cmd.Flags().StringVar(&cfg.metricsJSON, "metrics-json", "", "Optional path to write final metrics JSON") cmd.Flags().BoolVar(&cfg.waitLeadershipBalanced, "wait-leadership-balanced", true, "Wait for topic leadership to become balanced before starting the benchmark") + cmd.MarkFlagsMutuallyExclusive("reset-topic", "use-existing-topic") } func (cfg benchmarkConfig) validate() error { - if cfg.partitions <= 0 { - return fmt.Errorf("invalid --partitions %d, must be > 0", cfg.partitions) + if cfg.reset && cfg.useExistingTopic { + return fmt.Errorf("--use-existing-topic cannot be used with --reset-topic") } - if cfg.replicas <= 0 { - return fmt.Errorf("invalid --replicas %d, must be > 0", cfg.replicas) + if !cfg.useExistingTopic { + if cfg.partitions <= 0 { + return fmt.Errorf("invalid --partitions %d, must be > 0", cfg.partitions) + } + if cfg.replicas <= 0 { + return fmt.Errorf("invalid --replicas %d, must be > 0", cfg.replicas) + } } if cfg.clients <= 0 { return fmt.Errorf("invalid --clients %d, must be > 0", cfg.clients) @@ -142,7 +151,7 @@ func printStats(tw *out.TabWriter, s *stats, now, measureStart time.Time) { func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benchmarkConfig) (*benchmarkRun, error) { var err error - run := &benchmarkRun{} + run := &benchmarkRun{cfg: cfg} defer func() { if err != nil { run.Close() @@ -160,7 +169,15 @@ func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benc run.ctx, run.cancel = setupSignalContext(cmd) - err = setupBenchmarkTopic(run.ctx, run.adm, cfg.topic, cfg.partitions, cfg.replicas, cfg.reset) + err = setupBenchmarkTopic( + run.ctx, + run.adm, + cfg.topic, + cfg.partitions, + cfg.replicas, + cfg.reset, + cfg.useExistingTopic, + ) if err != nil { return nil, err } @@ -168,7 +185,7 @@ func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benc if cfg.waitLeadershipBalanced { fmt.Printf("waiting for balanced leadership on topic=%s\n", cfg.topic) - if err = waitForBalancedLeadership(run.ctx, run.adm, cfg.topic, cfg.partitions); err != nil { + if err = waitForBalancedLeadership(run.ctx, run.adm, cfg.topic); err != nil { return nil, err } } @@ -185,7 +202,7 @@ func (r *benchmarkRun) Close() { if r.timing.cancel != nil { r.timing.cancel() } - if r.topic != "" { + if !r.cfg.useExistingTopic && r.topic != "" { if err := deleteBenchmarkTopic(context.Background(), r.adm, r.topic); err != nil { fmt.Printf("cleanup warning: unable to delete topic %q: %v\n", r.topic, err) } @@ -304,7 +321,7 @@ func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, p return fmt.Errorf("missing create topic response for %q", topic) } if errors.Is(resp.Err, kerr.TopicAlreadyExists) { - return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic or pass --reset-topic to recreate it (destructive) - error: %w", topic, resp.Err) + return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic, pass --use-existing-topic to reuse it, or pass --reset-topic to recreate it (destructive) - error: %w", topic, resp.Err) } if resp.Err != nil { return resp.Err @@ -319,7 +336,11 @@ func setupBenchmarkTopic( partitions int32, replicas int16, reset bool, + useExistingTopic bool, ) error { + if useExistingTopic { + return ensureBenchmarkTopicExists(ctx, adm, topic) + } if reset { if err := deleteBenchmarkTopic(ctx, adm, topic); err != nil { return fmt.Errorf("unable to reset benchmark topic %q: %w", topic, err) @@ -328,6 +349,24 @@ func setupBenchmarkTopic( return createBenchmarkTopic(ctx, adm, topic, partitions, replicas) } +func ensureBenchmarkTopicExists(ctx context.Context, adm *kadm.Client, topic string) error { + md, err := adm.Metadata(ctx, topic) + if err != nil { + return err + } + td, ok := md.Topics[topic] + if !ok || errors.Is(td.Err, kerr.UnknownTopicOrPartition) { + return fmt.Errorf( + "benchmark topic %q does not exist; create it first or omit --use-existing-topic", + topic, + ) + } + if td.Err != nil { + return td.Err + } + return nil +} + func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) error { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -350,7 +389,7 @@ func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) e // checks whether leadership is balanced, aka: // - all brokers have equal amount of leadership. // - The above but some are leader for one more partition. -func leadershipBalanced(md kadm.Metadata, topic string, expectedPartitions int32) (bool, string) { +func leadershipBalanced(md kadm.Metadata, topic string) (bool, string) { td, ok := md.Topics[topic] if !ok { return false, "topic metadata not found" @@ -358,13 +397,7 @@ func leadershipBalanced(md kadm.Metadata, topic string, expectedPartitions int32 if td.Err != nil { return false, td.Err.Error() } - if int32(len(td.Partitions)) != expectedPartitions { - return false, fmt.Sprintf( - "partition count is %d, expected %d", - len(td.Partitions), - expectedPartitions, - ) - } + partitionCount := len(td.Partitions) replicaNodes := make(map[int32]struct{}) leaderCounts := make(map[int32]int) @@ -386,8 +419,8 @@ func leadershipBalanced(md kadm.Metadata, topic string, expectedPartitions int32 } brokerCount := len(replicaNodes) - basePartitionLeadersPerBroker := int(expectedPartitions) / brokerCount - moduloLeaderCount := int(expectedPartitions) % brokerCount + basePartitionLeadersPerBroker := partitionCount / brokerCount + moduloLeaderCount := partitionCount % brokerCount brokersAtBase := 0 brokersAtBasePlusOne := 0 @@ -433,7 +466,6 @@ func waitForBalancedLeadership( ctx context.Context, adm *kadm.Client, topic string, - expectedPartitions int32, ) error { ctx, cancel := context.WithTimeout(ctx, 120*time.Second) defer cancel() @@ -445,7 +477,7 @@ func waitForBalancedLeadership( for { md, err := adm.Metadata(ctx, topic) if err == nil { - if ok, reason := leadershipBalanced(md, topic, expectedPartitions); ok { + if ok, reason := leadershipBalanced(md, topic); ok { return nil } else { lastReason = reason diff --git a/src/go/rpk/pkg/cli/benchmark/produce.go b/src/go/rpk/pkg/cli/benchmark/produce.go index 0ea46e11a543d..42c51f1fb9e71 100644 --- a/src/go/rpk/pkg/cli/benchmark/produce.go +++ b/src/go/rpk/pkg/cli/benchmark/produce.go @@ -88,14 +88,23 @@ func runProduceBenchmark(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg } }() - fmt.Printf( - "mode=produce topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n", - cfg.topic, - cfg.clients, - cfg.partitions, - cfg.recordSize, - cfg.replicas, - ) + if cfg.useExistingTopic { + fmt.Printf( + "mode=produce topic=%s clients=%d record_size=%d use_existing_topic=true\n", + cfg.topic, + cfg.clients, + cfg.recordSize, + ) + } else { + fmt.Printf( + "mode=produce topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n", + cfg.topic, + cfg.clients, + cfg.partitions, + cfg.recordSize, + cfg.replicas, + ) + } if run.timing.warmup > 0 { fmt.Printf("warming up for %ds...\n", cfg.warmupS) }