|
8 | 8 | "os" |
9 | 9 | "os/signal" |
10 | 10 | "strconv" |
| 11 | + "sync" |
11 | 12 | "syscall" |
12 | 13 | "time" |
13 | 14 |
|
@@ -74,49 +75,57 @@ func runScheduler(parent context.Context, cfg startConfig) error { |
74 | 75 | return err |
75 | 76 | } |
76 | 77 |
|
| 78 | + var wg sync.WaitGroup |
77 | 79 | runWorkload := func(label, matrixPath string, txCount int) { |
| 80 | + defer wg.Done() |
78 | 81 | log.Printf("==> %s workload starting (%d tx)", label, txCount) |
79 | 82 | if err := internal.ExecuteMatrixWithOverridesFromFile(ctx, matrixPath, api, txCount); err != nil { |
80 | 83 | log.Printf("%s workload error: %v", label, err) |
81 | 84 | } |
82 | 85 | } |
83 | 86 |
|
84 | 87 | // fire regular immediately |
| 88 | + wg.Add(1) |
85 | 89 | go runWorkload("regular", cfg.regularMatrix, regularTxPerRun) |
86 | 90 |
|
87 | 91 | ticker := time.NewTicker(cfg.interval) |
88 | 92 | defer ticker.Stop() |
89 | 93 |
|
90 | 94 | // burst: single timer, reschedule after each fire, reset count every 24h. |
91 | 95 | burstsRemaining := cfg.burstPerDay |
92 | | - burstTimer := nextBurstTimer(burstsRemaining, burstWindow) |
| 96 | + nextReset := time.Now().Add(burstWindow) |
| 97 | + burstTimer := nextBurstTimer(burstsRemaining, time.Until(nextReset)) |
93 | 98 | resetTimer := time.NewTimer(burstWindow) |
94 | 99 | defer resetTimer.Stop() |
95 | 100 |
|
96 | 101 | for { |
97 | 102 | select { |
98 | 103 | case <-ctx.Done(): |
99 | | - log.Printf("shutting down...") |
| 104 | + log.Printf("shutting down, waiting for in-flight workloads...") |
100 | 105 | burstTimer.Stop() |
| 106 | + wg.Wait() |
101 | 107 | log.Printf("cleaning up spammers") |
102 | 108 | if err := internal.DeleteAllSpammers(api); err != nil { |
103 | 109 | log.Printf("warning: shutdown cleanup failed: %v", err) |
104 | 110 | } |
105 | 111 | return nil |
106 | 112 |
|
107 | 113 | case <-ticker.C: |
| 114 | + wg.Add(1) |
108 | 115 | go runWorkload("regular", cfg.regularMatrix, regularTxPerRun) |
109 | 116 |
|
110 | 117 | case <-burstTimer.C: |
| 118 | + wg.Add(1) |
111 | 119 | go runWorkload("burst", cfg.burstMatrix, cfg.burstTxCount) |
112 | 120 | burstsRemaining-- |
113 | | - burstTimer = nextBurstTimer(burstsRemaining, burstWindow) |
| 121 | + burstTimer = nextBurstTimer(burstsRemaining, time.Until(nextReset)) |
114 | 122 |
|
115 | 123 | case <-resetTimer.C: |
116 | 124 | log.Printf("24h elapsed - resetting burst count") |
117 | 125 | burstTimer.Stop() |
118 | 126 | burstsRemaining = cfg.burstPerDay |
119 | | - burstTimer = nextBurstTimer(burstsRemaining, burstWindow) |
| 127 | + nextReset = time.Now().Add(burstWindow) |
| 128 | + burstTimer = nextBurstTimer(burstsRemaining, time.Until(nextReset)) |
120 | 129 | resetTimer.Reset(burstWindow) |
121 | 130 | } |
122 | 131 | } |
|
0 commit comments