Skip to content

Commit d8dfd64

Browse files
tianmu-liclaude
andcommitted
fix: clean up multi-turn dead code and address PR mlcommons#285 reviewer comments
- Remove dead constant BLOCK_ON_PREVIOUS_TURN = -1 from scheduler.py - Remove redundant outer with state.condition: in mark_turn_complete - Remove ConversationMode import and explicit mode= args from integration tests - Fix format: jsonl → format: ".jsonl" in example YAMLs and docs - Add target_concurrency: 1 clarification to quickstart (preserves turn ordering) - Remove broken HYBRID_SCHEDULER_GUIDE.md reference from quickstart Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 26d07b3 commit d8dfd64

10 files changed

Lines changed: 56 additions & 198 deletions

File tree

docs/MULTI_TURN_QUICKSTART.md

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ datasets:
4040
- name: my_conversations
4141
type: performance
4242
path: path/to/your/conversations.jsonl
43-
format: jsonl
43+
format: ".jsonl"
4444
multi_turn: # ← Presence of this block enables multi-turn mode
4545
mode: independent # ← Per-conv pipelines; no cross-conv turn barrier
4646
turn_timeout_s: 300 # ← Max wait for prev turn
@@ -125,6 +125,8 @@ mode: independent
125125

126126
**Use for**: Realistic production load where short conversations finish while long ones are still running.
127127
For single-conversation debugging, use `mode: independent` with `target_concurrency: 1`.
128+
Note: unlike the plain `ConcurrencyScheduler`, multi-turn + `target_concurrency: 1` still enforces
129+
per-conversation turn ordering — turn N+1 waits for turn N even at concurrency 1.
128130

129131
**Example timeline**:
130132

@@ -136,21 +138,6 @@ t=0.8: conv1-turn3 (after conv1-turn2 completes)
136138
...
137139
```
138140
139-
### Poisson Mode (Planned)
140-
141-
```yaml
142-
mode: poisson
143-
conversations_per_second: 10.0
144-
```
145-
146-
**Behavior**:
147-
148-
- Start conversations with Poisson arrival
149-
- Sequence turns within each
150-
- Realistic user arrival patterns
151-
152-
**Status**: Not yet implemented (falls back to `independent`)
153-
154141
---
155142
156143
## 🎛️ Concurrency Control (NEW!)
@@ -172,8 +159,6 @@ settings:
172159
- Medium (50-500 convs): `target_concurrency: 32`
173160
- Large (500+ convs): `target_concurrency: 64`
174161

175-
**See**: `HYBRID_SCHEDULER_GUIDE.md` for detailed guide
176-
177162
---
178163

179164
## 🔧 Common Configurations
@@ -245,12 +230,12 @@ multi_turn:
245230

246231
**Problem**: MultiTurnDataset not recognized.
247232

248-
**Fix**: Ensure `format: jsonl` is specified in config:
233+
**Fix**: Ensure `format: ".jsonl"` is specified in config:
249234

250235
```yaml
251236
datasets:
252237
- path: your_file.jsonl
253-
format: jsonl # ← Required for JSONL
238+
format: ".jsonl" # ← Required for JSONL
254239
```
255240

256241
---
@@ -353,7 +338,7 @@ Before running your first multi-turn benchmark:
353338
- [ ] Config has `multi_turn:` block in the dataset section
354339
- [ ] Config has `load_pattern.type: multi_turn`
355340
- [ ] Endpoint is running and reachable
356-
- [ ] `format: jsonl` specified for JSONL datasets
341+
- [ ] `format: ".jsonl"` specified for JSONL datasets
357342
- [ ] Conversation IDs are unique per conversation
358343
- [ ] Turn numbers are sequential (1, 2, 3, ...)
359344

examples/09_MultiTurn/README.md

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ datasets:
139139
- name: customer_support
140140
type: performance
141141
path: examples/multi_turn/customer_support_conversations.jsonl
142-
format: jsonl
142+
format: ".jsonl"
143143
multi_turn:
144144
mode: independent
145145
turn_timeout_s: 300.0
@@ -204,18 +204,6 @@ still running. Turn 1 of one conversation and turn 100 of another can be in-flig
204204

205205
For single-conversation debugging, use `mode: independent` with `target_concurrency: 1`.
206206

207-
#### Poisson Mode
208-
209-
Starts conversations with Poisson arrival, sequences turns within each conversation.
210-
211-
```yaml
212-
multi_turn:
213-
mode: poisson
214-
conversations_per_second: 10.0
215-
```
216-
217-
**Use case**: Realistic arrival patterns (not yet implemented; falls back to `independent`)
218-
219207
### Turn Timeout
220208

221209
Configure maximum wait time for previous turn completion:

examples/09_MultiTurn/multi_turn_benchmark.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ datasets:
1111
- name: customer_support_conversations
1212
type: performance
1313
path: examples/09_MultiTurn/customer_support_conversations.jsonl
14-
format: jsonl
14+
format: ".jsonl"
1515
samples: 10
1616
multi_turn:
1717
mode: independent

examples/09_MultiTurn/multi_turn_with_concurrency.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ datasets:
1111
- name: customer_support_conversations
1212
type: performance
1313
path: examples/09_MultiTurn/customer_support_conversations.jsonl
14-
format: jsonl
14+
format: ".jsonl"
1515
samples: 10
1616
multi_turn:
1717
mode: independent # All conv turn-1 start together

src/inference_endpoint/config/schema.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ class LoadPatternType(str, Enum):
6868
class ConversationMode(str, Enum):
6969
"""Multi-turn conversation scheduling modes."""
7070

71-
POISSON = "poisson" # Poisson conv arrival, sequence turns within
7271
INDEPENDENT = "independent" # Per-conv pipelines; no cross-conv turn barrier
7372

7473

@@ -246,16 +245,14 @@ class MultiTurnConfig(BaseModel):
246245
Presence of this block in the dataset config enables multi-turn mode.
247246
248247
Attributes:
249-
mode: Conversation scheduling strategy (independent or poisson).
248+
mode: Conversation scheduling strategy (currently only independent).
250249
turn_timeout_s: Maximum seconds to wait for previous turn completion.
251-
conversations_per_second: Target CPS for POISSON mode (None = use dataset order).
252250
"""
253251

254252
model_config = {"extra": "forbid"}
255253

256254
mode: ConversationMode = ConversationMode.INDEPENDENT
257255
turn_timeout_s: float = 300.0
258-
conversations_per_second: float | None = None # For POISSON mode
259256
use_dataset_history: bool = True
260257

261258

src/inference_endpoint/load_generator/conversation_manager.py

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ class ConversationState:
3939
conversation_id: Unique identifier for this conversation.
4040
current_turn: Last completed turn number (0 = not started).
4141
pending_client_turn: Turn number of in-flight client turn (None if idle).
42-
turn_complete_event: Threading event to signal turn completion.
4342
expected_client_turns: Expected number of client turns (for completion tracking).
4443
issued_client_turns: Count of client turns issued.
4544
completed_client_turns: Count of client turns with responses.
@@ -54,7 +53,6 @@ class ConversationState:
5453
conversation_id: str
5554
current_turn: int = 0
5655
pending_client_turn: int | None = None
57-
turn_complete_event: threading.Event = field(default_factory=threading.Event)
5856
expected_client_turns: int | None = None
5957
issued_client_turns: int = 0
6058
completed_client_turns: int = 0
@@ -106,8 +104,6 @@ def add_assistant_turn(self, content: str | None = None):
106104
self.completed_client_turns += 1
107105
self.condition.notify_all()
108106

109-
self.turn_complete_event.set()
110-
111107
if self.is_complete():
112108
if self.failed_client_turns > 0:
113109
logger.info(
@@ -145,8 +141,6 @@ def mark_turn_failed(self):
145141
)
146142
self.condition.notify_all()
147143

148-
self.turn_complete_event.set()
149-
150144
if self.is_complete():
151145
logger.info(
152146
f"Conversation {self.conversation_id} completed with failures: "
@@ -165,14 +159,11 @@ def is_complete(self) -> bool:
165159
return False # Unknown completion, can't determine
166160
return self.completed_client_turns >= self.expected_client_turns
167161

168-
def is_ready_for_turn(self, turn: int) -> bool:
169-
"""Check if ready to issue this turn (previous turn must be complete).
170-
171-
Args:
172-
turn: Turn number to check (unused; sequencing is based on completion counts).
162+
def is_ready_for_turn(self) -> bool:
163+
"""Check if the previous turn has completed and the next may be issued.
173164
174165
Returns:
175-
True if ready to issue this turn, False otherwise.
166+
True if ready to issue the next turn, False otherwise.
176167
"""
177168
return (
178169
self.pending_client_turn is None
@@ -195,17 +186,14 @@ class ConversationManager:
195186
Each ConversationState carries its own Condition so that state changes
196187
(turn issued / turn complete) only wake the single pipeline thread waiting
197188
on that conversation, not all pipeline threads across all conversations.
198-
A separate _created_condition handles the narrow case where a pipeline
199-
thread calls wait_for_turn_issued before get_or_create has run.
189+
All conversation states are pre-created by the scheduler before pipeline
190+
threads start, so wait_for_turn_issued never races against get_or_create.
200191
"""
201192

202193
def __init__(self):
203194
"""Initialize conversation manager with empty state."""
204195
self._conversations: dict[str, ConversationState] = {}
205196
self._lock = threading.Lock()
206-
# Fired whenever a new conversation state is registered.
207-
# Only needed by wait_for_turn_issued when state is None on entry.
208-
self._created_condition = threading.Condition(self._lock)
209197

210198
def get_or_create(
211199
self,
@@ -233,15 +221,13 @@ def get_or_create(
233221
conversation_id=conversation_id,
234222
current_turn=0,
235223
pending_client_turn=None,
236-
turn_complete_event=threading.Event(),
237224
expected_client_turns=expected_client_turns,
238225
issued_client_turns=0,
239226
completed_client_turns=0,
240227
failed_client_turns=0,
241228
message_history=initial_history,
242229
)
243230
self._conversations[conversation_id] = state
244-
self._created_condition.notify_all()
245231
return self._conversations[conversation_id]
246232

247233
def wait_for_turn_ready(
@@ -272,11 +258,11 @@ def wait_for_turn_ready(
272258

273259
deadline = None if timeout is None else time.monotonic() + timeout
274260
with state.condition:
275-
while not state.is_ready_for_turn(turn):
261+
while not state.is_ready_for_turn():
276262
if deadline is not None:
277263
remaining_timeout = deadline - time.monotonic()
278264
if remaining_timeout <= 0:
279-
return state.is_ready_for_turn(turn)
265+
return state.is_ready_for_turn()
280266
remaining_timeout = max(MIN_TIMEOUT_SECONDS, remaining_timeout)
281267
else:
282268
remaining_timeout = None
@@ -298,34 +284,24 @@ def wait_for_turn_issued(
298284
so the pipeline would enqueue subsequent turns before the load generator has
299285
registered the current one as in-flight.
300286
287+
All conversation states are pre-created by the scheduler before pipeline
288+
threads start, so this method can look up the state directly without waiting
289+
for it to be registered.
290+
301291
Args:
302292
conversation_id: Conversation to wait for.
303293
min_issued: Minimum number of issued turns to wait for.
304294
timeout: Maximum seconds to wait (None = infinite).
305295
306296
Returns:
307297
True if condition met, False if timeout.
298+
299+
Raises:
300+
KeyError: If conversation_id not found (programming error — state must be
301+
pre-created by the scheduler before pipeline threads are spawned).
308302
"""
303+
state = self._conversations[conversation_id]
309304
deadline = None if timeout is None else time.monotonic() + timeout
310-
311-
# Phase 1: wait until the conversation state exists (guarded by
312-
# _created_condition so only the threads waiting on *this* conversation
313-
# being created are woken — not all pipeline threads).
314-
with self._created_condition:
315-
state = self._conversations.get(conversation_id)
316-
while state is None:
317-
if deadline is not None:
318-
remaining = deadline - time.monotonic()
319-
if remaining <= 0:
320-
return False
321-
self._created_condition.wait(
322-
timeout=max(MIN_TIMEOUT_SECONDS, remaining)
323-
)
324-
else:
325-
self._created_condition.wait()
326-
state = self._conversations.get(conversation_id)
327-
328-
# Phase 2: wait for the issued counter using the per-conversation Condition.
329305
with state.condition:
330306
while state.issued_client_turns < min_issued:
331307
if deadline is not None:
@@ -384,8 +360,7 @@ def mark_turn_complete(
384360
state = self._conversations.get(conversation_id)
385361
if state is None:
386362
raise KeyError(f"Conversation {conversation_id} not initialized")
387-
with state.condition:
388-
state.add_assistant_turn(response if store_in_history else None)
363+
state.add_assistant_turn(response if store_in_history else None)
389364

390365
def mark_turn_failed(self, conversation_id: str):
391366
"""Mark that assistant response failed (error/timeout).

src/inference_endpoint/load_generator/scheduler.py

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from typing import Any
2424

2525
from ..config.runtime_settings import RuntimeSettings
26-
from ..config.schema import ConversationMode, LoadPatternType, MultiTurnConfig
26+
from ..config.schema import LoadPatternType, MultiTurnConfig
2727
from .conversation_manager import ConversationManager
2828
from .sample import SampleEvent, SampleEventHandler
2929

@@ -438,9 +438,6 @@ def __iter__(self):
438438
yield s_idx, 0
439439

440440

441-
# Sentinel value to signal "block until previous turn completes"
442-
BLOCK_ON_PREVIOUS_TURN = -1
443-
444441
# Sentinel object pushed by each independent-mode pipeline thread when it exhausts its turns
445442
_PIPELINE_DONE = object()
446443

@@ -449,9 +446,8 @@ class MultiTurnScheduler(Scheduler, load_pattern=LoadPatternType.MULTI_TURN):
449446
"""Scheduler for multi-turn conversations with turn sequencing and optional concurrency control.
450447
451448
Enforces turn ordering within conversations: turn N+1 cannot be issued
452-
until turn N completes. Supports multiple conversation scheduling modes:
453-
- INDEPENDENT: Per-conv pipelines; no cross-conversation turn barrier (default)
454-
- POISSON: Start conversations with Poisson arrival, sequence turns within
449+
until turn N completes. Each conversation runs an independent pipeline thread
450+
that gates on wait_for_turn_ready before enqueuing the next turn.
455451
456452
Optionally limits total in-flight requests across all conversations via
457453
target_concurrency parameter, combining turn sequencing with concurrency control.
@@ -526,19 +522,9 @@ def __iter__(self):
526522
"""Iterate with turn sequencing enforcement and optional concurrency control.
527523
528524
Yields (sample_index, delay_ns) pairs. Turn blocking is handled inside
529-
pipeline threads (INDEPENDENT mode). The concurrency gate is applied here
530-
if target_concurrency is set.
525+
pipeline threads. The concurrency gate is applied here if target_concurrency is set.
531526
"""
532-
mode = ConversationMode.INDEPENDENT
533-
if self.multi_turn_config is not None:
534-
mode = self.multi_turn_config.mode
535-
536-
if mode == ConversationMode.POISSON:
537-
schedule = self._poisson_schedule()
538-
else:
539-
schedule = self._independent_schedule()
540-
541-
for s_idx, delay_ns in schedule:
527+
for s_idx, delay_ns in self._independent_schedule():
542528
# Block on concurrency limit if enabled
543529
if self._condition is not None:
544530
with self._condition:
@@ -548,19 +534,6 @@ def __iter__(self):
548534

549535
yield s_idx, delay_ns
550536

551-
def _poisson_schedule(self):
552-
"""Start conversations with Poisson arrival, sequence turns within.
553-
554-
TODO: Implement Poisson conversation arrival.
555-
For now, fall back to parallel mode.
556-
"""
557-
# TODO: Implement Poisson conversation arrival
558-
# For now, fall back to independent
559-
logger.warning(
560-
"Poisson conversation mode not yet implemented, using INDEPENDENT mode"
561-
)
562-
return self._independent_schedule()
563-
564537
def _independent_schedule(self):
565538
"""Issue turns per-conversation independently — no cross-conversation turn barriers.
566539
@@ -590,6 +563,13 @@ def _independent_schedule(self):
590563
conv_id = sample_meta["conversation_id"]
591564
conv_samples[conv_id].append((sample_index, sample_meta["turn"]))
592565

566+
# Pre-create all conversation states before spawning pipeline threads so that
567+
# wait_for_turn_issued can look them up directly without a creation race.
568+
for conv_id, turns in conv_samples.items():
569+
self.conversation_manager.get_or_create(
570+
conv_id, expected_client_turns=len(turns)
571+
)
572+
593573
num_pipelines = len(conv_samples)
594574
if num_pipelines > _PIPELINE_THREAD_WARNING_THRESHOLD:
595575
logger.warning(

0 commit comments

Comments
 (0)