diff --git a/Cargo.lock b/Cargo.lock index 57a8a24dc..5b67c086b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2422,6 +2422,7 @@ dependencies = [ "ironrdp", "ironrdp-cliprdr-native", "ironrdp-core", + "ironrdp-dvc-pipe-proxy", "ironrdp-rdcleanpath", "ironrdp-rdpsnd-native", "ironrdp-tls", @@ -2524,6 +2525,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "ironrdp-dvc-pipe-proxy" +version = "0.1.0" +dependencies = [ + "ironrdp-core", + "ironrdp-dvc", + "ironrdp-pdu", + "ironrdp-svc", + "tracing", + "widestring", + "windows 0.61.3", +] + [[package]] name = "ironrdp-error" version = "0.1.2" diff --git a/crates/ironrdp-client/Cargo.toml b/crates/ironrdp-client/Cargo.toml index 1c1b790e7..84eb8f136 100644 --- a/crates/ironrdp-client/Cargo.toml +++ b/crates/ironrdp-client/Cargo.toml @@ -48,6 +48,7 @@ ironrdp-rdpsnd-native = { path = "../ironrdp-rdpsnd-native", version = "0.3" } ironrdp-tls = { path = "../ironrdp-tls", version = "0.1" } ironrdp-tokio = { path = "../ironrdp-tokio", version = "0.5", features = ["reqwest"] } ironrdp-rdcleanpath.path = "../ironrdp-rdcleanpath" +ironrdp-dvc-pipe-proxy.path = "../ironrdp-dvc-pipe-proxy" # Windowing and rendering winit = { version = "0.30", features = ["rwh_06"] } diff --git a/crates/ironrdp-client/src/config.rs b/crates/ironrdp-client/src/config.rs index 315ee1bc6..eed26fc3e 100644 --- a/crates/ironrdp-client/src/config.rs +++ b/crates/ironrdp-client/src/config.rs @@ -21,6 +21,13 @@ pub struct Config { pub connector: connector::Config, pub clipboard_type: ClipboardType, pub rdcleanpath: Option, + + /// DVC channel <-> named pipe proxy configuration. + /// + /// Each configured proxy enables IronRDP to connect to DVC channel and create a named pipe + /// server, which will be used for proxying DVC messages to/from user-defined DVC logic + /// implemented as named pipe clients (either in the same process or in a different process). + pub dvc_pipe_proxies: Vec, } #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] @@ -137,6 +144,33 @@ pub struct RDCleanPathConfig { pub auth_token: String, } +#[derive(Clone, Debug)] +pub struct DvcProxyInfo { + pub channel_name: String, + pub pipe_name: String, +} + +impl FromStr for DvcProxyInfo { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let mut parts = s.split('='); + let channel_name = parts + .next() + .ok_or_else(|| anyhow::anyhow!("missing DVC channel name"))? + .to_owned(); + let pipe_name = parts + .next() + .ok_or_else(|| anyhow::anyhow!("missing DVC proxy pipe name"))? + .to_owned(); + + Ok(Self { + channel_name, + pipe_name, + }) + } +} + /// Devolutions IronRDP client #[derive(Parser, Debug)] #[clap(author = "Devolutions", about = "Devolutions-IronRDP client")] @@ -238,6 +272,14 @@ struct Args { /// The bitmap codecs to use (remotefx:on, ...) #[clap(long, value_parser, num_args = 1.., value_delimiter = ',')] codecs: Vec, + + /// Add DVC channel named pipe proxy. + /// the format is = + /// e.g. `ChannelName=PipeName` where `ChannelName` is the name of the channel, + /// and `PipeName` is the name of the named pipe to connect to (without OS-specific prefix), + /// e.g. PipeName will automatically be prefixed with `\\.\pipe\` on Windows. + #[clap(long, value_parser)] + dvc_proxy: Vec, } impl Config { @@ -357,6 +399,7 @@ impl Config { connector, clipboard_type, rdcleanpath, + dvc_pipe_proxies: args.dvc_proxy, }) } } diff --git a/crates/ironrdp-client/src/main.rs b/crates/ironrdp-client/src/main.rs index 6f7a8792e..49320bbfd 100644 --- a/crates/ironrdp-client/src/main.rs +++ b/crates/ironrdp-client/src/main.rs @@ -6,7 +6,7 @@ extern crate tracing; use anyhow::Context as _; use ironrdp_client::app::App; use ironrdp_client::config::{ClipboardType, Config}; -use ironrdp_client::rdp::{RdpClient, RdpInputEvent, RdpOutputEvent}; +use ironrdp_client::rdp::{DvcPipeProxyFactory, RdpClient, RdpInputEvent, RdpOutputEvent}; use tokio::runtime; use winit::event_loop::EventLoop; @@ -50,7 +50,7 @@ fn main() -> anyhow::Result<()> { use ironrdp_client::clipboard::ClientClipboardMessageProxy; use ironrdp_cliprdr_native::WinClipboard; - let cliprdr = WinClipboard::new(ClientClipboardMessageProxy::new(input_event_sender))?; + let cliprdr = WinClipboard::new(ClientClipboardMessageProxy::new(input_event_sender.clone()))?; let factory = cliprdr.backend_factory(); _win_clipboard = cliprdr; @@ -59,11 +59,14 @@ fn main() -> anyhow::Result<()> { _ => None, }; + let dvc_pipe_proxy_factory = DvcPipeProxyFactory::new(input_event_sender); + let client = RdpClient { config, event_loop_proxy, input_event_receiver, cliprdr_factory, + dvc_pipe_proxy_factory, }; debug!("Start RDP thread"); diff --git a/crates/ironrdp-client/src/rdp.rs b/crates/ironrdp-client/src/rdp.rs index 06362e66c..f191ab90f 100644 --- a/crates/ironrdp-client/src/rdp.rs +++ b/crates/ironrdp-client/src/rdp.rs @@ -8,8 +8,10 @@ use ironrdp::displaycontrol::pdu::MonitorLayoutEntry; use ironrdp::graphics::image_processing::PixelFormat; use ironrdp::graphics::pointer::DecodedPointer; use ironrdp::pdu::input::fast_path::FastPathInputEvent; +use ironrdp::pdu::{pdu_other_err, PduResult}; use ironrdp::session::image::DecodedImage; use ironrdp::session::{fast_path, ActiveStage, ActiveStageOutput, GracefulDisconnectReason, SessionResult}; +use ironrdp::svc::SvcMessage; use ironrdp::{cliprdr, connector, rdpdr, rdpsnd, session}; use ironrdp_core::WriteBuf; use ironrdp_rdpsnd_native::cpal; @@ -23,6 +25,7 @@ use tokio::sync::mpsc; use winit::event_loop::EventLoopProxy; use crate::config::{Config, RDCleanPathConfig}; +use ironrdp_dvc_pipe_proxy::DvcNamedPipeProxy; #[derive(Debug)] pub enum RdpOutputEvent { @@ -47,6 +50,10 @@ pub enum RdpInputEvent { FastPath(SmallVec<[FastPathInputEvent; 2]>), Close, Clipboard(ClipboardMessage), + SendDvcMessages { + channel_id: u32, + messages: Vec, + }, } impl RdpInputEvent { @@ -55,18 +62,50 @@ impl RdpInputEvent { } } +pub struct DvcPipeProxyFactory { + rdp_input_sender: mpsc::UnboundedSender, +} + +impl DvcPipeProxyFactory { + pub fn new(rdp_input_sender: mpsc::UnboundedSender) -> Self { + Self { rdp_input_sender } + } + + pub fn create(&self, channel_name: String, pipe_name: String) -> DvcNamedPipeProxy { + let rdp_input_sender = self.rdp_input_sender.clone(); + + DvcNamedPipeProxy::new(&channel_name, &pipe_name, move |channel_id, messages| { + rdp_input_sender + .send(RdpInputEvent::SendDvcMessages { channel_id, messages }) + .map_err(|_error| pdu_other_err!("send DVC messages to the event loop",))?; + + Ok(()) + }) + } +} + +pub type WriteDvcMessageFn = Box PduResult<()> + Send + 'static>; + pub struct RdpClient { pub config: Config, pub event_loop_proxy: EventLoopProxy, pub input_event_receiver: mpsc::UnboundedReceiver, pub cliprdr_factory: Option>, + pub dvc_pipe_proxy_factory: DvcPipeProxyFactory, } impl RdpClient { pub async fn run(mut self) { loop { let (connection_result, framed) = if let Some(rdcleanpath) = self.config.rdcleanpath.as_ref() { - match connect_ws(&self.config, rdcleanpath, self.cliprdr_factory.as_deref()).await { + match connect_ws( + &self.config, + rdcleanpath, + self.cliprdr_factory.as_deref(), + &self.dvc_pipe_proxy_factory, + ) + .await + { Ok(result) => result, Err(e) => { let _ = self.event_loop_proxy.send_event(RdpOutputEvent::ConnectionFailure(e)); @@ -74,7 +113,13 @@ impl RdpClient { } } } else { - match connect(&self.config, self.cliprdr_factory.as_deref()).await { + match connect( + &self.config, + self.cliprdr_factory.as_deref(), + &self.dvc_pipe_proxy_factory, + ) + .await + { Ok(result) => result, Err(e) => { let _ = self.event_loop_proxy.send_event(RdpOutputEvent::ConnectionFailure(e)); @@ -122,6 +167,7 @@ type UpgradedFramed = ironrdp_tokio::TokioFramed, + dvc_pipe_proxy_factory: &DvcPipeProxyFactory, ) -> ConnectorResult<(ConnectionResult, UpgradedFramed)> { let dest = format!("{}:{}", config.destination.name(), config.destination.port()); @@ -135,10 +181,21 @@ async fn connect( let mut framed = ironrdp_tokio::TokioFramed::new(socket); + let mut drdynvc = + ironrdp::dvc::DrdynvcClient::new().with_dynamic_channel(DisplayControlClient::new(|_| Ok(Vec::new()))); + + // Instantiate all DVC proxies + for proxy in config.dvc_pipe_proxies.iter() { + let channel_name = proxy.channel_name.clone(); + let pipe_name = proxy.pipe_name.clone(); + + trace!(%channel_name, %pipe_name, "Creating DVC proxy"); + + drdynvc = drdynvc.with_dynamic_channel(dvc_pipe_proxy_factory.create(channel_name, pipe_name)); + } + let mut connector = connector::ClientConnector::new(config.connector.clone(), client_addr) - .with_static_channel( - ironrdp::dvc::DrdynvcClient::new().with_dynamic_channel(DisplayControlClient::new(|_| Ok(Vec::new()))), - ) + .with_static_channel(drdynvc) .with_static_channel(rdpsnd::client::Rdpsnd::new(Box::new(cpal::RdpsndBackend::new()))) .with_static_channel(rdpdr::Rdpdr::new(Box::new(NoopRdpdrBackend {}), "IronRDP".to_owned()).with_smartcard(0)); @@ -186,6 +243,7 @@ async fn connect_ws( config: &Config, rdcleanpath: &RDCleanPathConfig, cliprdr_factory: Option<&(dyn CliprdrBackendFactory + Send)>, + dvc_pipe_proxy_factory: &DvcPipeProxyFactory, ) -> ConnectorResult<(ConnectionResult, UpgradedFramed)> { let hostname = rdcleanpath .url @@ -214,10 +272,21 @@ async fn connect_ws( let mut framed = ironrdp_tokio::TokioFramed::new(ws); + let mut drdynvc = + ironrdp::dvc::DrdynvcClient::new().with_dynamic_channel(DisplayControlClient::new(|_| Ok(Vec::new()))); + + // Instantiate all DVC proxies + for proxy in config.dvc_pipe_proxies.iter() { + let channel_name = proxy.channel_name.clone(); + let pipe_name = proxy.pipe_name.clone(); + + trace!(%channel_name, %pipe_name, "Creating DVC proxy"); + + drdynvc = drdynvc.with_dynamic_channel(dvc_pipe_proxy_factory.create(channel_name, pipe_name)); + } + let mut connector = connector::ClientConnector::new(config.connector.clone(), client_addr) - .with_static_channel( - ironrdp::dvc::DrdynvcClient::new().with_dynamic_channel(DisplayControlClient::new(|_| Ok(Vec::new()))), - ) + .with_static_channel(drdynvc) .with_static_channel(rdpsnd::client::Rdpsnd::new(Box::new(cpal::RdpsndBackend::new()))) .with_static_channel(rdpdr::Rdpdr::new(Box::new(NoopRdpdrBackend {}), "IronRDP".to_owned()).with_smartcard(0)); @@ -468,6 +537,12 @@ async fn active_session( Vec::new() } } + RdpInputEvent::SendDvcMessages { channel_id, messages } => { + trace!(channel_id, ?messages, "Send DVC messages"); + + let frame = active_stage.encode_dvc_messages(messages)?; + vec![ActiveStageOutput::ResponseFrame(frame)] + } } } }; diff --git a/crates/ironrdp-dvc-pipe-proxy/CHANGELOG.md b/crates/ironrdp-dvc-pipe-proxy/CHANGELOG.md new file mode 100644 index 000000000..1d013ff92 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/CHANGELOG.md @@ -0,0 +1,6 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). diff --git a/crates/ironrdp-dvc-pipe-proxy/Cargo.toml b/crates/ironrdp-dvc-pipe-proxy/Cargo.toml new file mode 100644 index 000000000..486c8e748 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "ironrdp-dvc-pipe-proxy" +version = "0.1.0" +readme = "README.md" +description = "DVC named pipe proxy for IronRDP" +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +authors.workspace = true +keywords.workspace = true +categories.workspace = true + +[lib] +doctest = false +test = false + +[dependencies] +ironrdp-core.path = "../ironrdp-core" +ironrdp-dvc.path = "../ironrdp-dvc" +ironrdp-pdu.path = "../ironrdp-pdu" +ironrdp-svc.path = "../ironrdp-svc" + +tracing = { version = "0.1", features = ["log"] } + + +[target.'cfg(windows)'.dependencies] +widestring = "1" +windows = { version = "0.61", features = [ + "Win32_Foundation", + "Win32_Security", + "Win32_System_Threading", + "Win32_Storage_FileSystem", + "Win32_System_Pipes", + "Win32_System_IO", +] } + + +[lints] +workspace = true diff --git a/crates/ironrdp-dvc-pipe-proxy/LICENSE-APACHE b/crates/ironrdp-dvc-pipe-proxy/LICENSE-APACHE new file mode 100644 index 000000000..1cd601d0a --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/LICENSE-APACHE @@ -0,0 +1 @@ +../../LICENSE-APACHE \ No newline at end of file diff --git a/crates/ironrdp-dvc-pipe-proxy/LICENSE-MIT b/crates/ironrdp-dvc-pipe-proxy/LICENSE-MIT new file mode 100644 index 000000000..b2cfbdc7b --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/LICENSE-MIT @@ -0,0 +1 @@ +../../LICENSE-MIT \ No newline at end of file diff --git a/crates/ironrdp-dvc-pipe-proxy/README.md b/crates/ironrdp-dvc-pipe-proxy/README.md new file mode 100644 index 000000000..2f20c8a5d --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/README.md @@ -0,0 +1,15 @@ +# IronRDP DVC pipe proxy + +This crate provides a Device Virtual Channel (DVC) handler for IronRDP, enabling proxying of RDP DVC +traffic over a named pipe. + +It was originally designed to simplify custom DVC integration within Devolutions Remote Desktop +Manager (RDM). By implementing a thin pipe proxy for target RDP clients (such as IronRDP, FreeRDP, +mstsc, etc.), the main client logic can be centralized and reused across all supported clients via a +named pipe. + +This approach allows you to implement your DVC logic in one place, making it easier to support +multiple RDP clients without duplicating code. + +Additionally, this crate can be used for other scenarios, such as testing your own custom DVC +channel client, without needing to patch or rebuild IronRDP itself. \ No newline at end of file diff --git a/crates/ironrdp-dvc-pipe-proxy/src/lib.rs b/crates/ironrdp-dvc-pipe-proxy/src/lib.rs new file mode 100644 index 000000000..f8031562f --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/lib.rs @@ -0,0 +1,12 @@ +#![doc = include_str!("../README.md")] +#![doc(html_logo_url = "https://cdnweb.devolutions.net/images/projects/devolutions/logos/devolutions-icon-shadow.svg")] + +#[macro_use] +extern crate tracing; + +#[cfg(target_os = "windows")] +mod windows; + +mod platform; + +pub use platform::DvcNamedPipeProxy; diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/mod.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/mod.rs new file mode 100644 index 000000000..19e79b37e --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/platform/mod.rs @@ -0,0 +1,11 @@ +#[cfg(target_os = "windows")] +mod windows; + +#[cfg(not(target_os = "windows"))] +mod unix; + +#[cfg(target_os = "windows")] +pub use windows::DvcNamedPipeProxy; + +#[cfg(not(target_os = "windows"))] +pub use unix::DvcNamedPipeProxy; diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/unix.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/unix.rs new file mode 100644 index 000000000..e0a221e85 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/platform/unix.rs @@ -0,0 +1,48 @@ +use ironrdp_core::impl_as_any; +use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor}; +use ironrdp_pdu::{pdu_other_err, PduResult}; +use ironrdp_svc::SvcMessage; + +/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server. +pub struct DvcNamedPipeProxy { + channel_name: String, +} + +impl DvcNamedPipeProxy { + /// Creates a new DVC named pipe proxy. + /// `dvc_write_callback` is called when the proxy receives a DVC message from the + /// named pipe server and the SVC message is ready to be sent to the DVC channel in the main + /// IronRDP active session loop. + pub fn new(channel_name: &str, _named_pipe_name: &str, _dvc_write_callback: F) -> Self + where + F: Fn(u32, Vec) -> PduResult<()> + Send + 'static, + { + error!("DvcNamedPipeProxy is not implemented on Unix-like systems, using a stub implementation"); + + Self { + channel_name: channel_name.to_owned(), + } + } +} + +impl_as_any!(DvcNamedPipeProxy); + +impl DvcProcessor for DvcNamedPipeProxy { + fn channel_name(&self) -> &str { + &self.channel_name + } + + fn start(&mut self, _channel_id: u32) -> PduResult> { + Err(pdu_other_err!( + "DvcNamedPipeProxy is not implemented on Unix-like systems" + )) + } + + fn process(&mut self, _channel_id: u32, _payload: &[u8]) -> PduResult> { + Err(pdu_other_err!( + "DvcNamedPipeProxy is not implemented on Unix-like systems" + )) + } +} + +impl DvcClientProcessor for DvcNamedPipeProxy {} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/error.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/error.rs new file mode 100644 index 000000000..b0e7ccd13 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/error.rs @@ -0,0 +1,37 @@ +use crate::windows::WindowsError; + +#[derive(Debug)] +pub(crate) enum DvcPipeProxyError { + Windows(WindowsError), + MpscIo, + DvcIncompleteWrite, + EncodeDvcMessage, +} + +impl From for DvcPipeProxyError { + fn from(err: WindowsError) -> Self { + DvcPipeProxyError::Windows(err) + } +} + +impl core::fmt::Display for DvcPipeProxyError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + DvcPipeProxyError::Windows(err) => err.fmt(f), + DvcPipeProxyError::MpscIo => write!(f, "MPSC IO error"), + DvcPipeProxyError::DvcIncompleteWrite => write!(f, "DVC incomplete write"), + DvcPipeProxyError::EncodeDvcMessage => write!(f, "DVC message encoding error"), + } + } +} + +impl core::error::Error for DvcPipeProxyError { + fn source(&self) -> Option<&(dyn core::error::Error + 'static)> { + match self { + DvcPipeProxyError::Windows(err) => Some(err), + DvcPipeProxyError::MpscIo => None, + DvcPipeProxyError::DvcIncompleteWrite => None, + DvcPipeProxyError::EncodeDvcMessage => None, + } + } +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/mod.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/mod.rs new file mode 100644 index 000000000..cebceede0 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/mod.rs @@ -0,0 +1,149 @@ +mod error; +mod worker; + +use std::sync::mpsc; + +use ironrdp_core::impl_as_any; +use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor}; +use ironrdp_pdu::{pdu_other_err, PduResult}; +use ironrdp_svc::SvcMessage; + +use crate::platform::windows::error::DvcPipeProxyError; +use crate::platform::windows::worker::{worker_thread_func, OnWriteDvcMessage, WorkerCtx}; +use crate::windows::{Event, MessagePipeServer, Semaphore}; + +const IO_MPSC_CHANNEL_SIZE: usize = 100; + +struct WorkerControlCtx { + to_pipe_tx: mpsc::SyncSender>, + to_pipe_semaphore: Semaphore, + abort_event: Event, +} + +/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server. +pub struct DvcNamedPipeProxy { + channel_name: String, + named_pipe_name: String, + dvc_write_callback: Option, + worker_control_ctx: Option, +} + +impl DvcNamedPipeProxy { + /// Creates a new DVC named pipe proxy. + /// `dvc_write_callback` is called when the proxy receives a DVC message from the + /// named pipe server and the SVC message is ready to be sent to the DVC channel in the main + /// IronRDP active session loop. + pub fn new(channel_name: &str, named_pipe_name: &str, dvc_write_callback: F) -> Self + where + F: Fn(u32, Vec) -> PduResult<()> + Send + 'static, + { + let named_pipe_name = format!("\\\\.\\pipe\\{named_pipe_name}"); + + Self { + channel_name: channel_name.to_owned(), + named_pipe_name, + dvc_write_callback: Some(Box::new(dvc_write_callback)), + worker_control_ctx: None, + } + } +} + +impl_as_any!(DvcNamedPipeProxy); + +impl Drop for DvcNamedPipeProxy { + fn drop(&mut self) { + if let Some(ctx) = &self.worker_control_ctx { + // Signal the worker thread to abort. + ctx.abort_event.set().ok(); + } + } +} + +impl DvcNamedPipeProxy { + fn start_impl(&mut self, channel_id: u32) -> Result<(), DvcPipeProxyError> { + // PIPE -> DVC channel - handled via callback passed to the constructor. + // DVC -> PIPE channel - handled via mpsc internally in the worker thread. + let (to_pipe_tx, to_pipe_rx) = mpsc::sync_channel(IO_MPSC_CHANNEL_SIZE); + + let semaphore_max_count = IO_MPSC_CHANNEL_SIZE + .try_into() + .expect("Channel size is too large for underlying WinAPI semaphore"); + + let to_pipe_semaphore = Semaphore::new_unnamed(0, semaphore_max_count)?; + + let abort_event = Event::new_unnamed()?; + + let worker_control_ctx = WorkerControlCtx { + to_pipe_tx, + to_pipe_semaphore: to_pipe_semaphore.clone(), + abort_event: abort_event.clone(), + }; + + let pipe = MessagePipeServer::new(&self.named_pipe_name)?; + + let dvc_write_callback = self + .dvc_write_callback + .take() + .expect("DVC write callback already taken"); + + let worker_ctx = WorkerCtx { + pipe, + to_pipe_rx, + to_pipe_semaphore, + abort_event, + dvc_write_callback, + pipe_name: self.named_pipe_name.clone(), + channel_name: self.channel_name.clone(), + channel_id, + }; + + let pipe_name = self.named_pipe_name.clone(); + let channel_name = self.channel_name.clone(); + + self.worker_control_ctx = Some(worker_control_ctx); + + std::thread::spawn(move || { + if let Err(error) = worker_thread_func(worker_ctx) { + error!(%error, %pipe_name, %channel_name, "DVC pipe proxy worker thread failed"); + } + }); + + Ok(()) + } +} + +impl DvcProcessor for DvcNamedPipeProxy { + fn channel_name(&self) -> &str { + &self.channel_name + } + + fn start(&mut self, channel_id: u32) -> PduResult> { + self.start_impl(channel_id) + .map_err(|e| pdu_other_err!("dvc named pipe proxy failed", source: e))?; + + Ok(Vec::new()) + } + + fn process(&mut self, _channel_id: u32, payload: &[u8]) -> PduResult> { + // Send the payload to the worker thread via the mpsc channel. + let ctx = match &self.worker_control_ctx { + Some(ctx) => ctx, + None => { + return Err(pdu_other_err!("DVC pipe proxy not started")); + } + }; + + ctx.to_pipe_tx + .send(payload.to_vec()) + .map_err(|_| pdu_other_err!("DVC pipe proxy send failed"))?; + + // Signal WinAPI-based worker IO loop. + ctx.to_pipe_semaphore + .release(1) + .map_err(|_| pdu_other_err!("DVC pipe proxy semaphore release failed"))?; + + Ok(Vec::new()) + } +} + +impl DvcClientProcessor for DvcNamedPipeProxy {} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/worker.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/worker.rs new file mode 100644 index 000000000..5edab7cf6 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/worker.rs @@ -0,0 +1,172 @@ +use std::sync::mpsc; + +use ironrdp_core::{ensure_size, Encode, EncodeResult}; +use ironrdp_dvc::{encode_dvc_messages, DvcEncode}; +use ironrdp_pdu::PduResult; +use ironrdp_svc::{ChannelFlags, SvcMessage}; + +use crate::platform::windows::error::DvcPipeProxyError; +use crate::windows::{wait_any, wait_any_with_timeout, Event, MessagePipeServer, Semaphore, WindowsError}; + +const PIPE_CONNECT_TIMEOUT_SECS: u32 = 10_000; // 10 seconds +const PIPE_WRITE_TIMEOUT_SECS: u32 = 3_000; // 3 seconds +const MESSAGE_BUFFER_SIZE: usize = 64 * 1024; // 64 KiB + +pub(crate) type OnWriteDvcMessage = Box) -> PduResult<()> + Send>; + +pub(crate) struct WorkerCtx { + pub pipe: MessagePipeServer, + pub to_pipe_rx: mpsc::Receiver>, + pub to_pipe_semaphore: Semaphore, + pub abort_event: Event, + pub dvc_write_callback: OnWriteDvcMessage, + pub pipe_name: String, + pub channel_name: String, + pub channel_id: u32, +} + +pub(crate) fn worker_thread_func(worker_ctx: WorkerCtx) -> Result<(), DvcPipeProxyError> { + let WorkerCtx { + mut pipe, + to_pipe_rx, + to_pipe_semaphore, + abort_event, + dvc_write_callback, + pipe_name, + channel_name, + channel_id, + } = worker_ctx; + + info!(%channel_name, %pipe_name, "Connecting DVC pipe proxy"); + + { + let mut connect_ctx = pipe.prepare_connect_overlapped()?; + + if !connect_ctx.overlapped_connect()? { + const EVENT_ID_ABORT: usize = 0; + let events = [abort_event.borrow(), connect_ctx.borrow_event()]; + let wait_result = match wait_any_with_timeout(&events, PIPE_CONNECT_TIMEOUT_SECS) { + Ok(idx) => idx, + Err(WindowsError::WaitForMultipleObjectsTimeout) => { + warn!(%channel_name, %pipe_name, "DVC pipe proxy connection timed out"); + return Ok(()); + } + Err(err) => { + return Err(DvcPipeProxyError::Windows(err)); + } + }; + + if wait_result == EVENT_ID_ABORT { + info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted"); + return Ok(()); + } + + connect_ctx.get_result()?; + } + } + + info!(%channel_name, %pipe_name, "DVC pipe proxy connected"); + + let mut read_ctx = pipe.prepare_read_overlapped(MESSAGE_BUFFER_SIZE)?; + + const EVENT_ID_ABORT: usize = 0; + const EVENT_ID_READ: usize = 1; + const EVENT_ID_WRITE_MPSC: usize = 2; + + read_ctx.overlapped_read()?; + + info!(%channel_name, %pipe_name, "DVC pipe proxy IO loop started"); + + loop { + let events = [ + abort_event.borrow(), + read_ctx.borrow_event(), + to_pipe_semaphore.borrow(), + ]; + let wait_result = wait_any(&events)?; + + if wait_result == EVENT_ID_ABORT { + info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted"); + return Ok(()); + } + + // Read end of pipe is ready, forward received data to DVC. + if wait_result == EVENT_ID_READ { + let read_result = read_ctx.get_result()?.to_vec(); + + trace!(%channel_name, %pipe_name, "DVC proxy read {} bytes from pipe", read_result.len()); + + if !read_result.is_empty() { + let messages = encode_dvc_messages( + channel_id, + vec![Box::new(RawDataDvcMessage(read_result))], + ChannelFlags::empty(), + ) + .map_err(|_| DvcPipeProxyError::EncodeDvcMessage)?; + + if let Err(err) = dvc_write_callback(0, messages) { + error!(%err, %channel_name, %pipe_name, "DVC pipe proxy write callback failed"); + } + } + + // Queue the read operation again. + read_ctx.overlapped_read()?; + continue; + } + + // DVC data received, forward it to the pipe. + if wait_result == EVENT_ID_WRITE_MPSC { + let payload = to_pipe_rx.recv().map_err(|_| DvcPipeProxyError::MpscIo)?; + + let payload_len = payload.len(); + + if payload_len == 0 { + warn!(%channel_name, %pipe_name, "Rejected empty DVC data (not sent to pipe)"); + continue; + } + + trace!(%channel_name, %pipe_name, "DVC proxy write {} bytes to pipe,", payload_len); + + let mut overlapped_write = pipe.prepare_write_overlapped(payload)?; + + overlapped_write.overlapped_write()?; + + let events = [abort_event.borrow(), overlapped_write.borrow_event()]; + let wait_result = wait_any_with_timeout(&events, PIPE_WRITE_TIMEOUT_SECS)?; + + if wait_result == EVENT_ID_ABORT { + info!(%channel_name, %pipe_name, "DVC pipe proxy write aborted"); + return Ok(()); + } + + let bytes_written = overlapped_write.get_result()?; + + if bytes_written as usize != payload_len { + // Message-based pipe write failed. + return Err(DvcPipeProxyError::DvcIncompleteWrite); + } + + continue; + } + } +} + +struct RawDataDvcMessage(Vec); + +impl Encode for RawDataDvcMessage { + fn encode(&self, dst: &mut ironrdp_core::WriteCursor<'_>) -> EncodeResult<()> { + ensure_size!(in: dst, size: self.size()); + dst.write_slice(&self.0); + Ok(()) + } + + fn name(&self) -> &'static str { + "RawDataDvcMessage" + } + + fn size(&self) -> usize { + self.0.len() + } +} + +impl DvcEncode for RawDataDvcMessage {} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/error.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/error.rs new file mode 100644 index 000000000..9c2f471d7 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/windows/error.rs @@ -0,0 +1,58 @@ +#[derive(Debug)] +pub(crate) enum WindowsError { + CreateNamedPipe(windows::core::Error), + CreateEvent(windows::core::Error), + SetEvent(windows::core::Error), + ReleaseSemaphore(windows::core::Error), + InvalidSemaphoreParams(&'static str), + WaitForMultipleObjectsFailed(windows::core::Error), + WaitForMultipleObjectsTimeout, + WaitForMultipleObjectsAbandoned(u32), + OverlappedConnect(windows::core::Error), + OverlappedRead(windows::core::Error), + OverlappedWrite(windows::core::Error), + CreateSemaphore(windows::core::Error), + InvalidPipeName(String), +} + +impl core::fmt::Display for WindowsError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + WindowsError::CreateNamedPipe(_) => write!(f, "failed to create named pipe"), + WindowsError::CreateEvent(_) => write!(f, "failed to create event object"), + WindowsError::SetEvent(_) => write!(f, "failed to set event to signaled state"), + WindowsError::InvalidSemaphoreParams(cause) => write!(f, "invalid semaphore parameters: {}", cause), + WindowsError::ReleaseSemaphore(_) => write!(f, "failed to release semaphore"), + WindowsError::WaitForMultipleObjectsFailed(_) => write!(f, "failed to wait for multiple objects"), + WindowsError::WaitForMultipleObjectsTimeout => write!(f, "timed out waiting for multiple objects"), + WindowsError::WaitForMultipleObjectsAbandoned(idx) => { + write!(f, "wait for multiple objects failed, handle #{idx} was abandoned") + } + WindowsError::OverlappedConnect(_) => write!(f, "overlapped connect failed"), + WindowsError::OverlappedRead(_) => write!(f, "overlapped read failed"), + WindowsError::OverlappedWrite(_) => write!(f, "overlapped write failed"), + WindowsError::CreateSemaphore(_) => write!(f, "failed to create semaphore object"), + WindowsError::InvalidPipeName(cause) => write!(f, "invalid pipe name: `{}`", cause), + } + } +} + +impl core::error::Error for WindowsError { + fn source(&self) -> Option<&(dyn core::error::Error + 'static)> { + match self { + WindowsError::CreateNamedPipe(err) + | WindowsError::SetEvent(err) + | WindowsError::ReleaseSemaphore(err) + | WindowsError::WaitForMultipleObjectsFailed(err) + | WindowsError::OverlappedConnect(err) + | WindowsError::OverlappedRead(err) + | WindowsError::OverlappedWrite(err) + | WindowsError::CreateSemaphore(err) => Some(err), + WindowsError::CreateEvent(err) => Some(err), + WindowsError::InvalidSemaphoreParams(_) + | WindowsError::WaitForMultipleObjectsTimeout + | WindowsError::InvalidPipeName(_) => None, + WindowsError::WaitForMultipleObjectsAbandoned(_) => None, + } + } +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/event.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/event.rs new file mode 100644 index 000000000..6629db253 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/windows/event.rs @@ -0,0 +1,53 @@ +use std::sync::Arc; + +use windows::core::Owned; +use windows::Win32::Foundation::HANDLE; +use windows::Win32::System::Threading::{CreateEventW, SetEvent}; + +use crate::windows::{BorrowedHandle, WindowsError}; + +/// RAII wrapper for WinAPI event handle. +#[derive(Debug, Clone)] +pub(crate) struct Event { + handle: Arc>, +} + +// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via +// Event wrapper API itself by restricting handle usage: +// - set() method which calls SetEvent inside (which is thread-safe). +// - borrow() method which returns a BorrowedHandle for waiting on the event. +// - Handle lifetime is ensured by Arc, so it is always valid when used. +unsafe impl Send for Event {} + +impl Event { + pub(crate) fn new_unnamed() -> Result { + // SAFETY: FFI call with no outstanding preconditions. + let handle = unsafe { CreateEventW(None, false, false, None).map_err(WindowsError::CreateEvent)? }; + + // SAFETY: Handle is valid and we are the owner of the handle. + let handle = unsafe { Owned::new(handle) }; + + // CreateEventW returns a valid handle on success. + Ok(Self { + // See `unsafe impl Send` comment. + #[allow(clippy::arc_with_non_send_sync)] + handle: Arc::new(handle), + }) + } + + pub(crate) fn set(&self) -> Result<(), WindowsError> { + // SAFETY: The handle is valid and we are the owner of the handle. + unsafe { + SetEvent(self.raw()).map_err(WindowsError::SetEvent)?; + } + Ok(()) + } + + pub(super) fn raw(&self) -> HANDLE { + **self.handle + } + + pub(crate) fn borrow(&self) -> BorrowedHandle<'_> { + BorrowedHandle(&self.handle) + } +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/mod.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/mod.rs new file mode 100644 index 000000000..849069869 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/windows/mod.rs @@ -0,0 +1,99 @@ +//! WinAPI wrappers for the DVC pipe proxy IO loop logic. +//! +//! Some of the wrappers are based on `win-api-wrappers` code (simplified/reduced functionality). + +mod error; +mod event; +mod pipe; +mod semaphore; + +pub(crate) use error::WindowsError; +pub(crate) use event::Event; +pub(crate) use pipe::MessagePipeServer; +pub(crate) use semaphore::Semaphore; + +use windows::Win32::Foundation::{ + ERROR_IO_PENDING, HANDLE, WAIT_ABANDONED_0, WAIT_EVENT, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT, +}; +use windows::Win32::System::Threading::{WaitForMultipleObjects, INFINITE}; + +/// Thin wrapper around borrowed `windows` crate `HANDLE` reference. +/// This is used to ensure handle lifetime when passing it to FFI functions +/// (see `wait_any_with_timeout` for example). +#[repr(transparent)] +pub(crate) struct BorrowedHandle<'a>(&'a HANDLE); + +/// Safe wrapper around `WaitForMultipleObjects`. +pub(crate) fn wait_any_with_timeout(handles: &[BorrowedHandle<'_>], timeout: u32) -> Result { + let handles = cast_handles(handles); + + // SAFETY: + // - BorrowedHandle alongside with rust type system ensures that the HANDLEs are valid for + // the duration of the call. + // - All handles in this module have SYNCHRONIZE access rights. + // - cast_handles ensures no handle duplicates. + let result = unsafe { WaitForMultipleObjects(handles, false, timeout) }; + + match result { + WAIT_FAILED => Err(WindowsError::WaitForMultipleObjectsFailed( + windows::core::Error::from_win32(), + )), + WAIT_TIMEOUT => Err(WindowsError::WaitForMultipleObjectsTimeout), + WAIT_EVENT(idx) if idx >= WAIT_ABANDONED_0.0 => { + let idx = idx - WAIT_ABANDONED_0.0; + Err(WindowsError::WaitForMultipleObjectsAbandoned(idx)) + } + WAIT_EVENT(id) => Ok((id - WAIT_OBJECT_0.0) as usize), + } +} + +/// Safe `WaitForMultipleObjects` wrapper with infinite timeout. +pub(crate) fn wait_any(handles: &[BorrowedHandle<'_>]) -> Result { + // Standard generic syntax is used instead if `impl` because of the following lint: + // > warning: lifetime parameter `'a` only used once + // + // Fixing this lint (use of '_ lifetime) produces compiler error. + wait_any_with_timeout(handles, INFINITE) +} + +fn cast_handles<'a>(handles: &'a [BorrowedHandle<'a>]) -> &'a [HANDLE] { + // Very basic sanity checks to ensure that the handles are valid + // and there are no duplicates. + // This is only done in debug builds to avoid performance overhead in release builds, while + // still catching undefined behavior early in development. + #[cfg(debug_assertions)] + { + // Ensure that there are no duplicate handles without hash. + for (i, handle) in handles.iter().enumerate() { + for other_handle in &handles[i + 1..] { + if handle.0 == other_handle.0 { + panic!("Duplicate handle found in wait_any_with_timeout"); + } + } + } + } + + for handle in handles { + // Ensure that the handle is valid. + if handle.0.is_invalid() { + panic!("Invalid handle in wait_any_with_timeout"); + } + } + + // SAFETY: + // - BorrowedHandle is #[repr(transparent)] over *const c_void, and so is HANDLE, + // so the layout is the same. + // - We ensure the lifetime is preserved. + unsafe { core::slice::from_raw_parts(handles.as_ptr() as *const HANDLE, handles.len()) } +} + +/// Maps ERROR_IO_PENDING to Ok(()) and returns other errors as is. +fn ensure_overlapped_io_result(result: windows::core::Result<()>) -> Result, WindowsError> { + if let Err(error) = &result { + if error.code() == ERROR_IO_PENDING.to_hresult() { + return Ok(Ok(())); + } + } + + Ok(result) +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/pipe.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/pipe.rs new file mode 100644 index 000000000..88ab42b41 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/windows/pipe.rs @@ -0,0 +1,291 @@ +use core::ops::DerefMut; +use core::pin::Pin; + +use windows::core::{Owned, PCWSTR}; +use windows::Win32::Foundation::{ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, HANDLE}; +use windows::Win32::Storage::FileSystem::{ + ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, +}; +use windows::Win32::System::Pipes::{ + ConnectNamedPipe, CreateNamedPipeW, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_WAIT, +}; +use windows::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; + +use crate::windows::{ensure_overlapped_io_result, BorrowedHandle, Event, WindowsError}; + +const PIPE_INSTANCES: u32 = 2; +const PIPE_BUFFER_SIZE: u32 = 64 * 1024; // 64KB +const DEFAULT_PIPE_TIMEOUT: u32 = 10_000; // 10 seconds + +/// RAII wrapper for WinAPI named pipe server. +#[derive(Debug)] +pub(crate) struct MessagePipeServer { + handle: Owned, + connected: bool, +} + +/// SAFETY: It is safe to send pipe HANDLE between threads. +unsafe impl Send for MessagePipeServer {} + +impl MessagePipeServer { + /// Creates a new named pipe server. + pub(crate) fn new(name: &str) -> Result { + // Create a named pipe with the specified name. + let lpname = + widestring::U16CString::from_str(name).map_err(|_| WindowsError::InvalidPipeName(name.to_owned()))?; + + // SAFETY: lpname is a valid pointer to a null-terminated wide string. + let handle = unsafe { + CreateNamedPipeW( + PCWSTR(lpname.as_ptr()), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE, + PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, + PIPE_INSTANCES, + PIPE_BUFFER_SIZE, + PIPE_BUFFER_SIZE, + DEFAULT_PIPE_TIMEOUT, + None, + ) + }; + + // `windows` crate API inconsistency: CreateNamedPipeW returns invalid handle on error + // instead of Result::Err. + if handle.is_invalid() { + return Err(WindowsError::CreateNamedPipe(windows::core::Error::from_win32())); + } + + // SAFETY: Handle is valid and we are the owner of the handle. + let handle = unsafe { Owned::new(handle) }; + + Ok(Self { + handle, + connected: false, + }) + } + + fn raw(&self) -> HANDLE { + *self.handle + } + + /// Initializes context for overlapped connect operation. + pub(crate) fn prepare_connect_overlapped(&mut self) -> Result, WindowsError> { + OverlappedPipeConnectCtx::new(self) + } + + /// Initializes context for overlapped read operation. + pub(crate) fn prepare_read_overlapped( + &self, + buffer_size: usize, + ) -> Result, WindowsError> { + OverlappedPipeReadCtx::new(self, buffer_size) + } + + /// Initializes context for overlapped write operation. + pub(crate) fn prepare_write_overlapped(&self, data: Vec) -> Result, WindowsError> { + OverlappedWriteCtx::new(self, data) + } +} + +pub(crate) struct OverlappedPipeConnectCtx<'a> { + pipe: &'a mut MessagePipeServer, + overlapped: Pin>, + event: Event, +} + +impl<'a> OverlappedPipeConnectCtx<'a> { + fn new(pipe: &'a mut MessagePipeServer) -> Result { + let event = Event::new_unnamed()?; + + let overlapped = Box::pin(OVERLAPPED { + hEvent: event.raw(), + ..Default::default() + }); + + Ok(Self { + pipe, + overlapped, + event, + }) + } + + /// Connects to the named pipe server. + /// Returns true if pipe is already connected prior to this call and no additional + /// overlapped io is needed. If false is returned, the caller should call `get_result()` to + /// after returned event handle is signaled to complete the connection. + pub(crate) fn overlapped_connect(&mut self) -> Result { + // SAFETY: The handle is valid and we are the owner of the handle. + let result = unsafe { ConnectNamedPipe(self.pipe.raw(), Some(self.overlapped.deref_mut() as *mut _)) }; + + match result { + Ok(()) => { + self.pipe.connected = true; + Ok(true) + } + Err(error) => { + if error.code() == ERROR_PIPE_CONNECTED.to_hresult() { + // The pipe is already connected. + self.pipe.connected = true; + Ok(true) + } else if error.code() == ERROR_IO_PENDING.to_hresult() { + // Overlapped I/O is pending. + Ok(false) + } else { + // Connection failed. + Err(WindowsError::OverlappedConnect(error)) + } + } + } + } + + pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> { + self.event.borrow() + } + + pub(crate) fn get_result(&mut self) -> Result<(), WindowsError> { + let mut bytes_read = 0u32; + + // SAFETY: The handle is valid and we are the owner of the handle. + unsafe { + GetOverlappedResult( + self.pipe.raw(), + self.overlapped.deref_mut() as *mut _, + &mut bytes_read as *mut u32, + false, + ) + .map_err(WindowsError::OverlappedConnect)? + }; + + self.pipe.connected = true; + + Ok(()) + } +} + +pub(crate) struct OverlappedPipeReadCtx<'a> { + pipe: &'a MessagePipeServer, + buffer: Vec, + overlapped: Pin>, + event: Event, +} + +impl<'a> OverlappedPipeReadCtx<'a> { + fn new(pipe: &'a MessagePipeServer, buffer_size: usize) -> Result { + let event = Event::new_unnamed()?; + + let overlapped = Box::pin(OVERLAPPED { + hEvent: event.raw(), + ..Default::default() + }); + + Ok(Self { + pipe, + buffer: vec![0; buffer_size], + overlapped, + event, + }) + } + + pub(crate) fn overlapped_read(&mut self) -> Result<(), WindowsError> { + // SAFETY: self.pipe.raw() returns a valid handle. The read buffer pointer returned + // by self.buffer.as_mut_slice() is valid and remains alive for the entire duration + // of the overlapped I/O operation. The OVERLAPPED structure is pinned and not moved + // in memory, ensuring its address remains stable until the operation completes. + let result = unsafe { + ReadFile( + self.pipe.raw(), + Some(self.buffer.as_mut_slice()), + None, + Some(self.overlapped.deref_mut() as *mut _), + ) + }; + + ensure_overlapped_io_result(result)?.map_err(WindowsError::OverlappedRead) + } + + pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> { + self.event.borrow() + } + + pub(crate) fn get_result(&mut self) -> Result<&[u8], WindowsError> { + let mut bytes_read = 0u32; + + // SAFETY: The handle is valid and we are the owner of the handle. + unsafe { + GetOverlappedResult( + self.pipe.raw(), + self.overlapped.deref_mut() as *mut _, + &mut bytes_read as *mut u32, + false, + ) + .map_err(WindowsError::OverlappedRead)? + }; + + Ok(&self.buffer[..bytes_read as usize]) + } +} + +pub(crate) struct OverlappedWriteCtx<'a> { + pipe: &'a MessagePipeServer, + data: Vec, + overlapped: Pin>, + event: Event, +} + +impl<'a> OverlappedWriteCtx<'a> { + fn new(pipe: &'a MessagePipeServer, data: Vec) -> Result { + let event = Event::new_unnamed()?; + + let mut overlapped = Box::pin(OVERLAPPED { + hEvent: event.raw(), + ..Default::default() + }); + + // Set write mode to append + overlapped.Anonymous.Anonymous.Offset = 0xFFFFFFFF; + overlapped.Anonymous.Anonymous.OffsetHigh = 0xFFFFFFFF; + + Ok(Self { + pipe, + data, + overlapped, + event, + }) + } + + pub(crate) fn overlapped_write(&mut self) -> Result<(), WindowsError> { + // SAFETY: self.pipe.raw() returns a valid handle. The write buffer pointer (&self.data) is valid + // and remains alive for the entire duration of the overlapped I/O operation. The OVERLAPPED + // structure is pinned and not moved in memory, ensuring its address remains stable until the + // operation completes. + let result = unsafe { + WriteFile( + self.pipe.raw(), + Some(&self.data), + None, + Some(self.overlapped.deref_mut() as *mut _), + ) + }; + + ensure_overlapped_io_result(result)?.map_err(WindowsError::OverlappedWrite) + } + + pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> { + self.event.borrow() + } + + pub(crate) fn get_result(&mut self) -> Result { + let mut bytes_written = 0u32; + // SAFETY: The pipe handle is valid and we are the owner of the handle. + unsafe { + GetOverlappedResult( + self.pipe.raw(), + self.overlapped.deref_mut() as *const _, + &mut bytes_written as *mut u32, + true, + ) + .map_err(WindowsError::OverlappedWrite)?; + }; + + Ok(bytes_written) + } +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/semaphore.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/semaphore.rs new file mode 100644 index 000000000..3e3ad0536 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/windows/semaphore.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use windows::core::Owned; +use windows::Win32::Foundation::HANDLE; +use windows::Win32::System::Threading::{CreateSemaphoreW, ReleaseSemaphore}; + +use crate::windows::{BorrowedHandle, WindowsError}; + +/// RAII wrapper for WinAPI semaphore handle. +#[derive(Debug, Clone)] +pub(crate) struct Semaphore { + handle: Arc>, +} + +// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via +// Semaphore wrapper API itself by restricting handle usage: +// - release() method which calls ReleaseSemaphore inside (which is thread-safe). +// - borrow() method which returns a BorrowedHandle for waiting on the semaphore. +// - Handle lifetime is ensured by Arc, so it is always valid when used. +unsafe impl Send for Semaphore {} + +impl Semaphore { + /// Creates a new unnamed semaphore with the specified initial and maximum counts. + pub(crate) fn new_unnamed(initial_count: u32, maximum_count: u32) -> Result { + if maximum_count == 0 { + return Err(WindowsError::InvalidSemaphoreParams( + "maximum_count must be greater than 0", + )); + } + + if initial_count > maximum_count { + return Err(WindowsError::InvalidSemaphoreParams( + "initial_count must be less than or equal to maximum_count", + )); + } + + let initial_count = i32::try_from(initial_count) + .map_err(|_| WindowsError::InvalidSemaphoreParams("initial_count should be positive"))?; + + let maximum_count = i32::try_from(maximum_count) + .map_err(|_| WindowsError::InvalidSemaphoreParams("maximum_count should be positive"))?; + + // SAFETY: All parameters are checked for validity above: + // - initial_count is always <= maximum_count. + // - maximum_count is always > 0. + // - all values are positive. + let handle = unsafe { + CreateSemaphoreW(None, initial_count, maximum_count, None).map_err(WindowsError::CreateSemaphore)? + }; + + // SAFETY: Handle is valid and we are the owner of the handle. + let handle = unsafe { Owned::new(handle) }; + + // CreateSemaphoreW returns a valid handle on success. + Ok(Self { + // See `unsafe impl Send` comment. + // TODO(@CBenoit): Verify this comment. + #[allow(clippy::arc_with_non_send_sync)] + handle: Arc::new(handle), + }) + } + + fn raw(&self) -> HANDLE { + **self.handle + } + + pub(crate) fn borrow(&self) -> BorrowedHandle<'_> { + BorrowedHandle(&self.handle) + } + + pub(crate) fn release(&self, release_count: u16) -> Result { + let release_count = i32::from(release_count); + + if release_count == 0 { + // semaphore release count must be greater than 0 + return Err(WindowsError::InvalidSemaphoreParams( + "release_count must be greater than 0", + )); + } + + let mut previous_count = 0; + // SAFETY: All parameters are checked for validity above: + // - release_count > 0. + // - lpPreviousCount points to valid stack memory. + // - handle is valid and owned by this struct. + unsafe { + ReleaseSemaphore(self.raw(), release_count, Some(&mut previous_count)) + .map_err(WindowsError::ReleaseSemaphore)?; + } + Ok(previous_count.try_into().expect("semaphore count is negative")) + } +} diff --git a/crates/ironrdp-dvc/src/client.rs b/crates/ironrdp-dvc/src/client.rs index e88e977db..50af13ddf 100644 --- a/crates/ironrdp-dvc/src/client.rs +++ b/crates/ironrdp-dvc/src/client.rs @@ -68,6 +68,10 @@ impl DrdynvcClient { self.dynamic_channels.get_by_type_id(TypeId::of::()) } + pub fn get_dvc_by_channel_id(&self, channel_id: u32) -> Option<&DynamicVirtualChannel> { + self.dynamic_channels.get_by_channel_id(channel_id) + } + fn create_capabilities_response(&mut self) -> SvcMessage { let caps_response = DrdynvcClientPdu::Capabilities(CapabilitiesResponsePdu::new(CapsVersion::V1)); debug!("Send DVC Capabilities Response PDU: {caps_response:?}"); @@ -141,7 +145,7 @@ impl SvcProcessor for DrdynvcClient { } DrdynvcServerPdu::Close(close_request) => { debug!("Got DVC Close Request PDU: {close_request:?}"); - self.dynamic_channels.remove_by_channel_id(&close_request.channel_id); + self.dynamic_channels.remove_by_channel_id(close_request.channel_id); let close_response = DrdynvcClientPdu::Close(ClosePdu::new(close_request.channel_id)); @@ -153,7 +157,7 @@ impl SvcProcessor for DrdynvcClient { let messages = self .dynamic_channels - .get_by_channel_id_mut(&channel_id) + .get_by_channel_id_mut(channel_id) .ok_or_else(|| pdu_other_err!("access to non existing DVC channel"))? .process(data)?; diff --git a/crates/ironrdp-dvc/src/lib.rs b/crates/ironrdp-dvc/src/lib.rs index 3fcb28879..3061f7ea4 100644 --- a/crates/ironrdp-dvc/src/lib.rs +++ b/crates/ironrdp-dvc/src/lib.rs @@ -202,14 +202,20 @@ impl DynamicChannelSet { self.channels.get_mut(name) } - fn get_by_channel_id_mut(&mut self, id: &DynamicChannelId) -> Option<&mut DynamicVirtualChannel> { + fn get_by_channel_id(&self, id: DynamicChannelId) -> Option<&DynamicVirtualChannel> { self.channel_id_to_name - .get(id) + .get(&id) + .and_then(|name| self.channels.get(name)) + } + + fn get_by_channel_id_mut(&mut self, id: DynamicChannelId) -> Option<&mut DynamicVirtualChannel> { + self.channel_id_to_name + .get(&id) .and_then(|name| self.channels.get_mut(name)) } - fn remove_by_channel_id(&mut self, id: &DynamicChannelId) -> Option { - if let Some(name) = self.channel_id_to_name.remove(id) { + fn remove_by_channel_id(&mut self, id: DynamicChannelId) -> Option { + if let Some(name) = self.channel_id_to_name.remove(&id) { return self.name_to_channel_id.remove(&name); // Channels are retained in the `self.channels` and `self.type_id_to_name` map to allow potential // dynamic re-addition by the server. diff --git a/crates/ironrdp-session/src/active_stage.rs b/crates/ironrdp-session/src/active_stage.rs index 50a7fd04a..4fd7c5675 100644 --- a/crates/ironrdp-session/src/active_stage.rs +++ b/crates/ironrdp-session/src/active_stage.rs @@ -10,7 +10,7 @@ use ironrdp_pdu::geometry::InclusiveRectangle; use ironrdp_pdu::input::fast_path::{FastPathInput, FastPathInputEvent}; use ironrdp_pdu::rdp::headers::ShareDataPdu; use ironrdp_pdu::{mcs, Action}; -use ironrdp_svc::{SvcProcessor, SvcProcessorMessages}; +use ironrdp_svc::{SvcMessage, SvcProcessor, SvcProcessorMessages}; use crate::fast_path::UpdateKind; use crate::image::DecodedImage; @@ -187,6 +187,10 @@ impl ActiveStage { self.x224_processor.get_dvc::() } + pub fn get_dvc_by_channel_id(&mut self, channel_id: u32) -> Option<&DynamicVirtualChannel> { + self.x224_processor.get_dvc_by_channel_id(channel_id) + } + /// Completes user's SVC request with data, required to sent it over the network and returns /// a buffer with encoded data. pub fn process_svc_processor_messages( @@ -245,6 +249,10 @@ impl ActiveStage { None } + + pub fn encode_dvc_messages(&mut self, messages: Vec) -> SessionResult> { + self.process_svc_processor_messages(SvcProcessorMessages::::new(messages)) + } } #[derive(Debug)] diff --git a/crates/ironrdp-session/src/x224/mod.rs b/crates/ironrdp-session/src/x224/mod.rs index 3313aabe5..9b411dc86 100644 --- a/crates/ironrdp-session/src/x224/mod.rs +++ b/crates/ironrdp-session/src/x224/mod.rs @@ -88,6 +88,11 @@ impl Processor { self.get_svc_processor::()?.get_dvc_by_type_id::() } + pub fn get_dvc_by_channel_id(&self, channel_id: u32) -> Option<&DynamicVirtualChannel> { + self.get_svc_processor::()? + .get_dvc_by_channel_id(channel_id) + } + /// Processes a received PDU. Returns a vector of [`ProcessorOutput`] that must be processed /// in the returned order. pub fn process(&mut self, frame: &[u8]) -> SessionResult> { diff --git a/crates/ironrdp-svc/src/lib.rs b/crates/ironrdp-svc/src/lib.rs index 58746380c..7e712bcbb 100644 --- a/crates/ironrdp-svc/src/lib.rs +++ b/crates/ironrdp-svc/src/lib.rs @@ -83,6 +83,15 @@ pub struct SvcMessage { flags: ChannelFlags, } +impl fmt::Debug for SvcMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SvcMessage") + .field("pdu", &self.pdu.name()) + .field("flags", &self.flags) + .finish() + } +} + impl SvcMessage { /// Adds additional SVC header flags to the message. #[must_use]