Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 37 additions & 22 deletions tools/server/server-models.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ extern char **environ;

struct server_subproc {
std::optional<subprocess_s> sproc; // empty while in DOWNLOADING state
std::atomic<bool> stop_download{false}; // flag to signal download cancellation
std::atomic<bool> stopped{false}; // set to cancel a download or signal child process exit

subprocess_s & get() {
GGML_ASSERT(sproc.has_value() && "subprocess not initialized");
Expand All @@ -64,6 +64,22 @@ struct server_subproc {
bool is_alive() {
return sproc.has_value() && subprocess_alive(&sproc.value());
}

void terminate() {
if (!sproc.has_value()) {
return;
}
#if defined(_WIN32)
if (sproc->hProcess == NULL) {
return;
}
#else
if (sproc->child <= 0) {
return;
}
#endif
subprocess_terminate(&sproc.value());
}
};


Expand Down Expand Up @@ -896,50 +912,49 @@ void server_models::load(const std::string & name) {
});

std::thread stopping_thread([&]() {
// thread to monitor stopping signal OR child crash
// thread to monitor explicit stop requests; child crash is signalled via child_proc->stopped
auto is_stopping = [this, &name]() {
return this->stopping_models.find(name) != this->stopping_models.end();
};
auto should_wake = [&]() {
return is_stopping() || !child_proc->is_alive();
};
{
std::unique_lock<std::mutex> lk(this->mutex);
this->cv_stop.wait(lk, should_wake);
this->cv_stop.wait(lk, [&]() {
return is_stopping() || child_proc->stopped.load(std::memory_order_acquire);
});
}
// child may have already exited (e.g. crashed) — skip shutdown sequence
if (!child_proc->is_alive()) {
// child crashed or finished on its own — skip graceful shutdown sequence
if (child_proc->stopped.load(std::memory_order_acquire)) {
return;
}
SRV_INF("stopping model instance name=%s\n", name.c_str());
// send interrupt to child process
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
fflush(stdin_file);
// wait to stop gracefully or timeout
int64_t start_time = ggml_time_ms();
while (true) {
std::unique_lock<std::mutex> lk(this->mutex);
if (!is_stopping()) {
return; // already stopped
if (!is_stopping() || child_proc->stopped.load(std::memory_order_acquire)) {
return;
}
int64_t elapsed = ggml_time_ms() - start_time;
if (elapsed >= stop_timeout * 1000) {
// timeout, force kill
lk.unlock();
SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout);
subprocess_terminate(&child_proc->get());
child_proc->terminate();
return;
}
this->cv_stop.wait_for(lk, std::chrono::seconds(1));
this->cv_stop.wait_for(lk, std::chrono::seconds(1), [&]() {
return !is_stopping() || child_proc->stopped.load(std::memory_order_acquire);
});
}
});

// we reach here when the child process exits
// we reach here when the child process exits (stdout EOF)
// note: we cannot join() prior to this point because it will close stdin_file
if (log_thread.joinable()) {
log_thread.join();
}

// stop the timeout monitoring thread
child_proc->stopped.store(true, std::memory_order_release);
{
std::lock_guard<std::mutex> lk(this->mutex);
stopping_models.erase(name);
Expand All @@ -965,7 +980,7 @@ void server_models::load(const std::string & name) {
// old process should have exited already, but just in case, we clean it up here
if (old_instance.subproc->is_alive()) {
SRV_WRN("old process for model name=%s is still alive, this is unexpected\n", name.c_str());
subprocess_terminate(&old_instance.subproc->get()); // force kill
old_instance.subproc->terminate(); // force kill
}
if (old_instance.th.joinable()) {
old_instance.th.join();
Expand Down Expand Up @@ -1033,7 +1048,7 @@ void server_models::download(common_params_model && model, common_download_opts
dl->opts = opts; // copy

dl->should_stop = [sp = inst.subproc]() {
return sp->stop_download.load(std::memory_order_relaxed);
return sp->stopped.load(std::memory_order_relaxed);
};

dl->on_progress = [this, name](const common_download_progress & p) {
Expand Down Expand Up @@ -1063,7 +1078,7 @@ void server_models::unload(const std::string & name) {
if (it != mapping.end()) {
if (it->second.meta.status == SERVER_MODEL_STATUS_DOWNLOADING) {
SRV_INF("cancelling download for model name=%s\n", name.c_str());
it->second.subproc->stop_download.store(true, std::memory_order_relaxed);
it->second.subproc->stopped.store(true, std::memory_order_relaxed);
// for convenience, we wait the status change here
wait(lk, name, [](const server_model_meta & new_meta) {
return new_meta.status != SERVER_MODEL_STATUS_DOWNLOADING;
Expand All @@ -1074,7 +1089,7 @@ void server_models::unload(const std::string & name) {
if (it->second.meta.status == SERVER_MODEL_STATUS_LOADING) {
// special case: if model is in loading state, unloading means force-killing it
SRV_WRN("model name=%s is still loading, force-killing\n", name.c_str());
subprocess_terminate(&it->second.subproc->get());
it->second.subproc->terminate();
}
cv_stop.notify_all();
// status change will be handled by the managing thread
Expand All @@ -1091,7 +1106,7 @@ void server_models::unload_all() {
for (auto & [name, inst] : mapping) {
if (inst.meta.status == SERVER_MODEL_STATUS_DOWNLOADING) {
SRV_INF("cancelling download for model name=%s\n", name.c_str());
inst.subproc->stop_download.store(true, std::memory_order_relaxed);
inst.subproc->stopped.store(true, std::memory_order_relaxed);
} else if (inst.meta.is_running()) {
SRV_INF("stopping model instance name=%s\n", name.c_str());
stopping_models.insert(name);
Expand Down
Loading