Skip to content

Commit 05d60ab

Browse files
committed
Timeout MCP requests, reap stdio child, detect unexpected EOF
1 parent b8ac3e7 commit 05d60ab

2 files changed

Lines changed: 206 additions & 74 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ All notable changes to Sofos are documented in this file.
1818

1919
### Fixed
2020

21+
- **MCP requests are now bounded at 120 seconds.** A frozen stdio MCP server used to wedge every subsequent MCP call because `BufRead::read_line` blocks indefinitely and the stdout mutex serialises requests. The blocking I/O now runs on a dedicated worker thread wrapped in `tokio::time::timeout`; on expiry the child is killed so queued callers fail fast with EPIPE rather than piling up on a mutex that will never release. HTTP MCP clients gain an explicit `reqwest` timeout for the same reason — previously they inherited the library default, which is unlimited.
22+
- **MCP child processes are now reaped on drop.** `std::process::Child::drop` does not wait, so every MCP server previously lingered as a zombie until sofos itself exited. `StdioClient` now has an explicit `Drop` that kills and waits the subprocess.
2123
- **"Goodbye!" no longer prints on the same line as the status row on exit.** The TUI left the cursor parked at the end of the status line; when the session summary short-circuited on zero usage, `print_goodbye` then printed `Goodbye!` flush against `… thinking: 5120 tok`. The teardown now emits an escape-newline on the no-summary path so the message always starts fresh.
2224
- **Safe-mode tool list now matches reality.** `SAFE_MODE_MESSAGE` told the model it had `list_directory, read_file and web_search`, but `get_read_only_tools()` actually exposes `list_directory, read_file, glob_files, web_fetch, web_search` (+ `search_code` when ripgrep is present). The model was being given a false toolset description on safe-mode entry.
2325
- **Corrupted session file on save no longer wipes the in-memory conversation.** `save_session` re-read the prior file to preserve `created_at`; if the file was unparseable (hand-edited, partial prior write, schema drift) the JSON error bubbled up and the current turn's messages were dropped on the floor. The stamp now falls back to `now` on any read/parse failure — losing a date is cheaper than losing the session. Regression test added (`save_session_survives_corrupted_prior_file`).

src/mcp/client.rs

Lines changed: 204 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ use std::io::{BufRead, BufReader, Write};
77
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
88
use std::sync::atomic::{AtomicU64, Ordering};
99
use std::sync::{Arc, Mutex};
10+
use std::time::Duration;
11+
12+
/// Ceiling on a single MCP request (stdio read + HTTP round-trip). A
13+
/// misbehaving MCP server used to freeze every subsequent MCP call
14+
/// because `BufRead::read_line` blocks indefinitely and the stdout
15+
/// mutex serialises all requests. Two minutes is generous enough for
16+
/// slow remote search backends (PubMed / ClinicalTrials.gov) while
17+
/// still bounding a frozen server's blast radius to a single turn.
18+
const MCP_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
1019

1120
fn create_init_request() -> InitializeRequest {
1221
InitializeRequest {
@@ -95,14 +104,122 @@ impl McpClient {
95104
}
96105
}
97106

107+
/// Take the stdin and (buffered) stdout out of a freshly-spawned child.
108+
/// Returning `Err` here is essentially unreachable because
109+
/// `Command::spawn` with `Stdio::piped()` on both ends always populates
110+
/// `Child::stdin` / `Child::stdout`, but we still handle it — leaking
111+
/// a live `Child` out to `Drop` would fail to reap it (the std type
112+
/// doesn't wait).
113+
fn take_child_pipes(process: &mut Child, server_name: &str) -> Result<(ChildStdin, ChildStdout)> {
114+
let stdin = process.stdin.take().ok_or_else(|| {
115+
SofosError::McpError(format!(
116+
"Failed to get stdin for MCP server '{}'",
117+
server_name
118+
))
119+
})?;
120+
let stdout = process.stdout.take().ok_or_else(|| {
121+
SofosError::McpError(format!(
122+
"Failed to get stdout for MCP server '{}'",
123+
server_name
124+
))
125+
})?;
126+
Ok((stdin, stdout))
127+
}
128+
129+
/// Write a single stdio message to the server (JSON-RPC request *or*
130+
/// notification). Shared by `send_request` and `send_notification` so
131+
/// the lock/write/flush sequence lives in one place.
132+
fn stdio_write_blocking(
133+
server_name: &str,
134+
stdin: &Arc<Mutex<ChildStdin>>,
135+
payload: &str,
136+
) -> Result<()> {
137+
let mut stdin_guard = stdin
138+
.lock()
139+
.map_err(|e| SofosError::McpError(format!("Failed to lock stdin: {}", e)))?;
140+
writeln!(stdin_guard, "{}", payload).map_err(|e| {
141+
SofosError::McpError(format!(
142+
"Failed to write to MCP server '{}': {}",
143+
server_name, e
144+
))
145+
})?;
146+
stdin_guard.flush().map_err(|e| {
147+
SofosError::McpError(format!(
148+
"Failed to flush stdin for MCP server '{}': {}",
149+
server_name, e
150+
))
151+
})?;
152+
Ok(())
153+
}
154+
155+
/// Run one stdio MCP request on a worker thread. Owns no `self` so it
156+
/// can be freely moved into `tokio::task::spawn_blocking`. Returns the
157+
/// parsed `JsonRpcResponse` envelope (not the `result` payload) so the
158+
/// caller can distinguish a server-reported error from a transport
159+
/// failure.
160+
fn stdio_request_blocking(
161+
server_name: &str,
162+
stdin: &Arc<Mutex<ChildStdin>>,
163+
stdout: &Arc<Mutex<BufReader<ChildStdout>>>,
164+
request_json: &str,
165+
) -> Result<JsonRpcResponse> {
166+
stdio_write_blocking(server_name, stdin, request_json)?;
167+
168+
let mut stdout_guard = stdout
169+
.lock()
170+
.map_err(|e| SofosError::McpError(format!("Failed to lock stdout: {}", e)))?;
171+
172+
let mut response_line = String::new();
173+
let bytes_read = stdout_guard.read_line(&mut response_line).map_err(|e| {
174+
SofosError::McpError(format!(
175+
"Failed to read from MCP server '{}': {}",
176+
server_name, e
177+
))
178+
})?;
179+
// Zero bytes from `read_line` means the server closed stdout
180+
// cleanly — typically a crash or exit between requests.
181+
// Surface that plainly so the user isn't chasing a bogus
182+
// "parse error" message for what's really a dead server.
183+
if bytes_read == 0 {
184+
return Err(SofosError::McpError(format!(
185+
"MCP server '{}' closed stdout unexpectedly (server crashed or exited?)",
186+
server_name
187+
)));
188+
}
189+
190+
serde_json::from_str(&response_line).map_err(|e| {
191+
SofosError::McpError(format!(
192+
"Failed to parse response from MCP server '{}': {}",
193+
server_name, e
194+
))
195+
})
196+
}
197+
98198
pub struct StdioClient {
99199
server_name: String,
100-
_process: Arc<Mutex<Child>>,
200+
process: Arc<Mutex<Child>>,
101201
stdin: Arc<Mutex<ChildStdin>>,
102202
stdout: Arc<Mutex<BufReader<ChildStdout>>>,
103203
next_id: Arc<AtomicU64>,
104204
}
105205

206+
impl Drop for StdioClient {
207+
fn drop(&mut self) {
208+
// `Child::drop` does NOT wait on the subprocess, so without
209+
// this the MCP server lingers as a zombie until sofos itself
210+
// exits. A detached reap task spawned by `kill_child_detached`
211+
// may also be running when Drop fires — the mutex serialises
212+
// them, and the kill/wait pair is idempotent: a second `kill`
213+
// after the child already exited returns `InvalidInput`, and
214+
// a second `wait` after the child was already reaped returns
215+
// a harmless error. Both are discarded.
216+
if let Ok(mut child) = self.process.lock() {
217+
let _ = child.kill();
218+
let _ = child.wait();
219+
}
220+
}
221+
}
222+
106223
impl StdioClient {
107224
pub async fn new(server_name: String, config: McpServerConfig) -> Result<Self> {
108225
let command = config
@@ -129,23 +246,23 @@ impl StdioClient {
129246
))
130247
})?;
131248

132-
let stdin = process.stdin.take().ok_or_else(|| {
133-
SofosError::McpError(format!(
134-
"Failed to get stdin for MCP server '{}'",
135-
server_name
136-
))
137-
})?;
138-
139-
let stdout = process.stdout.take().ok_or_else(|| {
140-
SofosError::McpError(format!(
141-
"Failed to get stdout for MCP server '{}'",
142-
server_name
143-
))
144-
})?;
249+
// Take the pipes through a helper that reaps the child on
250+
// error, so a (practically-impossible but not-type-proven)
251+
// `None` from `take()` doesn't leak a zombie into the OS
252+
// process table. Once the child is wrapped in `Self`, the
253+
// `Drop` impl takes over.
254+
let (stdin, stdout) = match take_child_pipes(&mut process, &server_name) {
255+
Ok(pair) => pair,
256+
Err(e) => {
257+
let _ = process.kill();
258+
let _ = process.wait();
259+
return Err(e);
260+
}
261+
};
145262

146263
let client = Self {
147264
server_name: server_name.clone(),
148-
_process: Arc::new(Mutex::new(process)),
265+
process: Arc::new(Mutex::new(process)),
149266
stdin: Arc::new(Mutex::new(stdin)),
150267
stdout: Arc::new(Mutex::new(BufReader::new(stdout))),
151268
next_id: Arc::new(AtomicU64::new(1)),
@@ -156,6 +273,53 @@ impl StdioClient {
156273
Ok(client)
157274
}
158275

276+
/// Run a blocking closure with the shared MCP timeout ceiling. On
277+
/// timeout the child is killed off-thread so the async caller
278+
/// doesn't pause the executor waiting for the OS to reap it. Used
279+
/// by both `send_request` and `send_notification` so they share
280+
/// the same lock/panic/timeout error vocabulary.
281+
async fn run_with_timeout<T, F>(&self, label: &str, blocking: F) -> Result<T>
282+
where
283+
F: FnOnce() -> Result<T> + Send + 'static,
284+
T: Send + 'static,
285+
{
286+
let task = tokio::task::spawn_blocking(blocking);
287+
match tokio::time::timeout(MCP_REQUEST_TIMEOUT, task).await {
288+
Ok(Ok(Ok(value))) => Ok(value),
289+
Ok(Ok(Err(e))) => Err(e),
290+
Ok(Err(join_err)) => Err(SofosError::McpError(format!(
291+
"MCP worker panicked for server '{}' during {}: {}",
292+
self.server_name, label, join_err
293+
))),
294+
Err(_) => {
295+
self.kill_child_detached();
296+
Err(SofosError::McpError(format!(
297+
"MCP server '{}' {} timed out after {}s",
298+
self.server_name,
299+
label,
300+
MCP_REQUEST_TIMEOUT.as_secs()
301+
)))
302+
}
303+
}
304+
}
305+
306+
/// Kill + reap the child on a blocking thread without waiting for
307+
/// it to finish here. Used from async timeout handlers so a slow
308+
/// `Child::wait` (milliseconds in practice, but the call is
309+
/// synchronous) never pauses the tokio executor. Firing and
310+
/// forgetting is safe because tokio drains blocking tasks on
311+
/// runtime shutdown, and the `Drop` impl is idempotent against
312+
/// an already-reaped child.
313+
fn kill_child_detached(&self) {
314+
let process = Arc::clone(&self.process);
315+
tokio::task::spawn_blocking(move || {
316+
if let Ok(mut child) = process.lock() {
317+
let _ = child.kill();
318+
let _ = child.wait();
319+
}
320+
});
321+
}
322+
159323
async fn initialize(&self) -> Result<()> {
160324
let response = self
161325
.send_request(
@@ -175,47 +339,16 @@ impl StdioClient {
175339
async fn send_request(&self, method: &str, params: Option<Value>) -> Result<Value> {
176340
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
177341
let request = JsonRpcRequest::new(id, method.to_string(), params);
178-
179342
let request_json = serde_json::to_string(&request)?;
180343

181-
{
182-
let mut stdin = self
183-
.stdin
184-
.lock()
185-
.map_err(|e| SofosError::McpError(format!("Failed to lock stdin: {}", e)))?;
186-
writeln!(stdin, "{}", request_json).map_err(|e| {
187-
SofosError::McpError(format!(
188-
"Failed to write to MCP server '{}': {}",
189-
self.server_name, e
190-
))
191-
})?;
192-
stdin.flush().map_err(|e| {
193-
SofosError::McpError(format!(
194-
"Failed to flush stdin for MCP server '{}': {}",
195-
self.server_name, e
196-
))
197-
})?;
198-
}
199-
200-
let mut stdout = self
201-
.stdout
202-
.lock()
203-
.map_err(|e| SofosError::McpError(format!("Failed to lock stdout: {}", e)))?;
204-
205-
let mut response_line = String::new();
206-
stdout.read_line(&mut response_line).map_err(|e| {
207-
SofosError::McpError(format!(
208-
"Failed to read from MCP server '{}': {}",
209-
self.server_name, e
210-
))
211-
})?;
212-
213-
let response: JsonRpcResponse = serde_json::from_str(&response_line).map_err(|e| {
214-
SofosError::McpError(format!(
215-
"Failed to parse response from MCP server '{}': {}",
216-
self.server_name, e
217-
))
218-
})?;
344+
let server_name = self.server_name.clone();
345+
let stdin = Arc::clone(&self.stdin);
346+
let stdout = Arc::clone(&self.stdout);
347+
let response = self
348+
.run_with_timeout("request", move || {
349+
stdio_request_blocking(&server_name, &stdin, &stdout, &request_json)
350+
})
351+
.await?;
219352

220353
if let Some(error) = response.error {
221354
return Err(SofosError::McpError(format!(
@@ -237,27 +370,17 @@ impl StdioClient {
237370
"jsonrpc": "2.0",
238371
"method": method,
239372
});
240-
241373
let notification_json = serde_json::to_string(&notification)?;
242374

243-
let mut stdin = self
244-
.stdin
245-
.lock()
246-
.map_err(|e| SofosError::McpError(format!("Failed to lock stdin: {}", e)))?;
247-
writeln!(stdin, "{}", notification_json).map_err(|e| {
248-
SofosError::McpError(format!(
249-
"Failed to write notification to MCP server '{}': {}",
250-
self.server_name, e
251-
))
252-
})?;
253-
stdin.flush().map_err(|e| {
254-
SofosError::McpError(format!(
255-
"Failed to flush stdin for MCP server '{}': {}",
256-
self.server_name, e
257-
))
258-
})?;
259-
260-
Ok(())
375+
// Notifications are one-way (no response to read), but the
376+
// write itself can still block forever if the child has
377+
// wedged its read side. Same timeout path as `send_request`.
378+
let server_name = self.server_name.clone();
379+
let stdin = Arc::clone(&self.stdin);
380+
self.run_with_timeout("notification", move || {
381+
stdio_write_blocking(&server_name, &stdin, &notification_json)
382+
})
383+
.await
261384
}
262385

263386
pub async fn list_tools(&self) -> Result<Vec<McpTool>> {
@@ -285,7 +408,14 @@ impl HttpClient {
285408

286409
let headers = config.headers.unwrap_or_default();
287410

288-
let client = reqwest::Client::new();
411+
// A bare `reqwest::Client::new()` uses no request timeout at
412+
// all, so a slow remote MCP server could stall a turn forever.
413+
// Set the shared MCP ceiling at client-construction time so
414+
// every call-site inherits it without extra threading.
415+
let client = reqwest::Client::builder()
416+
.timeout(MCP_REQUEST_TIMEOUT)
417+
.build()
418+
.map_err(|e| SofosError::McpError(format!("Failed to build MCP HTTP client: {}", e)))?;
289419

290420
let http_client = Self {
291421
server_name: server_name.clone(),

0 commit comments

Comments
 (0)