Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

Commit 368a4f3

Browse files
committed
add non-stream chat completions
1 parent cd55d64 commit 368a4f3

3 files changed

Lines changed: 76 additions & 29 deletions

File tree

engine/extensions/python-engines/vllm_engine.cc

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ VllmEngine::VllmEngine()
2424

2525
VllmEngine::~VllmEngine() {
2626
// NOTE: what happens if we can't kill subprocess?
27-
std::unique_lock write_lock(mutex);
28-
for (auto& [model_name, py_proc] : model_process_map) {
27+
std::unique_lock write_lock(mutex_);
28+
for (auto& [model_name, py_proc] : model_process_map_) {
2929
if (py_proc.IsAlive())
3030
py_proc.Kill();
3131
}
@@ -60,15 +60,58 @@ void VllmEngine::Unload(EngineUnloadOption opts) {};
6060
void VllmEngine::HandleChatCompletion(
6161
std::shared_ptr<Json::Value> json_body,
6262
std::function<void(Json::Value&&, Json::Value&&)>&& callback) {
63-
CTL_WRN("Not implemented");
64-
throw std::runtime_error("Not implemented");
63+
64+
// request validation should be in controller
65+
if (!json_body->isMember("model")) {
66+
auto [status, error] =
67+
CreateResponse("Missing required fields: model", 400);
68+
callback(std::move(status), std::move(error));
69+
return;
70+
}
71+
72+
const std::string model = (*json_body)["model"].asString();
73+
int port;
74+
// check if model has started
75+
// TODO: use health check instead
76+
{
77+
std::shared_lock read_lock(mutex_);
78+
if (model_process_map_.find(model) == model_process_map_.end()) {
79+
const std::string msg = "Model " + model + " has not been loaded yet.";
80+
auto [status, error] = CreateResponse(msg, 400);
81+
callback(std::move(status), std::move(error));
82+
return;
83+
}
84+
port = model_process_map_[model].port;
85+
}
86+
87+
bool stream = (*json_body)["stream"].asBool();
88+
if (stream) {
89+
auto [status, res] = CreateResponse("stream=true is not yet supported", 400);
90+
callback(std::move(status), std::move(res));
91+
} else {
92+
const std::string url =
93+
"http://127.0.0.1:" + std::to_string(port) + "/v1/chat/completions";
94+
auto result = curl_utils::SimplePostJson(url, json_body->toStyledString());
95+
96+
if (result.has_error()) {
97+
auto [status, res] = CreateResponse(result.error(), 400);
98+
callback(std::move(status), std::move(res));
99+
}
100+
101+
Json::Value status;
102+
status["is_done"] = true;
103+
status["has_error"] = false;
104+
status["is_stream"] = false;
105+
status["status_code"] = 200;
106+
callback(std::move(status), std::move(result.value()));
107+
}
65108
};
66109

67110
void VllmEngine::HandleEmbedding(
68111
std::shared_ptr<Json::Value> json_body,
69112
std::function<void(Json::Value&&, Json::Value&&)>&& callback) {
70-
CTL_WRN("Not implemented");
71-
throw std::runtime_error("Not implemented");
113+
auto [status, res] = CreateResponse("embedding is not yet supported", 400);
114+
callback(std::move(status), std::move(res));
72115
};
73116

74117
void VllmEngine::LoadModel(
@@ -85,9 +128,9 @@ void VllmEngine::LoadModel(
85128
const std::string model = (*json_body)["model"].asString();
86129

87130
{
88-
std::unique_lock write_lock(mutex);
89-
if (model_process_map.find(model) != model_process_map.end()) {
90-
auto proc = model_process_map[model];
131+
std::unique_lock write_lock(mutex_);
132+
if (model_process_map_.find(model) != model_process_map_.end()) {
133+
auto proc = model_process_map_[model];
91134

92135
if (proc.IsAlive()) {
93136
auto [status, error] = CreateResponse("Model already loaded!", 409);
@@ -96,7 +139,7 @@ void VllmEngine::LoadModel(
96139
} else {
97140
// if model has exited, try to load model again?
98141
CTL_WRN("Model " << model << " has exited unexpectedly");
99-
model_process_map.erase(model);
142+
model_process_map_.erase(model);
100143
port_offsets_[proc.port - cortex_port_] = false; // free the port
101144
}
102145
}
@@ -164,8 +207,8 @@ void VllmEngine::LoadModel(
164207

165208
pid = py_proc.proc_info.pid;
166209

167-
std::unique_lock write_lock(mutex);
168-
model_process_map[model] = py_proc;
210+
std::unique_lock write_lock(mutex_);
211+
model_process_map_[model] = py_proc;
169212

170213
} catch (const std::exception& e) {
171214
auto e_msg = e.what();
@@ -192,8 +235,8 @@ void VllmEngine::UnloadModel(
192235

193236
// check if model has started
194237
{
195-
std::shared_lock read_lock(mutex);
196-
if (model_process_map.find(model) == model_process_map.end()) {
238+
std::shared_lock read_lock(mutex_);
239+
if (model_process_map_.find(model) == model_process_map_.end()) {
197240
const std::string msg = "Model " + model + " has not been loaded yet.";
198241
auto [status, error] = CreateResponse(msg, 400);
199242
callback(std::move(status), std::move(error));
@@ -203,15 +246,15 @@ void VllmEngine::UnloadModel(
203246

204247
// we know that model has started
205248
{
206-
std::unique_lock write_lock(mutex);
207-
auto proc = model_process_map[model];
249+
std::unique_lock write_lock(mutex_);
250+
auto proc = model_process_map_[model];
208251

209252
// TODO: we can use vLLM health check endpoint
210253
// check if subprocess is still alive
211254
// NOTE: is this step necessary? the subprocess could have terminated
212255
// after .IsAlive() and before .Kill() later.
213256
if (!proc.IsAlive()) {
214-
model_process_map.erase(model);
257+
model_process_map_.erase(model);
215258
port_offsets_[proc.port - cortex_port_] = false; // free the port
216259

217260
const std::string msg = "Model " + model + " stopped running.";
@@ -221,14 +264,14 @@ void VllmEngine::UnloadModel(
221264
}
222265

223266
// subprocess is alive. we kill it here.
224-
if (!model_process_map[model].Kill()) {
267+
if (!model_process_map_[model].Kill()) {
225268
const std::string msg = "Unable to kill process of model " + model;
226269
auto [status, error] = CreateResponse(msg, 500);
227270
callback(std::move(status), std::move(error));
228271
return;
229272
}
230273

231-
model_process_map.erase(model);
274+
model_process_map_.erase(model);
232275
port_offsets_[proc.port - cortex_port_] = false; // free the port
233276
}
234277

@@ -249,8 +292,8 @@ void VllmEngine::GetModelStatus(
249292
const std::string model = (*json_body)["model"].asString();
250293
// check if model has started
251294
{
252-
std::shared_lock read_lock(mutex);
253-
if (model_process_map.find(model) == model_process_map.end()) {
295+
std::shared_lock read_lock(mutex_);
296+
if (model_process_map_.find(model) == model_process_map_.end()) {
254297
const std::string msg = "Model " + model + " has not been loaded yet.";
255298
auto [status, error] = CreateResponse(msg, 400);
256299
callback(std::move(status), std::move(error));
@@ -261,12 +304,12 @@ void VllmEngine::GetModelStatus(
261304
// we know that model has started
262305
// TODO: just use health check endpoint
263306
{
264-
std::unique_lock write_lock(mutex);
307+
std::unique_lock write_lock(mutex_);
265308

266309
// check if subprocess is still alive
267-
if (!model_process_map[model].IsAlive()) {
310+
if (!model_process_map_[model].IsAlive()) {
268311
CTL_WRN("Model " << model << " has exited unexpectedly.");
269-
model_process_map.erase(model);
312+
model_process_map_.erase(model);
270313
const std::string msg = "Model " + model + " stopped running.";
271314
auto [status, error] = CreateResponse(msg, 400);
272315
callback(std::move(status), std::move(error));
@@ -291,12 +334,12 @@ void VllmEngine::GetModels(
291334
std::function<void(Json::Value&&, Json::Value&&)>&& callback) {
292335
Json::Value res, model_list(Json::arrayValue), status;
293336
{
294-
std::unique_lock write_lock(mutex);
295-
for (auto& [model_name, py_proc] : model_process_map) {
337+
std::unique_lock write_lock(mutex_);
338+
for (auto& [model_name, py_proc] : model_process_map_) {
296339
// TODO: check using health endpoint
297340
if (!py_proc.IsAlive()) {
298341
CTL_WRN("Model " << model_name << " has exited unexpectedly.");
299-
model_process_map.erase(model_name);
342+
model_process_map_.erase(model_name);
300343
continue;
301344
}
302345

engine/extensions/python-engines/vllm_engine.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ class VllmEngine : public EngineI {
1212
// otherwise, cortex_port + i is not used
1313
std::vector<bool> port_offsets_;
1414

15-
mutable std::shared_mutex mutex;
15+
mutable std::shared_mutex mutex_;
1616
std::unordered_map<std::string, python_utils::PythonSubprocess>
17-
model_process_map;
17+
model_process_map_;
1818

1919
public:
2020
VllmEngine();

engine/services/model_service.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,6 +1252,10 @@ std::string ModelService::GetEngineByModelId(
12521252
CTL_WRN("Error: " + model_entry.error());
12531253
return "";
12541254
}
1255+
1256+
if (model_entry.value().engine == kVllmEngine)
1257+
return kVllmEngine;
1258+
12551259
config::YamlHandler yaml_handler;
12561260
yaml_handler.ModelConfigFromFile(
12571261
fmu::ToAbsoluteCortexDataPath(

0 commit comments

Comments
 (0)