Skip to content

Commit 9bc2cdc

Browse files
committed
refactor(dvc-proxy): refactoring after review
1 parent 0f7f0df commit 9bc2cdc

17 files changed

Lines changed: 363 additions & 354 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ironrdp-dvc-pipe-proxy/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ tracing = { version = "0.1", features = ["log"] }
2525

2626

2727
[target.'cfg(windows)'.dependencies]
28+
29+
widestring = "1"
30+
smallvec = "1"
2831
windows = { version = "0.61", features = [
2932
"Win32_Foundation",
3033
"Win32_Security",

crates/ironrdp-dvc-pipe-proxy/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ mod windows;
99

1010
mod platform;
1111

12-
pub use platform::*;
12+
pub use platform::DvcNamedPipeProxy;
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
#[cfg(target_os = "windows")]
22
mod windows;
33

4+
#[cfg(not(target_os = "windows"))]
5+
mod unix;
6+
47
#[cfg(target_os = "windows")]
58
pub use windows::DvcNamedPipeProxy;
9+
10+
#[cfg(not(target_os = "windows"))]
11+
pub use unix::DvcNamedPipeProxy;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use ironrdp_core::impl_as_any;
2+
use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor};
3+
use ironrdp_pdu::{pdu_other_err, PduResult};
4+
use ironrdp_svc::SvcMessage;
5+
6+
/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server.
7+
pub struct DvcNamedPipeProxy {
8+
channel_name: String,
9+
}
10+
11+
impl DvcNamedPipeProxy {
12+
/// Creates a new DVC named pipe proxy.
13+
/// `dvc_write_callback` is called when the proxy receives a DVC message from the
14+
/// named pipe server and the SVC message is ready to be sent to the DVC channel in the main
15+
/// IronRDP active session loop.
16+
pub fn new<F>(channel_name: &str, _named_pipe_name: &str, _dvc_write_callback: F) -> Self
17+
where
18+
F: Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send + 'static,
19+
{
20+
error!("DvcNamedPipeProxy is not implemented on Unix-like systems, using a stub implementation");
21+
22+
Self {
23+
channel_name: channel_name.to_owned(),
24+
}
25+
}
26+
}
27+
28+
impl_as_any!(DvcNamedPipeProxy);
29+
30+
impl DvcProcessor for DvcNamedPipeProxy {
31+
fn channel_name(&self) -> &str {
32+
&self.channel_name
33+
}
34+
35+
fn start(&mut self, _channel_id: u32) -> PduResult<Vec<DvcMessage>> {
36+
Err(pdu_other_err!(
37+
"DvcNamedPipeProxy is not implemented on Unix-like systems"
38+
))
39+
}
40+
41+
fn process(&mut self, _channel_id: u32, _payload: &[u8]) -> PduResult<Vec<DvcMessage>> {
42+
Err(pdu_other_err!(
43+
"DvcNamedPipeProxy is not implemented on Unix-like systems"
44+
))
45+
}
46+
}
47+
48+
impl DvcClientProcessor for DvcNamedPipeProxy {}

crates/ironrdp-dvc-pipe-proxy/src/platform/windows/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ mod worker;
33

44
use std::sync::mpsc;
55

6-
use error::DvcPipeProxyError;
76
use ironrdp_core::impl_as_any;
87
use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor};
98
use ironrdp_pdu::{pdu_other_err, PduResult};
109
use ironrdp_svc::SvcMessage;
11-
use worker::{worker_thread_func, OnWriteDvcMessage, WorkerCtx};
1210

11+
use crate::platform::windows::error::DvcPipeProxyError;
12+
use crate::platform::windows::worker::{worker_thread_func, OnWriteDvcMessage, WorkerCtx};
1313
use crate::windows::{Event, MessagePipeServer, Semaphore};
1414

1515
const IO_MPSC_CHANNEL_SIZE: usize = 100;
@@ -61,8 +61,8 @@ impl Drop for DvcNamedPipeProxy {
6161

6262
impl DvcNamedPipeProxy {
6363
fn start_impl(&mut self, channel_id: u32) -> Result<(), DvcPipeProxyError> {
64-
// PIPE -> DVC channel - handled via callback passed to the constructor
65-
// DVC -> PIPE channel - handled via mpsc internally in the worker thread
64+
// PIPE -> DVC channel - handled via callback passed to the constructor.
65+
// DVC -> PIPE channel - handled via mpsc internally in the worker thread.
6666
let (to_pipe_tx, to_pipe_rx) = mpsc::sync_channel(IO_MPSC_CHANNEL_SIZE);
6767

6868
let semaphore_max_count = IO_MPSC_CHANNEL_SIZE
@@ -126,7 +126,6 @@ impl DvcProcessor for DvcNamedPipeProxy {
126126

127127
fn process(&mut self, _channel_id: u32, payload: &[u8]) -> PduResult<Vec<DvcMessage>> {
128128
// Send the payload to the worker thread via the mpsc channel.
129-
130129
let ctx = match &self.worker_control_ctx {
131130
Some(ctx) => ctx,
132131
None => {

crates/ironrdp-dvc-pipe-proxy/src/platform/windows/worker.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use ironrdp_svc::{ChannelFlags, SvcMessage};
88
use crate::platform::windows::error::DvcPipeProxyError;
99
use crate::windows::{wait_any, wait_any_with_timeout, Event, MessagePipeServer, Semaphore, WindowsError};
1010

11-
const PIPE_CONNECT_TIMEOUT: u32 = 10_000; // 10 seconds
12-
const PIPE_WRITE_TIMEOUT: u32 = 3_000; // 3 seconds
11+
const PIPE_CONNECT_TIMEOUT_SECS: u32 = 10_000; // 10 seconds
12+
const PIPE_WRITE_TIMEOUT_SECS: u32 = 3_000; // 3 seconds
1313
const MESSAGE_BUFFER_SIZE: usize = 64 * 1024; // 64 KiB
1414

1515
pub(crate) type OnWriteDvcMessage = Box<dyn Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send>;
@@ -44,9 +44,8 @@ pub(crate) fn worker_thread_func(worker_ctx: WorkerCtx) -> Result<(), DvcPipePro
4444

4545
if !connect_ctx.overlapped_connect()? {
4646
const EVENT_ID_ABORT: usize = 0;
47-
let events = &[abort_event.raw(), connect_ctx.event().raw()];
48-
49-
let wait_result = match wait_any_with_timeout(events, PIPE_CONNECT_TIMEOUT) {
47+
let events = [abort_event.borrow(), connect_ctx.borrow_event()];
48+
let wait_result = match wait_any_with_timeout(events, PIPE_CONNECT_TIMEOUT_SECS) {
5049
Ok(idx) => idx,
5150
Err(WindowsError::WaitForMultipleObjectsTimeout) => {
5251
warn!(%channel_name, %pipe_name, "DVC pipe proxy connection timed out");
@@ -73,22 +72,25 @@ pub(crate) fn worker_thread_func(worker_ctx: WorkerCtx) -> Result<(), DvcPipePro
7372
const EVENT_ID_ABORT: usize = 0;
7473
const EVENT_ID_READ: usize = 1;
7574
const EVENT_ID_WRITE_MPSC: usize = 2;
76-
let events = &[abort_event.raw(), read_ctx.event().raw(), to_pipe_semaphore.raw()];
7775

7876
read_ctx.overlapped_read()?;
7977

8078
info!(%channel_name, %pipe_name, "DVC pipe proxy IO loop started");
8179

8280
loop {
81+
let events = [
82+
abort_event.borrow(),
83+
read_ctx.borrow_event(),
84+
to_pipe_semaphore.borrow(),
85+
];
8386
let wait_result = wait_any(events)?;
8487

85-
// abort event
8688
if wait_result == EVENT_ID_ABORT {
8789
info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted");
8890
return Ok(());
8991
}
9092

91-
// read from pipe
93+
// Read end of pipe is ready, forward received data to DVC.
9294
if wait_result == EVENT_ID_READ {
9395
let read_result = read_ctx.get_result()?.to_vec();
9496

@@ -107,12 +109,12 @@ pub(crate) fn worker_thread_func(worker_ctx: WorkerCtx) -> Result<(), DvcPipePro
107109
}
108110
}
109111

110-
// Queue the read operation again
112+
// Queue the read operation again.
111113
read_ctx.overlapped_read()?;
112114
continue;
113115
}
114116

115-
// read from mpsc and write to pipe
117+
// DVC data received, forward it to the pipe.
116118
if wait_result == EVENT_ID_WRITE_MPSC {
117119
let payload = to_pipe_rx.recv().map_err(|_| DvcPipeProxyError::MpscIo)?;
118120

@@ -125,25 +127,22 @@ pub(crate) fn worker_thread_func(worker_ctx: WorkerCtx) -> Result<(), DvcPipePro
125127

126128
trace!(%channel_name, %pipe_name, "DVC proxy write {} bytes to pipe,", payload_len);
127129

128-
// write to pipe
129130
let mut overlapped_write = pipe.prepare_write_overlapped(payload)?;
130131

131-
let events = &[abort_event.raw(), overlapped_write.event().raw()];
132-
133132
overlapped_write.overlapped_write()?;
134-
let wait_result = wait_any_with_timeout(events, PIPE_WRITE_TIMEOUT)?;
135133

136-
// abort event
134+
let events = [abort_event.borrow(), overlapped_write.borrow_event()];
135+
let wait_result = wait_any_with_timeout(events, PIPE_WRITE_TIMEOUT_SECS)?;
136+
137137
if wait_result == EVENT_ID_ABORT {
138138
info!(%channel_name, %pipe_name, "DVC pipe proxy write aborted");
139139
return Ok(());
140140
}
141141

142-
// write to pipe
143142
let bytes_written = overlapped_write.get_result()?;
144143

145144
if bytes_written as usize != payload_len {
146-
// Message-based pipe write failed
145+
// Message-based pipe write failed.
147146
return Err(DvcPipeProxyError::DvcIncompleteWrite);
148147
}
149148

crates/ironrdp-dvc-pipe-proxy/src/windows/error.rs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,45 +12,27 @@ pub(crate) enum WindowsError {
1212
OverlappedRead(windows::core::Error),
1313
OverlappedWrite(windows::core::Error),
1414
CreateSemaphore(windows::core::Error),
15-
InvalidHandle,
15+
InvalidPipeName(String),
1616
}
1717

1818
impl core::fmt::Display for WindowsError {
1919
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
2020
match self {
21-
WindowsError::CreateNamedPipe(_) => write!(f, "CreateNamedPipe failed"),
22-
WindowsError::CreateEvent(_) => write!(f, "CreateEvent failed"),
23-
WindowsError::SetEvent(_) => write!(f, "SetEvent failed"),
24-
WindowsError::InvalidSemaphoreParams(cause) => {
25-
write!(f, "Invalid semaphore parameters: {}", cause)
26-
}
27-
WindowsError::ReleaseSemaphore(_) => {
28-
write!(f, "ReleaseSemaphore failed")
29-
}
30-
WindowsError::WaitForMultipleObjectsFailed(_) => {
31-
write!(f, "WaitForMultipleObjects failed")
32-
}
33-
WindowsError::WaitForMultipleObjectsTimeout => {
34-
write!(f, "WaitForMultipleObjects timed out")
35-
}
21+
WindowsError::CreateNamedPipe(_) => write!(f, "failed to create named pipe"),
22+
WindowsError::CreateEvent(_) => write!(f, "failed to create event object"),
23+
WindowsError::SetEvent(_) => write!(f, "failed to set event to signaled state"),
24+
WindowsError::InvalidSemaphoreParams(cause) => write!(f, "invalid semaphore parameters: {}", cause),
25+
WindowsError::ReleaseSemaphore(_) => write!(f, "failed to release semaphore"),
26+
WindowsError::WaitForMultipleObjectsFailed(_) => write!(f, "failed to wait for multiple objects"),
27+
WindowsError::WaitForMultipleObjectsTimeout => write!(f, "timed out waiting for multiple objects"),
3628
WindowsError::WaitForMultipleObjectsAbandoned(idx) => {
37-
write!(f, "WaitForMultipleObjects handle #{idx} was abandoned")
38-
}
39-
WindowsError::OverlappedConnect(_) => {
40-
write!(f, "Overlapped connect failed")
41-
}
42-
WindowsError::OverlappedRead(_) => {
43-
write!(f, "Overlapped read failed")
44-
}
45-
WindowsError::OverlappedWrite(_) => {
46-
write!(f, "Overlapped write failed")
47-
}
48-
WindowsError::CreateSemaphore(_) => {
49-
write!(f, "CreateSemaphore failed")
50-
}
51-
WindowsError::InvalidHandle => {
52-
write!(f, "Invalid handle")
29+
write!(f, "wait for multiple objects failed, handle #{idx} was abandoned")
5330
}
31+
WindowsError::OverlappedConnect(_) => write!(f, "overlapped connect failed"),
32+
WindowsError::OverlappedRead(_) => write!(f, "overlapped read failed"),
33+
WindowsError::OverlappedWrite(_) => write!(f, "overlapped write failed"),
34+
WindowsError::CreateSemaphore(_) => write!(f, "failed to create semaphore object"),
35+
WindowsError::InvalidPipeName(cause) => write!(f, "invalid pipe name: `{}`", cause),
5436
}
5537
}
5638
}
@@ -69,8 +51,8 @@ impl core::error::Error for WindowsError {
6951
WindowsError::CreateEvent(err) => Some(err),
7052
WindowsError::InvalidSemaphoreParams(_)
7153
| WindowsError::WaitForMultipleObjectsTimeout
72-
| WindowsError::InvalidHandle
73-
| WindowsError::WaitForMultipleObjectsAbandoned(_) => None,
54+
| WindowsError::InvalidPipeName(_) => None,
55+
WindowsError::WaitForMultipleObjectsAbandoned(_) => None,
7456
}
7557
}
7658
}
Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,53 @@
11
use std::sync::Arc;
22

3+
use windows::core::Owned;
34
use windows::Win32::Foundation::HANDLE;
45
use windows::Win32::System::Threading::{CreateEventW, SetEvent};
56

6-
use crate::windows::{Handle, WindowsError};
7+
use crate::windows::{BorrowedHandle, WindowsError};
78

89
/// RAII wrapper for WinAPI event handle.
910
#[derive(Debug, Clone)]
1011
pub(crate) struct Event {
11-
handle: Arc<Handle>,
12+
handle: Arc<Owned<HANDLE>>,
1213
}
1314

14-
/// SAFETY: It is safe to send event HANDLE between threads.
15+
// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via
16+
// Event wrapper API itself by restricting handle usage:
17+
// - set() method which calls SetEvent inside (which is thread-safe).
18+
// - borrow() method which returns a BorrowedHandle for waiting on the event.
19+
// - Handle lifetime is ensured by Arc, so it is always valid when used.
1520
unsafe impl Send for Event {}
1621

1722
impl Event {
1823
pub(crate) fn new_unnamed() -> Result<Self, WindowsError> {
1924
// SAFETY: FFI call with no outstanding preconditions.
2025
let handle = unsafe { CreateEventW(None, false, false, None).map_err(WindowsError::CreateEvent)? };
26+
2127
// SAFETY: Handle is valid and we are the owner of the handle.
22-
let handle = unsafe { Handle::new_owned(handle)? };
28+
let handle = unsafe { Owned::new(handle) };
2329

2430
// CreateEventW returns a valid handle on success.
2531
Ok(Self {
32+
// See `unsafe impl Send` comment.
33+
#[allow(clippy::arc_with_non_send_sync)]
2634
handle: Arc::new(handle),
2735
})
2836
}
2937

3038
pub(crate) fn set(&self) -> Result<(), WindowsError> {
3139
// SAFETY: The handle is valid and we are the owner of the handle.
3240
unsafe {
33-
SetEvent(self.handle.raw()).map_err(WindowsError::SetEvent)?;
41+
SetEvent(self.raw()).map_err(WindowsError::SetEvent)?;
3442
}
3543
Ok(())
3644
}
3745

38-
pub(crate) fn raw(&self) -> HANDLE {
39-
self.handle.raw()
46+
pub(super) fn raw(&self) -> HANDLE {
47+
**self.handle
48+
}
49+
50+
pub(crate) fn borrow(&self) -> BorrowedHandle<'_> {
51+
BorrowedHandle(&self.handle)
4052
}
4153
}

0 commit comments

Comments
 (0)