diff --git a/src/go/rpk/pkg/cli/benchmark/BUILD b/src/go/rpk/pkg/cli/benchmark/BUILD index 5c2343c51f401..c232fc8e6dbba 100644 --- a/src/go/rpk/pkg/cli/benchmark/BUILD +++ b/src/go/rpk/pkg/cli/benchmark/BUILD @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "benchmark", - srcs = ["benchmark.go"], + srcs = [ + "benchmark.go", + "produce.go", + ], importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/benchmark", visibility = ["//visibility:public"], deps = [ diff --git a/src/go/rpk/pkg/cli/benchmark/benchmark.go b/src/go/rpk/pkg/cli/benchmark/benchmark.go index 0f358a284416b..af84b23b9ac67 100644 --- a/src/go/rpk/pkg/cli/benchmark/benchmark.go +++ b/src/go/rpk/pkg/cli/benchmark/benchmark.go @@ -16,7 +16,6 @@ import ( "fmt" "os" "os/signal" - "sync" "sync/atomic" "syscall" "time" @@ -28,7 +27,6 @@ import ( "github.com/spf13/cobra" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" - "github.com/twmb/franz-go/pkg/kgo" ) type stats struct { @@ -43,12 +41,82 @@ type finalMetrics struct { Errors uint64 `json:"errors"` } +type benchmarkConfig struct { + topic string + partitions int32 + replicas int16 + clients int + reset bool + useExistingTopic bool + warmupS int + durationS int + metricsJSON string + waitLeadershipBalanced bool +} + +type benchmarkTiming struct { + warmup time.Duration + duration time.Duration + measureStart time.Time + measureEnd time.Time + runCtx context.Context + cancel context.CancelFunc +} + +type benchmarkRun struct { + cfg benchmarkConfig + profile *config.RpkProfile + ctx context.Context + cancel context.CancelFunc + timing benchmarkTiming + adm *kadm.Client + topic string +} + const ( statsReqWidth = 12 statsMBWidth = 8 statsErrWidth = 8 ) +func (cfg *benchmarkConfig) addFlags(cmd *cobra.Command) { + cmd.Flags().StringVar(&cfg.topic, "topic", "rpk-benchmark-topic", "Benchmark topic name") + cmd.Flags().Int32VarP(&cfg.partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation (ignored with --use-existing-topic)") + cmd.Flags().Int16VarP(&cfg.replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation (ignored with --use-existing-topic)") + cmd.Flags().IntVar(&cfg.clients, "clients", 16, "Number of benchmark client connections") + cmd.Flags().BoolVar(&cfg.reset, "reset-topic", false, "Delete the benchmark topic first if it already exists") + cmd.Flags().BoolVar(&cfg.useExistingTopic, "use-existing-topic", false, "Use the benchmark topic as-is without creating or deleting it") + cmd.Flags().IntVar(&cfg.warmupS, "warmup", 10, "Warmup duration in seconds") + cmd.Flags().IntVar(&cfg.durationS, "duration", 60, "Measurement duration in seconds") + cmd.Flags().StringVar(&cfg.metricsJSON, "metrics-json", "", "Optional path to write final metrics JSON") + cmd.Flags().BoolVar(&cfg.waitLeadershipBalanced, "wait-leadership-balanced", true, "Wait for topic leadership to become balanced before starting the benchmark") + cmd.MarkFlagsMutuallyExclusive("reset-topic", "use-existing-topic") +} + +func (cfg benchmarkConfig) validate() error { + if cfg.reset && cfg.useExistingTopic { + return fmt.Errorf("--use-existing-topic cannot be used with --reset-topic") + } + if !cfg.useExistingTopic { + if cfg.partitions <= 0 { + return fmt.Errorf("invalid --partitions %d, must be > 0", cfg.partitions) + } + if cfg.replicas <= 0 { + return fmt.Errorf("invalid --replicas %d, must be > 0", cfg.replicas) + } + } + if cfg.clients <= 0 { + return fmt.Errorf("invalid --clients %d, must be > 0", cfg.clients) + } + if cfg.warmupS < 0 { + return fmt.Errorf("invalid --warmup %d, must be >= 0", cfg.warmupS) + } + if cfg.durationS <= 0 { + return fmt.Errorf("invalid --duration %d, must be > 0", cfg.durationS) + } + return nil +} + func computeMetrics(s *stats, now, measureStart time.Time) finalMetrics { elapsed := now.Sub(measureStart).Seconds() if elapsed <= 0 { @@ -81,6 +149,168 @@ func printStats(tw *out.TabWriter, s *stats, now, measureStart time.Time) { _ = tw.Flush() } +func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benchmarkConfig) (*benchmarkRun, error) { + var err error + run := &benchmarkRun{cfg: cfg} + defer func() { + if err != nil { + run.Close() + } + }() + run.profile, err = p.LoadVirtualProfile(fs) + if err != nil { + return nil, fmt.Errorf("rpk unable to load config: %w", err) + } + + run.adm, err = kafka.NewAdmin(fs, run.profile) + if err != nil { + return nil, fmt.Errorf("unable to initialize admin kafka client: %w", err) + } + + run.ctx, run.cancel = setupSignalContext(cmd) + + err = setupBenchmarkTopic( + run.ctx, + run.adm, + cfg.topic, + cfg.partitions, + cfg.replicas, + cfg.reset, + cfg.useExistingTopic, + ) + if err != nil { + return nil, err + } + run.topic = cfg.topic + + if cfg.waitLeadershipBalanced { + fmt.Printf("waiting for balanced leadership on topic=%s\n", cfg.topic) + if err = waitForBalancedLeadership(run.ctx, run.adm, cfg.topic); err != nil { + return nil, err + } + } + + run.timing = newBenchmarkTiming(run.ctx, cfg) + + return run, nil +} + +func (r *benchmarkRun) Close() { + if r.cancel != nil { + r.cancel() + } + if r.timing.cancel != nil { + r.timing.cancel() + } + if !r.cfg.useExistingTopic && r.topic != "" { + if err := deleteBenchmarkTopic(context.Background(), r.adm, r.topic); err != nil { + fmt.Printf("cleanup warning: unable to delete topic %q: %v\n", r.topic, err) + } + } + if r.adm != nil { + r.adm.Close() + } +} + +func setupSignalContext(cmd *cobra.Command) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(cmd.Context()) + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + go func() { + defer signal.Stop(sigCh) + select { + case <-sigCh: + cancel() + case <-ctx.Done(): + } + }() + + return ctx, cancel +} + +func createPayload(recordSize int) []byte { + payload := make([]byte, recordSize) + for i := range payload { + payload[i] = 'x' + } + return payload +} + +func newBenchmarkTiming(ctx context.Context, cfg benchmarkConfig) benchmarkTiming { + warmup := time.Duration(cfg.warmupS) * time.Second + duration := time.Duration(cfg.durationS) * time.Second + measureStart := time.Now().Add(warmup) + measureEnd := measureStart.Add(duration) + runCtx, cancel := context.WithDeadline(ctx, measureEnd) + + return benchmarkTiming{ + warmup: warmup, + duration: duration, + measureStart: measureStart, + measureEnd: measureEnd, + runCtx: runCtx, + cancel: cancel, + } +} + +func writeMetricsJSON(path string, metrics finalMetrics) error { + b, err := json.MarshalIndent(metrics, "", " ") + if err != nil { + return fmt.Errorf("unable to marshal metrics json: %w", err) + } + if err := os.WriteFile(path, append(b, '\n'), 0o644); err != nil { + return fmt.Errorf("unable to write metrics json to %q: %w", path, err) + } + return nil +} + +func waitForWarmup(ctx context.Context, warmup time.Duration) { + if warmup <= 0 { + return + } + + select { + case <-ctx.Done(): + case <-time.After(warmup): + } +} + +func runBenchmarkReporter( + ctx context.Context, + timing benchmarkTiming, + stats *stats, + metricsJSON string, + wait func(), +) error { + statsTable := out.NewTable() + + waitForWarmup(ctx, timing.warmup) + printStatsHeader(statsTable) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-timing.runCtx.Done(): + wait() + if ctx.Err() == nil { + printStats(statsTable, stats, timing.measureEnd, timing.measureStart) + } + if metricsJSON != "" { + if err := writeMetricsJSON(metricsJSON, computeMetrics(stats, timing.measureEnd, timing.measureStart)); err != nil { + return err + } + } + return nil + case <-ticker.C: + printStats(statsTable, stats, time.Now(), timing.measureStart) + } + } +} + func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, partitions int32, replicas int16) error { resps, err := adm.CreateTopics(ctx, partitions, replicas, nil, topic) if err != nil { @@ -91,7 +321,7 @@ func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, p return fmt.Errorf("missing create topic response for %q", topic) } if errors.Is(resp.Err, kerr.TopicAlreadyExists) { - return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic", topic) + return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic, pass --use-existing-topic to reuse it, or pass --reset-topic to recreate it (destructive) - error: %w", topic, resp.Err) } if resp.Err != nil { return resp.Err @@ -99,8 +329,46 @@ func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, p return nil } -func deleteBenchmarkTopic(adm *kadm.Client, topic string) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func setupBenchmarkTopic( + ctx context.Context, + adm *kadm.Client, + topic string, + partitions int32, + replicas int16, + reset bool, + useExistingTopic bool, +) error { + if useExistingTopic { + return ensureBenchmarkTopicExists(ctx, adm, topic) + } + if reset { + if err := deleteBenchmarkTopic(ctx, adm, topic); err != nil { + return fmt.Errorf("unable to reset benchmark topic %q: %w", topic, err) + } + } + return createBenchmarkTopic(ctx, adm, topic, partitions, replicas) +} + +func ensureBenchmarkTopicExists(ctx context.Context, adm *kadm.Client, topic string) error { + md, err := adm.Metadata(ctx, topic) + if err != nil { + return err + } + td, ok := md.Topics[topic] + if !ok || errors.Is(td.Err, kerr.UnknownTopicOrPartition) { + return fmt.Errorf( + "benchmark topic %q does not exist; create it first or omit --use-existing-topic", + topic, + ) + } + if td.Err != nil { + return td.Err + } + return nil +} + +func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() resps, err := adm.DeleteTopics(ctx, topic) if err != nil { @@ -110,7 +378,9 @@ func deleteBenchmarkTopic(adm *kadm.Client, topic string) error { if !ok { return fmt.Errorf("missing delete topic response for %q", topic) } - if resp.Err != nil { + if errors.Is(resp.Err, kerr.UnknownTopicOrPartition) { + return nil + } else if resp.Err != nil { return resp.Err } return nil @@ -119,7 +389,7 @@ func deleteBenchmarkTopic(adm *kadm.Client, topic string) error { // checks whether leadership is balanced, aka: // - all brokers have equal amount of leadership. // - The above but some are leader for one more partition. -func leadershipBalanced(md kadm.Metadata, topic string, expectedPartitions int32) (bool, string) { +func leadershipBalanced(md kadm.Metadata, topic string) (bool, string) { td, ok := md.Topics[topic] if !ok { return false, "topic metadata not found" @@ -127,13 +397,7 @@ func leadershipBalanced(md kadm.Metadata, topic string, expectedPartitions int32 if td.Err != nil { return false, td.Err.Error() } - if int32(len(td.Partitions)) != expectedPartitions { - return false, fmt.Sprintf( - "partition count is %d, expected %d", - len(td.Partitions), - expectedPartitions, - ) - } + partitionCount := len(td.Partitions) replicaNodes := make(map[int32]struct{}) leaderCounts := make(map[int32]int) @@ -155,8 +419,8 @@ func leadershipBalanced(md kadm.Metadata, topic string, expectedPartitions int32 } brokerCount := len(replicaNodes) - basePartitionLeadersPerBroker := int(expectedPartitions) / brokerCount - moduloLeaderCount := int(expectedPartitions) % brokerCount + basePartitionLeadersPerBroker := partitionCount / brokerCount + moduloLeaderCount := partitionCount % brokerCount brokersAtBase := 0 brokersAtBasePlusOne := 0 @@ -202,7 +466,6 @@ func waitForBalancedLeadership( ctx context.Context, adm *kadm.Client, topic string, - expectedPartitions int32, ) error { ctx, cancel := context.WithTimeout(ctx, 120*time.Second) defer cancel() @@ -214,7 +477,7 @@ func waitForBalancedLeadership( for { md, err := adm.Metadata(ctx, topic) if err == nil { - if ok, reason := leadershipBalanced(md, topic, expectedPartitions); ok { + if ok, reason := leadershipBalanced(md, topic); ok { return nil } else { lastReason = reason @@ -238,222 +501,15 @@ func waitForBalancedLeadership( } } -func runProducerLoop( - ctx context.Context, - cl *kgo.Client, - topic string, - payload []byte, - measureStart time.Time, - stats *stats, -) { - for { - if ctx.Err() != nil { - return - } - - rec := &kgo.Record{Topic: topic, Value: payload} - - // We use sync produce. Like this we can guarantee single record per batch per request. - // To increase inflight it's easy to just bump clients/connections (this is cheap in franz-go) - err := cl.ProduceSync(ctx, rec).FirstErr() - now := time.Now() - if now.Before(measureStart) { - continue - } - if err != nil { - if ctx.Err() == nil { - stats.requests.Add(1) - stats.errors.Add(1) - } - continue - } - - stats.requests.Add(1) - stats.bytes.Add(uint64(len(payload))) - } -} - func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { - var ( - topic string - partitions int32 - replicas int16 - clients int - recordSize int - warmupS int - durationS int - metricsJSON string - waitLeadershipBalanced bool - ) - cmd := &cobra.Command{ Use: "benchmark", Short: "Run a Kafka benchmark", Long: "Load testing tool which stresses the broker by sending small batches with high request rate", - Args: cobra.NoArgs, Hidden: true, - RunE: func(cmd *cobra.Command, _ []string) error { - if partitions <= 0 { - return fmt.Errorf("invalid --partitions %d, must be > 0", partitions) - } - if replicas <= 0 { - return fmt.Errorf("invalid --replicas %d, must be > 0", replicas) - } - if clients <= 0 { - return fmt.Errorf("invalid --clients %d, must be > 0", clients) - } - if recordSize <= 0 { - return fmt.Errorf("invalid --record-size %d, must be > 0", recordSize) - } - if warmupS < 0 { - return fmt.Errorf("invalid --warmup %d, must be >= 0", warmupS) - } - if durationS <= 0 { - return fmt.Errorf("invalid --duration %d, must be > 0", durationS) - } - - profile, err := p.LoadVirtualProfile(fs) - if err != nil { - return fmt.Errorf("rpk unable to load config: %w", err) - } - - adm, err := kafka.NewAdmin(fs, profile) - if err != nil { - return fmt.Errorf("unable to initialize admin kafka client: %w", err) - } - defer adm.Close() - - ctx, cancel := context.WithCancel(cmd.Context()) - defer cancel() - - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) - defer signal.Stop(sigCh) - go func() { - select { - case <-sigCh: - cancel() - case <-ctx.Done(): - } - }() - - err = createBenchmarkTopic(ctx, adm, topic, partitions, replicas) - if err != nil { - return err - } - defer func() { - if err := deleteBenchmarkTopic(adm, topic); err != nil { - fmt.Printf("cleanup warning: unable to delete topic %q: %v\n", topic, err) - } - }() - - if waitLeadershipBalanced { - fmt.Printf("waiting for balanced leadership on topic=%s\n", topic) - err = waitForBalancedLeadership(ctx, adm, topic, partitions) - if err != nil { - return err - } - } - - warmup := time.Duration(warmupS) * time.Second - duration := time.Duration(durationS) * time.Second - measureStart := time.Now().Add(warmup) - measureEnd := measureStart.Add(duration) - - runCtx, runCancel := context.WithDeadline(ctx, measureEnd) - defer runCancel() - - payload := make([]byte, recordSize) - for i := range payload { - payload[i] = 'x' - } - stats := &stats{} - - producerOpts := []kgo.Opt{ - kgo.DefaultProduceTopic(topic), - kgo.RequiredAcks(kgo.AllISRAcks()), - kgo.RecordPartitioner(kgo.RoundRobinPartitioner()), - kgo.ProducerLinger(0), - kgo.ProducerBatchCompression(kgo.NoCompression()), - } - - producerClients := make([]*kgo.Client, 0, clients) - for i := 0; i < clients; i++ { - cl, err := kafka.NewFranzClient(fs, profile, producerOpts...) - if err != nil { - for _, started := range producerClients { - started.Close() - } - return fmt.Errorf("unable to initialize producer client %d: %w", i, err) - } - producerClients = append(producerClients, cl) - } - defer func() { - for _, cl := range producerClients { - cl.Close() - } - }() - - fmt.Printf("topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n", topic, clients, partitions, recordSize, replicas) - if warmup > 0 { - fmt.Printf("warming up for %ds...\n", warmupS) - } - - statsTable := out.NewTable() - - var wg sync.WaitGroup - for _, cl := range producerClients { - wg.Add(1) - go func(cl *kgo.Client) { - defer wg.Done() - runProducerLoop(runCtx, cl, topic, payload, measureStart, stats) - }(cl) - } - - select { - case <-runCtx.Done(): - case <-time.After(warmup): - } + } - printStatsHeader(statsTable) - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-runCtx.Done(): - wg.Wait() - if ctx.Err() == nil { - printStats(statsTable, stats, measureEnd, measureStart) - } - if metricsJSON != "" { - metrics := computeMetrics(stats, measureEnd, measureStart) - b, err := json.MarshalIndent(metrics, "", " ") - if err != nil { - return fmt.Errorf("unable to marshal metrics json: %w", err) - } - err = os.WriteFile(metricsJSON, append(b, '\n'), 0o644) - if err != nil { - return fmt.Errorf("unable to write metrics json to %q: %w", metricsJSON, err) - } - } - return nil - case <-ticker.C: - printStats(statsTable, stats, time.Now(), measureStart) - } - } - }, - } - - cmd.Flags().StringVar(&topic, "topic", "rpk-benchmark-topic", "Benchmark topic name") - cmd.Flags().Int32VarP(&partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation") - cmd.Flags().Int16VarP(&replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation") - cmd.Flags().IntVar(&clients, "clients", 16, "Number of producer client connections") - cmd.Flags().IntVar(&recordSize, "record-size", 100, "Record payload size in bytes") - cmd.Flags().IntVar(&warmupS, "warmup", 10, "Warmup duration in seconds") - cmd.Flags().IntVar(&durationS, "duration", 60, "Measurement duration in seconds") - cmd.Flags().StringVar(&metricsJSON, "metrics-json", "", "Optional path to write final metrics JSON") - cmd.Flags().BoolVar(&waitLeadershipBalanced, "wait-leadership-balanced", true, "Wait for topic leadership to become balanced before producing") + cmd.AddCommand(newProduceCommand(fs, p)) return cmd } diff --git a/src/go/rpk/pkg/cli/benchmark/produce.go b/src/go/rpk/pkg/cli/benchmark/produce.go new file mode 100644 index 0000000000000..42c51f1fb9e71 --- /dev/null +++ b/src/go/rpk/pkg/cli/benchmark/produce.go @@ -0,0 +1,156 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package benchmark + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka" + "github.com/spf13/afero" + "github.com/spf13/cobra" + "github.com/twmb/franz-go/pkg/kgo" +) + +type produceConfig struct { + benchmarkConfig + recordSize int +} + +func newProduceCommand(fs afero.Fs, p *config.Params) *cobra.Command { + var cfg produceConfig + + cmd := &cobra.Command{ + Use: "produce", + Short: "Run a Kafka produce benchmark", + Long: "Load testing tool which stresses the broker by sending small batches with high request rate", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return runProduceBenchmark(fs, p, cmd, cfg) + }, + } + + cfg.addFlags(cmd) + cmd.Flags().IntVar(&cfg.recordSize, "record-size", 100, "Record payload size in bytes") + + return cmd +} + +func runProduceBenchmark(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg produceConfig) error { + if err := cfg.validate(); err != nil { + return err + } + if cfg.recordSize <= 0 { + return fmt.Errorf("invalid --record-size %d, must be > 0", cfg.recordSize) + } + + run, err := newBenchmarkRun(fs, p, cmd, cfg.benchmarkConfig) + if err != nil { + return err + } + defer run.Close() + + payload := createPayload(cfg.recordSize) + stats := &stats{} + + producerOpts := []kgo.Opt{ + kgo.DefaultProduceTopic(cfg.topic), + kgo.RequiredAcks(kgo.AllISRAcks()), + kgo.RecordPartitioner(kgo.RoundRobinPartitioner()), + kgo.ProducerLinger(0), + kgo.ProducerBatchCompression(kgo.NoCompression()), + } + + producerClients := make([]*kgo.Client, 0, cfg.clients) + for i := 0; i < cfg.clients; i++ { + cl, err := kafka.NewFranzClient(fs, run.profile, producerOpts...) + if err != nil { + for _, started := range producerClients { + started.Close() + } + return fmt.Errorf("unable to initialize producer client %d: %w", i, err) + } + producerClients = append(producerClients, cl) + } + defer func() { + for _, cl := range producerClients { + cl.Close() + } + }() + + if cfg.useExistingTopic { + fmt.Printf( + "mode=produce topic=%s clients=%d record_size=%d use_existing_topic=true\n", + cfg.topic, + cfg.clients, + cfg.recordSize, + ) + } else { + fmt.Printf( + "mode=produce topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n", + cfg.topic, + cfg.clients, + cfg.partitions, + cfg.recordSize, + cfg.replicas, + ) + } + if run.timing.warmup > 0 { + fmt.Printf("warming up for %ds...\n", cfg.warmupS) + } + + var wg sync.WaitGroup + for _, cl := range producerClients { + wg.Add(1) + go func(cl *kgo.Client) { + defer wg.Done() + runProducerLoop(run.timing.runCtx, cl, cfg.topic, payload, run.timing.measureStart, stats) + }(cl) + } + + return runBenchmarkReporter(run.ctx, run.timing, stats, cfg.metricsJSON, wg.Wait) +} + +func runProducerLoop( + ctx context.Context, + cl *kgo.Client, + topic string, + payload []byte, + measureStart time.Time, + stats *stats, +) { + for { + if ctx.Err() != nil { + return + } + + rec := &kgo.Record{Topic: topic, Value: payload} + + // We use sync produce. Like this we can guarantee single record per batch per request. + // To increase inflight it's easy to just bump clients/connections (this is cheap in franz-go) + err := cl.ProduceSync(ctx, rec).FirstErr() + if time.Now().Before(measureStart) { + continue + } + if err != nil { + if ctx.Err() == nil { + stats.requests.Add(1) + stats.errors.Add(1) + } + continue + } + + stats.requests.Add(1) + stats.bytes.Add(uint64(len(payload))) + } +} diff --git a/tests/rptest/perf/rpk_benchmark_test.py b/tests/rptest/perf/rpk_benchmark_test.py index f2fdf0ffba3e0..0616838ede2ff 100644 --- a/tests/rptest/perf/rpk_benchmark_test.py +++ b/tests/rptest/perf/rpk_benchmark_test.py @@ -28,11 +28,12 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: *args, num_brokers=3, resource_settings=resource_settings, **kwargs ) - def run_workload(self) -> None: + def run_workload(self, mode: str) -> None: svc = RpkBenchmarkService( self.test_context, self.redpanda, topic="rpk-bench-topic", + mode=mode, partitions=self.PARTITIONS, replicas=self.REPLICAS, clients=self.CLIENTS, @@ -52,4 +53,4 @@ def run_workload(self) -> None: @cluster(num_nodes=6) def test_produce(self) -> None: - self.run_workload() + self.run_workload("produce") diff --git a/tests/rptest/services/rpk_benchmark_service.py b/tests/rptest/services/rpk_benchmark_service.py index d04c4cb9712b0..cd410221b3d30 100644 --- a/tests/rptest/services/rpk_benchmark_service.py +++ b/tests/rptest/services/rpk_benchmark_service.py @@ -50,6 +50,7 @@ def __init__( redpanda: RedpandaService, topic: str, *, + mode: str = "produce", partitions: int = 18, replicas: int = 3, clients: int = 1, @@ -59,8 +60,11 @@ def __init__( wait_for_stable_leadership: bool = True, ): super().__init__(context, num_nodes=1) + if mode != "produce": + raise ValueError(f"unsupported rpk benchmark mode: {mode}") self._redpanda = redpanda self._topic = topic + self._mode = mode self._partitions = partitions self._replicas = replicas self._clients = clients @@ -72,7 +76,7 @@ def __init__( def _build_cmd(self) -> str: return ( - f"{self._redpanda.find_binary('rpk')} -X brokers={self._redpanda.brokers()} benchmark " + f"{self._redpanda.find_binary('rpk')} -X brokers={self._redpanda.brokers()} benchmark {self._mode} " f"--topic {self._topic} --partitions {self._partitions} --replicas {self._replicas} " f"--clients {self._clients} --record-size {self._record_size} " f"--warmup {self._warmup_s} --duration {self._duration_s} " diff --git a/tests/rptest/tests/rpk_benchmark_service_test.py b/tests/rptest/tests/rpk_benchmark_service_test.py index d391e2345db0a..eb96f26e17008 100644 --- a/tests/rptest/tests/rpk_benchmark_service_test.py +++ b/tests/rptest/tests/rpk_benchmark_service_test.py @@ -21,13 +21,13 @@ class RpkBenchmarkServiceSelfTest(RedpandaTest): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, num_brokers=1, **kwargs) - @cluster(num_nodes=2) - def test_smoke(self) -> None: + def run_smoke(self, mode: str) -> None: topic = f"rpk-bench-smoke-{uuid4().hex[:8]}" svc = RpkBenchmarkService( self.test_context, self.redpanda, topic=topic, + mode=mode, partitions=6, replicas=1, clients=5, @@ -47,3 +47,7 @@ def test_smoke(self) -> None: assert metrics.mb_per_sec > 0, "Expected MB/s > 0" svc.write_metrics_result(metrics) + + @cluster(num_nodes=2) + def test_produce_smoke(self) -> None: + self.run_smoke("produce")