Skip to content

Commit eb7f1ef

Browse files
committed
composefs-splitfdstream: new crate
Add a client library for communicating with the containers-storage splitfdstream server. This enables importing OCI container layers via UNIX socket with file descriptor passing. Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
1 parent a5e8205 commit eb7f1ef

12 files changed

Lines changed: 874 additions & 2 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ composefs = { version = "0.3.0", path = "crates/composefs", default-features = f
2020
composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false }
2121
composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false }
2222
composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false }
23+
composefs-splitfdstream = { version = "0.3.0", path = "crates/composefs-splitfdstream", default-features = false }
2324

2425
[profile.dev.package.sha2]
2526
# this is *really* slow otherwise

crates/cfsctl/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ composefs = { workspace = true }
2424
composefs-boot = { workspace = true }
2525
composefs-oci = { workspace = true, optional = true }
2626
composefs-http = { workspace = true, optional = true }
27+
composefs-splitfdstream = { workspace = true }
2728
env_logger = { version = "0.11.0", default-features = false }
2829
hex = { version = "0.4.0", default-features = false }
2930
rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] }

crates/cfsctl/src/main.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,19 @@ enum HashType {
5959
enum OciCommand {
6060
/// Stores a tar file as a splitstream in the repository.
6161
ImportLayer {
62+
/// The layer digest (e.g., sha256:abc123...)
6263
digest: String,
64+
/// Optional reference name for the layer
6365
name: Option<String>,
66+
/// Import from splitfdstream server socket instead of stdin
67+
#[clap(long)]
68+
socket: Option<PathBuf>,
69+
/// Layer ID to request from splitfdstream server (defaults to digest)
70+
#[clap(long)]
71+
layer_id: Option<String>,
72+
/// Parent layer ID for delta computation
73+
#[clap(long)]
74+
parent_id: Option<String>,
6475
},
6576
/// Lists the contents of a tar stream
6677
LsLayer {
@@ -243,7 +254,30 @@ where
243254
}
244255
#[cfg(feature = "oci")]
245256
Command::Oci { cmd: oci_cmd } => match oci_cmd {
246-
OciCommand::ImportLayer { name, digest } => {
257+
OciCommand::ImportLayer {
258+
name,
259+
digest,
260+
socket,
261+
layer_id,
262+
parent_id,
263+
} => {
264+
if let Some(socket_path) = socket {
265+
// Import from splitfdstream server
266+
let effective_layer_id = layer_id.as_deref().unwrap_or(&digest);
267+
let content_identifier = composefs_oci::layer_identifier(&digest);
268+
let object_id = composefs_oci::import_from_splitfdstream(
269+
&Arc::new(repo),
270+
&socket_path,
271+
effective_layer_id,
272+
parent_id.as_deref(),
273+
&content_identifier,
274+
name.as_deref(),
275+
)?;
276+
println!("{}", object_id.to_id());
277+
return Ok(());
278+
}
279+
280+
// Import from stdin (default)
247281
let object_id = composefs_oci::import_layer(
248282
&Arc::new(repo),
249283
&digest,

crates/composefs-oci/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ anyhow = { version = "1.0.87", default-features = false }
1515
async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] }
1616
bytes = { version = "1", default-features = false }
1717
composefs = { workspace = true }
18+
composefs-splitfdstream = { workspace = true }
1819
containers-image-proxy = { version = "0.9.2", default-features = false }
1920
hex = { version = "0.4.0", default-features = false }
2021
indicatif = { version = "0.17.0", default-features = false, features = ["tokio"] }

crates/composefs-oci/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
1313
pub mod image;
1414
pub mod skopeo;
15+
pub mod splitfdstream;
1516
pub mod tar;
1617

18+
pub use splitfdstream::import_from_splitfdstream;
19+
1720
use std::{collections::HashMap, io::Read, sync::Arc};
1821

1922
use anyhow::{bail, ensure, Context, Result};
@@ -28,7 +31,8 @@ use crate::tar::get_entry;
2831

2932
type ContentAndVerity<ObjectID> = (String, ObjectID);
3033

31-
fn layer_identifier(diff_id: &str) -> String {
34+
/// Generate the content identifier for a layer based on its diff_id.
35+
pub fn layer_identifier(diff_id: &str) -> String {
3236
format!("oci-layer-{diff_id}")
3337
}
3438

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
//! Import OCI layers from containers-storage via splitfdstream.
2+
//!
3+
//! This module provides functionality to import container layers directly from
4+
//! containers-storage using the splitfdstream protocol. This enables efficient
5+
//! layer transfer with file descriptor passing for external content objects.
6+
//!
7+
//! The splitfdstream format uses:
8+
//! - Inline data for tar headers and small files
9+
//! - File descriptor references for large files (passed via SCM_RIGHTS)
10+
//!
11+
//! Files are processed sequentially: tar header (inline) followed by content
12+
//! (external FD). Reflink is attempted first for efficient copying.
13+
14+
use std::path::Path;
15+
use std::sync::Arc;
16+
17+
use anyhow::{Context, Result};
18+
use rustix::fs::{copy_file_range, fstat, ftruncate};
19+
use rustix::io::pread;
20+
21+
use composefs::fsverity::FsVerityHashValue;
22+
use composefs::repository::Repository;
23+
24+
use composefs_splitfdstream::{OwnedFd, SplitFDStreamChunk, SplitFDStreamClient, SplitFDStreamReader};
25+
26+
use crate::skopeo::TAR_LAYER_CONTENT_TYPE;
27+
28+
/// Result of storing an FD's contents - the object ID and size.
29+
struct StoredObject<ObjectID> {
30+
object_id: ObjectID,
31+
size: u64,
32+
}
33+
34+
/// Import a layer from a splitfdstream server into the repository.
35+
///
36+
/// This function connects to a containers-storage splitfdstream server,
37+
/// retrieves the layer data and file descriptors, then converts the
38+
/// splitfdstream format to the composefs splitstream format.
39+
///
40+
/// FDs are processed immediately as each batch is received, keeping max ~200
41+
/// FDs open at a time to avoid exhausting file descriptor limits.
42+
pub fn import_from_splitfdstream<ObjectID: FsVerityHashValue>(
43+
repo: &Arc<Repository<ObjectID>>,
44+
socket_path: impl AsRef<Path>,
45+
layer_id: &str,
46+
parent_id: Option<&str>,
47+
content_identifier: &str,
48+
reference: Option<&str>,
49+
) -> Result<ObjectID> {
50+
// Connect to the splitfdstream server
51+
let mut client = SplitFDStreamClient::connect(socket_path.as_ref())
52+
.with_context(|| format!("Connecting to splitfdstream socket {:?}", socket_path.as_ref()))?;
53+
54+
// Get batch processor
55+
let mut batch_proc = client
56+
.get_splitfdstream(layer_id, parent_id)
57+
.with_context(|| format!("Getting splitfdstream for layer {}", layer_id))?;
58+
59+
// Track if reflink works (detected on first FD)
60+
let mut reflink_works: Option<bool> = None;
61+
62+
// Map of FD index -> stored object (object_id, size)
63+
let mut stored_objects: Vec<StoredObject<ObjectID>> = Vec::new();
64+
65+
// Receive all batches, process FDs immediately
66+
while let Some((fds, _stream_data)) = batch_proc.next_batch()? {
67+
// Process each FD immediately: copy to repo, then close
68+
for fd in fds {
69+
let stat = fstat(&fd).context("Getting file size from FD")?;
70+
let size = stat.st_size as u64;
71+
72+
let object_id = store_fd_to_repo(repo, &fd, size, &mut reflink_works)?;
73+
stored_objects.push(StoredObject { object_id, size });
74+
// fd is dropped here, closing the file descriptor
75+
}
76+
}
77+
78+
// Get complete stream data
79+
let stream_data = batch_proc.stream_data()?;
80+
81+
// Create a splitstream writer
82+
let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE);
83+
84+
// Parse stream and build splitstream
85+
let mut reader = SplitFDStreamReader::new(stream_data.to_vec());
86+
87+
while let Some(chunk) = reader.next_chunk()? {
88+
match chunk {
89+
SplitFDStreamChunk::Inline(data) => {
90+
writer.write_inline(&data);
91+
}
92+
SplitFDStreamChunk::External(fd_idx) => {
93+
let stored = stored_objects.get(fd_idx).ok_or_else(|| {
94+
anyhow::anyhow!(
95+
"FD index {} out of range (have {} objects)",
96+
fd_idx,
97+
stored_objects.len()
98+
)
99+
})?;
100+
101+
writer.add_external_size(stored.size);
102+
writer.write_reference(stored.object_id.clone())?;
103+
}
104+
}
105+
}
106+
107+
let object_id = repo.write_stream(writer, content_identifier, reference)?;
108+
109+
Ok(object_id)
110+
}
111+
112+
/// Store FD contents to repository, trying reflink first.
113+
fn store_fd_to_repo<ObjectID: FsVerityHashValue>(
114+
repo: &Arc<Repository<ObjectID>>,
115+
fd: &OwnedFd,
116+
size: u64,
117+
reflink_works: &mut Option<bool>,
118+
) -> Result<ObjectID> {
119+
if reflink_works.is_none() {
120+
match try_reflink_to_repo(repo, fd, size) {
121+
Ok(id) => {
122+
*reflink_works = Some(true);
123+
return Ok(id);
124+
}
125+
Err(_) => {
126+
*reflink_works = Some(false);
127+
}
128+
}
129+
} else if *reflink_works == Some(true) {
130+
if let Ok(id) = try_reflink_to_repo(repo, fd, size) {
131+
return Ok(id);
132+
}
133+
}
134+
135+
let data = read_fd_contents(fd, size)?;
136+
repo.ensure_object(&data).context("Storing object")
137+
}
138+
139+
/// Try to reflink the file descriptor contents to the repository.
140+
fn try_reflink_to_repo<ObjectID: FsVerityHashValue>(
141+
repo: &Arc<Repository<ObjectID>>,
142+
src_fd: &OwnedFd,
143+
size: u64,
144+
) -> Result<ObjectID> {
145+
let tmpfile = repo
146+
.create_object_tmpfile()
147+
.context("Creating object tmpfile")?;
148+
149+
try_copy_file_range(src_fd, &tmpfile, size)?;
150+
151+
repo.finalize_object_tmpfile(tmpfile.into(), size)
152+
.context("Finalizing reflinked object")
153+
}
154+
155+
/// Try to copy file contents using copy_file_range (which may use reflink).
156+
fn try_copy_file_range(src_fd: &OwnedFd, dst_fd: &OwnedFd, size: u64) -> Result<()> {
157+
if size == 0 {
158+
return Ok(());
159+
}
160+
161+
ftruncate(dst_fd, size).context("Truncating destination file")?;
162+
163+
let mut src_offset = 0u64;
164+
let mut dst_offset = 0u64;
165+
let mut remaining = size;
166+
167+
while remaining > 0 {
168+
let copied = copy_file_range(
169+
src_fd,
170+
Some(&mut src_offset),
171+
dst_fd,
172+
Some(&mut dst_offset),
173+
remaining as usize,
174+
)
175+
.context("copy_file_range failed")?;
176+
177+
if copied == 0 {
178+
anyhow::bail!("copy_file_range returned 0 before completing");
179+
}
180+
181+
remaining -= copied as u64;
182+
}
183+
184+
Ok(())
185+
}
186+
187+
/// Read the entire contents of a file descriptor using pread.
188+
fn read_fd_contents(fd: &OwnedFd, expected_size: u64) -> Result<Vec<u8>> {
189+
let size = expected_size as usize;
190+
let mut data = vec![0u8; size];
191+
let mut offset = 0u64;
192+
let mut total_read = 0usize;
193+
194+
while total_read < size {
195+
let n = pread(fd, &mut data[total_read..], offset)
196+
.with_context(|| format!("Reading from fd at offset {}", offset))?;
197+
if n == 0 {
198+
data.truncate(total_read);
199+
break;
200+
}
201+
total_read += n;
202+
offset += n as u64;
203+
}
204+
205+
Ok(data)
206+
}
207+
208+
#[cfg(test)]
209+
mod tests {
210+
// Integration tests would require a running splitfdstream server
211+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "composefs-splitfdstream"
3+
description = "Splitfdstream client for composefs"
4+
keywords = ["composefs", "oci", "splitstream"]
5+
6+
edition.workspace = true
7+
license.workspace = true
8+
readme.workspace = true
9+
repository.workspace = true
10+
rust-version.workspace = true
11+
version.workspace = true
12+
13+
[dependencies]
14+
anyhow = { version = "1.0", default-features = false }
15+
rustix = { version = "1.0", default-features = false, features = ["net", "fs"] }
16+
serde = { version = "1.0", default-features = false, features = ["derive"] }
17+
serde_json = { version = "1.0", default-features = false, features = ["std"] }
18+
thiserror = { version = "1.0", default-features = false }
19+
20+
[lints]
21+
workspace = true

0 commit comments

Comments
 (0)