Skip to content

Commit fe7c8b2

Browse files
authored
server: (router) fix stopping_thread potentially hang (ggml-org#24728)
* server: (router) fix stopping_thread potentially hang * fix windows build
1 parent e1efd09 commit fe7c8b2

1 file changed

Lines changed: 37 additions & 22 deletions

File tree

tools/server/server-models.cpp

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ extern char **environ;
5454

5555
struct server_subproc {
5656
std::optional<subprocess_s> sproc; // empty while in DOWNLOADING state
57-
std::atomic<bool> stop_download{false}; // flag to signal download cancellation
57+
std::atomic<bool> stopped{false}; // set to cancel a download or signal child process exit
5858

5959
subprocess_s & get() {
6060
GGML_ASSERT(sproc.has_value() && "subprocess not initialized");
@@ -64,6 +64,22 @@ struct server_subproc {
6464
bool is_alive() {
6565
return sproc.has_value() && subprocess_alive(&sproc.value());
6666
}
67+
68+
void terminate() {
69+
if (!sproc.has_value()) {
70+
return;
71+
}
72+
#if defined(_WIN32)
73+
if (sproc->hProcess == NULL) {
74+
return;
75+
}
76+
#else
77+
if (sproc->child <= 0) {
78+
return;
79+
}
80+
#endif
81+
subprocess_terminate(&sproc.value());
82+
}
6783
};
6884

6985

@@ -902,50 +918,49 @@ void server_models::load(const std::string & name) {
902918
});
903919

904920
std::thread stopping_thread([&]() {
905-
// thread to monitor stopping signal OR child crash
921+
// thread to monitor explicit stop requests; child crash is signalled via child_proc->stopped
906922
auto is_stopping = [this, &name]() {
907923
return this->stopping_models.find(name) != this->stopping_models.end();
908924
};
909-
auto should_wake = [&]() {
910-
return is_stopping() || !child_proc->is_alive();
911-
};
912925
{
913926
std::unique_lock<std::mutex> lk(this->mutex);
914-
this->cv_stop.wait(lk, should_wake);
927+
this->cv_stop.wait(lk, [&]() {
928+
return is_stopping() || child_proc->stopped.load(std::memory_order_acquire);
929+
});
915930
}
916-
// child may have already exited (e.g. crashed) — skip shutdown sequence
917-
if (!child_proc->is_alive()) {
931+
// child crashed or finished on its own — skip graceful shutdown sequence
932+
if (child_proc->stopped.load(std::memory_order_acquire)) {
918933
return;
919934
}
920935
SRV_INF("stopping model instance name=%s\n", name.c_str());
921-
// send interrupt to child process
922936
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
923937
fflush(stdin_file);
924-
// wait to stop gracefully or timeout
925938
int64_t start_time = ggml_time_ms();
926939
while (true) {
927940
std::unique_lock<std::mutex> lk(this->mutex);
928-
if (!is_stopping()) {
929-
return; // already stopped
941+
if (!is_stopping() || child_proc->stopped.load(std::memory_order_acquire)) {
942+
return;
930943
}
931944
int64_t elapsed = ggml_time_ms() - start_time;
932945
if (elapsed >= stop_timeout * 1000) {
933-
// timeout, force kill
946+
lk.unlock();
934947
SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout);
935-
subprocess_terminate(&child_proc->get());
948+
child_proc->terminate();
936949
return;
937950
}
938-
this->cv_stop.wait_for(lk, std::chrono::seconds(1));
951+
this->cv_stop.wait_for(lk, std::chrono::seconds(1), [&]() {
952+
return !is_stopping() || child_proc->stopped.load(std::memory_order_acquire);
953+
});
939954
}
940955
});
941956

942-
// we reach here when the child process exits
957+
// we reach here when the child process exits (stdout EOF)
943958
// note: we cannot join() prior to this point because it will close stdin_file
944959
if (log_thread.joinable()) {
945960
log_thread.join();
946961
}
947962

948-
// stop the timeout monitoring thread
963+
child_proc->stopped.store(true, std::memory_order_release);
949964
{
950965
std::lock_guard<std::mutex> lk(this->mutex);
951966
stopping_models.erase(name);
@@ -971,7 +986,7 @@ void server_models::load(const std::string & name) {
971986
// old process should have exited already, but just in case, we clean it up here
972987
if (old_instance.subproc->is_alive()) {
973988
SRV_WRN("old process for model name=%s is still alive, this is unexpected\n", name.c_str());
974-
subprocess_terminate(&old_instance.subproc->get()); // force kill
989+
old_instance.subproc->terminate(); // force kill
975990
}
976991
if (old_instance.th.joinable()) {
977992
old_instance.th.join();
@@ -1039,7 +1054,7 @@ void server_models::download(common_params_model && model, common_download_opts
10391054
dl->opts = opts; // copy
10401055

10411056
dl->should_stop = [sp = inst.subproc]() {
1042-
return sp->stop_download.load(std::memory_order_relaxed);
1057+
return sp->stopped.load(std::memory_order_relaxed);
10431058
};
10441059

10451060
dl->on_progress = [this, name](const common_download_progress & p) {
@@ -1069,7 +1084,7 @@ void server_models::unload(const std::string & name) {
10691084
if (it != mapping.end()) {
10701085
if (it->second.meta.status == SERVER_MODEL_STATUS_DOWNLOADING) {
10711086
SRV_INF("cancelling download for model name=%s\n", name.c_str());
1072-
it->second.subproc->stop_download.store(true, std::memory_order_relaxed);
1087+
it->second.subproc->stopped.store(true, std::memory_order_relaxed);
10731088
// for convenience, we wait the status change here
10741089
wait(lk, name, [](const server_model_meta & new_meta) {
10751090
return new_meta.status != SERVER_MODEL_STATUS_DOWNLOADING;
@@ -1080,7 +1095,7 @@ void server_models::unload(const std::string & name) {
10801095
if (it->second.meta.status == SERVER_MODEL_STATUS_LOADING) {
10811096
// special case: if model is in loading state, unloading means force-killing it
10821097
SRV_WRN("model name=%s is still loading, force-killing\n", name.c_str());
1083-
subprocess_terminate(&it->second.subproc->get());
1098+
it->second.subproc->terminate();
10841099
}
10851100
cv_stop.notify_all();
10861101
// status change will be handled by the managing thread
@@ -1097,7 +1112,7 @@ void server_models::unload_all() {
10971112
for (auto & [name, inst] : mapping) {
10981113
if (inst.meta.status == SERVER_MODEL_STATUS_DOWNLOADING) {
10991114
SRV_INF("cancelling download for model name=%s\n", name.c_str());
1100-
inst.subproc->stop_download.store(true, std::memory_order_relaxed);
1115+
inst.subproc->stopped.store(true, std::memory_order_relaxed);
11011116
} else if (inst.meta.is_running()) {
11021117
SRV_INF("stopping model instance name=%s\n", name.c_str());
11031118
stopping_models.insert(name);

0 commit comments

Comments
 (0)