Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions tools/server/server-models.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,56 +626,71 @@ void server_models::load(const std::string & name) {
}
});

std::atomic<bool> child_finished(false);

std::thread stopping_thread([&]() {
// thread to monitor stopping signal OR child crash
// thread to monitor explicit stop requests; child exit is signalled via child_finished
auto is_stopping = [this, &name]() {
return this->stopping_models.find(name) != this->stopping_models.end();
};
auto should_wake = [&]() {
return is_stopping() || !subprocess_alive(child_proc.get());
};

{
std::unique_lock<std::mutex> lk(this->mutex);
this->cv_stop.wait(lk, should_wake);
this->cv_stop.wait(lk, [&]() {
return is_stopping() || child_finished.load(std::memory_order_acquire);
});
if (!is_stopping() || child_finished.load(std::memory_order_acquire)) {
return;
}
}
// child may have already exited (e.g. crashed) — skip shutdown sequence
if (!subprocess_alive(child_proc.get())) {

// child may have already exited between wake-up and shutdown handling
if (subprocess_alive(child_proc.get()) <= 0) {
return;
}

SRV_INF("stopping model instance name=%s\n", name.c_str());
// send interrupt to child process
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
fflush(stdin_file);
// wait to stop gracefully or timeout

int64_t start_time = ggml_time_ms();
while (true) {
if (subprocess_alive(child_proc.get()) <= 0) {
return;
}

std::unique_lock<std::mutex> lk(this->mutex);
if (!is_stopping()) {
return; // already stopped
if (!is_stopping() || child_finished.load(std::memory_order_acquire)) {
return;
}

int64_t elapsed = ggml_time_ms() - start_time;
if (elapsed >= stop_timeout * 1000) {
// timeout, force kill
lk.unlock();
SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout);
subprocess_terminate(child_proc.get());
return;
}
this->cv_stop.wait_for(lk, std::chrono::seconds(1));

this->cv_stop.wait_for(lk, std::chrono::seconds(1), [&]() {
return !is_stopping() || child_finished.load(std::memory_order_acquire);
});
}
});

// we reach here when the child process exits

// note: we cannot join() prior to this point because it will close stdin_file
if (log_thread.joinable()) {
log_thread.join();
}

// stop the timeout monitoring thread
child_finished.store(true, std::memory_order_release);
{
std::lock_guard<std::mutex> lk(this->mutex);
stopping_models.erase(name);
cv_stop.notify_all();
}

if (stopping_thread.joinable()) {
stopping_thread.join();
}
Expand Down
5 changes: 5 additions & 0 deletions vendor/sheredom/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,11 @@ int subprocess_terminate(struct subprocess_s *const process) {
return success_terminate;
#else
int result;

if (process->child <= 0) {
return -1;
}

result = kill(process->child, 9);
return result;
#endif
Expand Down