Skip to content

Commit a100aaf

Browse files
committed
Add Windows PTY support for Python worker
Run built-in Python through ConPTY on Windows so it uses the same PTY-backed stdin and sideband accounting path as Unix. Keep the worker protocol unchanged while adding Windows console setup, input draining, and zod/Python parity coverage. Verification: cargo check; cargo build; python tests/run_integration_tests.py --binary target/debug/mcp-repl.exe; cargo clippy --all-targets --all-features -- -D warnings; cargo test --quiet; cargo +nightly fmt. The required python3 integration command could not run because python3 is not installed on this Windows host.
1 parent 5fc510b commit a100aaf

9 files changed

Lines changed: 1107 additions & 162 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
landlock = "0.4.4"
4242
seccompiler = "0.5.0"
4343

44-
[target.'cfg(unix)'.dependencies]
44+
[target.'cfg(any(unix, windows))'.dependencies]
4545
portable-pty = "0.9.0"
4646

4747
[target.'cfg(target_os = "macos")'.dependencies]
@@ -56,6 +56,7 @@
5656
"Win32_Security_Authorization",
5757
"Win32_Storage_FileSystem",
5858
"Win32_System_Console",
59+
"Win32_System_Environment",
5960
"Win32_System_IO",
6061
"Win32_System_JobObjects",
6162
"Win32_System_Pipes",

docs/plans/active/worker-server-protocol-zod.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ ordered on the worker-to-server sideband stream. The server must not
647647
assume that writing the `interrupt` message means the worker has already
648648
processed it; later `readline_input`, `readline_discard`,
649649
`readline_start`, and `session_end` events determine recovery.
650-
Built-in Unix Python currently has a private `python_interrupt` /
650+
Built-in PTY-backed Python currently has a private `python_interrupt` /
651651
`python_interrupt_ack` cleanup handshake so it can drain PTY input before
652652
SIGINT; that acknowledgement is transitional and not part of the generic
653653
worker protocol.

src/backend.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ impl WorkerLaunch {
3939

4040
pub fn stdin_transport(&self) -> WorkerStdinTransport {
4141
match self {
42-
Self::Builtin(Backend::Python) if cfg!(target_family = "unix") => {
42+
Self::Builtin(Backend::Python)
43+
if cfg!(any(target_family = "unix", target_os = "windows")) =>
44+
{
4345
WorkerStdinTransport::Pty
4446
}
4547
Self::Builtin(_) => WorkerStdinTransport::Pipe,
@@ -198,12 +200,12 @@ mod tests {
198200
WorkerLaunch::Builtin(Backend::R).stdin_transport(),
199201
WorkerStdinTransport::Pipe
200202
);
201-
#[cfg(not(target_family = "unix"))]
203+
#[cfg(not(any(target_family = "unix", target_os = "windows")))]
202204
assert_eq!(
203205
WorkerLaunch::Builtin(Backend::Python).stdin_transport(),
204206
WorkerStdinTransport::Pipe
205207
);
206-
#[cfg(target_family = "unix")]
208+
#[cfg(any(target_family = "unix", target_os = "windows"))]
207209
assert_eq!(
208210
WorkerLaunch::Builtin(Backend::Python).stdin_transport(),
209211
WorkerStdinTransport::Pty

src/ipc.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,9 @@ impl OutputCriticalIpcWriter {
279279
.writer
280280
.lock()
281281
.map_err(|_| io::Error::other("ipc writer mutex poisoned"))?;
282-
write_ipc_message(&mut **writer, &message)
282+
let result = write_ipc_message(&mut **writer, &message);
283+
drop(writer);
284+
result
283285
}
284286
}
285287

@@ -631,7 +633,10 @@ impl ServerIpcConnection {
631633
Ok(())
632634
}
633635

634-
#[cfg_attr(target_family = "unix", allow(dead_code))]
636+
#[cfg_attr(
637+
any(target_family = "unix", target_family = "windows"),
638+
allow(dead_code)
639+
)]
635640
pub fn begin_request(&self) {
636641
let mut guard = self.inbox.lock().unwrap();
637642
reset_after_completed_request(&mut guard);
@@ -1009,8 +1014,9 @@ impl WorkerIpcConnection {
10091014
fn write_ipc_message<T: Serialize>(writer: &mut dyn Write, message: &T) -> io::Result<()> {
10101015
let payload = serde_json::to_string(message).map_err(io::Error::other)?;
10111016
writer.write_all(payload.as_bytes())?;
1012-
writer.write_all(b"\n")?;
1013-
writer.flush()
1017+
// IPC transports are unbuffered OS pipes. On Windows named pipes, flushing
1018+
// can wait for peer drainage, so a complete JSONL write is the sync point.
1019+
writer.write_all(b"\n")
10141020
}
10151021

10161022
#[derive(Debug)]
@@ -1113,7 +1119,7 @@ impl IpcServer {
11131119
self,
11141120
handle: IpcHandle,
11151121
handlers: IpcHandlers,
1116-
child: &mut std::process::Child,
1122+
child_exited: impl FnMut() -> io::Result<bool>,
11171123
max_wait: Duration,
11181124
) -> io::Result<()> {
11191125
let Some(server_pipe_to_worker) = self.server_pipe_to_worker else {
@@ -1127,9 +1133,10 @@ impl IpcServer {
11271133
));
11281134
};
11291135
let start = Instant::now();
1130-
connect_named_pipe_with_process_retry(&server_pipe_to_worker, child, max_wait)?;
1136+
let child_exited = std::cell::RefCell::new(child_exited);
1137+
connect_named_pipe_with_process_retry(&server_pipe_to_worker, &child_exited, max_wait)?;
11311138
let remaining = max_wait.saturating_sub(start.elapsed());
1132-
connect_named_pipe_with_process_retry(&server_pipe_from_worker, child, remaining)?;
1139+
connect_named_pipe_with_process_retry(&server_pipe_from_worker, &child_exited, remaining)?;
11331140
let conn = ServerIpcConnection::new(
11341141
IpcTransport {
11351142
reader: Box::new(server_pipe_from_worker),
@@ -1459,12 +1466,12 @@ fn join_connector_with_grace(connector: thread::JoinHandle<()>, max_wait: Durati
14591466
#[cfg(target_family = "windows")]
14601467
fn connect_named_pipe_with_process_retry(
14611468
server_pipe: &File,
1462-
child: &mut std::process::Child,
1469+
child_exited: &std::cell::RefCell<impl FnMut() -> io::Result<bool>>,
14631470
max_wait: Duration,
14641471
) -> io::Result<()> {
14651472
connect_named_pipe_with_process_retry_impl(
14661473
|timeout| connect_named_pipe(server_pipe, timeout),
1467-
|| child.try_wait().map(|status| status.is_some()),
1474+
|| child_exited.borrow_mut()(),
14681475
max_wait,
14691476
)
14701477
}

0 commit comments

Comments
 (0)