diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index ff9a0df12f4b..5225b65f7699 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -54,7 +54,7 @@ extern char **environ; struct server_subproc { std::optional sproc; // empty while in DOWNLOADING state - std::atomic stop_download{false}; // flag to signal download cancellation + std::atomic stopped{false}; // set to cancel a download or signal child process exit subprocess_s & get() { GGML_ASSERT(sproc.has_value() && "subprocess not initialized"); @@ -64,6 +64,22 @@ struct server_subproc { bool is_alive() { return sproc.has_value() && subprocess_alive(&sproc.value()); } + + void terminate() { + if (!sproc.has_value()) { + return; + } +#if defined(_WIN32) + if (sproc->hProcess == NULL) { + return; + } +#else + if (sproc->child <= 0) { + return; + } +#endif + subprocess_terminate(&sproc.value()); + } }; @@ -896,50 +912,49 @@ void server_models::load(const std::string & name) { }); std::thread stopping_thread([&]() { - // thread to monitor stopping signal OR child crash + // thread to monitor explicit stop requests; child crash is signalled via child_proc->stopped auto is_stopping = [this, &name]() { return this->stopping_models.find(name) != this->stopping_models.end(); }; - auto should_wake = [&]() { - return is_stopping() || !child_proc->is_alive(); - }; { std::unique_lock lk(this->mutex); - this->cv_stop.wait(lk, should_wake); + this->cv_stop.wait(lk, [&]() { + return is_stopping() || child_proc->stopped.load(std::memory_order_acquire); + }); } - // child may have already exited (e.g. crashed) — skip shutdown sequence - if (!child_proc->is_alive()) { + // child crashed or finished on its own — skip graceful shutdown sequence + if (child_proc->stopped.load(std::memory_order_acquire)) { return; } SRV_INF("stopping model instance name=%s\n", name.c_str()); - // send interrupt to child process fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT); fflush(stdin_file); - // wait to stop gracefully or timeout int64_t start_time = ggml_time_ms(); while (true) { std::unique_lock lk(this->mutex); - if (!is_stopping()) { - return; // already stopped + if (!is_stopping() || child_proc->stopped.load(std::memory_order_acquire)) { + return; } int64_t elapsed = ggml_time_ms() - start_time; if (elapsed >= stop_timeout * 1000) { - // timeout, force kill + lk.unlock(); SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout); - subprocess_terminate(&child_proc->get()); + child_proc->terminate(); return; } - this->cv_stop.wait_for(lk, std::chrono::seconds(1)); + this->cv_stop.wait_for(lk, std::chrono::seconds(1), [&]() { + return !is_stopping() || child_proc->stopped.load(std::memory_order_acquire); + }); } }); - // we reach here when the child process exits + // we reach here when the child process exits (stdout EOF) // note: we cannot join() prior to this point because it will close stdin_file if (log_thread.joinable()) { log_thread.join(); } - // stop the timeout monitoring thread + child_proc->stopped.store(true, std::memory_order_release); { std::lock_guard lk(this->mutex); stopping_models.erase(name); @@ -965,7 +980,7 @@ void server_models::load(const std::string & name) { // old process should have exited already, but just in case, we clean it up here if (old_instance.subproc->is_alive()) { SRV_WRN("old process for model name=%s is still alive, this is unexpected\n", name.c_str()); - subprocess_terminate(&old_instance.subproc->get()); // force kill + old_instance.subproc->terminate(); // force kill } if (old_instance.th.joinable()) { old_instance.th.join(); @@ -1033,7 +1048,7 @@ void server_models::download(common_params_model && model, common_download_opts dl->opts = opts; // copy dl->should_stop = [sp = inst.subproc]() { - return sp->stop_download.load(std::memory_order_relaxed); + return sp->stopped.load(std::memory_order_relaxed); }; dl->on_progress = [this, name](const common_download_progress & p) { @@ -1063,7 +1078,7 @@ void server_models::unload(const std::string & name) { if (it != mapping.end()) { if (it->second.meta.status == SERVER_MODEL_STATUS_DOWNLOADING) { SRV_INF("cancelling download for model name=%s\n", name.c_str()); - it->second.subproc->stop_download.store(true, std::memory_order_relaxed); + it->second.subproc->stopped.store(true, std::memory_order_relaxed); // for convenience, we wait the status change here wait(lk, name, [](const server_model_meta & new_meta) { return new_meta.status != SERVER_MODEL_STATUS_DOWNLOADING; @@ -1074,7 +1089,7 @@ void server_models::unload(const std::string & name) { if (it->second.meta.status == SERVER_MODEL_STATUS_LOADING) { // special case: if model is in loading state, unloading means force-killing it SRV_WRN("model name=%s is still loading, force-killing\n", name.c_str()); - subprocess_terminate(&it->second.subproc->get()); + it->second.subproc->terminate(); } cv_stop.notify_all(); // status change will be handled by the managing thread @@ -1091,7 +1106,7 @@ void server_models::unload_all() { for (auto & [name, inst] : mapping) { if (inst.meta.status == SERVER_MODEL_STATUS_DOWNLOADING) { SRV_INF("cancelling download for model name=%s\n", name.c_str()); - inst.subproc->stop_download.store(true, std::memory_order_relaxed); + inst.subproc->stopped.store(true, std::memory_order_relaxed); } else if (inst.meta.is_running()) { SRV_INF("stopping model instance name=%s\n", name.c_str()); stopping_models.insert(name);