Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 51 additions & 38 deletions codex-rs/utils/pty/src/pipe.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,56 @@
use std::collections::HashMap;
#[cfg(not(windows))]
use std::io;
#[cfg(not(windows))]
use std::io::ErrorKind;
use std::path::Path;
#[cfg(not(windows))]
use std::process::Stdio;
#[cfg(not(windows))]
use std::sync::Arc;
#[cfg(not(windows))]
use std::sync::Mutex as StdMutex;
#[cfg(not(windows))]
use std::sync::atomic::AtomicBool;

use anyhow::Result;
#[cfg(not(windows))]
use tokio::io::AsyncRead;
#[cfg(not(windows))]
use tokio::io::AsyncReadExt;
#[cfg(not(windows))]
use tokio::io::AsyncWriteExt;
#[cfg(not(windows))]
use tokio::io::BufReader;
#[cfg(not(windows))]
use tokio::process::Command;
#[cfg(not(windows))]
use tokio::sync::mpsc;
#[cfg(not(windows))]
use tokio::sync::oneshot;
#[cfg(not(windows))]
use tokio::task::JoinHandle;

#[cfg(not(windows))]
use crate::process::ChildTerminator;
#[cfg(not(windows))]
use crate::process::ProcessHandle;
#[cfg(not(windows))]
use crate::process::ProcessSignal;
use crate::process::SpawnedProcess;
#[cfg(not(windows))]
use crate::process::exit_code_from_status;

#[cfg(target_os = "linux")]
use libc;

#[cfg(not(windows))]
struct PipeChildTerminator {
#[cfg(windows)]
pid: u32,
#[cfg(unix)]
process_group_id: u32,
}

#[cfg(not(windows))]
impl ChildTerminator for PipeChildTerminator {
fn signal(&mut self, signal: ProcessSignal) -> io::Result<()> {
match signal {
Expand All @@ -56,36 +74,14 @@ impl ChildTerminator for PipeChildTerminator {
crate::process_group::kill_process_group(self.process_group_id)
}

#[cfg(windows)]
{
kill_process(self.pid)
}

#[cfg(not(any(unix, windows)))]
{
Ok(())
}
}
}

#[cfg(windows)]
fn kill_process(pid: u32) -> io::Result<()> {
unsafe {
let handle = winapi::um::processthreadsapi::OpenProcess(
winapi::um::winnt::PROCESS_TERMINATE,
0,
pid,
);
if handle.is_null() {
return Err(io::Error::last_os_error());
}
let success = winapi::um::processthreadsapi::TerminateProcess(handle, 1);
let err = io::Error::last_os_error();
winapi::um::handleapi::CloseHandle(handle);
if success == 0 { Err(err) } else { Ok(()) }
}
}

#[cfg(not(windows))]
async fn read_output_stream<R>(mut reader: R, output_tx: mpsc::Sender<Vec<u8>>)
where
R: AsyncRead + Unpin,
Expand All @@ -103,12 +99,14 @@ where
}
}

#[cfg(not(windows))]
#[derive(Clone, Copy)]
enum PipeStdinMode {
Piped,
Null,
}

#[cfg(not(windows))]
async fn spawn_process_with_stdin_mode(
program: &str,
args: &[String],
Expand Down Expand Up @@ -240,8 +238,6 @@ async fn spawn_process_with_stdin_mode(
let handle = ProcessHandle::new(
writer_tx,
Box::new(PipeChildTerminator {
#[cfg(windows)]
pid,
#[cfg(unix)]
process_group_id,
}),
Expand Down Expand Up @@ -271,7 +267,16 @@ pub async fn spawn_process(
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped, &[]).await
#[cfg(windows)]
{
let _ = arg0;
crate::win::pipe::spawn_process(program, args, cwd, env).await
}
#[cfg(not(windows))]
{
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped, &[])
.await
}
}

/// Spawn a process using regular pipes, but close stdin immediately.
Expand All @@ -295,14 +300,22 @@ pub async fn spawn_process_no_stdin_with_inherited_fds(
arg0: &Option<String>,
inherited_fds: &[i32],
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(
program,
args,
cwd,
env,
arg0,
PipeStdinMode::Null,
inherited_fds,
)
.await
#[cfg(windows)]
{
let _ = (arg0, inherited_fds);
crate::win::pipe::spawn_process_no_stdin(program, args, cwd, env).await
}
#[cfg(not(windows))]
{
spawn_process_with_stdin_mode(
program,
args,
cwd,
env,
arg0,
PipeStdinMode::Null,
inherited_fds,
)
.await
}
}
2 changes: 2 additions & 0 deletions codex-rs/utils/pty/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use core::fmt;
use std::io;
#[cfg(unix)]
use std::os::fd::RawFd;
#[cfg(not(windows))]
use std::process::ExitStatus;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub(crate) fn unsupported_signal(signal: ProcessSignal) -> io::Error {
}
}

#[cfg(not(windows))]
pub(crate) fn exit_code_from_status(status: ExitStatus) -> i32 {
if let Some(code) = status.code() {
return code;
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/utils/pty/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ mod windows_job_test_support;
#[path = "windows_conpty_job_tests.rs"]
mod windows_conpty_job_tests;

#[cfg(windows)]
#[path = "windows_pipe_job_tests.rs"]
mod windows_pipe_job_tests;

fn find_python() -> Option<String> {
for candidate in ["python3", "python"] {
if let Ok(output) = std::process::Command::new(candidate)
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/utils/pty/src/win/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ use winapi::um::synchapi::WaitForSingleObject;
use winapi::um::winbase::INFINITE;
use winapi::um::winbase::WAIT_OBJECT_0;

#[cfg(test)]
mod command;
pub(crate) mod conpty;
mod job;
pub(crate) mod pipe;
mod procthreadattr;
mod psuedocon;

Expand Down
Loading
Loading