Skip to content

Commit ddae7b7

Browse files
tlrmchlsmthclaude
andauthored
Consolidate --workers flag with auto mode, resilient barrier DNS, and docs (#53)
* Consolidate --num-workers and --sync into --workers with automatic load division Replace two overlapping flags (--sync JSON and --num-workers) with a single --workers N flag. When N > 1, barrier sync is enabled automatically and concurrency/rate are divided across workers so config files always express total desired load. Breaking changes: - --sync flag removed (barrier sync now implicit when --workers > 1) - --num-workers renamed to --workers - concurrency/rate in configs now mean total, not per-worker Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Tyler Michael Smith <tlrmchlsmth@gmail.com> * Retry DNS NXDOMAIN in barrier instead of failing fast In Kubernetes, headless service DNS records take a few seconds to propagate after pod creation. NXDOMAIN is expected early on and should be retried with backoff (like connection errors), not treated as a permanent failure after 3 attempts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Tyler Michael Smith <tlrmchlsmth@gmail.com> * Document container image registry and PR image tags in README Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Tyler Michael Smith <tlrmchlsmth@gmail.com> * Document --workers flag with auto load division and barrier sync Update README and CLAUDE.md to replace old --sync '{"workers":N}' references with the new --workers flag, and document automatic load division behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Tyler Michael Smith <tlrmchlsmth@gmail.com> * Add --workers auto to compute worker count from max concurrency --workers auto sets workers = ceil(max_concurrency / 1024) so each worker handles at most 1024 concurrent streams. Config is parsed before the kube branch so auto-resolution works for both local and Kubernetes deploys. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Tyler Michael Smith <tlrmchlsmth@gmail.com> * Address review: clarify auto workers sizing and fix builtin shadow - Rename `max` variable in MaxConcurrency to avoid shadowing Go builtin - Add rationale for the 1024 streams-per-worker threshold - Document that --workers auto sizes for peak stage, which may leave workers idle during low-concurrency staircase stages Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Tyler Michael Smith <tlrmchlsmth@gmail.com> --------- Signed-off-by: Tyler Michael Smith <tlrmchlsmth@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 07b40d2 commit ddae7b7

7 files changed

Lines changed: 152 additions & 40 deletions

File tree

CLAUDE.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ go test ./... -v
3636
- **Client-side recording** — JSONL per worker. One line per completed request with timestamps, TTFT, per-token ITL array, token counts, status. Merging across workers = cat + sort.
3737
- **Timestamps** — JSON file per worker with start_time, rampup_end_time, end_time. Used to query Prometheus for server-side metrics.
3838
- **Mock server** — configurable TTFT, ITL, and output token count. Serves streaming SSE responses on `/v1/chat/completions`.
39-
- **Barrier sync** — multi-pod synchronization via HTTP barrier. Pod-0 (leader) runs a barrier server; all pods negotiate a common start time before measured stages. Configured via `--sync '{"workers":N}'` CLI flag and `barrier()` in Starlark DSL.
39+
- **Barrier sync** — multi-pod synchronization via HTTP barrier. Pod-0 (leader) runs a barrier server; all pods negotiate a common start time before measured stages. Enabled automatically when `--workers N` (N > 1); concurrency/rate are divided across workers so configs express total load. Use `barrier()` in Starlark DSL for explicit sync points.
4040

4141
## Deployment
4242

@@ -48,13 +48,13 @@ just deploy my-bench http://vllm:8000/v1 config.json N_WORKERS=4
4848

4949
### Multi-pod synchronization
5050

51-
Use `--sync` to synchronize benchmark start across pods:
51+
Use `--workers N` to enable barrier sync and automatic load division across pods:
5252

5353
```bash
54-
nyann-bench generate --config scenario.star --sync '{"workers":4,"timeout":"10m"}'
54+
nyann-bench generate --config scenario.star --workers 4 --worker-id 0
5555
```
5656

57-
An implicit `barrier()` is inserted before the first measured stage. In Starlark, use explicit `barrier()` for additional sync points:
57+
Concurrency and rate values in configs express **total** desired load — each worker gets its share automatically. An implicit `barrier()` is inserted before the first measured stage. In Starlark, use explicit `barrier()` for additional sync points:
5858

5959
```python
6060
scenario(

README.md

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,22 +72,33 @@ scenario(
7272

7373
Each goroutine stream can run multi-turn conversations, carrying real model responses forward into subsequent turns. This exercises server-side KV cache reuse (prefix caching) and produces realistic conversation-shaped traffic.
7474

75-
### Synchronized multi-pod start
75+
### Synchronized multi-pod start with automatic load division
7676

77-
When running across multiple pods, `--sync '{"workers":N}'` enables barrier synchronization. All pods negotiate a common start time via an HTTP barrier protocol — pod-0 (leader) runs the barrier server, workers discover it via `BARRIER_ADDR` (set automatically in the Job manifest to the leader pod's DNS name). Barriers are first-class in the Starlark DSL:
77+
When running across multiple pods, `--workers N` (where N > 1) enables barrier synchronization and automatically divides load across workers. Concurrency and rate values in config files always express the **total** desired load — each worker gets its fair share via integer division, with remainder distributed to lower-indexed workers (e.g. `concurrency=10, workers=3` → 4, 3, 3).
78+
79+
```bash
80+
# Run with 4 workers — each gets 1/4 of the configured concurrency and rate
81+
nyann-bench generate --target http://vllm:8000/v1 --config scenario.star --workers 4 --worker-id 0
82+
```
83+
84+
All pods negotiate a common start time via an HTTP barrier protocol — pod-0 (leader) runs the barrier server, workers discover it via `BARRIER_ADDR` (set automatically in the Job manifest to the leader pod's DNS name). An implicit barrier is inserted before the first measured stage. Barriers are first-class in the Starlark DSL:
7885

7986
```python
8087
scenario(
8188
stages=[
8289
stage("2m", concurrency=16, warmup=True),
83-
barrier(), # implicit one added automatically
90+
# implicit barrier fires here — all workers sync before measured stages
8491
stage("5m", concurrency=64),
85-
barrier(drain=True), # drain pool before workload switch
92+
barrier(drain=True), # explicit: drain pool before workload switch
8693
stage("5m", concurrency=64, workload=other),
8794
],
8895
)
8996
```
9097

98+
With `--workers auto`, the worker count is `ceil(max_concurrency / 1024)`, sized for the **peak** stage. In staircase configs where concurrency ramps up across stages (e.g. `[4, 64, 512, 2048]`), early low-concurrency stages will have some workers with very few or zero streams. Use an explicit `--workers N` if you need tighter control.
99+
100+
With `--workers 1` (the default), no barrier sync or load division occurs.
101+
91102
### Ramp-up and warmup
92103

93104
A configurable warmup phase brings the server to steady state before measurement begins, and ramp-up staggers stream starts to avoid synchronized request patterns that would otherwise create artificial load spikes.
@@ -154,7 +165,7 @@ Merging across workers: `cat requests_*.jsonl`.
154165
just deploy my-benchmark http://vllm-server:8000/v1 config.star 8
155166
```
156167

157-
This creates a ConfigMap with your config and launches an Indexed Job with 8 pods. Each pod auto-detects its worker ID from `JOB_COMPLETION_INDEX` and the barrier server address from `BARRIER_ADDR`. Sync is enabled automatically via `--sync '{"workers":N}'` in the manifest.
168+
This creates a ConfigMap with your config and launches an Indexed Job with 8 pods. Each pod auto-detects its worker ID from `JOB_COMPLETION_INDEX` and the barrier server address from `BARRIER_ADDR`. The manifest passes `--workers N` so barrier sync and load division are enabled automatically.
158169

159170
## Installation
160171

@@ -168,6 +179,21 @@ Or pull the container:
168179
docker pull ghcr.io/neuralmagic/nyann-bench:latest
169180
```
170181

182+
## Container images
183+
184+
CI pushes multi-platform (`linux/amd64`, `linux/arm64`) images to GitHub Container Registry on every push to `main` and every pull request:
185+
186+
| Event | Tag | Example |
187+
|-------|-----|---------|
188+
| Push to `main` | `latest`, `sha-<commit>` | `ghcr.io/neuralmagic/nyann-bench:latest` |
189+
| Pull request | `pr-<number>` | `ghcr.io/neuralmagic/nyann-bench:pr-47` |
190+
191+
To use a PR image for testing:
192+
193+
```bash
194+
docker pull ghcr.io/neuralmagic/nyann-bench:pr-47
195+
```
196+
171197
## Development
172198

173199
```bash

cmd/nyann-bench/generate.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func generateCmd() *cobra.Command {
2525
cfgInput string
2626
outputDir string
2727
workerID int
28-
workers int
28+
workersFlag string
2929
metricsAddr string
3030
kubeFlags kube.Flags
3131
)
@@ -63,6 +63,20 @@ Workload types:
6363
corpus Sliding window over real text files
6464
gsm8k GSM8K math problems with streaming eval`,
6565
RunE: func(cmd *cobra.Command, args []string) error {
66+
// Parse config early — needed to resolve --workers auto
67+
sc, err := config.Parse(cfgInput)
68+
if err != nil {
69+
return fmt.Errorf("config: %w", err)
70+
}
71+
72+
workers, err := config.ResolveWorkers(workersFlag, sc.MaxConcurrency())
73+
if err != nil {
74+
return err
75+
}
76+
if workersFlag == "auto" {
77+
slog.Info("Auto-resolved workers", "workers", workers, "max_concurrency", sc.MaxConcurrency())
78+
}
79+
6680
if kubeFlags.IsEnabled(cmd) {
6781
cfg, err := kubeFlags.ToConfig()
6882
if err != nil {
@@ -91,12 +105,6 @@ Workload types:
91105
}
92106
}
93107

94-
// Parse config
95-
sc, err := config.Parse(cfgInput)
96-
if err != nil {
97-
return fmt.Errorf("config: %w", err)
98-
}
99-
100108
sc.Workers = workers
101109
sc.WorkerID = workerID
102110

@@ -163,7 +171,7 @@ Workload types:
163171
cmd.Flags().StringVar(&cfgInput, "config", "{}", "Workload config (JSON file, inline JSON, or .star file)")
164172
cmd.Flags().StringVar(&outputDir, "output-dir", "", "Directory for JSONL + timestamp files (omit for stdout-only)")
165173
cmd.Flags().IntVar(&workerID, "worker-id", 0, "Worker identifier (for multi-container runs)")
166-
cmd.Flags().IntVar(&workers, "workers", 1, "Total number of workers (enables barrier sync and divides load when > 1)")
174+
cmd.Flags().StringVar(&workersFlag, "workers", "1", `Number of workers: integer or "auto" (auto = ceil(max_concurrency/1024))`)
167175
cmd.Flags().StringVar(&metricsAddr, "metrics", "", "Prometheus metrics listen address (e.g. :9090)")
168176

169177
kube.RegisterFlags(cmd, &kubeFlags)

pkg/barrier/barrier_test.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -204,26 +204,21 @@ func TestContextCancel(t *testing.T) {
204204
}
205205
}
206206

207-
func TestDNSFailureFails(t *testing.T) {
208-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
207+
func TestDNSFailureRetriesUntilTimeout(t *testing.T) {
208+
// DNS NXDOMAIN should be retried (Kubernetes headless service DNS
209+
// takes time to propagate), not treated as a permanent failure.
210+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
209211
defer cancel()
210212

211-
// Use a hostname that definitely won't resolve
212213
addr := "this-host-does-not-exist-barrier-test.invalid:8080"
213214

214-
start := time.Now()
215-
_, err := WaitForStart(ctx, addr, 0, 0, 2, 30*time.Second)
216-
elapsed := time.Since(start)
215+
_, err := WaitForStart(ctx, addr, 0, 0, 2, 3*time.Second)
217216

218217
if err == nil {
219-
t.Fatal("expected error for unresolvable hostname, got nil")
218+
t.Fatal("expected timeout error for unresolvable hostname, got nil")
220219
}
221-
if !strings.Contains(err.Error(), "DNS lookup failed") {
222-
t.Errorf("expected DNS failure message, got: %v", err)
223-
}
224-
// Should fail within a few seconds, not wait for the full 30s timeout
225-
if elapsed > 15*time.Second {
226-
t.Errorf("DNS failure took %v — should fail fast, not wait for full timeout", elapsed)
220+
if !strings.Contains(err.Error(), "timed out") {
221+
t.Errorf("expected timeout error, got: %v", err)
227222
}
228223
}
229224

pkg/barrier/client.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,17 @@ func WaitForStart(ctx context.Context, addr string, workerID, barrierID, nWorker
5656
return time.Time{}, fmt.Errorf("barrier: timed out waiting for barrier server at %s after %d attempts: %w", addr, attempt, ctx.Err())
5757
}
5858

59-
// DNS NXDOMAIN — hostname doesn't exist, retrying won't help
59+
// DNS NXDOMAIN — in Kubernetes, headless service DNS records
60+
// take a few seconds to propagate after pod creation, so
61+
// NXDOMAIN is expected early on. Keep retrying.
6062
var dnsErr *net.DNSError
6163
if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
6264
dnsFailures++
63-
if dnsFailures >= 3 {
64-
host := addr
65-
if h, _, splitErr := net.SplitHostPort(addr); splitErr == nil {
66-
host = h
67-
}
68-
return time.Time{}, fmt.Errorf("barrier: DNS lookup failed for %q — "+
69-
"the hostname does not resolve. For Kubernetes Indexed Jobs, BARRIER_ADDR must "+
70-
"include the headless service name (e.g. <job>-0.<service>): %w", host, err)
65+
if dnsFailures <= 3 {
66+
slog.Warn("Barrier DNS lookup failed, retrying", "addr", addr, "attempt", dnsFailures, "error", err)
67+
} else {
68+
slog.Debug("Barrier DNS still not resolved", "addr", addr, "attempt", dnsFailures)
7169
}
72-
slog.Warn("Barrier DNS lookup failed, retrying", "addr", addr, "attempt", dnsFailures, "max_attempts", 3, "error", err)
7370
} else if attempt <= 3 {
7471
slog.Debug("Barrier server not ready, retrying", "error", err, "backoff", backoff)
7572
} else {

pkg/config/config_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,58 @@ func TestDivideRate(t *testing.T) {
221221
}
222222
}
223223

224+
func TestMaxConcurrency(t *testing.T) {
225+
sc := &config.ScenarioConfig{
226+
Stages: []config.ScenarioStage{
227+
{Concurrency: 16, Warmup: true},
228+
{Concurrency: 64},
229+
{Concurrency: 128},
230+
{Concurrency: 32},
231+
},
232+
}
233+
if got := sc.MaxConcurrency(); got != 128 {
234+
t.Errorf("MaxConcurrency() = %d, want 128", got)
235+
}
236+
237+
empty := &config.ScenarioConfig{}
238+
if got := empty.MaxConcurrency(); got != 0 {
239+
t.Errorf("MaxConcurrency() on empty = %d, want 0", got)
240+
}
241+
}
242+
243+
func TestResolveWorkers(t *testing.T) {
244+
tests := []struct {
245+
flag string
246+
maxConcurrency int
247+
want int
248+
wantErr bool
249+
}{
250+
{"1", 0, 1, false},
251+
{"4", 0, 4, false},
252+
{"auto", 0, 1, false},
253+
{"auto", 1, 1, false},
254+
{"auto", 1024, 1, false},
255+
{"auto", 1025, 2, false},
256+
{"auto", 2048, 2, false},
257+
{"auto", 2049, 3, false},
258+
{"auto", 4096, 4, false},
259+
{"auto", 10000, 10, false},
260+
{"0", 0, 0, true},
261+
{"-1", 0, 0, true},
262+
{"abc", 0, 0, true},
263+
}
264+
for _, tt := range tests {
265+
got, err := config.ResolveWorkers(tt.flag, tt.maxConcurrency)
266+
if (err != nil) != tt.wantErr {
267+
t.Errorf("ResolveWorkers(%q, %d) error = %v, wantErr %v", tt.flag, tt.maxConcurrency, err, tt.wantErr)
268+
continue
269+
}
270+
if got != tt.want {
271+
t.Errorf("ResolveWorkers(%q, %d) = %d, want %d", tt.flag, tt.maxConcurrency, got, tt.want)
272+
}
273+
}
274+
}
275+
224276
func TestInsertImplicitBarrier(t *testing.T) {
225277
sc := &config.ScenarioConfig{
226278
Stages: []config.ScenarioStage{

pkg/config/scenario.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package config
22

33
import (
4+
"fmt"
5+
"strconv"
46
"time"
57
)
68

@@ -105,6 +107,38 @@ func DivideRate(total float64, nWorkers int) float64 {
105107
return total / float64(nWorkers)
106108
}
107109

110+
// MaxConcurrency returns the highest concurrency value across all stages.
111+
func (sc *ScenarioConfig) MaxConcurrency() int {
112+
highest := 0
113+
for _, s := range sc.Stages {
114+
if s.Concurrency > highest {
115+
highest = s.Concurrency
116+
}
117+
}
118+
return highest
119+
}
120+
121+
// ResolveWorkers converts a --workers flag value to an integer.
122+
// "auto" computes ceil(maxConcurrency / 1024) so each worker handles at most
123+
// 1024 concurrent streams — beyond that, goroutine scheduling overhead and
124+
// per-connection memory become significant on a single pod.
125+
func ResolveWorkers(flag string, maxConcurrency int) (int, error) {
126+
if flag == "auto" {
127+
if maxConcurrency <= 0 {
128+
return 1, nil
129+
}
130+
return (maxConcurrency + 1023) / 1024, nil
131+
}
132+
n, err := strconv.Atoi(flag)
133+
if err != nil {
134+
return 0, fmt.Errorf("--workers must be a positive integer or \"auto\", got %q", flag)
135+
}
136+
if n < 1 {
137+
return 0, fmt.Errorf("--workers must be >= 1, got %d", n)
138+
}
139+
return n, nil
140+
}
141+
108142
// InsertImplicitBarrier adds a barrier before all stages so workers sync
109143
// before warmup begins. This is called when --workers > 1 to ensure a sync
110144
// point even without explicit barrier() calls.

0 commit comments

Comments
 (0)