Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ FetchContent_MakeAvailable(yaml-cpp)
FetchContent_Declare(
xgrammar
GIT_REPOSITORY https://github.com/mlc-ai/xgrammar.git
GIT_TAG v0.1.27
GIT_TAG v0.2.1
GIT_SUBMODULES "3rdparty/dlpack"
GIT_PROGRESS TRUE
USES_TERMINAL_DOWNLOAD TRUE
Expand Down
19 changes: 12 additions & 7 deletions lmdeploy/turbomind/turbomind.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def __init__(self,
self.source_model = model_loader.model
self.is_dummy = self.model_comm.is_dummy_node()
self.tokenizer = Tokenizer(model_path, trust_remote_code=trust_remote_code)
self._grammar_compiler = None
if not _engine_config.empty_init:
with torch.cuda.device(self.devices[0]):
model_loader.export()
Expand All @@ -174,6 +175,15 @@ def __init__(self,

self.session_len = _engine_config.session_len

@property
def grammar_compiler(self):
"""Lazy-initialized GrammarCompiler shared across all requests."""
if self._grammar_compiler is None:
tokenizer_info = TokenizerInfo.from_huggingface(
self.tokenizer.model.model, vocab_size=self._vocab_size)
self._grammar_compiler = _xgr.GrammarCompiler(tokenizer_info)
return self._grammar_compiler

def _process_weights(self):
"""Process weight."""
with ThreadPoolExecutor(max_workers=self.gpu_count) as e:
Expand Down Expand Up @@ -712,11 +722,8 @@ async def async_stream_infer(self,
gen_config=gen_config)

if gen_config.response_format is not None:
tokenizer = self.tm_model.tokenizer
vocab_size = self.tm_model._vocab_size

try:
tokenizer_info = TokenizerInfo.from_huggingface(tokenizer.model.model, vocab_size=vocab_size)
compiler = self.tm_model.grammar_compiler
decode_grammar_type = gen_config.response_format['type']
if decode_grammar_type == 'json_schema':
decode_grammar = gen_config.response_format[decode_grammar_type]['schema']
Expand All @@ -725,8 +732,6 @@ async def async_stream_infer(self,
elif decode_grammar_type == 'json_object':
decode_grammar = '{"type" : "object", "additionalProperties": true}'

compiler = _xgr.GrammarCompiler(tokenizer_info)

if decode_grammar_type == 'json_schema':
decode_grammar = json.dumps(decode_grammar)
grammar = compiler.compile_json_schema(decode_grammar)
Expand All @@ -742,7 +747,7 @@ async def async_stream_infer(self,

self.model_inst.set_grammar(grammar)
except ValueError as e:
logger.warning(f'Failed to initialize guided decoding for tokenizer {tokenizer}, '
logger.warning(f'Failed to initialize guided decoding, '
f'disable guided decoding: {e}')
gen_config.response_format = None

Expand Down
7 changes: 3 additions & 4 deletions src/turbomind/generation/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
cmake_minimum_required(VERSION 3.25)

add_library(guided_decoding STATIC guided_decoding.cc)
target_link_libraries(guided_decoding PRIVATE
apply_token_bitmask_inplace_cuda
xgrammar
core)
target_link_libraries(guided_decoding
PUBLIC xgrammar core
PRIVATE apply_token_bitmask_inplace_cuda)
set_property(TARGET guided_decoding PROPERTY POSITION_INDEPENDENT_CODE ON)

add_library(generation STATIC
Expand Down
4 changes: 3 additions & 1 deletion src/turbomind/generation/generation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,13 @@ struct Generation::Impl {

sampling_->Forward(phase, env);

guided_decoding_->Update(phase, env);
guided_decoding_->ScheduleUpdate(phase, env);

AppendTokenIds(d.token_ids_ptrs.data(), output_ids_.data(), output_pos.data(), gs, stream);

stop_criteria_->Forward(phase, env);

guided_decoding_->FinishUpdate(phase, env);
}
}
};
Expand Down
77 changes: 65 additions & 12 deletions src/turbomind/generation/guided_decoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ GuidedDecoding::GuidedDecoding(const BaseGenerationParam& base, const comm::Host
bitmask_buf_ = {{max_batch_size_, bitmask_size}, kCPUpinned};
output_ids_buf_ = {max_batch_size_, kCPUpinned};

d2h_stream_ = core::Stream::create();
sampling_done_ = core::Event::create();
d2h_done_ = core::Event::create();

for (int i = 0; i < phases; ++i) {
auto& d = data_.emplace_back(std::make_shared<Data>());
d->bitmask = empty_like(bitmask_buf_);
Expand All @@ -48,6 +52,11 @@ void GuidedDecoding::Setup(int phase, TensorMap& env)
void GuidedDecoding::FillMask(int phase, TensorMap& env)
{
if (auto& d = *data_.at(phase); d.active) {
// Only the first `generation_size` (= logits.shape(0)) slots are actively
// generating; matchers beyond this index belong to idle/prefill requests
// whose output_ids are stale and whose bitmasks are never applied.
const int gs = env.at("logits").shape(0);

static_assert(sizeof(ssize_t) == sizeof(int64_t));
DLTensor dlbitmask{bitmask_buf_.data(),
DLDevice{kDLCPU, 0},
Expand All @@ -56,17 +65,28 @@ void GuidedDecoding::FillMask(int phase, TensorMap& env)
(int64_t*)bitmask_buf_.shape().data(),
nullptr,
0};

if (tp_group_->rank() == 0) {
for (size_t i = 0; i < d.matchers.size(); ++i) {
if (const auto& matcher = d.matchers[i]; matcher && !matcher->IsTerminated()) {
matcher->FillNextTokenBitmask(&dlbitmask, i);
std::vector<xgrammar::GrammarMatcher> active_matchers;
std::vector<int32_t> active_indices;
active_matchers.reserve(gs);
active_indices.reserve(gs);

for (int i = 0; i < gs; ++i) {
if (const auto& m = d.matchers[i]; m && !m->IsTerminated()) {
active_matchers.emplace_back(*m);
active_indices.emplace_back(static_cast<int32_t>(i));
}
else {
std::fill_n(bitmask_buf_.data() + i * bitmask_buf_.stride(0),
bitmask_buf_.stride(0),
static_cast<int32_t>(-1));
}
}

if (!active_matchers.empty()) {
batch_matcher_.BatchFillNextTokenBitmask(&active_matchers, &dlbitmask, active_indices);
}
}
}
}
Expand All @@ -88,18 +108,51 @@ void GuidedDecoding::ApplyMask(int phase, TensorMap& env)
}
}

void GuidedDecoding::Update(int phase, TensorMap& env)
void GuidedDecoding::ScheduleUpdate(int phase, TensorMap& env)
{
if (auto& d = *data_.at(phase); d.active) {
Copy(env.at("output_ids").buffer(), d.matchers.size(), output_ids_buf_);
core::Context::stream().Sync();
if (tp_group_->rank() == 0) {
for (size_t i = 0; i < d.matchers.size(); ++i) {
if (const auto& matcher = d.matchers[i]; matcher && !matcher->IsTerminated()) {
matcher->AcceptToken(output_ids_buf_[i]);
}
if (auto& d = *data_.at(phase); d.active && tp_group_->rank() == 0) {
// Record event on main stream after sampling GPU work is submitted.
// The secondary stream will wait for this before issuing the D2H copy,
// ensuring it reads the output_ids written by sampling.
sampling_done_.Record(core::Context::stream());

// D2H copy on secondary stream — overlaps with subsequent GPU kernels
// on the main stream (AppendTokenIds, stop_criteria).
// Only copy the first `generation_size` entries: sampling writes exactly
// that many output_ids, and entries beyond it contain stale values.
const int gs = env.at("logits").shape(0);
d2h_stream_.Wait(sampling_done_);
Copy(env.at("output_ids").buffer(), gs, output_ids_buf_, d2h_stream_);
d2h_done_.Record(d2h_stream_);
Comment thread
windreamer marked this conversation as resolved.
Comment thread
windreamer marked this conversation as resolved.
}
}

void GuidedDecoding::FinishUpdate(int phase, TensorMap& env)
{
if (auto& d = *data_.at(phase); d.active && tp_group_->rank() == 0) {
// Wait only for the D2H copy to complete — the main stream's
// AppendTokenIds + stop_criteria may still be executing on GPU.
d2h_done_.Sync();

// Collect active matchers and their token IDs for batch AcceptToken.
// Only iterate over the first `generation_size` (= logits.shape(0)) slots —
// beyond that index the output_ids buffer contains stale data from prior steps.
const int gs = env.at("logits").shape(0);
std::vector<xgrammar::GrammarMatcher> active_matchers;
std::vector<int32_t> active_token_ids;
active_matchers.reserve(gs);
active_token_ids.reserve(gs);

for (int i = 0; i < gs; ++i) {
if (const auto& m = d.matchers[i]; m && !m->IsTerminated()) {
active_matchers.emplace_back(*m);
active_token_ids.emplace_back(output_ids_buf_[i]);
}
}

if (!active_matchers.empty()) {
xgrammar::BatchGrammarMatcher::BatchAcceptToken(&active_matchers, active_token_ids);
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/turbomind/generation/guided_decoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "src/turbomind/comm/host_comm.h"
#include "src/turbomind/core/core.h"
#include "src/turbomind/core/stream.h"
#include "xgrammar/matcher.h"

namespace turbomind {

Expand All @@ -19,16 +21,23 @@ class GuidedDecoding: public BaseGenerationParam {

void ApplyMask(int phase, TensorMap& env);

void Update(int phase, TensorMap& env);
void ScheduleUpdate(int phase, TensorMap& env);
void FinishUpdate(int phase, TensorMap& env);

private:
comm::HostComm tp_group_;

struct Data;
std::vector<std::shared_ptr<Data>> data_;

xgrammar::BatchGrammarMatcher batch_matcher_;

Tensor_<int32_t> bitmask_buf_;
Buffer_<int> output_ids_buf_;

core::Stream d2h_stream_; // secondary stream for D2H copy of output_ids
core::Event sampling_done_; // recorded on main stream after sampling
core::Event d2h_done_; // recorded on d2h_stream_ after copy completes
};

} // namespace turbomind
Loading