Skip to content

Commit d64f6e8

Browse files
committed
chore: apply spammoor fix
1 parent fa30616 commit d64f6e8

2 files changed

Lines changed: 82 additions & 14 deletions

File tree

apps/loadgen/internal/runner/runner.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import (
1515
)
1616

1717
const (
18-
metricSentTotal = "spamoor_transactions_sent_total"
19-
metricFailedTotal = "spamoor_transactions_failed_total"
18+
metricSentTotal = "spamoor_transactions_sent_total"
19+
metricFailedTotal = "spamoor_transaction_failures_total"
20+
metricSpammerRunning = "spamoor_spammer_running"
2021
)
2122

2223
type matrixOpts struct {
@@ -201,41 +202,69 @@ func waitForSpamoorDoneWithInterval(ctx context.Context, api spamoor.Client, tar
201202

202203
start := time.Now()
203204
var prevSent float64
205+
var sawRunning bool
204206

205207
for {
206208
select {
207209
case <-ctx.Done():
208210
return sent, failed, fmt.Errorf("timed out waiting for %d txs (sent %.0f): %w", targetCount, sent, ctx.Err())
209211
case <-ticker.C:
210-
currentSent, currentFailed, mErr := getSpamoorCountersForPrefix(api, namePrefix)
212+
metrics, mErr := api.GetMetrics()
211213
if mErr != nil {
212214
log.Printf("warning: failed to get metrics: %v", mErr)
213215
continue
214216
}
215217

216-
sent = currentSent - baselineSent
217-
failed = currentFailed - baselineFailed
218+
sent = sumCounterWithPrefix(metrics[metricSentTotal], namePrefix) - baselineSent
219+
failed = sumCounterWithPrefix(metrics[metricFailedTotal], namePrefix) - baselineFailed
218220
if sent < 0 {
219221
sent = 0
220222
}
221223
if failed < 0 {
222224
failed = 0
223225
}
224226

227+
running := countRunningWithPrefix(metrics[metricSpammerRunning], namePrefix)
228+
if running > 0 {
229+
sawRunning = true
230+
}
231+
225232
delta := sent - prevSent
226233
rate := delta / pollInterval.Seconds()
227234
elapsed := time.Since(start)
228-
log.Printf(" progress: %.0f/%d sent (%.0f tx/s instant, %.1f tx/s avg, %.0f failed) [%s]",
229-
sent, targetCount, rate, sent/elapsed.Seconds(), failed, elapsed.Round(time.Second))
235+
log.Printf(" progress: %.0f/%d sent (%.0f tx/s instant, %.1f tx/s avg, %.0f failed, %d running) [%s]",
236+
sent, targetCount, rate, sent/elapsed.Seconds(), failed, running, elapsed.Round(time.Second))
230237
prevSent = sent
231238

232-
if sent >= float64(targetCount) {
239+
// spammers report running=0 once the scenario fully completes,
240+
// which includes flushing pending txs. the sent>=target fallback
241+
// covers scenarios that finish before the gauge is first observed.
242+
if running == 0 && (sawRunning || sent >= float64(targetCount)) {
233243
return sent, failed, nil
234244
}
235245
}
236246
}
237247
}
238248

249+
func countRunningWithPrefix(f *dto.MetricFamily, namePrefix string) int {
250+
if f == nil || f.GetType() != dto.MetricType_GAUGE {
251+
return 0
252+
}
253+
var n int
254+
for _, m := range f.GetMetric() {
255+
if m.GetGauge() == nil {
256+
continue
257+
}
258+
if namePrefix != "" && !hasLabelPrefix(m, "spammer_name", namePrefix) {
259+
continue
260+
}
261+
if m.GetGauge().GetValue() > 0 {
262+
n++
263+
}
264+
}
265+
return n
266+
}
267+
239268
func getSpamoorCountersForPrefix(api spamoor.Client, namePrefix string) (sent, failed float64, err error) {
240269
metrics, err := api.GetMetrics()
241270
if err != nil {

apps/loadgen/internal/runner/runner_test.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ func TestRunEntryFailsWhenSpammerDoesNotStart(t *testing.T) {
8282
func TestWaitForSpamoorDoneUsesDeltas(t *testing.T) {
8383
client := &fakeClient{
8484
metricsSeq: []metricSnapshot{
85-
{sent: 105, failed: 3},
86-
{sent: 108, failed: 4},
85+
{sent: 105, failed: 3, running: 1},
86+
{sent: 108, failed: 4, running: 0},
8787
},
8888
spammerNames: []string{"bench-test-0"},
8989
}
@@ -94,6 +94,22 @@ func TestWaitForSpamoorDoneUsesDeltas(t *testing.T) {
9494
require.Equal(t, 2.0, failed)
9595
}
9696

97+
func TestWaitForSpamoorDoneWaitsForSpammersToStop(t *testing.T) {
98+
client := &fakeClient{
99+
metricsSeq: []metricSnapshot{
100+
{sent: 110, failed: 0, running: 1},
101+
{sent: 110, failed: 0, running: 1},
102+
{sent: 110, failed: 0, running: 0},
103+
},
104+
spammerNames: []string{"bench-test-0"},
105+
}
106+
107+
sent, _, err := waitForSpamoorDoneWithInterval(context.Background(), client, 10, 100, 0, "", time.Millisecond)
108+
require.NoError(t, err)
109+
require.Equal(t, 10.0, sent)
110+
require.Equal(t, 3, client.metricsIndex)
111+
}
112+
97113
func TestWaitForSpamoorDoneHonorsContext(t *testing.T) {
98114
client := &fakeClient{
99115
metricsSeq: []metricSnapshot{
@@ -167,8 +183,9 @@ func TestSumCounterWithPrefixFiltersCorrectly(t *testing.T) {
167183
}
168184

169185
type metricSnapshot struct {
170-
sent float64
171-
failed float64
186+
sent float64
187+
failed float64
188+
running float64
172189
}
173190

174191
type fakeClient struct {
@@ -234,16 +251,38 @@ func (f *fakeClient) GetMetrics() (map[string]*dto.MetricFamily, error) {
234251
perFailed := snapshot.failed / float64(len(f.spammerNames))
235252
sentMap := make(map[string]float64, len(f.spammerNames))
236253
failedMap := make(map[string]float64, len(f.spammerNames))
254+
runningMap := make(map[string]float64, len(f.spammerNames))
237255
for _, name := range f.spammerNames {
238256
sentMap[name] = perSpammer
239257
failedMap[name] = perFailed
258+
runningMap[name] = snapshot.running
240259
}
241260
return map[string]*dto.MetricFamily{
242-
"spamoor_transactions_sent_total": labeledCounterFamily("spamoor_transactions_sent_total", sentMap),
243-
"spamoor_transactions_failed_total": labeledCounterFamily("spamoor_transactions_failed_total", failedMap),
261+
"spamoor_transactions_sent_total": labeledCounterFamily("spamoor_transactions_sent_total", sentMap),
262+
"spamoor_transaction_failures_total": labeledCounterFamily("spamoor_transaction_failures_total", failedMap),
263+
"spamoor_spammer_running": labeledGaugeFamily("spamoor_spammer_running", runningMap),
244264
}, nil
245265
}
246266

267+
func labeledGaugeFamily(name string, spammerValues map[string]float64) *dto.MetricFamily {
268+
gaugeType := dto.MetricType_GAUGE
269+
labelName := "spammer_name"
270+
var metrics []*dto.Metric
271+
for spammerName, value := range spammerValues {
272+
metrics = append(metrics, &dto.Metric{
273+
Label: []*dto.LabelPair{
274+
{Name: &labelName, Value: &spammerName},
275+
},
276+
Gauge: &dto.Gauge{Value: &value},
277+
})
278+
}
279+
return &dto.MetricFamily{
280+
Name: &name,
281+
Type: &gaugeType,
282+
Metric: metrics,
283+
}
284+
}
285+
247286
func (f *fakeClient) GetClients() ([]spamoorapi.Client, error) {
248287
if f.getClientsErr != nil {
249288
return nil, f.getClientsErr

0 commit comments

Comments
 (0)