Skip to content

Commit 3ec5fba

Browse files
committed
feat(dvc): make dvc named pipe proxy cross-platform
1 parent ae052ed commit 3ec5fba

19 files changed

Lines changed: 504 additions & 1013 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ironrdp-dvc-pipe-proxy/Cargo.toml

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,8 @@ ironrdp-dvc = { path = "../ironrdp-dvc", version = "0.3" }
2222
ironrdp-svc = { path = "../ironrdp-svc", version = "0.4" } # public (SvcMessage type)
2323

2424
tracing = { version = "0.1", features = ["log"] }
25-
26-
[target.'cfg(windows)'.dependencies]
27-
widestring = "1"
28-
windows = { version = "0.61", features = [
29-
"Win32_Foundation",
30-
"Win32_Security",
31-
"Win32_System_Threading",
32-
"Win32_Storage_FileSystem",
33-
"Win32_System_Pipes",
34-
"Win32_System_IO",
35-
] }
25+
tokio = { version = "1", features = ["net", "rt", "sync", "macros", "io-util"]}
26+
async-trait = "0.1"
3627

3728
[lints]
3829
workspace = true
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#[derive(Debug)]
2+
pub(crate) enum DvcPipeProxyError {
3+
Io(std::io::Error),
4+
EncodeDvcMessage(ironrdp_core::EncodeError),
5+
}
6+
7+
impl core::fmt::Display for DvcPipeProxyError {
8+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
9+
match self {
10+
DvcPipeProxyError::Io(_) => write!(f, "IO error"),
11+
DvcPipeProxyError::EncodeDvcMessage(_) => write!(f, "DVC message encoding error"),
12+
}
13+
}
14+
}
15+
16+
impl core::error::Error for DvcPipeProxyError {
17+
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
18+
match self {
19+
DvcPipeProxyError::Io(err) => Some(err),
20+
DvcPipeProxyError::EncodeDvcMessage(src) => Some(src),
21+
}
22+
}
23+
}

crates/ironrdp-dvc-pipe-proxy/src/lib.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
#[macro_use]
55
extern crate tracing;
66

7-
#[cfg(target_os = "windows")]
8-
mod windows;
9-
7+
mod error;
8+
mod message;
9+
mod os_pipe;
1010
mod platform;
11-
pub use self::platform::DvcNamedPipeProxy;
11+
mod proxy;
12+
mod worker;
13+
14+
pub use self::proxy::DvcNamedPipeProxy;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use ironrdp_core::{ensure_size, Encode, EncodeResult};
2+
use ironrdp_dvc::DvcEncode;
3+
4+
pub(crate) struct RawDataDvcMessage(pub Vec<u8>);
5+
6+
impl Encode for RawDataDvcMessage {
7+
fn encode(&self, dst: &mut ironrdp_core::WriteCursor<'_>) -> EncodeResult<()> {
8+
ensure_size!(in: dst, size: self.size());
9+
dst.write_slice(&self.0);
10+
Ok(())
11+
}
12+
13+
fn name(&self) -> &'static str {
14+
"RawDataDvcMessage"
15+
}
16+
17+
fn size(&self) -> usize {
18+
self.0.len()
19+
}
20+
}
21+
22+
impl DvcEncode for RawDataDvcMessage {}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use async_trait::async_trait;
2+
3+
use crate::error::DvcPipeProxyError;
4+
5+
#[async_trait]
6+
pub(crate) trait OsPipe: Send + Sync {
7+
/// Creates a new OS pipe and waits for the connection.
8+
async fn connect(pipe_name: &str) -> Result<Self, DvcPipeProxyError>
9+
where
10+
Self: Sized;
11+
12+
/// Reads data from the pipe and returns the number of bytes read.
13+
///
14+
/// Returned future should be stateless and can be polled multiple times.
15+
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, DvcPipeProxyError>;
16+
17+
/// Writes data to the pipe and returns the number of bytes written.
18+
async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError>;
19+
}
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
#[cfg(target_os = "windows")]
2-
mod windows;
3-
#[cfg(target_os = "windows")]
4-
pub use self::windows::DvcNamedPipeProxy;
2+
pub(crate) mod windows;
53

64
#[cfg(not(target_os = "windows"))]
7-
mod unix;
8-
#[cfg(not(target_os = "windows"))]
9-
pub use self::unix::DvcNamedPipeProxy;
5+
pub(crate) mod unix;

crates/ironrdp-dvc-pipe-proxy/src/platform/unix.rs

Lines changed: 55 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,66 @@
1-
use ironrdp_core::impl_as_any;
2-
use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor};
3-
use ironrdp_pdu::{pdu_other_err, PduResult};
4-
use ironrdp_svc::SvcMessage;
5-
6-
/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server.
7-
pub struct DvcNamedPipeProxy {
8-
channel_name: String,
1+
use async_trait::async_trait;
2+
use tokio::fs;
3+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
4+
5+
use crate::error::DvcPipeProxyError;
6+
use crate::os_pipe::OsPipe;
7+
8+
/// Unix-specific implementation of the OS pipe trait.
9+
pub(crate) struct UnixPipe {
10+
socket: tokio::net::UnixStream,
911
}
1012

11-
impl DvcNamedPipeProxy {
12-
/// Creates a new DVC named pipe proxy.
13-
/// `dvc_write_callback` is called when the proxy receives a DVC message from the
14-
/// named pipe server and the SVC message is ready to be sent to the DVC channel in the main
15-
/// IronRDP active session loop.
16-
pub fn new<F>(channel_name: &str, _named_pipe_name: &str, _dvc_write_callback: F) -> Self
17-
where
18-
F: Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send + 'static,
19-
{
20-
error!("DvcNamedPipeProxy is not implemented on Unix-like systems, using a stub implementation");
21-
22-
Self {
23-
channel_name: channel_name.to_owned(),
13+
#[async_trait]
14+
impl OsPipe for UnixPipe {
15+
async fn connect(pipe_name: &str) -> Result<Self, DvcPipeProxyError> {
16+
// Domain socket file could already exist from a previous run.
17+
match fs::metadata(&pipe_name).await {
18+
Ok(metadata) => {
19+
use std::os::unix::fs::FileTypeExt;
20+
21+
info!(
22+
%pipe_name,
23+
"DVC pipe already exists, removing stale file."
24+
);
25+
26+
// Just to be sure, check if it's indeed a socket -
27+
// throw an error if calling code accidentally passed a regular file.
28+
if !metadata.file_type().is_socket() {
29+
return Err(DvcPipeProxyError::Io(std::io::Error::new(
30+
std::io::ErrorKind::InvalidInput,
31+
format!("Path {} is not a socket", pipe_name),
32+
)));
33+
}
34+
35+
fs::remove_file(pipe_name).await.map_err(DvcPipeProxyError::Io)?;
36+
}
37+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
38+
trace!(
39+
%pipe_name,
40+
"DVC pipe does not exist, creating it."
41+
);
42+
}
43+
Err(e) => {
44+
return Err(DvcPipeProxyError::Io(e));
45+
}
2446
}
25-
}
26-
}
2747

28-
impl_as_any!(DvcNamedPipeProxy);
48+
let listener = tokio::net::UnixListener::bind(pipe_name).map_err(DvcPipeProxyError::Io)?;
2949

30-
impl DvcProcessor for DvcNamedPipeProxy {
31-
fn channel_name(&self) -> &str {
32-
&self.channel_name
50+
let (socket, _) = listener.accept().await.map_err(DvcPipeProxyError::Io)?;
51+
52+
Ok(Self { socket })
3353
}
3454

35-
fn start(&mut self, _channel_id: u32) -> PduResult<Vec<DvcMessage>> {
36-
Err(pdu_other_err!(
37-
"DvcNamedPipeProxy is not implemented on Unix-like systems"
38-
))
55+
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, DvcPipeProxyError> {
56+
self.socket.read(buffer).await.map_err(DvcPipeProxyError::Io)
3957
}
4058

41-
fn process(&mut self, _channel_id: u32, _payload: &[u8]) -> PduResult<Vec<DvcMessage>> {
42-
Err(pdu_other_err!(
43-
"DvcNamedPipeProxy is not implemented on Unix-like systems"
44-
))
59+
async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError> {
60+
self.socket
61+
.write_all(buffer)
62+
.await
63+
.map_err(DvcPipeProxyError::Io)
64+
.map(|_| ())
4565
}
4666
}
47-
48-
impl DvcClientProcessor for DvcNamedPipeProxy {}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use async_trait::async_trait;
2+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
3+
use tokio::net::windows::named_pipe;
4+
5+
use crate::error::DvcPipeProxyError;
6+
use crate::os_pipe::OsPipe;
7+
8+
const PIPE_BUFFER_SIZE: u32 = 64 * 1024; // 64KB
9+
10+
/// Unix-specific implementation of the OS pipe trait.
11+
pub(crate) struct WindowsPipe {
12+
pipe_server: named_pipe::NamedPipeServer,
13+
}
14+
15+
#[async_trait]
16+
impl OsPipe for WindowsPipe {
17+
async fn connect(pipe_name: &str) -> Result<Self, DvcPipeProxyError> {
18+
let pipe_name = format!("\\\\.\\pipe\\{pipe_name}");
19+
20+
let pipe_server = named_pipe::ServerOptions::new()
21+
.first_pipe_instance(true)
22+
.access_inbound(true)
23+
.access_outbound(true)
24+
.max_instances(2)
25+
.in_buffer_size(PIPE_BUFFER_SIZE)
26+
.out_buffer_size(PIPE_BUFFER_SIZE)
27+
.pipe_mode(named_pipe::PipeMode::Message)
28+
.create(pipe_name)
29+
.map_err(DvcPipeProxyError::Io)?;
30+
31+
pipe_server.connect().await.map_err(DvcPipeProxyError::Io)?;
32+
33+
Ok(Self { pipe_server })
34+
}
35+
36+
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, DvcPipeProxyError> {
37+
self.pipe_server.read(buffer).await.map_err(DvcPipeProxyError::Io)
38+
}
39+
40+
async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError> {
41+
self.pipe_server
42+
.write_all(buffer)
43+
.await
44+
.map_err(DvcPipeProxyError::Io)
45+
.map(|_| ())
46+
}
47+
}

crates/ironrdp-dvc-pipe-proxy/src/platform/windows/error.rs

Lines changed: 0 additions & 37 deletions
This file was deleted.

0 commit comments

Comments
 (0)