Skip to content

Commit 1a4d33e

Browse files
chattonclaude
andcommitted
fix(loadgen): fix burst scheduling window and shutdown races
- use remaining time until next reset for burst timer instead of full 24h window - track workload goroutines with WaitGroup, wait before cleanup on shutdown Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7b75212 commit 1a4d33e

1 file changed

Lines changed: 13 additions & 4 deletions

File tree

apps/loadgen/cmd/start.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"os/signal"
1010
"strconv"
11+
"sync"
1112
"syscall"
1213
"time"
1314

@@ -74,49 +75,57 @@ func runScheduler(parent context.Context, cfg startConfig) error {
7475
return err
7576
}
7677

78+
var wg sync.WaitGroup
7779
runWorkload := func(label, matrixPath string, txCount int) {
80+
defer wg.Done()
7881
log.Printf("==> %s workload starting (%d tx)", label, txCount)
7982
if err := internal.ExecuteMatrixWithOverridesFromFile(ctx, matrixPath, api, txCount); err != nil {
8083
log.Printf("%s workload error: %v", label, err)
8184
}
8285
}
8386

8487
// fire regular immediately
88+
wg.Add(1)
8589
go runWorkload("regular", cfg.regularMatrix, regularTxPerRun)
8690

8791
ticker := time.NewTicker(cfg.interval)
8892
defer ticker.Stop()
8993

9094
// burst: single timer, reschedule after each fire, reset count every 24h.
9195
burstsRemaining := cfg.burstPerDay
92-
burstTimer := nextBurstTimer(burstsRemaining, burstWindow)
96+
nextReset := time.Now().Add(burstWindow)
97+
burstTimer := nextBurstTimer(burstsRemaining, time.Until(nextReset))
9398
resetTimer := time.NewTimer(burstWindow)
9499
defer resetTimer.Stop()
95100

96101
for {
97102
select {
98103
case <-ctx.Done():
99-
log.Printf("shutting down...")
104+
log.Printf("shutting down, waiting for in-flight workloads...")
100105
burstTimer.Stop()
106+
wg.Wait()
101107
log.Printf("cleaning up spammers")
102108
if err := internal.DeleteAllSpammers(api); err != nil {
103109
log.Printf("warning: shutdown cleanup failed: %v", err)
104110
}
105111
return nil
106112

107113
case <-ticker.C:
114+
wg.Add(1)
108115
go runWorkload("regular", cfg.regularMatrix, regularTxPerRun)
109116

110117
case <-burstTimer.C:
118+
wg.Add(1)
111119
go runWorkload("burst", cfg.burstMatrix, cfg.burstTxCount)
112120
burstsRemaining--
113-
burstTimer = nextBurstTimer(burstsRemaining, burstWindow)
121+
burstTimer = nextBurstTimer(burstsRemaining, time.Until(nextReset))
114122

115123
case <-resetTimer.C:
116124
log.Printf("24h elapsed - resetting burst count")
117125
burstTimer.Stop()
118126
burstsRemaining = cfg.burstPerDay
119-
burstTimer = nextBurstTimer(burstsRemaining, burstWindow)
127+
nextReset = time.Now().Add(burstWindow)
128+
burstTimer = nextBurstTimer(burstsRemaining, time.Until(nextReset))
120129
resetTimer.Reset(burstWindow)
121130
}
122131
}

0 commit comments

Comments
 (0)