Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

Commit d38eca8

Browse files
committed
support streaming
1 parent 807b201 commit d38eca8

3 files changed

Lines changed: 95 additions & 8 deletions

File tree

engine/extensions/python-engines/vllm_engine.cc

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,47 @@ static std::pair<Json::Value, Json::Value> CreateResponse(
1414
res["message"] = msg;
1515
return {status, res};
1616
}
17+
18+
// this is mostly copied from local_engine.cc
19+
struct StreamContext {
20+
std::shared_ptr<std::function<void(Json::Value&&, Json::Value&&)>> callback;
21+
bool need_stop;
22+
23+
static size_t write_callback(char* ptr, size_t size, size_t nmemb,
24+
void* userdata) {
25+
auto* ctx = static_cast<StreamContext*>(userdata);
26+
size_t data_length = size * nmemb;
27+
if (data_length <= 6)
28+
return data_length;
29+
30+
std::string chunk{ptr, data_length};
31+
CTL_INF(chunk);
32+
Json::Value status;
33+
status["is_stream"] = true;
34+
status["has_error"] = false;
35+
status["status_code"] = 200;
36+
Json::Value chunk_json;
37+
chunk_json["data"] = chunk;
38+
39+
if (chunk.find("[DONE]") != std::string::npos) {
40+
status["is_done"] = true;
41+
ctx->need_stop = false;
42+
} else {
43+
status["is_done"] = false;
44+
}
45+
46+
(*ctx->callback)(std::move(status), std::move(chunk_json));
47+
return data_length;
48+
};
49+
};
50+
1751
} // namespace
1852

1953
VllmEngine::VllmEngine()
2054
: cortex_port_{std::stoi(
2155
file_manager_utils::GetCortexConfig().apiServerPort)},
22-
port_offsets_{true} // cortex_port + 0 is always used (by cortex itself)
23-
{}
56+
port_offsets_{true}, // cortex_port + 0 is always used (by cortex itself)
57+
queue_{2 /* threadNum */, "vLLM engine"} {}
2458

2559
VllmEngine::~VllmEngine() {
2660
// NOTE: what happens if we can't kill subprocess?
@@ -84,14 +118,62 @@ void VllmEngine::HandleChatCompletion(
84118
port = model_process_map_[model].port;
85119
}
86120

121+
const std::string url =
122+
"http://127.0.0.1:" + std::to_string(port) + "/v1/chat/completions";
123+
const std::string json_str = json_body->toStyledString();
124+
87125
bool stream = (*json_body)["stream"].asBool();
88126
if (stream) {
89-
auto [status, res] = CreateResponse("stream=true is not yet supported", 400);
90-
callback(std::move(status), std::move(res));
127+
queue_.runTaskInQueue([url = std::move(url), json_str = std::move(json_str),
128+
callback = std::move(callback)] {
129+
CURL* curl = curl_easy_init();
130+
if (!curl) {
131+
auto [status, res] = CreateResponse("Internal server error", 500);
132+
callback(std::move(status), std::move(res));
133+
}
134+
135+
struct curl_slist* headers = nullptr;
136+
headers = curl_slist_append(headers, "Content-Type: application/json");
137+
138+
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
139+
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
140+
curl_easy_setopt(curl, CURLOPT_POST, 1L);
141+
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_str.c_str());
142+
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, json_str.length());
143+
curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L);
144+
145+
StreamContext ctx;
146+
ctx.callback =
147+
std::make_shared<std::function<void(Json::Value&&, Json::Value&&)>>(
148+
callback);
149+
ctx.need_stop = true;
150+
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
151+
StreamContext::write_callback);
152+
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &ctx);
153+
154+
CURLcode res = curl_easy_perform(curl);
155+
if (res != CURLE_OK) {
156+
auto msg = curl_easy_strerror(res);
157+
auto [status, res] = CreateResponse(msg, 500);
158+
callback(std::move(status), std::move(res));
159+
}
160+
161+
curl_slist_free_all(headers);
162+
curl_easy_cleanup(curl);
163+
if (ctx.need_stop) {
164+
Json::Value status;
165+
status["is_done"] = true;
166+
status["has_error"] = false;
167+
status["is_stream"] = true;
168+
status["status_code"] = 200;
169+
callback(std::move(status), Json::Value{});
170+
}
171+
172+
return;
173+
});
91174
} else {
92-
const std::string url =
93-
"http://127.0.0.1:" + std::to_string(port) + "/v1/chat/completions";
94-
auto result = curl_utils::SimplePostJson(url, json_body->toStyledString());
175+
// non-streaming
176+
auto result = curl_utils::SimplePostJson(url, json_str);
95177

96178
if (result.has_error()) {
97179
auto [status, res] = CreateResponse(result.error(), 400);
@@ -173,7 +255,7 @@ void VllmEngine::LoadModel(
173255

174256
// https://docs.astral.sh/uv/reference/cli/#uv-run
175257
std::vector<std::string> cmd =
176-
python_utils::BuildUvCommand("run", env_dir.string());
258+
python_utils::UvBuildCommand("run", env_dir.string());
177259
cmd.push_back("vllm");
178260
cmd.push_back("serve");
179261
cmd.push_back(model_path.string());

engine/extensions/python-engines/vllm_engine.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "common/engine_servicei.h"
33
#include "cortex-common/EngineI.h"
44
#include "python_utils.h"
5+
#include "trantor/utils/ConcurrentTaskQueue.h"
56

67
class VllmEngine : public EngineI {
78
private:
@@ -16,6 +17,9 @@ class VllmEngine : public EngineI {
1617
std::unordered_map<std::string, python_utils::PythonSubprocess>
1718
model_process_map_;
1819

20+
// TODO: will use cortex's main TaskQueue once llama.cpp PR is merged
21+
trantor::ConcurrentTaskQueue queue_;
22+
1923
public:
2024
VllmEngine();
2125
~VllmEngine();

engine/services/engine_service.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ cpp::result<void, std::string> EngineService::DownloadVllm(
433433
std::vector<std::string> cmd =
434434
python_utils::UvBuildCommand("venv", vllm_path.string());
435435
cmd.push_back("--relocatable");
436+
cmd.push_back("--seed");
436437
auto result = cortex::process::SpawnProcess(cmd);
437438
if (result.has_error())
438439
return cpp::fail(result.error());

0 commit comments

Comments
 (0)