support continous batching for eagle3#3321
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for continuous batching with multiple prompts to the EAGLE3 speculative decoding pipeline. The implementation ensures that the draft model and main model process the same chunks by coordinating prefill completion across all requests.
Changes:
- Added synchronization logic to pause generation until all requests complete prefill in EAGLE3 mode
- Modified hidden state handling to support partial tensor copying for mismatched sequence lengths
- Extended test coverage to validate multiple prompt batching scenarios
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| tests/python_tests/test_continuous_batching.py | Updated tests to support parameterized multi-prompt batching scenarios for both speculative decoding and EAGLE3 |
| src/cpp/src/speculative_decoding/continuous_batching/pipeline_impl.cpp | Added prefill synchronization logic and modified draft model execution control flow |
| src/cpp/src/sequence_group.hpp | Added has_finished_prefill() method to check if any sequence has begun generation |
| src/cpp/src/continuous_batching/model_runner.hpp | Enhanced hidden state handling with partial tensor copying for mismatched sequence lengths |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
src/cpp/src/speculative_decoding/continuous_batching/pipeline_impl.cpp:1
- Switching from
to_generate |= ...toto_generate &= ...changes semantics from “any request can generate” to “all requests can generate”. With this change,to_generatemust be initialized totrueimmediately before the loop; otherwise&=can preserve a stale value from previous iterations (or remainfalse), preventing the draft model from running when it should.
// Copyright (C) 2023-2026 Intel Corporation
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (4)
src/cpp/src/speculative_decoding/continuous_batching/pipeline_impl.cpp:1
to_generatewas previously initialized for an OR-reduction; after switching to an AND-reduction (&=), it must be initialized totruebefore the loop. As written,to_generatemay be uninitialized/stale (or remainfalseif previously set), which can incorrectly suppress draft-model execution or lead to inconsistent behavior. Initializeto_generate = trueimmediately before the loop (or refactor to a clearly-scoped reduction variable).
// Copyright (C) 2023-2026 Intel Corporation
src/cpp/src/sequence_group.hpp:466
has_finished_prefill()returns true only after at least one token has been generated (generated_len > 0), which is typically after prefill has already completed. Since pipeline synchronization relies on this to detect prefill completion, this can fail to pause the earliest-finished request at the right time. Consider basing this on prompt-consumption state (e.g., the same internal condition used to allow generation after prompt processing, without depending ongenerated_len).
bool has_finished_prefill() const {
for (auto& sequence : get_running_sequences()) {
if (sequence->get_generated_len() > 0) {
return true;
}
}
return false;
}
tests/python_tests/test_continuous_batching.py:697
- With the new
@pytest.mark.parametrizecases, this helper will re-download/re-convert the same models multiple times per test function, which can significantly lengthen CI/runtime. Consider moving model download/convert into a cached fixture (e.g.,scope='session') or passing precomputedmain_model_path/draft_model_pathinto this helper so parametrization doesn’t multiply conversion cost.
def compare_results_for_dynamic_split_fuse_config(main_model_id, draft_model_id, prompts):
main_model_path = download_and_convert_model(main_model_id).models_path
draft_model_path = download_and_convert_model(draft_model_id).models_path
tests/python_tests/test_continuous_batching.py:728
- This index-based loop can be simplified and made clearer by iterating over both lists directly (e.g.,
for reference, generated in zip(...)). That also avoids manual indexing and makes failures easier to read if you addpytestassertion messages later.
for i in range(0, result_len_ref):
reference = result_ref.texts[i]
generated = result_gen.texts[i]
assert generated == reference
| size_t stored_hidden_size = stored_shape[stored_shape.size() - 1]; | ||
|
|
||
| OPENVINO_ASSERT(stored_hidden_size == hidden_size, "Target state hidden size does not match the expected size for Eagle3 draft model inference."); | ||
| OPENVINO_ASSERT(stored_seq_len == total_num_tokens, "Target state sequence length does not match the expected length for Eagle3 draft model inference."); |
There was a problem hiding this comment.
stored_seq_len == num_scheduled_tokens
| if (stored_seq_len == total_num_tokens) { | ||
| hidden_state_input = stored_hidden_state; | ||
| } else { | ||
| size_t copy_length = std::min(stored_seq_len, num_scheduled_tokens); |
There was a problem hiding this comment.
| size_t copy_length = std::min(stored_seq_len, num_scheduled_tokens); | |
| const size_t copy_length = std::min(stored_seq_len, num_scheduled_tokens); |
| hidden_state_input = stored_hidden_state; | ||
| } else { | ||
| size_t copy_length = std::min(stored_seq_len, num_scheduled_tokens); | ||
| size_t source_start_idx = stored_seq_len - copy_length; |
There was a problem hiding this comment.
| size_t source_start_idx = stored_seq_len - copy_length; | |
| const size_t source_start_idx = stored_seq_len - copy_length; |
| return m_max_content_len + m_num_validation_tokens >= get_prompt_len() && !m_is_gen_paused; | ||
| } | ||
|
|
||
| bool has_generated_tokens() const { |
There was a problem hiding this comment.
looks like we don't need a specific api only for limited useage?
| if (stored_seq_len == total_num_tokens) { | ||
| hidden_state_input = stored_hidden_state; | ||
| } else { | ||
| size_t copy_length = std::min(stored_seq_len, num_scheduled_tokens); |
There was a problem hiding this comment.
In the Eagle3 hidden-state import path, the fallback branch copies only min(stored_seq_len, num_scheduled_tokens) rows when stored_seq_len != total_num_tokens. If stored_seq_len < num_scheduled_tokens, the remaining scheduled tokens keep zeroed hidden states, which can silently corrupt draft-model inference. Add an explicit precondition (e.g., OPENVINO_ASSERT(stored_seq_len >= num_scheduled_tokens, ...) or ==) and fail fast if the stored hidden-state slice is shorter than the scheduled chunk.
| size_t copy_length = std::min(stored_seq_len, num_scheduled_tokens); | |
| OPENVINO_ASSERT(stored_seq_len >= num_scheduled_tokens, | |
| "Stored hidden state length is shorter than the scheduled token chunk " | |
| "in Eagle3 draft model inference."); | |
| size_t copy_length = num_scheduled_tokens; |
…atching_for_eagle3
| bool is_update_logit_processor) { | ||
| UpdateRequestResult result{0, 0}; | ||
| // Check if all requests have completed pre-filling | ||
| bool all_prefill_finished = true; |
There was a problem hiding this comment.
| bool all_prefill_finished = true; | |
| bool any_other_prefill_unfinished = false; |
looks more clear?
| for (auto& request : m_requests) { | ||
| const bool finished = request->has_generated_tokens(); | ||
| if (!finished) { | ||
| all_prefill_finished = false; |
There was a problem hiding this comment.
can break the loop earlier once set ?
|
This PR will be closed in a week because of 2 weeks of no activity. |
|
Closing due to inactivity. Feel free to reopen if you plan to continue working on this. |
|
Replaced by #3812 |
Description
support continuous batching with multiple prompts for eagle3 pipeline.
Tickets: CVS-179148
Checklist: