Skip to content

Commit 7c52b6a

Browse files
committed
fix(loadgen): filter spamoor counters by spammer name prefix
Counter-based progress tracking summed all spammer metrics globally, causing overshoot when concurrent workloads (e.g. baseline + burst) ran on the same spamoor instance. Filter by spammer_name label prefix so each workload only tracks its own spammers.
1 parent 5d7f440 commit 7c52b6a

2 files changed

Lines changed: 107 additions & 32 deletions

File tree

apps/loadgen/internal/runner.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"log"
77
"math/rand/v2"
8+
"strings"
89
"time"
910

1011
"github.com/celestiaorg/tastora/framework/docker/evstack/spamoor"
@@ -142,7 +143,8 @@ func runCheck(ctx context.Context, api SpamoorClient, timeout time.Duration) err
142143
return fmt.Errorf("waiting for spamoor sync: %w", err)
143144
}
144145

145-
baselineSent, baselineFailed, err := getSpamoorCounters(api)
146+
checkPrefix := "check-"
147+
baselineSent, baselineFailed, err := getSpamoorCountersForPrefix(api, checkPrefix)
146148
if err != nil {
147149
return fmt.Errorf("get baseline metrics: %w", err)
148150
}
@@ -173,7 +175,7 @@ func runCheck(ctx context.Context, api SpamoorClient, timeout time.Duration) err
173175
checkCtx, cancel := context.WithTimeout(ctx, timeout)
174176
defer cancel()
175177

176-
sent, failed, err := waitForSpamoorDone(checkCtx, api, 1, baselineSent, baselineFailed)
178+
sent, failed, err := waitForSpamoorDone(checkCtx, api, 1, baselineSent, baselineFailed, checkPrefix)
177179
if err != nil {
178180
return fmt.Errorf("tx not confirmed: %w", err)
179181
}
@@ -186,7 +188,7 @@ func runCheck(ctx context.Context, api SpamoorClient, timeout time.Duration) err
186188
return nil
187189
}
188190

189-
type waitForDoneFunc func(context.Context, SpamoorClient, int, float64, float64) (float64, float64, error)
191+
type waitForDoneFunc func(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64, namePrefix string) (float64, float64, error)
190192

191193
func runEntry(ctx context.Context, api SpamoorClient, entry Entry) error {
192194
return runEntryWithWait(ctx, api, entry, waitForSpamoorDone)
@@ -198,7 +200,9 @@ func runEntryWithWait(ctx context.Context, api SpamoorClient, entry Entry, wait
198200
log.Printf("[%s] scenario=%s spammers=%d count_per=%d total=%d",
199201
entry.TestName, entry.Scenario, entry.NumSpammers, entry.CountPerSpammer, totalCount)
200202

201-
baselineSent, baselineFailed, err := getSpamoorCounters(api)
203+
namePrefix := spammerPrefix(entry.TestName)
204+
205+
baselineSent, baselineFailed, err := getSpamoorCountersForPrefix(api, namePrefix)
202206
if err != nil {
203207
return fmt.Errorf("get baseline metrics: %w", err)
204208
}
@@ -207,7 +211,7 @@ func runEntryWithWait(ctx context.Context, api SpamoorClient, entry Entry, wait
207211

208212
var spammerIDs []int
209213
for i := range entry.NumSpammers {
210-
name := fmt.Sprintf("bench-%s-%d", entry.TestName, i)
214+
name := fmt.Sprintf("%s%d", namePrefix, i)
211215
id, err := api.CreateSpammer(name, entry.Scenario, scenarioCfg, true)
212216
if err != nil {
213217
return fmt.Errorf("create spammer %s: %w", name, err)
@@ -234,7 +238,7 @@ func runEntryWithWait(ctx context.Context, api SpamoorClient, entry Entry, wait
234238
}
235239

236240
start := time.Now()
237-
sent, failed, err := wait(ctx, api, totalCount, baselineSent, baselineFailed)
241+
sent, failed, err := wait(ctx, api, totalCount, baselineSent, baselineFailed, namePrefix)
238242
elapsed := time.Since(start)
239243

240244
if err != nil {
@@ -263,11 +267,11 @@ func DeleteAllSpammers(api SpamoorClient) error {
263267
return nil
264268
}
265269

266-
func waitForSpamoorDone(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64) (sent, failed float64, err error) {
267-
return waitForSpamoorDoneWithInterval(ctx, api, targetCount, baselineSent, baselineFailed, 2*time.Second)
270+
func waitForSpamoorDone(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64, namePrefix string) (sent, failed float64, err error) {
271+
return waitForSpamoorDoneWithInterval(ctx, api, targetCount, baselineSent, baselineFailed, namePrefix, 2*time.Second)
268272
}
269273

270-
func waitForSpamoorDoneWithInterval(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64, pollInterval time.Duration) (sent, failed float64, err error) {
274+
func waitForSpamoorDoneWithInterval(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64, namePrefix string, pollInterval time.Duration) (sent, failed float64, err error) {
271275
ticker := time.NewTicker(pollInterval)
272276
defer ticker.Stop()
273277

@@ -279,7 +283,7 @@ func waitForSpamoorDoneWithInterval(ctx context.Context, api SpamoorClient, targ
279283
case <-ctx.Done():
280284
return sent, failed, fmt.Errorf("timed out waiting for %d txs (sent %.0f): %w", targetCount, sent, ctx.Err())
281285
case <-ticker.C:
282-
currentSent, currentFailed, mErr := getSpamoorCounters(api)
286+
currentSent, currentFailed, mErr := getSpamoorCountersForPrefix(api, namePrefix)
283287
if mErr != nil {
284288
log.Printf("warning: failed to get metrics: %v", mErr)
285289
continue
@@ -308,13 +312,13 @@ func waitForSpamoorDoneWithInterval(ctx context.Context, api SpamoorClient, targ
308312
}
309313
}
310314

311-
func getSpamoorCounters(api SpamoorClient) (sent, failed float64, err error) {
315+
func getSpamoorCountersForPrefix(api SpamoorClient, namePrefix string) (sent, failed float64, err error) {
312316
metrics, err := api.GetMetrics()
313317
if err != nil {
314318
return 0, 0, err
315319
}
316-
return sumCounter(metrics["spamoor_transactions_sent_total"]),
317-
sumCounter(metrics["spamoor_transactions_failed_total"]),
320+
return sumCounterWithPrefix(metrics["spamoor_transactions_sent_total"], namePrefix),
321+
sumCounterWithPrefix(metrics["spamoor_transactions_failed_total"], namePrefix),
318322
nil
319323
}
320324

@@ -371,15 +375,32 @@ func getSpamoorHeight(api SpamoorClient) (uint64, error) {
371375
return clients[0].Height, nil
372376
}
373377

374-
func sumCounter(f *dto.MetricFamily) float64 {
378+
func sumCounterWithPrefix(f *dto.MetricFamily, namePrefix string) float64 {
375379
if f == nil || f.GetType() != dto.MetricType_COUNTER {
376380
return 0
377381
}
378382
var sum float64
379383
for _, m := range f.GetMetric() {
380-
if m.GetCounter() != nil && m.GetCounter().Value != nil {
381-
sum += m.GetCounter().GetValue()
384+
if m.GetCounter() == nil || m.GetCounter().Value == nil {
385+
continue
386+
}
387+
if namePrefix != "" && !hasLabelPrefix(m, "spammer_name", namePrefix) {
388+
continue
382389
}
390+
sum += m.GetCounter().GetValue()
383391
}
384392
return sum
385393
}
394+
395+
func spammerPrefix(testName string) string {
396+
return fmt.Sprintf("bench-%s-", testName)
397+
}
398+
399+
func hasLabelPrefix(m *dto.Metric, name, prefix string) bool {
400+
for _, lp := range m.GetLabel() {
401+
if lp.GetName() == name {
402+
return strings.HasPrefix(lp.GetValue(), prefix)
403+
}
404+
}
405+
return false
406+
}

apps/loadgen/internal/runner_test.go

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func TestRunEntryUsesBaselineCounters(t *testing.T) {
2323
metricsSeq: []metricSnapshot{
2424
{sent: 100, failed: 7},
2525
},
26+
spammerNames: []string{"bench-baseline-0", "bench-baseline-1"},
2627
}
2728

2829
var gotTarget int
@@ -35,7 +36,7 @@ func TestRunEntryUsesBaselineCounters(t *testing.T) {
3536
Env: map[string]string{"BENCH_COUNT_PER_SPAMMER": "5"},
3637
NumSpammers: 2,
3738
CountPerSpammer: 5,
38-
}, func(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64) (float64, float64, error) {
39+
}, func(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64, namePrefix string) (float64, float64, error) {
3940
gotTarget = targetCount
4041
gotBaselineSent = baselineSent
4142
gotBaselineFailed = baselineFailed
@@ -58,6 +59,7 @@ func TestRunEntryFailsWhenSpammerDoesNotStart(t *testing.T) {
5859
metricsSeq: []metricSnapshot{
5960
{sent: 0, failed: 0},
6061
},
62+
spammerNames: []string{"bench-fail-0"},
6163
}
6264

6365
err := runEntryWithWait(context.Background(), client, Entry{
@@ -66,7 +68,7 @@ func TestRunEntryFailsWhenSpammerDoesNotStart(t *testing.T) {
6668
Env: map[string]string{"BENCH_COUNT_PER_SPAMMER": "1"},
6769
NumSpammers: 1,
6870
CountPerSpammer: 1,
69-
}, func(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64) (float64, float64, error) {
71+
}, func(ctx context.Context, api SpamoorClient, targetCount int, baselineSent, baselineFailed float64, namePrefix string) (float64, float64, error) {
7072
t.Fatal("wait function should not be called when spammer startup fails")
7173
return 0, 0, nil
7274
})
@@ -80,9 +82,10 @@ func TestWaitForSpamoorDoneUsesDeltas(t *testing.T) {
8082
{sent: 105, failed: 3},
8183
{sent: 108, failed: 4},
8284
},
85+
spammerNames: []string{"bench-test-0"},
8386
}
8487

85-
sent, failed, err := waitForSpamoorDoneWithInterval(context.Background(), client, 8, 100, 2, time.Millisecond)
88+
sent, failed, err := waitForSpamoorDoneWithInterval(context.Background(), client, 8, 100, 2, "", time.Millisecond)
8689
require.NoError(t, err)
8790
require.Equal(t, 8.0, sent)
8891
require.Equal(t, 2.0, failed)
@@ -93,12 +96,13 @@ func TestWaitForSpamoorDoneHonorsContext(t *testing.T) {
9396
metricsSeq: []metricSnapshot{
9497
{sent: 100, failed: 0},
9598
},
99+
spammerNames: []string{"bench-test-0"},
96100
}
97101

98102
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
99103
defer cancel()
100104

101-
_, _, err := waitForSpamoorDoneWithInterval(ctx, client, 2, 100, 0, 10*time.Millisecond)
105+
_, _, err := waitForSpamoorDoneWithInterval(ctx, client, 2, 100, 0, "", 10*time.Millisecond)
102106
require.ErrorContains(t, err, "timed out waiting for 2 txs")
103107
}
104108

@@ -126,6 +130,41 @@ func TestWaitForSyncHonorsContext(t *testing.T) {
126130
require.ErrorContains(t, err, "cancelled waiting for sync")
127131
}
128132

133+
func TestSumCounterWithPrefixFiltersCorrectly(t *testing.T) {
134+
family := labeledCounterFamily("spamoor_transactions_sent_total", map[string]float64{
135+
"bench-EOATransferBurst-0": 1000,
136+
"bench-EOATransferBurst-1": 1000,
137+
"bench-EOATransfer-0": 500,
138+
"bench-EOATransfer-1": 500,
139+
})
140+
141+
t.Run("no prefix sums all", func(t *testing.T) {
142+
total := sumCounterWithPrefix(family, "")
143+
require.Equal(t, 3000.0, total)
144+
})
145+
146+
t.Run("burst prefix sums only burst spammers", func(t *testing.T) {
147+
total := sumCounterWithPrefix(family, "bench-EOATransferBurst-")
148+
require.Equal(t, 2000.0, total)
149+
})
150+
151+
t.Run("baseline prefix does not match burst spammers", func(t *testing.T) {
152+
// "bench-EOATransfer-" does not match "bench-EOATransferBurst-"
153+
// because the dash after "Transfer" differs from "B" in "Burst".
154+
total := sumCounterWithPrefix(family, "bench-EOATransfer-")
155+
require.Equal(t, 1000.0, total)
156+
})
157+
158+
t.Run("nil family returns zero", func(t *testing.T) {
159+
require.Equal(t, 0.0, sumCounterWithPrefix(nil, "anything"))
160+
})
161+
162+
t.Run("unmatched prefix returns zero", func(t *testing.T) {
163+
total := sumCounterWithPrefix(family, "bench-Nonexistent-")
164+
require.Equal(t, 0.0, total)
165+
})
166+
}
167+
129168
type metricSnapshot struct {
130169
sent float64
131170
failed float64
@@ -141,6 +180,7 @@ type fakeSpamoorClient struct {
141180
clientsSeq [][]spamoor.Client
142181
clientsIndex int
143182
getClientsErr error
183+
spammerNames []string
144184
}
145185

146186
type createCall struct {
@@ -189,9 +229,17 @@ func (f *fakeSpamoorClient) GetMetrics() (map[string]*dto.MetricFamily, error) {
189229
}
190230
}
191231

232+
perSpammer := snapshot.sent / float64(len(f.spammerNames))
233+
perFailed := snapshot.failed / float64(len(f.spammerNames))
234+
sentMap := make(map[string]float64, len(f.spammerNames))
235+
failedMap := make(map[string]float64, len(f.spammerNames))
236+
for _, name := range f.spammerNames {
237+
sentMap[name] = perSpammer
238+
failedMap[name] = perFailed
239+
}
192240
return map[string]*dto.MetricFamily{
193-
"spamoor_transactions_sent_total": counterFamily("spamoor_transactions_sent_total", snapshot.sent),
194-
"spamoor_transactions_failed_total": counterFamily("spamoor_transactions_failed_total", snapshot.failed),
241+
"spamoor_transactions_sent_total": labeledCounterFamily("spamoor_transactions_sent_total", sentMap),
242+
"spamoor_transactions_failed_total": labeledCounterFamily("spamoor_transactions_failed_total", failedMap),
195243
}, nil
196244
}
197245

@@ -210,17 +258,23 @@ func (f *fakeSpamoorClient) GetClients() ([]spamoor.Client, error) {
210258
return clients, nil
211259
}
212260

213-
func counterFamily(name string, value float64) *dto.MetricFamily {
261+
func labeledCounterFamily(name string, spammerValues map[string]float64) *dto.MetricFamily {
214262
counterType := dto.MetricType_COUNTER
215-
return &dto.MetricFamily{
216-
Name: &name,
217-
Type: &counterType,
218-
Metric: []*dto.Metric{
219-
{
220-
Counter: &dto.Counter{
221-
Value: &value,
222-
},
263+
labelName := "spammer_name"
264+
var metrics []*dto.Metric
265+
for spammerName, value := range spammerValues {
266+
n := spammerName
267+
v := value
268+
metrics = append(metrics, &dto.Metric{
269+
Label: []*dto.LabelPair{
270+
{Name: &labelName, Value: &n},
223271
},
224-
},
272+
Counter: &dto.Counter{Value: &v},
273+
})
274+
}
275+
return &dto.MetricFamily{
276+
Name: &name,
277+
Type: &counterType,
278+
Metric: metrics,
225279
}
226280
}

0 commit comments

Comments
 (0)