Skip to content

Commit cc8e645

Browse files
rpk: Add --reset-topic to benchmark
When cancelling the command we delete the test topic we created. Sometimes the topic might still leak (SIGKILL or rpk is killed during topic creation before RP has responded). Provide a `--reset-topic` flag which deletes and recreates the topic similar to what OMB provides.
1 parent d4cb270 commit cc8e645

1 file changed

Lines changed: 23 additions & 3 deletions

File tree

src/go/rpk/pkg/cli/benchmark/benchmark.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type benchmarkConfig struct {
4646
partitions int32
4747
replicas int16
4848
clients int
49+
reset bool
4950
warmupS int
5051
durationS int
5152
metricsJSON string
@@ -81,6 +82,7 @@ func (cfg *benchmarkConfig) addFlags(cmd *cobra.Command) {
8182
cmd.Flags().Int32VarP(&cfg.partitions, "partitions", "p", 18, "Number of partitions for benchmark topic creation")
8283
cmd.Flags().Int16VarP(&cfg.replicas, "replicas", "r", 3, "Replication factor for benchmark topic creation")
8384
cmd.Flags().IntVar(&cfg.clients, "clients", 16, "Number of benchmark client connections")
85+
cmd.Flags().BoolVar(&cfg.reset, "reset-topic", false, "Delete the benchmark topic first if it already exists")
8486
cmd.Flags().IntVar(&cfg.warmupS, "warmup", 10, "Warmup duration in seconds")
8587
cmd.Flags().IntVar(&cfg.durationS, "duration", 60, "Measurement duration in seconds")
8688
cmd.Flags().StringVar(&cfg.metricsJSON, "metrics-json", "", "Optional path to write final metrics JSON")
@@ -158,7 +160,7 @@ func newBenchmarkRun(fs afero.Fs, p *config.Params, cmd *cobra.Command, cfg benc
158160

159161
run.ctx, run.cancel = setupSignalContext(cmd)
160162

161-
err = createBenchmarkTopic(run.ctx, run.adm, cfg.topic, cfg.partitions, cfg.replicas)
163+
err = setupBenchmarkTopic(run.ctx, run.adm, cfg.topic, cfg.partitions, cfg.replicas, cfg.reset)
162164
if err != nil {
163165
return nil, err
164166
}
@@ -302,14 +304,30 @@ func createBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string, p
302304
return fmt.Errorf("missing create topic response for %q", topic)
303305
}
304306
if errors.Is(resp.Err, kerr.TopicAlreadyExists) {
305-
return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic", topic)
307+
return fmt.Errorf("benchmark topic %q already exists; choose a unique --topic or pass --reset-topic to recreate it (destructive) - error: %w", topic, resp.Err)
306308
}
307309
if resp.Err != nil {
308310
return resp.Err
309311
}
310312
return nil
311313
}
312314

315+
func setupBenchmarkTopic(
316+
ctx context.Context,
317+
adm *kadm.Client,
318+
topic string,
319+
partitions int32,
320+
replicas int16,
321+
reset bool,
322+
) error {
323+
if reset {
324+
if err := deleteBenchmarkTopic(ctx, adm, topic); err != nil {
325+
return fmt.Errorf("unable to reset benchmark topic %q: %w", topic, err)
326+
}
327+
}
328+
return createBenchmarkTopic(ctx, adm, topic, partitions, replicas)
329+
}
330+
313331
func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) error {
314332
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
315333
defer cancel()
@@ -321,7 +339,9 @@ func deleteBenchmarkTopic(ctx context.Context, adm *kadm.Client, topic string) e
321339
if !ok {
322340
return fmt.Errorf("missing delete topic response for %q", topic)
323341
}
324-
if resp.Err != nil {
342+
if errors.Is(resp.Err, kerr.UnknownTopicOrPartition) {
343+
return nil
344+
} else if resp.Err != nil {
325345
return resp.Err
326346
}
327347
return nil

0 commit comments

Comments
 (0)