Skip to content

Commit dfb829f

Browse files
committed
feat(agent): implement exec detached mode
1 parent acf770f commit dfb829f

4 files changed

Lines changed: 145 additions & 46 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

devolutions-session/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ win-api-wrappers = { path = "../crates/win-api-wrappers", optional = true }
4444

4545
[dependencies.now-proto-pdu]
4646
optional = true
47-
version = "0.3.2"
47+
git = "https://github.com/Devolutions/now-proto"
48+
branch = "feat/exec-detached"
4849
features = ["std"]
4950

5051
[target.'cfg(windows)'.build-dependencies]

devolutions-session/src/dvc/process.rs

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ pub enum ServerChannelEvent {
9898
pub struct WinApiProcessCtx {
9999
session_id: u32,
100100

101-
io_notification_tx: Sender<ServerChannelEvent>,
102-
103101
stdout_read_pipe: Option<Pipe>,
104102
stderr_read_pipe: Option<Pipe>,
105103
stdin_write_pipe: Option<Pipe>,
@@ -123,7 +121,10 @@ impl WinApiProcessCtx {
123121
Ok(())
124122
}
125123

126-
pub fn process_cancel(&mut self) -> Result<(), ExecError> {
124+
pub fn process_cancel(
125+
&mut self,
126+
io_notification_tx: &Sender<ServerChannelEvent>,
127+
) -> Result<(), ExecError> {
127128
info!(
128129
session_id = self.session_id,
129130
"Cancelling process execution by user request"
@@ -135,15 +136,19 @@ impl WinApiProcessCtx {
135136

136137
// Acknowledge client that cancel request has been processed
137138
// successfully.
138-
self.io_notification_tx
139+
io_notification_tx
139140
.blocking_send(ServerChannelEvent::SessionCancelSuccess {
140141
session_id: self.session_id,
141142
})?;
142143

143144
Ok(())
144145
}
145146

146-
pub fn wait(mut self, mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>) -> Result<u32, ExecError> {
147+
pub fn wait(
148+
mut self,
149+
mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>,
150+
io_notification_tx: Sender<ServerChannelEvent>,
151+
) -> Result<u32, ExecError> {
147152
let session_id = self.session_id;
148153

149154
info!(session_id, "Waiting for process to exit");
@@ -153,7 +158,7 @@ impl WinApiProcessCtx {
153158
const WAIT_OBJECT_INPUT_MESSAGE: WAIT_EVENT = WAIT_OBJECT_0;
154159
const WAIT_OBJECT_PROCESS_EXIT: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 1);
155160

156-
self.io_notification_tx
161+
io_notification_tx
157162
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
158163

159164
loop {
@@ -179,7 +184,7 @@ impl WinApiProcessCtx {
179184
return Err(ExecError::Aborted);
180185
}
181186
ProcessIoInputEvent::CancelExecution => {
182-
self.process_cancel()?;
187+
self.process_cancel(&io_notification_tx)?;
183188

184189
// wait for process to exit
185190
continue;
@@ -209,6 +214,7 @@ impl WinApiProcessCtx {
209214
pub fn wait_with_io_redirection(
210215
mut self,
211216
mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>,
217+
io_notification_tx: Sender<ServerChannelEvent>,
212218
) -> Result<u32, ExecError> {
213219
let session_id = self.session_id;
214220

@@ -277,7 +283,7 @@ impl WinApiProcessCtx {
277283

278284
// Signal client side about started execution
279285

280-
self.io_notification_tx
286+
io_notification_tx
281287
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
282288

283289
info!(session_id, "Process IO is ready for async loop execution");
@@ -304,7 +310,7 @@ impl WinApiProcessCtx {
304310
return Err(ExecError::Aborted);
305311
}
306312
ProcessIoInputEvent::CancelExecution => {
307-
self.process_cancel()?;
313+
self.process_cancel(&io_notification_tx)?;
308314

309315
// wait for process to exit
310316
continue;
@@ -369,7 +375,7 @@ impl WinApiProcessCtx {
369375
// EOF on stdout pipe, close it and send EOF message to message_tx
370376
self.stdout_read_pipe = None;
371377

372-
self.io_notification_tx
378+
io_notification_tx
373379
.blocking_send(ServerChannelEvent::SessionDataOut {
374380
session_id,
375381
stream: NowExecDataStreamKind::Stdout,
@@ -382,7 +388,7 @@ impl WinApiProcessCtx {
382388
continue;
383389
}
384390

385-
self.io_notification_tx
391+
io_notification_tx
386392
.blocking_send(ServerChannelEvent::SessionDataOut {
387393
session_id,
388394
stream: NowExecDataStreamKind::Stdout,
@@ -432,7 +438,7 @@ impl WinApiProcessCtx {
432438
ERROR_HANDLE_EOF | ERROR_BROKEN_PIPE => {
433439
// EOF on stderr pipe, close it and send EOF message to message_tx
434440
self.stderr_read_pipe = None;
435-
self.io_notification_tx
441+
io_notification_tx
436442
.blocking_send(ServerChannelEvent::SessionDataOut {
437443
session_id,
438444
stream: NowExecDataStreamKind::Stderr,
@@ -445,7 +451,7 @@ impl WinApiProcessCtx {
445451
continue;
446452
}
447453

448-
self.io_notification_tx
454+
io_notification_tx
449455
.blocking_send(ServerChannelEvent::SessionDataOut {
450456
session_id,
451457
stream: NowExecDataStreamKind::Stderr,
@@ -527,12 +533,13 @@ impl WinApiProcessBuilder {
527533
self
528534
}
529535

530-
/// Starts process execution and spawns IO thread to redirect stdio to/from dvc.
531-
pub fn run(
536+
/// Internal implementation for process execution.
537+
fn run_impl(
532538
mut self,
533539
session_id: u32,
534-
io_notification_tx: Sender<ServerChannelEvent>,
535-
) -> Result<WinApiProcess, ExecError> {
540+
io_notification_tx: Option<Sender<ServerChannelEvent>>,
541+
detached: bool,
542+
) -> Result<Option<WinApiProcess>, ExecError> {
536543
let command_line = format!("\"{}\" {}", self.executable, self.command_line)
537544
.trim_end()
538545
.to_owned();
@@ -557,31 +564,41 @@ impl WinApiProcessBuilder {
557564
let io_redirection = self.enable_io_redirection;
558565

559566
let process_ctx = if io_redirection {
560-
prepare_process_with_io_redirection(
561-
session_id,
562-
command_line,
563-
current_directory,
564-
self.env,
565-
io_notification_tx.clone(),
566-
)?
567+
prepare_process_with_io_redirection(session_id, command_line, current_directory, self.env)?
567568
} else {
568-
prepare_process(
569-
session_id,
570-
command_line,
571-
current_directory,
572-
self.env,
573-
io_notification_tx.clone(),
574-
)?
569+
prepare_process(session_id, command_line, current_directory, self.env)?
575570
};
576571

572+
if detached {
573+
// For detached mode, spawn a thread that waits for process exit and keeps temp files alive
574+
std::thread::spawn(move || {
575+
let _temp_files = temp_files; // Keep temp files alive
576+
577+
// Wait for process to exit (indefinitely)
578+
if let Err(error) = process_ctx.process.wait(None) {
579+
error!(%error, session_id, "Failed to wait for detached process");
580+
return;
581+
}
582+
583+
info!(session_id, "Detached process exited");
584+
585+
// Temp files will be cleaned up when this thread exits
586+
});
587+
588+
info!(session_id, "Detached process started successfully");
589+
return Ok(None);
590+
}
591+
577592
// Create channel for `task` -> `Process IO thread` communication
578593
let (input_event_tx, input_event_rx) = winapi_signaled_mpsc_channel()?;
579594

595+
let io_notification_tx = io_notification_tx.expect("BUG: io_notification_tx must be Some for non-detached mode");
596+
580597
let join_handle = std::thread::spawn(move || {
581598
let run_result = if io_redirection {
582-
process_ctx.wait_with_io_redirection(input_event_rx)
599+
process_ctx.wait_with_io_redirection(input_event_rx, io_notification_tx.clone())
583600
} else {
584-
process_ctx.wait(input_event_rx)
601+
process_ctx.wait(input_event_rx, io_notification_tx.clone())
585602
};
586603

587604
let notification = match run_result {
@@ -594,11 +611,29 @@ impl WinApiProcessBuilder {
594611
}
595612
});
596613

597-
Ok(WinApiProcess {
614+
Ok(Some(WinApiProcess {
598615
input_event_tx,
599616
join_handle,
600617
_temp_files: temp_files,
601-
})
618+
}))
619+
}
620+
621+
/// Starts process execution and spawns IO thread to redirect stdio to/from dvc.
622+
pub fn run(
623+
self,
624+
session_id: u32,
625+
io_notification_tx: Sender<ServerChannelEvent>,
626+
) -> Result<WinApiProcess, ExecError> {
627+
Ok(self
628+
.run_impl(session_id, Some(io_notification_tx), false)?
629+
.expect("BUG: run_impl should return Some when detached=false"))
630+
}
631+
632+
/// Starts process in detached mode (fire-and-forget).
633+
/// No IO redirection, no waiting for process exit. Returns immediately after spawning.
634+
pub fn run_detached(self, session_id: u32) -> Result<(), ExecError> {
635+
self.run_impl(session_id, None, true)?;
636+
Ok(())
602637
}
603638
}
604639

@@ -607,19 +642,25 @@ fn prepare_process(
607642
mut command_line: WideString,
608643
current_directory: WideString,
609644
env: HashMap<String, String>,
610-
io_notification_tx: Sender<ServerChannelEvent>,
611645
) -> Result<WinApiProcessCtx, ExecError> {
612646
let mut process_information = PROCESS_INFORMATION::default();
613647

614-
let startup_info = STARTUPINFOW {
648+
let mut startup_info = STARTUPINFOW {
615649
cb: u32::try_from(size_of::<STARTUPINFOW>()).expect("BUG: STARTUPINFOW should always fit into u32"),
616650
dwFlags: Default::default(),
617651
..Default::default()
618652
};
619653

620654
let environment_block = (!env.is_empty()).then(|| make_environment_block(env)).transpose()?;
621655

656+
// Control console window visibility:
657+
// - CREATE_NEW_CONSOLE creates a new console window
658+
// - SW_HIDE hides the console window
622659
let mut creation_flags = NORMAL_PRIORITY_CLASS | CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE;
660+
661+
startup_info.dwFlags |= STARTF_USESHOWWINDOW;
662+
startup_info.wShowWindow = u16::try_from(SW_HIDE.0).expect("SHOW_WINDOW_CMD fits into u16");
663+
623664
if environment_block.is_some() {
624665
creation_flags |= CREATE_UNICODE_ENVIRONMENT;
625666
}
@@ -657,7 +698,6 @@ fn prepare_process(
657698

658699
Ok(WinApiProcessCtx {
659700
session_id,
660-
io_notification_tx,
661701
stdout_read_pipe: None,
662702
stderr_read_pipe: None,
663703
stdin_write_pipe: None,
@@ -671,7 +711,6 @@ fn prepare_process_with_io_redirection(
671711
mut command_line: WideString,
672712
current_directory: WideString,
673713
env: HashMap<String, String>,
674-
io_notification_tx: Sender<ServerChannelEvent>,
675714
) -> Result<WinApiProcessCtx, ExecError> {
676715
let mut process_information = PROCESS_INFORMATION::default();
677716

@@ -741,7 +780,6 @@ fn prepare_process_with_io_redirection(
741780

742781
let process_ctx = WinApiProcessCtx {
743782
session_id,
744-
io_notification_tx,
745783
stdout_read_pipe: Some(stdout_read_pipe),
746784
stderr_read_pipe: Some(stderr_read_pipe),
747785
stdin_write_pipe: Some(stdin_write_pipe),

0 commit comments

Comments
 (0)