diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index db6cbce8f9d8..6ead608a4ffb 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -626,56 +626,71 @@ void server_models::load(const std::string & name) { } }); + std::atomic child_finished(false); + std::thread stopping_thread([&]() { - // thread to monitor stopping signal OR child crash + // thread to monitor explicit stop requests; child exit is signalled via child_finished auto is_stopping = [this, &name]() { return this->stopping_models.find(name) != this->stopping_models.end(); }; - auto should_wake = [&]() { - return is_stopping() || !subprocess_alive(child_proc.get()); - }; + { std::unique_lock lk(this->mutex); - this->cv_stop.wait(lk, should_wake); + this->cv_stop.wait(lk, [&]() { + return is_stopping() || child_finished.load(std::memory_order_acquire); + }); + if (!is_stopping() || child_finished.load(std::memory_order_acquire)) { + return; + } } - // child may have already exited (e.g. crashed) — skip shutdown sequence - if (!subprocess_alive(child_proc.get())) { + + // child may have already exited between wake-up and shutdown handling + if (subprocess_alive(child_proc.get()) <= 0) { return; } + SRV_INF("stopping model instance name=%s\n", name.c_str()); - // send interrupt to child process fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT); fflush(stdin_file); - // wait to stop gracefully or timeout + int64_t start_time = ggml_time_ms(); while (true) { + if (subprocess_alive(child_proc.get()) <= 0) { + return; + } + std::unique_lock lk(this->mutex); - if (!is_stopping()) { - return; // already stopped + if (!is_stopping() || child_finished.load(std::memory_order_acquire)) { + return; } + int64_t elapsed = ggml_time_ms() - start_time; if (elapsed >= stop_timeout * 1000) { - // timeout, force kill + lk.unlock(); SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout); subprocess_terminate(child_proc.get()); return; } - this->cv_stop.wait_for(lk, std::chrono::seconds(1)); + + this->cv_stop.wait_for(lk, std::chrono::seconds(1), [&]() { + return !is_stopping() || child_finished.load(std::memory_order_acquire); + }); } }); - // we reach here when the child process exits + // note: we cannot join() prior to this point because it will close stdin_file if (log_thread.joinable()) { log_thread.join(); } - // stop the timeout monitoring thread + child_finished.store(true, std::memory_order_release); { std::lock_guard lk(this->mutex); stopping_models.erase(name); cv_stop.notify_all(); } + if (stopping_thread.joinable()) { stopping_thread.join(); } diff --git a/vendor/sheredom/subprocess.h b/vendor/sheredom/subprocess.h index 3e40bae046a2..f6f93dfec760 100644 --- a/vendor/sheredom/subprocess.h +++ b/vendor/sheredom/subprocess.h @@ -1051,6 +1051,11 @@ int subprocess_terminate(struct subprocess_s *const process) { return success_terminate; #else int result; + + if (process->child <= 0) { + return -1; + } + result = kill(process->child, 9); return result; #endif