Skip to content

Commit 861e518

Browse files
authored
refactor(fspy): use shared memory IPC on Windows (#239)
### TL;DR Replaced Windows pipe-based IPC with shared memory for file system access tracking, unifying the approach across platforms. ### What changed? - Replaced Windows named pipe IPC with shared memory for tracking file system accesses - Created a new `ipc.rs` module with common code for both Windows and Unix platforms - Extracted `OwnedReceiverLockGuard` from Unix implementation to be reused on Windows - Removed dependency on `dashmap` which was used for the previous Windows implementation - Added `write_encoded` method to `ShmWriter` to directly encode values into shared memory - Updated Windows payload structure to use `ChannelConf` instead of pipe handles ### Why make this change? The previous implementation used different IPC mechanisms on Windows (named pipes) and Unix (shared memory). This change unifies the approach across platforms, making the code more maintainable and consistent. Shared memory is more efficient for passing large amounts of data between processes, which is important for tracking file system accesses in complex applications. The unified approach also simplifies the codebase by removing platform-specific code paths and dependencies.
1 parent 92c05a4 commit 861e518

12 files changed

Lines changed: 192 additions & 268 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fspy/src/ipc.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::io;
2+
3+
use bincode::borrow_decode_from_slice;
4+
use fspy_shared::ipc::{
5+
BINCODE_CONFIG, PathAccess,
6+
channel::{Receiver, ReceiverLockGuard},
7+
};
8+
use tokio::task::spawn_blocking;
9+
10+
// Shared memory size for storing path accesses.
11+
// 4 GiB is large enough to store path accesses in almost any realistic scenario.
12+
// This doesn't allocate physical memory until it's actually used.
13+
pub const SHM_CAPACITY: usize = 4 * 1024 * 1024 * 1024;
14+
15+
#[ouroboros::self_referencing]
16+
pub struct OwnedReceiverLockGuard {
17+
/// Owns the shared memory
18+
receiver: Receiver,
19+
/// Borrows the shared memory and owns the file lock
20+
#[borrows(receiver)]
21+
#[covariant]
22+
lock_guard: ReceiverLockGuard<'this>,
23+
}
24+
25+
impl OwnedReceiverLockGuard {
26+
pub fn lock(receiver: Receiver) -> io::Result<Self> {
27+
OwnedReceiverLockGuard::try_new(receiver, |receiver| receiver.lock())
28+
}
29+
30+
pub async fn lock_async(receiver: Receiver) -> io::Result<Self> {
31+
spawn_blocking(move || Self::lock(receiver)).await.expect("lock task panicked")
32+
}
33+
34+
pub fn iter_path_accesses(&self) -> impl Iterator<Item = PathAccess<'_>> {
35+
self.borrow_lock_guard().iter_frames().map(|frame| {
36+
let (path_access, decoded_size) =
37+
borrow_decode_from_slice::<PathAccess<'_>, _>(frame, BINCODE_CONFIG).unwrap();
38+
assert_eq!(decoded_size, frame.len());
39+
path_access
40+
})
41+
}
42+
}

crates/fspy/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
// Persist the injected DLL/shared library somewhere in the filesystem.
55
mod fixture;
66

7+
mod ipc;
8+
79
#[cfg(unix)]
810
#[path = "./unix/mod.rs"]
911
mod os_impl;
@@ -12,6 +14,7 @@ mod os_impl;
1214
#[path = "./windows/mod.rs"]
1315
mod os_impl;
1416

17+
#[cfg(unix)]
1518
mod arena;
1619
mod command;
1720

crates/fspy/src/unix/mod.rs

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
// use std::{os::unix::process::CommandExt};
2-
3-
// use tokio::process::Command;
4-
51
#[cfg(target_os = "linux")]
62
mod syscall_handler;
73

@@ -10,13 +6,9 @@ mod macos_fixtures;
106

117
use std::{io, path::Path};
128

13-
use bincode::borrow_decode_from_slice;
149
#[cfg(target_os = "linux")]
1510
use fspy_seccomp_unotify::supervisor::supervise;
16-
use fspy_shared::ipc::{
17-
BINCODE_CONFIG, NativeString, PathAccess,
18-
channel::{Receiver, ReceiverLockGuard, channel},
19-
};
11+
use fspy_shared::ipc::{NativeString, PathAccess, channel::channel};
2012
#[cfg(target_os = "macos")]
2113
use fspy_shared_unix::payload::Fixtures;
2214
use fspy_shared_unix::{
@@ -27,9 +19,12 @@ use fspy_shared_unix::{
2719
use futures_util::FutureExt;
2820
#[cfg(target_os = "linux")]
2921
use syscall_handler::SyscallHandler;
30-
use tokio::task::spawn_blocking;
3122

32-
use crate::{Command, TrackedChild, arena::PathAccessArena};
23+
use crate::{
24+
Command, TrackedChild,
25+
arena::PathAccessArena,
26+
ipc::{OwnedReceiverLockGuard, SHM_CAPACITY},
27+
};
3328

3429
#[derive(Debug, Clone)]
3530
pub struct SpyInner {
@@ -71,16 +66,6 @@ impl SpyInner {
7166
}
7267
}
7368

74-
#[ouroboros::self_referencing]
75-
struct OwnedReceiverLockGuard {
76-
/// Owns the shared memory
77-
receiver: Receiver,
78-
/// Borrows the shared memory and owns the file lock
79-
#[borrows(receiver)]
80-
#[covariant]
81-
lock_guard: ReceiverLockGuard<'this>,
82-
}
83-
8469
pub struct PathAccessIterable {
8570
arenas: Vec<PathAccessArena>,
8671
ipc_receiver_lock_guard: OwnedReceiverLockGuard,
@@ -91,22 +76,11 @@ impl PathAccessIterable {
9176
let accesses_in_arena =
9277
self.arenas.iter().flat_map(|arena| arena.borrow_accesses().iter()).copied();
9378

94-
let accesses_in_shm =
95-
self.ipc_receiver_lock_guard.borrow_lock_guard().iter_frames().map(|frame| {
96-
let (path_access, decoded_size) =
97-
borrow_decode_from_slice::<PathAccess<'_>, _>(frame, BINCODE_CONFIG).unwrap();
98-
assert_eq!(decoded_size, frame.len());
99-
path_access
100-
});
79+
let accesses_in_shm = self.ipc_receiver_lock_guard.iter_path_accesses();
10180
accesses_in_shm.chain(accesses_in_arena)
10281
}
10382
}
10483

105-
// Shared memory size for storing path accesses.
106-
// 4 GiB is large enough to store path accesses in almost any realistic scenario.
107-
// This doesn't allocate physical memory until it's actually used.
108-
const SHM_CAPACITY: usize = 4 * 1024 * 1024 * 1024;
109-
11084
pub(crate) async fn spawn_impl(mut command: Command) -> io::Result<TrackedChild> {
11185
#[cfg(target_os = "linux")]
11286
let supervisor = supervise::<SyscallHandler>()?;
@@ -170,10 +144,7 @@ pub(crate) async fn spawn_impl(mut command: Command) -> io::Result<TrackedChild>
170144
let accesses_future = async move {
171145
let arenas = arenas_future.await?;
172146
// `receiver.lock()` blocks. Run it inside `spawn_blocking` to avoid blocking the tokio runtime.
173-
let ipc_receiver_lock_guard = spawn_blocking(move || {
174-
OwnedReceiverLockGuard::try_new(ipc_receiver, |receiver| receiver.lock())
175-
})
176-
.await??;
147+
let ipc_receiver_lock_guard = OwnedReceiverLockGuard::lock_async(ipc_receiver).await?;
177148
Ok(PathAccessIterable { arenas, ipc_receiver_lock_guard })
178149
}
179150
.boxed();

crates/fspy/src/windows/mod.rs

Lines changed: 20 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,31 @@
11
use std::{
2-
ffi::{CStr, c_char, c_void},
3-
fs::OpenOptions,
2+
ffi::{CStr, c_char},
43
io,
54
os::windows::{ffi::OsStrExt, io::AsRawHandle, process::ChildExt as _},
65
path::Path,
7-
ptr::null_mut,
86
sync::Arc,
97
};
108

11-
use bincode::borrow_decode_from_slice;
129
use const_format::formatcp;
1310
use fspy_detours_sys::{DetourCopyPayloadToProcess, DetourUpdateProcessWithDll};
1411
use fspy_shared::{
15-
ipc::{BINCODE_CONFIG, PathAccess},
12+
ipc::{BINCODE_CONFIG, PathAccess, channel::channel},
1613
windows::{PAYLOAD_ID, Payload},
1714
};
1815
use futures_util::FutureExt;
19-
use tokio::{
20-
io::AsyncReadExt,
21-
net::windows::named_pipe::{PipeMode, ServerOptions},
22-
};
23-
// use detours_sys2::{DetourAttach,};
2416
use winapi::{
25-
shared::minwindef::{FALSE, TRUE},
26-
um::{
27-
handleapi::DuplicateHandle,
28-
processthreadsapi::{GetCurrentProcess, ResumeThread},
29-
winbase::CREATE_SUSPENDED,
30-
winnt::DUPLICATE_SAME_ACCESS,
31-
},
17+
shared::minwindef::TRUE,
18+
um::{processthreadsapi::ResumeThread, winbase::CREATE_SUSPENDED},
3219
};
33-
// use windows_sys::Win32::System::Threading::{CREATE_SUSPENDED, ResumeThread};
3420
use winsafe::co::{CP, WC};
3521
use xxhash_rust::const_xxh3::xxh3_128;
3622

37-
use crate::{TrackedChild, arena::PathAccessArena, command::Command, fixture::Fixture};
23+
use crate::{
24+
TrackedChild,
25+
command::Command,
26+
fixture::Fixture,
27+
ipc::{OwnedReceiverLockGuard, SHM_CAPACITY},
28+
};
3829

3930
const PRELOAD_CDYLIB_BINARY: &[u8] = include_bytes!(env!("CARGO_CDYLIB_FILE_FSPY_PRELOAD_WINDOWS"));
4031
const INTERPOSE_CDYLIB: Fixture = Fixture::new(
@@ -43,38 +34,14 @@ const INTERPOSE_CDYLIB: Fixture = Fixture::new(
4334
formatcp!("{:x}", xxh3_128(PRELOAD_CDYLIB_BINARY)),
4435
);
4536

46-
fn luid() -> io::Result<u64> {
47-
let mut luid = unsafe { std::mem::zeroed::<winapi::um::winnt::LUID>() };
48-
let ret = unsafe { winapi::um::securitybaseapi::AllocateLocallyUniqueId(&mut luid) };
49-
if ret == 0 {
50-
return Err(io::Error::last_os_error());
51-
}
52-
Ok((u64::from(luid.HighPart as u32)) << 32 | u64::from(luid.LowPart))
53-
}
54-
5537
pub struct PathAccessIterable {
56-
arena: PathAccessArena,
57-
// pipe_receiver: NamedPipeServer,
38+
ipc_receiver_lock_guard: OwnedReceiverLockGuard,
5839
}
5940

60-
const MESSAGE_MAX_LEN: usize = 4096;
61-
6241
impl PathAccessIterable {
6342
pub fn iter(&self) -> impl Iterator<Item = PathAccess<'_>> {
64-
self.arena.borrow_accesses().iter().copied()
43+
self.ipc_receiver_lock_guard.iter_path_accesses()
6544
}
66-
// pub async fn next<'a>(&mut self, buf: &'a mut Vec<u8>) -> io::Result<Option<PathAccess<'a>>> {
67-
// buf.resize(MESSAGE_MAX_LEN, 0);
68-
// let n = self.pipe_receiver.read(buf.as_mut_slice()).await?;
69-
// if n == 0 {
70-
// return Ok(None);
71-
// }
72-
// let msg = &buf[..n];
73-
// let (path_access, decoded_len) =
74-
// borrow_decode_from_slice::<'_, PathAccess, _>(msg, BINCODE_CONFIG).unwrap();
75-
// assert_eq!(decoded_len, msg.len());
76-
// Ok(Some(path_access))
77-
// }
7845
}
7946

8047
// pub struct TracedProcess {
@@ -110,46 +77,13 @@ pub(crate) async fn spawn_impl(command: Command) -> io::Result<TrackedChild> {
11077

11178
command.creation_flags(CREATE_SUSPENDED);
11279

113-
let pipe_name = format!(r"\\.\pipe\fspy_ipc_{:x}", luid()?);
114-
115-
let mut pipe_receiver = ServerOptions::new()
116-
.pipe_mode(PipeMode::Message)
117-
.access_outbound(false)
118-
.access_inbound(true)
119-
.in_buffer_size(1024)
120-
// .out_buffer_size(100 * 1024 * 1024)
121-
.create(&pipe_name)?;
122-
123-
let connect_fut = pipe_receiver.connect();
124-
125-
let pipe_sender = OpenOptions::new().write(true).open(&pipe_name).unwrap();
126-
127-
connect_fut.await?;
128-
129-
// Temporary workaround before switching to shared memory IPC on Windows.
130-
// The shared memory IPC requires `accesses_future` to be polled after the child process has exited.
131-
// The test code has updated but the Windows implementation here has not yet adopted that model.
132-
// As a result, accesses_future didn't get polled making the pipe_sender to block indefinitely.
133-
// To unblock the pipe_sender, we spawn a separate task to keep reading pipe_receiver.
134-
// This workaround can be removed once the shared memory IPC is used on Windows.
135-
let accesses_future = tokio::task::spawn(async move {
136-
let mut arena = PathAccessArena::default();
137-
138-
let mut buf = [0u8; MESSAGE_MAX_LEN];
139-
loop {
140-
let n = pipe_receiver.read(&mut buf).await?;
141-
if n == 0 {
142-
break;
143-
}
144-
let msg = &buf[..n];
145-
let (path_access, decoded_len) =
146-
borrow_decode_from_slice::<'_, PathAccess, _>(msg, BINCODE_CONFIG).unwrap();
147-
assert_eq!(decoded_len, msg.len());
148-
arena.add(path_access);
149-
}
150-
io::Result::Ok(PathAccessIterable { arena })
151-
});
152-
let accesses_future = async move { accesses_future.await.unwrap() }.boxed();
80+
let (channel_conf, receiver) = channel(SHM_CAPACITY)?;
81+
82+
let accesses_future = async move {
83+
let ipc_receiver_lock_guard = OwnedReceiverLockGuard::lock_async(receiver).await?;
84+
io::Result::Ok(PathAccessIterable { ipc_receiver_lock_guard })
85+
}
86+
.boxed();
15387

15488
// let path_access_stream = PathAccessIterable { pipe_receiver };
15589

@@ -163,24 +97,8 @@ pub(crate) async fn spawn_impl(command: Command) -> io::Result<TrackedChild> {
16397
return Err(io::Error::last_os_error());
16498
}
16599

166-
let mut handle_in_child: *mut c_void = null_mut();
167-
let ret = unsafe {
168-
DuplicateHandle(
169-
GetCurrentProcess(),
170-
pipe_sender.as_raw_handle(),
171-
process_handle,
172-
&mut handle_in_child,
173-
0,
174-
FALSE,
175-
DUPLICATE_SAME_ACCESS,
176-
)
177-
};
178-
if ret == 0 {
179-
return Err(io::Error::last_os_error());
180-
}
181-
182100
let payload = Payload {
183-
pipe_handle: handle_in_child.addr(),
101+
channel_conf: channel_conf.clone(),
184102
asni_dll_path_with_nul: asni_dll_path_with_nul.to_bytes(),
185103
};
186104
let payload_bytes = bincode::encode_to_vec(payload, BINCODE_CONFIG).unwrap();
@@ -207,6 +125,5 @@ pub(crate) async fn spawn_impl(command: Command) -> io::Result<TrackedChild> {
207125
Ok(std_child)
208126
})?;
209127

210-
drop(pipe_sender);
211128
Ok(TrackedChild { tokio_child: child, accesses_future })
212129
}

crates/fspy_preload_windows/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,3 @@ winsafe = { workspace = true }
2626

2727
[target.'cfg(target_os = "windows")'.dev-dependencies]
2828
tempfile = { workspace = true }
29-
30-
[dependencies]
31-
dashmap = { workspace = true }

0 commit comments

Comments
 (0)