Skip to content

Commit dc6f1f1

Browse files
committed
feat(ie-sandbox): implement IPC channel with length-prefixed JSON #11
- IpcChannel: Unix domain socket pair with length-prefixed (u32 BE) JSON messages. Send/recv with serde generics. 64MB max message size. - IpcMessage: FetchRequest, FetchResponse (base64 body encoding), FetchError, Shutdown, Ping, Pong with request correlation IDs. - IpcError: ConnectionClosed, MessageTooLarge, Serialization/ Deserialization errors, IO errors. - from_raw_fd for child process fd inheritance. - 16 tests: pair creation, send/recv, bidirectional, multiple messages, large payload, connection close, deserialization errors, concurrent, raw fd round-trip, base64 body encoding verification.
1 parent 2896191 commit dc6f1f1

8 files changed

Lines changed: 415 additions & 19 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,4 @@ bytes = "1"
5656
async-trait = "0.1"
5757
chrono = { version = "0.4", features = ["serde"] }
5858
tempfile = "3"
59+
base64 = "0.22"

crates/ie-sandbox/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ tracing.workspace = true
1414
tokio.workspace = true
1515
serde.workspace = true
1616
serde_json.workspace = true
17+
base64.workspace = true

crates/ie-sandbox/src/channel.rs

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
use serde::Serialize;
2+
use serde::de::DeserializeOwned;
3+
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
4+
5+
use crate::error::IpcError;
6+
7+
const LENGTH_PREFIX_SIZE: usize = 4; // u32 big-endian
8+
const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB
9+
10+
// Unix implementation
11+
#[cfg(unix)]
12+
mod platform {
13+
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
14+
pub type ReadHalf = OwnedReadHalf;
15+
pub type WriteHalf = OwnedWriteHalf;
16+
}
17+
18+
#[cfg(unix)]
19+
pub use platform::*;
20+
21+
pub struct IpcChannel {
22+
reader: BufReader<ReadHalf>,
23+
writer: WriteHalf,
24+
}
25+
26+
impl IpcChannel {
27+
/// Create a connected pair of IPC channels.
28+
#[cfg(unix)]
29+
pub fn pair() -> Result<(IpcChannel, IpcChannel), IpcError> {
30+
let (a, b) = tokio::net::UnixStream::pair()?;
31+
let (a_read, a_write) = a.into_split();
32+
let (b_read, b_write) = b.into_split();
33+
Ok((
34+
IpcChannel {
35+
reader: BufReader::new(a_read),
36+
writer: a_write,
37+
},
38+
IpcChannel {
39+
reader: BufReader::new(b_read),
40+
writer: b_write,
41+
},
42+
))
43+
}
44+
45+
/// Reconstruct a channel from a raw file descriptor (for child processes).
46+
#[cfg(unix)]
47+
pub fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Result<Self, IpcError> {
48+
use std::os::unix::io::FromRawFd;
49+
let std_stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd) };
50+
std_stream.set_nonblocking(true)?;
51+
let stream = tokio::net::UnixStream::from_std(std_stream)?;
52+
let (read, write) = stream.into_split();
53+
Ok(IpcChannel {
54+
reader: BufReader::new(read),
55+
writer: write,
56+
})
57+
}
58+
59+
/// Send a serializable message with length prefix.
60+
pub async fn send<T: Serialize>(&mut self, msg: &T) -> Result<(), IpcError> {
61+
let payload =
62+
serde_json::to_vec(msg).map_err(|e| IpcError::SerializationError(e.to_string()))?;
63+
if payload.len() > MAX_MESSAGE_SIZE {
64+
return Err(IpcError::MessageTooLarge(payload.len(), MAX_MESSAGE_SIZE));
65+
}
66+
self.writer
67+
.write_all(&(payload.len() as u32).to_be_bytes())
68+
.await?;
69+
self.writer.write_all(&payload).await?;
70+
self.writer.flush().await?;
71+
Ok(())
72+
}
73+
74+
/// Receive a deserialized message with length prefix.
75+
pub async fn recv<T: DeserializeOwned>(&mut self) -> Result<T, IpcError> {
76+
let mut len_buf = [0u8; LENGTH_PREFIX_SIZE];
77+
match self.reader.read_exact(&mut len_buf).await {
78+
Ok(_) => {}
79+
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
80+
return Err(IpcError::ConnectionClosed);
81+
}
82+
Err(e) => return Err(IpcError::Io(e)),
83+
}
84+
let size = u32::from_be_bytes(len_buf) as usize;
85+
if size > MAX_MESSAGE_SIZE {
86+
return Err(IpcError::MessageTooLarge(size, MAX_MESSAGE_SIZE));
87+
}
88+
let mut buf = vec![0u8; size];
89+
match self.reader.read_exact(&mut buf).await {
90+
Ok(_) => {}
91+
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
92+
return Err(IpcError::ConnectionClosed);
93+
}
94+
Err(e) => return Err(IpcError::Io(e)),
95+
}
96+
serde_json::from_slice(&buf).map_err(|e| IpcError::DeserializationError(e.to_string()))
97+
}
98+
99+
/// Send raw bytes with length prefix (for testing malformed messages).
100+
#[cfg(test)]
101+
pub(crate) async fn send_raw(&mut self, data: &[u8]) -> Result<(), IpcError> {
102+
self.writer
103+
.write_all(&(data.len() as u32).to_be_bytes())
104+
.await?;
105+
self.writer.write_all(data).await?;
106+
self.writer.flush().await?;
107+
Ok(())
108+
}
109+
110+
/// Write raw bytes directly to the writer (for testing oversized length prefixes).
111+
#[cfg(test)]
112+
pub(crate) async fn write_raw(&mut self, data: &[u8]) -> Result<(), IpcError> {
113+
self.writer.write_all(data).await?;
114+
self.writer.flush().await?;
115+
Ok(())
116+
}
117+
}
118+
119+
#[cfg(test)]
120+
mod tests {
121+
use std::collections::HashMap;
122+
123+
use crate::message::IpcMessage;
124+
125+
use super::*;
126+
127+
#[tokio::test]
128+
async fn pair_creation() {
129+
let (_a, _b) = IpcChannel::pair().unwrap();
130+
}
131+
132+
#[tokio::test]
133+
async fn simple_send_recv() {
134+
let (mut a, mut b) = IpcChannel::pair().unwrap();
135+
a.send(&IpcMessage::Ping).await.unwrap();
136+
let msg: IpcMessage = b.recv().await.unwrap();
137+
assert_eq!(msg, IpcMessage::Ping);
138+
}
139+
140+
#[tokio::test]
141+
async fn round_trip_struct() {
142+
let (mut a, mut b) = IpcChannel::pair().unwrap();
143+
let req = IpcMessage::FetchRequest {
144+
id: 42,
145+
url: "https://example.com".to_string(),
146+
};
147+
a.send(&req).await.unwrap();
148+
let msg: IpcMessage = b.recv().await.unwrap();
149+
assert_eq!(msg, req);
150+
}
151+
152+
#[tokio::test]
153+
async fn bidirectional() {
154+
let (mut a, mut b) = IpcChannel::pair().unwrap();
155+
a.send(&IpcMessage::Ping).await.unwrap();
156+
let msg: IpcMessage = b.recv().await.unwrap();
157+
assert_eq!(msg, IpcMessage::Ping);
158+
b.send(&IpcMessage::Pong).await.unwrap();
159+
let msg: IpcMessage = a.recv().await.unwrap();
160+
assert_eq!(msg, IpcMessage::Pong);
161+
}
162+
163+
#[tokio::test]
164+
async fn multiple_messages() {
165+
let (mut a, mut b) = IpcChannel::pair().unwrap();
166+
for i in 0..100 {
167+
a.send(&IpcMessage::FetchRequest {
168+
id: i,
169+
url: format!("https://example.com/{i}"),
170+
})
171+
.await
172+
.unwrap();
173+
}
174+
for i in 0..100 {
175+
let msg: IpcMessage = b.recv().await.unwrap();
176+
assert_eq!(
177+
msg,
178+
IpcMessage::FetchRequest {
179+
id: i,
180+
url: format!("https://example.com/{i}"),
181+
}
182+
);
183+
}
184+
}
185+
186+
#[tokio::test]
187+
async fn large_fetch_response() {
188+
let (mut a, mut b) = IpcChannel::pair().unwrap();
189+
let body = vec![0xABu8; 100_000]; // 100KB
190+
let msg = IpcMessage::FetchResponse {
191+
id: 1,
192+
status: 200,
193+
headers: HashMap::new(),
194+
body: body.clone(),
195+
final_url: "https://example.com".to_string(),
196+
};
197+
a.send(&msg).await.unwrap();
198+
let received: IpcMessage = b.recv().await.unwrap();
199+
assert_eq!(received, msg);
200+
}
201+
202+
#[tokio::test]
203+
async fn connection_closed_sender_drops() {
204+
let (a, mut b) = IpcChannel::pair().unwrap();
205+
drop(a);
206+
let result: Result<IpcMessage, _> = b.recv().await;
207+
assert!(matches!(result, Err(IpcError::ConnectionClosed)));
208+
}
209+
210+
#[tokio::test]
211+
async fn connection_closed_receiver_drops() {
212+
let (mut a, b) = IpcChannel::pair().unwrap();
213+
drop(b);
214+
// Sending may succeed (buffered) or fail — the key is it doesn't hang
215+
let _ = a.send(&IpcMessage::Ping).await;
216+
// A second send after the pipe is broken should definitely fail
217+
let result = a.send(&IpcMessage::Ping).await;
218+
assert!(result.is_err());
219+
}
220+
221+
#[tokio::test]
222+
async fn deserialization_error() {
223+
let (mut a, mut b) = IpcChannel::pair().unwrap();
224+
a.send_raw(b"not valid json").await.unwrap();
225+
let result: Result<IpcMessage, _> = b.recv().await;
226+
assert!(matches!(result, Err(IpcError::DeserializationError(_))));
227+
}
228+
229+
#[tokio::test]
230+
async fn zero_length_message() {
231+
let (mut a, mut b) = IpcChannel::pair().unwrap();
232+
// Write a zero-length prefix
233+
a.write_raw(&0u32.to_be_bytes()).await.unwrap();
234+
let result: Result<IpcMessage, _> = b.recv().await;
235+
assert!(matches!(result, Err(IpcError::DeserializationError(_))));
236+
}
237+
238+
#[tokio::test]
239+
async fn message_too_large() {
240+
let (mut a, mut b) = IpcChannel::pair().unwrap();
241+
// Write a length prefix claiming 128MB
242+
let huge_size = (128 * 1024 * 1024u32).to_be_bytes();
243+
a.write_raw(&huge_size).await.unwrap();
244+
let result: Result<IpcMessage, _> = b.recv().await;
245+
assert!(matches!(result, Err(IpcError::MessageTooLarge(_, _))));
246+
}
247+
248+
#[tokio::test]
249+
async fn concurrent_send_recv() {
250+
let (mut a, mut b) = IpcChannel::pair().unwrap();
251+
let n = 100;
252+
253+
let sender = tokio::spawn(async move {
254+
for i in 0..n {
255+
a.send(&IpcMessage::FetchRequest {
256+
id: i,
257+
url: format!("https://example.com/{i}"),
258+
})
259+
.await
260+
.unwrap();
261+
}
262+
});
263+
264+
let receiver = tokio::spawn(async move {
265+
for i in 0..n {
266+
let msg: IpcMessage = b.recv().await.unwrap();
267+
assert_eq!(
268+
msg,
269+
IpcMessage::FetchRequest {
270+
id: i,
271+
url: format!("https://example.com/{i}"),
272+
}
273+
);
274+
}
275+
});
276+
277+
sender.await.unwrap();
278+
receiver.await.unwrap();
279+
}
280+
281+
#[tokio::test]
282+
async fn interleaved_ping_pong() {
283+
let (mut a, mut b) = IpcChannel::pair().unwrap();
284+
a.send(&IpcMessage::Ping).await.unwrap();
285+
let msg: IpcMessage = b.recv().await.unwrap();
286+
assert_eq!(msg, IpcMessage::Ping);
287+
b.send(&IpcMessage::Pong).await.unwrap();
288+
let msg: IpcMessage = a.recv().await.unwrap();
289+
assert_eq!(msg, IpcMessage::Pong);
290+
}
291+
292+
#[cfg(unix)]
293+
#[tokio::test]
294+
async fn from_raw_fd_round_trip() {
295+
use std::os::unix::io::AsRawFd;
296+
297+
// Create a Unix socketpair manually
298+
let (std_a, std_b) = std::os::unix::net::UnixStream::pair().unwrap();
299+
let fd_b = std_b.as_raw_fd();
300+
301+
// Reconstruct channel from raw fd
302+
std_a.set_nonblocking(true).unwrap();
303+
std_b.set_nonblocking(true).unwrap();
304+
let tok_a = tokio::net::UnixStream::from_std(std_a).unwrap();
305+
let (a_read, a_write) = tok_a.into_split();
306+
let mut chan_a = IpcChannel {
307+
reader: BufReader::new(a_read),
308+
writer: a_write,
309+
};
310+
311+
// Leak std_b so from_raw_fd can take ownership
312+
std::mem::forget(std_b);
313+
let mut chan_b = IpcChannel::from_raw_fd(fd_b).unwrap();
314+
315+
chan_a.send(&IpcMessage::Ping).await.unwrap();
316+
let msg: IpcMessage = chan_b.recv().await.unwrap();
317+
assert_eq!(msg, IpcMessage::Ping);
318+
}
319+
}

crates/ie-sandbox/src/error.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#[derive(Debug, thiserror::Error)]
2+
pub enum IpcError {
3+
#[error("connection closed by peer")]
4+
ConnectionClosed,
5+
6+
#[error("message too large: {0} bytes (max {1})")]
7+
MessageTooLarge(usize, usize),
8+
9+
#[error("serialization error: {0}")]
10+
SerializationError(String),
11+
12+
#[error("deserialization error: {0}")]
13+
DeserializationError(String),
14+
15+
#[error("IO error: {0}")]
16+
Io(#[from] std::io::Error),
17+
}

crates/ie-sandbox/src/ipc.rs

Lines changed: 0 additions & 18 deletions
This file was deleted.

crates/ie-sandbox/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
//! IPC between processes uses length-prefixed JSON messages over Unix domain sockets
1212
//! (or named pipes on Windows).
1313
14-
pub mod ipc;
14+
pub mod channel;
15+
pub mod error;
16+
pub mod message;
1517
pub mod process;
1618

19+
pub use channel::IpcChannel;
20+
pub use error::IpcError;
21+
pub use message::IpcMessage;
1722
pub use process::{ProcessKind, spawn_child};

0 commit comments

Comments
 (0)