diff --git a/CMakeLists.txt b/CMakeLists.txt index 2d3f0af950..0c801eabf6 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -101,7 +101,7 @@ FetchContent_MakeAvailable(yaml-cpp) FetchContent_Declare( xgrammar GIT_REPOSITORY https://github.com/mlc-ai/xgrammar.git - GIT_TAG v0.1.27 + GIT_TAG v0.2.1 GIT_SUBMODULES "3rdparty/dlpack" GIT_PROGRESS TRUE USES_TERMINAL_DOWNLOAD TRUE diff --git a/lmdeploy/turbomind/turbomind.py b/lmdeploy/turbomind/turbomind.py index 1d3c1f6cfc..0db8eae178 100644 --- a/lmdeploy/turbomind/turbomind.py +++ b/lmdeploy/turbomind/turbomind.py @@ -166,6 +166,7 @@ def __init__(self, self.source_model = model_loader.model self.is_dummy = self.model_comm.is_dummy_node() self.tokenizer = Tokenizer(model_path, trust_remote_code=trust_remote_code) + self._grammar_compiler = None if not _engine_config.empty_init: with torch.cuda.device(self.devices[0]): model_loader.export() @@ -174,6 +175,15 @@ def __init__(self, self.session_len = _engine_config.session_len + @property + def grammar_compiler(self): + """Lazy-initialized GrammarCompiler shared across all requests.""" + if self._grammar_compiler is None: + tokenizer_info = TokenizerInfo.from_huggingface( + self.tokenizer.model.model, vocab_size=self._vocab_size) + self._grammar_compiler = _xgr.GrammarCompiler(tokenizer_info) + return self._grammar_compiler + def _process_weights(self): """Process weight.""" with ThreadPoolExecutor(max_workers=self.gpu_count) as e: @@ -712,11 +722,8 @@ async def async_stream_infer(self, gen_config=gen_config) if gen_config.response_format is not None: - tokenizer = self.tm_model.tokenizer - vocab_size = self.tm_model._vocab_size - try: - tokenizer_info = TokenizerInfo.from_huggingface(tokenizer.model.model, vocab_size=vocab_size) + compiler = self.tm_model.grammar_compiler decode_grammar_type = gen_config.response_format['type'] if decode_grammar_type == 'json_schema': decode_grammar = gen_config.response_format[decode_grammar_type]['schema'] @@ -725,8 +732,6 @@ async def async_stream_infer(self, elif decode_grammar_type == 'json_object': decode_grammar = '{"type" : "object", "additionalProperties": true}' - compiler = _xgr.GrammarCompiler(tokenizer_info) - if decode_grammar_type == 'json_schema': decode_grammar = json.dumps(decode_grammar) grammar = compiler.compile_json_schema(decode_grammar) @@ -742,7 +747,7 @@ async def async_stream_infer(self, self.model_inst.set_grammar(grammar) except ValueError as e: - logger.warning(f'Failed to initialize guided decoding for tokenizer {tokenizer}, ' + logger.warning(f'Failed to initialize guided decoding, ' f'disable guided decoding: {e}') gen_config.response_format = None diff --git a/src/turbomind/generation/CMakeLists.txt b/src/turbomind/generation/CMakeLists.txt index ff7038ab72..4c1c2313f0 100644 --- a/src/turbomind/generation/CMakeLists.txt +++ b/src/turbomind/generation/CMakeLists.txt @@ -15,10 +15,9 @@ cmake_minimum_required(VERSION 3.25) add_library(guided_decoding STATIC guided_decoding.cc) -target_link_libraries(guided_decoding PRIVATE - apply_token_bitmask_inplace_cuda - xgrammar - core) +target_link_libraries(guided_decoding + PUBLIC xgrammar core + PRIVATE apply_token_bitmask_inplace_cuda) set_property(TARGET guided_decoding PROPERTY POSITION_INDEPENDENT_CODE ON) add_library(generation STATIC diff --git a/src/turbomind/generation/generation.cc b/src/turbomind/generation/generation.cc index 479048b650..8a2449f787 100644 --- a/src/turbomind/generation/generation.cc +++ b/src/turbomind/generation/generation.cc @@ -292,11 +292,13 @@ struct Generation::Impl { sampling_->Forward(phase, env); - guided_decoding_->Update(phase, env); + guided_decoding_->ScheduleUpdate(phase, env); AppendTokenIds(d.token_ids_ptrs.data(), output_ids_.data(), output_pos.data(), gs, stream); stop_criteria_->Forward(phase, env); + + guided_decoding_->FinishUpdate(phase, env); } } }; diff --git a/src/turbomind/generation/guided_decoding.cc b/src/turbomind/generation/guided_decoding.cc index c185156506..5176698164 100644 --- a/src/turbomind/generation/guided_decoding.cc +++ b/src/turbomind/generation/guided_decoding.cc @@ -25,6 +25,10 @@ GuidedDecoding::GuidedDecoding(const BaseGenerationParam& base, const comm::Host bitmask_buf_ = {{max_batch_size_, bitmask_size}, kCPUpinned}; output_ids_buf_ = {max_batch_size_, kCPUpinned}; + d2h_stream_ = core::Stream::create(); + sampling_done_ = core::Event::create(); + d2h_done_ = core::Event::create(); + for (int i = 0; i < phases; ++i) { auto& d = data_.emplace_back(std::make_shared()); d->bitmask = empty_like(bitmask_buf_); @@ -48,6 +52,11 @@ void GuidedDecoding::Setup(int phase, TensorMap& env) void GuidedDecoding::FillMask(int phase, TensorMap& env) { if (auto& d = *data_.at(phase); d.active) { + // Only the first `generation_size` (= logits.shape(0)) slots are actively + // generating; matchers beyond this index belong to idle/prefill requests + // whose output_ids are stale and whose bitmasks are never applied. + const int gs = env.at("logits").shape(0); + static_assert(sizeof(ssize_t) == sizeof(int64_t)); DLTensor dlbitmask{bitmask_buf_.data(), DLDevice{kDLCPU, 0}, @@ -56,10 +65,17 @@ void GuidedDecoding::FillMask(int phase, TensorMap& env) (int64_t*)bitmask_buf_.shape().data(), nullptr, 0}; + if (tp_group_->rank() == 0) { - for (size_t i = 0; i < d.matchers.size(); ++i) { - if (const auto& matcher = d.matchers[i]; matcher && !matcher->IsTerminated()) { - matcher->FillNextTokenBitmask(&dlbitmask, i); + std::vector active_matchers; + std::vector active_indices; + active_matchers.reserve(gs); + active_indices.reserve(gs); + + for (int i = 0; i < gs; ++i) { + if (const auto& m = d.matchers[i]; m && !m->IsTerminated()) { + active_matchers.emplace_back(*m); + active_indices.emplace_back(static_cast(i)); } else { std::fill_n(bitmask_buf_.data() + i * bitmask_buf_.stride(0), @@ -67,6 +83,10 @@ void GuidedDecoding::FillMask(int phase, TensorMap& env) static_cast(-1)); } } + + if (!active_matchers.empty()) { + batch_matcher_.BatchFillNextTokenBitmask(&active_matchers, &dlbitmask, active_indices); + } } } } @@ -88,18 +108,51 @@ void GuidedDecoding::ApplyMask(int phase, TensorMap& env) } } -void GuidedDecoding::Update(int phase, TensorMap& env) +void GuidedDecoding::ScheduleUpdate(int phase, TensorMap& env) { - if (auto& d = *data_.at(phase); d.active) { - Copy(env.at("output_ids").buffer(), d.matchers.size(), output_ids_buf_); - core::Context::stream().Sync(); - if (tp_group_->rank() == 0) { - for (size_t i = 0; i < d.matchers.size(); ++i) { - if (const auto& matcher = d.matchers[i]; matcher && !matcher->IsTerminated()) { - matcher->AcceptToken(output_ids_buf_[i]); - } + if (auto& d = *data_.at(phase); d.active && tp_group_->rank() == 0) { + // Record event on main stream after sampling GPU work is submitted. + // The secondary stream will wait for this before issuing the D2H copy, + // ensuring it reads the output_ids written by sampling. + sampling_done_.Record(core::Context::stream()); + + // D2H copy on secondary stream — overlaps with subsequent GPU kernels + // on the main stream (AppendTokenIds, stop_criteria). + // Only copy the first `generation_size` entries: sampling writes exactly + // that many output_ids, and entries beyond it contain stale values. + const int gs = env.at("logits").shape(0); + d2h_stream_.Wait(sampling_done_); + Copy(env.at("output_ids").buffer(), gs, output_ids_buf_, d2h_stream_); + d2h_done_.Record(d2h_stream_); + } +} + +void GuidedDecoding::FinishUpdate(int phase, TensorMap& env) +{ + if (auto& d = *data_.at(phase); d.active && tp_group_->rank() == 0) { + // Wait only for the D2H copy to complete — the main stream's + // AppendTokenIds + stop_criteria may still be executing on GPU. + d2h_done_.Sync(); + + // Collect active matchers and their token IDs for batch AcceptToken. + // Only iterate over the first `generation_size` (= logits.shape(0)) slots — + // beyond that index the output_ids buffer contains stale data from prior steps. + const int gs = env.at("logits").shape(0); + std::vector active_matchers; + std::vector active_token_ids; + active_matchers.reserve(gs); + active_token_ids.reserve(gs); + + for (int i = 0; i < gs; ++i) { + if (const auto& m = d.matchers[i]; m && !m->IsTerminated()) { + active_matchers.emplace_back(*m); + active_token_ids.emplace_back(output_ids_buf_[i]); } } + + if (!active_matchers.empty()) { + xgrammar::BatchGrammarMatcher::BatchAcceptToken(&active_matchers, active_token_ids); + } } } diff --git a/src/turbomind/generation/guided_decoding.h b/src/turbomind/generation/guided_decoding.h index 5417a60a43..b87fbc473f 100644 --- a/src/turbomind/generation/guided_decoding.h +++ b/src/turbomind/generation/guided_decoding.h @@ -6,6 +6,8 @@ #include "src/turbomind/comm/host_comm.h" #include "src/turbomind/core/core.h" +#include "src/turbomind/core/stream.h" +#include "xgrammar/matcher.h" namespace turbomind { @@ -19,7 +21,8 @@ class GuidedDecoding: public BaseGenerationParam { void ApplyMask(int phase, TensorMap& env); - void Update(int phase, TensorMap& env); + void ScheduleUpdate(int phase, TensorMap& env); + void FinishUpdate(int phase, TensorMap& env); private: comm::HostComm tp_group_; @@ -27,8 +30,14 @@ class GuidedDecoding: public BaseGenerationParam { struct Data; std::vector> data_; + xgrammar::BatchGrammarMatcher batch_matcher_; + Tensor_ bitmask_buf_; Buffer_ output_ids_buf_; + + core::Stream d2h_stream_; // secondary stream for D2H copy of output_ids + core::Event sampling_done_; // recorded on main stream after sampling + core::Event d2h_done_; // recorded on d2h_stream_ after copy completes }; } // namespace turbomind