Skip to content

Commit 822fef8

Browse files
rpk: Add --use-prexisting-topic to benchmark
Add a flag to use a prexisting topic. Can sometimes be useful for various reasons (existing data etc.).
1 parent cc8e645 commit 822fef8

2 files changed

Lines changed: 72 additions & 31 deletions

File tree

src/go/rpk/pkg/cli/benchmark/benchmark.go

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type benchmarkConfig struct {
4747
replicas int16
4848
clients int
4949
reset bool
50+
useExistingTopic bool
5051
warmupS int
5152
durationS int
5253
metricsJSON string
@@ -63,6 +64,7 @@ type benchmarkTiming struct {
6364
}
6465

6566
type benchmarkRun struct {
67+
cfg benchmarkConfig
6668
profile *config.RpkProfile
6769
ctx context.Context
6870
cancel context.CancelFunc
@@ -79,22 +81,29 @@ const (
7981

8082
func (cfg *benchmarkConfig) addFlags(cmd *cobra.Command) {
8183
cmd.Flags().StringVar(&cfg.topic, "topic", "rpk-benchmark-topic", "Benchmark topic name")
82-
cmd.Flags().Int32VarP(&cfg.partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation")
83-
cmd.Flags().Int16VarP(&cfg.replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation")
84+
cmd.Flags().Int32VarP(&cfg.partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation (ignored with --use-existing-topic)")
85+
cmd.Flags().Int16VarP(&cfg.replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation (ignored with --use-existing-topic)")
8486
cmd.Flags().IntVar(&cfg.clients, "clients", 16, "Number of benchmark client connections")
8587
cmd.Flags().BoolVar(&cfg.reset, "reset-topic", false, "Delete the benchmark topic first if it already exists")
88+
cmd.Flags().BoolVar(&cfg.useExistingTopic, "use-existing-topic", false, "Use the benchmark topic as-is without creating or deleting it")
8689
cmd.Flags().IntVar(&cfg.warmupS, "warmup", 10, "Warmup duration in seconds")
8790
cmd.Flags().IntVar(&cfg.durationS, "duration", 60, "Measurement duration in seconds")
8891
cmd.Flags().StringVar(&cfg.metricsJSON, "metrics-json", "", "Optional path to write final metrics JSON")
8992
cmd.Flags().BoolVar(&cfg.waitLeadershipBalanced, "wait-leadership-balanced", true, "Wait for topic leadership to become balanced before starting the benchmark")
93+
cmd.MarkFlagsMutuallyExclusive("reset-topic", "use-existing-topic")
9094
}
9195

9296
func (cfg benchmarkConfig) validate() error {
93-
if cfg.partitions <= 0 {
94-
return fmt.Errorf("invalid --partitions %d, must be > 0", cfg.partitions)
97+
if cfg.reset && cfg.useExistingTopic {
98+
return fmt.Errorf("--use-existing-topic cannot be used with --reset-topic")
9599
}
96-
if cfg.replicas <= 0 {
97-
return fmt.Errorf("invalid --replicas %d, must be > 0", cfg.replicas)
100+
if !cfg.useExistingTopic {
101+
if cfg.partitions <= 0 {
102+
return fmt.Errorf("invalid --partitions %d, must be > 0", cfg.partitions)
103+
}
104+
if cfg.replicas <= 0 {
105+
return fmt.Errorf("invalid --replicas %d, must be > 0", cfg.replicas)
106+
}
98107
}
99108
if cfg.clients <= 0 {
100109
return fmt.Errorf("invalid --clients %d, must be > 0", cfg.clients)
@@ -142,7 +151,7 @@ func printStats(tw *out.TabWriter, s *stats, now, measureStart time.Time) {
142151

143152
func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benchmarkConfig) (*benchmarkRun, error) {
144153
var err error
145-
run := &benchmarkRun{}
154+
run := &benchmarkRun{cfg: cfg}
146155
defer func() {
147156
if err != nil {
148157
run.Close()
@@ -160,15 +169,23 @@ func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benc
160169

161170
run.ctx, run.cancel = setupSignalContext(cmd)
162171

163-
err = setupBenchmarkTopic(run.ctx, run.adm, cfg.topic, cfg.partitions, cfg.replicas, cfg.reset)
172+
err = setupBenchmarkTopic(
173+
run.ctx,
174+
run.adm,
175+
cfg.topic,
176+
cfg.partitions,
177+
cfg.replicas,
178+
cfg.reset,
179+
cfg.useExistingTopic,
180+
)
164181
if err != nil {
165182
return nil, err
166183
}
167184
run.topic = cfg.topic
168185

169186
if cfg.waitLeadershipBalanced {
170187
fmt.Printf("waiting for balanced leadership on topic=%s\n", cfg.topic)
171-
if err = waitForBalancedLeadership(run.ctx, run.adm, cfg.topic, cfg.partitions); err != nil {
188+
if err = waitForBalancedLeadership(run.ctx, run.adm, cfg.topic); err != nil {
172189
return nil, err
173190
}
174191
}
@@ -185,7 +202,7 @@ func (r *benchmarkRun) Close() {
185202
if r.timing.cancel != nil {
186203
r.timing.cancel()
187204
}
188-
if r.topic != "" {
205+
if !r.cfg.useExistingTopic && r.topic != "" {
189206
if err := deleteBenchmarkTopic(context.Background(), r.adm, r.topic); err != nil {
190207
fmt.Printf("cleanup warning: unable to delete topic %q: %v\n", r.topic, err)
191208
}
@@ -304,7 +321,7 @@ func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, p
304321
return fmt.Errorf("missing create topic response for %q", topic)
305322
}
306323
if errors.Is(resp.Err, kerr.TopicAlreadyExists) {
307-
return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic or pass --reset-topic to recreate it (destructive) - error: %w", topic, resp.Err)
324+
return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic, pass --use-existing-topic to reuse it, or pass --reset-topic to recreate it (destructive) - error: %w", topic, resp.Err)
308325
}
309326
if resp.Err != nil {
310327
return resp.Err
@@ -319,7 +336,11 @@ func setupBenchmarkTopic(
319336
partitions int32,
320337
replicas int16,
321338
reset bool,
339+
useExistingTopic bool,
322340
) error {
341+
if useExistingTopic {
342+
return ensureBenchmarkTopicExists(ctx, adm, topic)
343+
}
323344
if reset {
324345
if err := deleteBenchmarkTopic(ctx, adm, topic); err != nil {
325346
return fmt.Errorf("unable to reset benchmark topic %q: %w", topic, err)
@@ -328,6 +349,24 @@ func setupBenchmarkTopic(
328349
return createBenchmarkTopic(ctx, adm, topic, partitions, replicas)
329350
}
330351

352+
func ensureBenchmarkTopicExists(ctx context.Context, adm *kadm.Client, topic string) error {
353+
md, err := adm.Metadata(ctx, topic)
354+
if err != nil {
355+
return err
356+
}
357+
td, ok := md.Topics[topic]
358+
if !ok || errors.Is(td.Err, kerr.UnknownTopicOrPartition) {
359+
return fmt.Errorf(
360+
"benchmark topic %q does not exist; create it first or omit --use-existing-topic",
361+
topic,
362+
)
363+
}
364+
if td.Err != nil {
365+
return td.Err
366+
}
367+
return nil
368+
}
369+
331370
func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) error {
332371
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
333372
defer cancel()
@@ -350,21 +389,15 @@ func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) e
350389
// checks whether leadership is balanced, aka:
351390
// - all brokers have equal amount of leadership.
352391
// - The above but some are leader for one more partition.
353-
func leadershipBalanced(md kadm.Metadata, topic string, expectedPartitions int32) (bool, string) {
392+
func leadershipBalanced(md kadm.Metadata, topic string) (bool, string) {
354393
td, ok := md.Topics[topic]
355394
if !ok {
356395
return false, "topic metadata not found"
357396
}
358397
if td.Err != nil {
359398
return false, td.Err.Error()
360399
}
361-
if int32(len(td.Partitions)) != expectedPartitions {
362-
return false, fmt.Sprintf(
363-
"partition count is %d, expected %d",
364-
len(td.Partitions),
365-
expectedPartitions,
366-
)
367-
}
400+
partitionCount := len(td.Partitions)
368401

369402
replicaNodes := make(map[int32]struct{})
370403
leaderCounts := make(map[int32]int)
@@ -386,8 +419,8 @@ func leadershipBalanced(md kadm.Metadata, topic string, expectedPartitions int32
386419
}
387420

388421
brokerCount := len(replicaNodes)
389-
basePartitionLeadersPerBroker := int(expectedPartitions) / brokerCount
390-
moduloLeaderCount := int(expectedPartitions) % brokerCount
422+
basePartitionLeadersPerBroker := partitionCount / brokerCount
423+
moduloLeaderCount := partitionCount % brokerCount
391424

392425
brokersAtBase := 0
393426
brokersAtBasePlusOne := 0
@@ -433,7 +466,6 @@ func waitForBalancedLeadership(
433466
ctx context.Context,
434467
adm *kadm.Client,
435468
topic string,
436-
expectedPartitions int32,
437469
) error {
438470
ctx, cancel := context.WithTimeout(ctx, 120*time.Second)
439471
defer cancel()
@@ -445,7 +477,7 @@ func waitForBalancedLeadership(
445477
for {
446478
md, err := adm.Metadata(ctx, topic)
447479
if err == nil {
448-
if ok, reason := leadershipBalanced(md, topic, expectedPartitions); ok {
480+
if ok, reason := leadershipBalanced(md, topic); ok {
449481
return nil
450482
} else {
451483
lastReason = reason

src/go/rpk/pkg/cli/benchmark/produce.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,23 @@ func runProduceBenchmark(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg
8888
}
8989
}()
9090

91-
fmt.Printf(
92-
"mode=produce topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n",
93-
cfg.topic,
94-
cfg.clients,
95-
cfg.partitions,
96-
cfg.recordSize,
97-
cfg.replicas,
98-
)
91+
if cfg.useExistingTopic {
92+
fmt.Printf(
93+
"mode=produce topic=%s clients=%d record_size=%d use_existing_topic=true\n",
94+
cfg.topic,
95+
cfg.clients,
96+
cfg.recordSize,
97+
)
98+
} else {
99+
fmt.Printf(
100+
"mode=produce topic=%s clients=%d partitions=%d record_size=%d replication_factor=%d\n",
101+
cfg.topic,
102+
cfg.clients,
103+
cfg.partitions,
104+
cfg.recordSize,
105+
cfg.replicas,
106+
)
107+
}
99108
if run.timing.warmup > 0 {
100109
fmt.Printf("warming up for %ds...\n", cfg.warmupS)
101110
}

0 commit comments

Comments
 (0)