-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathwin32.py
More file actions
221 lines (188 loc) · 7.27 KB
/
win32.py
File metadata and controls
221 lines (188 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""
Windows-specific functionality for stdio client operations.
"""
import os
import shutil
import signal
import subprocess
import sys
from pathlib import Path
from typing import BinaryIO, TextIO, cast
import anyio
from anyio import to_thread
from anyio.abc import Process
from anyio.streams.file import FileReadStream, FileWriteStream
def get_windows_executable_command(command: str) -> str:
"""
Get the correct executable command normalized for Windows.
On Windows, commands might exist with specific extensions (.exe, .cmd, etc.)
that need to be located for proper execution.
Args:
command: Base command (e.g., 'uvx', 'npx')
Returns:
str: Windows-appropriate command path
"""
try:
# First check if command exists in PATH as-is
if command_path := shutil.which(command):
return command_path
# Check for Windows-specific extensions
for ext in [".cmd", ".bat", ".exe", ".ps1"]:
ext_version = f"{command}{ext}"
if ext_path := shutil.which(ext_version):
return ext_path
# For regular commands or if we couldn't find special versions
return command
except OSError:
# Handle file system errors during path resolution
# (permissions, broken symlinks, etc.)
return command
class FallbackProcess:
"""
A fallback process wrapper for Windows to handle async I/O
when using subprocess.Popen, which provides sync-only FileIO objects.
This wraps stdin and stdout into async-compatible
streams (FileReadStream, FileWriteStream),
so that MCP clients expecting async streams can work properly.
"""
def __init__(self, popen_obj: subprocess.Popen[bytes]):
self.popen: subprocess.Popen[bytes] = popen_obj
self.stdin_raw = popen_obj.stdin # type: ignore[assignment]
self.stdout_raw = popen_obj.stdout # type: ignore[assignment]
self.stderr = popen_obj.stderr # type: ignore[assignment]
self.stdin = FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None
self.stdout = FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None
async def __aenter__(self):
"""Support async context manager entry."""
return self
async def __aexit__(
self,
exc_type: BaseException | None,
exc_val: BaseException | None,
exc_tb: object | None,
) -> None:
"""Terminate and wait on process exit inside a thread."""
# Try graceful shutdown with CTRL_C_EVENT on Windows
if sys.platform == "win32" and hasattr(signal, "CTRL_C_EVENT"):
try:
# Send CTRL_C_EVENT for graceful shutdown
os.kill(self.popen.pid, getattr(signal, "CTRL_C_EVENT"))
# Wait for process to exit gracefully (2 second timeout)
await to_thread.run_sync(lambda: self.popen.wait(timeout=2))
except (subprocess.TimeoutExpired, ProcessLookupError, OSError):
# If graceful shutdown fails, fall back to terminate
try:
self.popen.terminate()
await to_thread.run_sync(self.popen.wait)
except ProcessLookupError:
# Process already exited
pass
else:
# Non-Windows or fallback behavior
try:
self.popen.terminate()
await to_thread.run_sync(self.popen.wait)
except ProcessLookupError:
# Process already exited
pass
# Close the file handles to prevent ResourceWarning
if self.stdin:
await self.stdin.aclose()
if self.stdout:
await self.stdout.aclose()
if self.stdin_raw:
self.stdin_raw.close()
if self.stdout_raw:
self.stdout_raw.close()
if self.stderr:
self.stderr.close()
async def wait(self):
"""Async wait for process completion."""
return await to_thread.run_sync(self.popen.wait)
def terminate(self):
"""Terminate the subprocess immediately."""
return self.popen.terminate()
def kill(self) -> None:
"""Kill the subprocess immediately (alias for terminate)."""
self.terminate()
# ------------------------
# Updated function
# ------------------------
async def create_windows_process(
command: str,
args: list[str],
env: dict[str, str] | None = None,
errlog: TextIO | None = sys.stderr,
cwd: Path | str | None = None,
) -> FallbackProcess:
"""
Creates a subprocess in a Windows-compatible way.
On Windows, asyncio.create_subprocess_exec has incomplete support
(NotImplementedError when trying to open subprocesses).
Therefore, we fallback to subprocess.Popen and wrap it for async usage.
Args:
command (str): The executable to run
args (list[str]): List of command line arguments
env (dict[str, str] | None): Environment variables
errlog (TextIO | None): Where to send stderr output (defaults to sys.stderr)
cwd (Path | str | None): Working directory for the subprocess
Returns:
FallbackProcess: Async-compatible subprocess with stdin and stdout streams
"""
try:
# Try launching with creationflags to avoid opening a new console window
popen_obj = subprocess.Popen(
[command, *args],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=errlog,
env=env,
cwd=cwd,
bufsize=0, # Unbuffered output
creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0),
)
return FallbackProcess(popen_obj)
except Exception:
# If creationflags failed, fallback without them
popen_obj = subprocess.Popen(
[command, *args],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=errlog,
env=env,
cwd=cwd,
bufsize=0,
)
return FallbackProcess(popen_obj)
async def terminate_windows_process(process: Process | FallbackProcess):
"""
Terminate a Windows process gracefully.
First attempts graceful shutdown using CTRL_C_EVENT signal,
which allows the process to run cleanup code.
Falls back to terminate() and then kill() if graceful shutdown fails.
Args:
process: The process to terminate
"""
# Try graceful shutdown with CTRL_C_EVENT first
if isinstance(process, FallbackProcess) and hasattr(signal, "CTRL_C_EVENT"):
try:
# Send CTRL_C_EVENT for graceful shutdown
os.kill(process.popen.pid, getattr(signal, "CTRL_C_EVENT"))
with anyio.fail_after(2.0):
await process.wait()
return
except (TimeoutError, ProcessLookupError, OSError):
# If CTRL_C_EVENT failed or timed out, continue to forceful termination
pass
# Fall back to terminate
try:
process.terminate()
with anyio.fail_after(2.0):
await process.wait()
except TimeoutError:
# Force kill if it doesn't terminate
try:
process.kill()
except ProcessLookupError:
# Process already exited
pass