Skip to content

Commit ac7e9f6

Browse files
committed
oci: Add varlink APIs using "splitdirfdstream"
First, I discovered that actually fd-passing with varlink generally works well, and I was misguided in thinking we needed jsonrpc-fdpass. Almost: one issue is that varlink doesn't have good support for passing *a lot* of file descriptors (which jsonrpc-fdpass was designed to handle). But upon some reflection, I realized we don't need to pass a file descriptor per file, all use cases here are fine with a directory fd plus filename. So here a new data stream format "splitdirfdstream" is implemented. We first now use that *internally* when we're doing a direct pull from containers-storage for reflinking/hardlinkling. But better: let's expose that data concept over varlink, where a varlink client can both pull or push container image layers that way. This paves the way to a very clear mechanism for us to integrate with containers-storage or other storage stacks (like containerd) in an agnostic way. We also now support `cfsctl oci copy` to copy across composefs repositories which is also implemented this way. Generated-by: OpenCode (Claude Opus 4.8) Signed-off-by: Colin Walters <walters@verbum.org>
1 parent ed0da92 commit ac7e9f6

22 files changed

Lines changed: 7012 additions & 2768 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ default-members = [
1414
"crates/composefs-setup-root",
1515
"crates/composefs-storage",
1616
"crates/composefs-erofs-debug",
17-
"crates/composefs-splitfdstream",
17+
"crates/composefs-splitdirfdstream",
1818
]
1919
resolver = "2"
2020

@@ -41,8 +41,6 @@ composefs-http = { version = "0.4.0", path = "crates/composefs-http", default-fe
4141
cap-std-ext = "5.1.2"
4242
ocidir = "0.7.2"
4343

44-
# JSON-RPC with FD passing for userns helper
45-
jsonrpc-fdpass = { version = "0.1.0", default-features = false }
4644
zlink = { version = "0.5", default-features = false, features = ["tokio", "introspection", "proxy", "server", "service", "tracing"] }
4745
zlink-core = { version = "0.5", default-features = false, features = ["introspection", "std", "tracing"] }
4846

crates/composefs-ctl/Cargo.toml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,24 @@ composefs = { workspace = true, features = ["varlink"] }
3333
composefs-boot = { workspace = true }
3434
composefs-oci = { workspace = true, optional = true, features = ["boot"] }
3535
composefs-http = { workspace = true, optional = true }
36-
cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.4.0", features = ["userns-helper"], optional = true }
36+
cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.4.0", features = ["layer-transfer"], optional = true }
3737
env_logger = { version = "0.11.0", default-features = false }
3838
hex = { version = "0.4.0", default-features = false }
3939
indicatif = { version = "0.17.0", default-features = false }
4040
libsystemd = { version = "0.7" }
4141
log = { version = "0.4", default-features = false }
42-
rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] }
42+
rustix = { version = "1.0.0", default-features = false, features = ["fs", "pipe", "process"] }
4343
serde = { version = "1.0", default-features = false, features = ["derive"] }
4444
serde_json = { version = "1.0", default-features = false, features = ["std"] }
45-
tokio = { version = "1.24.2", default-features = false, features = ["io-std", "io-util", "net", "rt", "sync"] }
45+
tokio = { version = "1.24.2", default-features = false, features = ["io-std", "io-util", "net", "rt", "rt-multi-thread", "sync"] }
4646
zlink = { workspace = true }
4747

48+
[dev-dependencies]
49+
composefs-splitdirfdstream = { path = "../composefs-splitdirfdstream", version = "0.4.0" }
50+
similar-asserts = "1.7.0"
51+
tar = { version = "0.4.38", default-features = false }
52+
tempfile = "3.8.0"
53+
tokio = { version = "1.24.2", default-features = false, features = ["macros", "rt-multi-thread"] }
54+
4855
[lints]
4956
workspace = true

crates/composefs-ctl/src/lib.rs

Lines changed: 246 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ impl From<ErofsVersion> for composefs::erofs::format::FormatVersion {
261261
/// start with `@`.
262262
#[cfg(feature = "oci")]
263263
#[derive(Debug, Clone)]
264-
pub(crate) enum OciReference {
264+
pub enum OciReference {
265265
/// A content-addressable digest such as `sha256:abcdef…`.
266266
Digest(composefs_oci::OciDigest),
267267
/// A named ref resolved through the repository's ref tree, typically
@@ -374,6 +374,31 @@ enum OciCommand {
374374
#[arg(long, value_enum, default_value_t = LocalFetchCli::Disabled)]
375375
local_fetch: LocalFetchCli,
376376
},
377+
/// Copy an OCI image (and its layers) from this repository into another
378+
/// composefs repository, reflinking object data when possible.
379+
///
380+
/// The source repository is selected by the global `--repo`/`--user`/
381+
/// `--system` flags. The destination is `--to`. Both repositories must use
382+
/// the same hash algorithm.
383+
///
384+
/// Pass `--zerocopy` to attempt reflink (then hardlink) instead of copying
385+
/// object data. This requires both repositories to be on the same
386+
/// filesystem and the caller to have `CAP_DAC_READ_SEARCH` (i.e. root).
387+
/// Without `--zerocopy`, objects are always copied, which is safe on any
388+
/// filesystem.
389+
Copy {
390+
/// Image to copy (tag name or `@digest`).
391+
image: OciReference,
392+
/// Path to the destination composefs repository.
393+
#[clap(long)]
394+
to: PathBuf,
395+
/// Tag to assign to the image in the destination repository.
396+
#[clap(long)]
397+
name: Option<String>,
398+
/// Use reflink/hardlink zero-copy transfer (requires same filesystem and root).
399+
#[clap(long)]
400+
zerocopy: bool,
401+
},
377402
/// List all tagged OCI images in the repository
378403
#[clap(name = "images")]
379404
ListImages {
@@ -1045,6 +1070,149 @@ where
10451070
Ok(repo)
10461071
}
10471072

1073+
/// Copy an OCI image (and all its layers) from one repository to another.
1074+
///
1075+
/// The source and destination may use different fs-verity hash algorithms
1076+
/// (`SrcID` vs `DestID`): the splitdirfdstream carries only raw bytes and
1077+
/// algorithm-independent OCI content digests, and the destination re-computes
1078+
/// each object's fs-verity digest under its own algorithm on import.
1079+
///
1080+
/// When `zerocopy` is `true` the destination attempts reflink/hardlink
1081+
/// (no data copy). This requires both repos to reside on the **same
1082+
/// filesystem** (same `st_dev`), and hardlink additionally requires matching
1083+
/// hash algorithms (fs-verity is enabled in-place on the shared inode).
1084+
/// If `zerocopy` is `true` but the algorithms differ, this function returns
1085+
/// an error up front rather than silently falling back.
1086+
///
1087+
/// Returns accumulated [`composefs_oci::ImportStats`] across all transferred
1088+
/// layers.
1089+
#[cfg(feature = "oci")]
1090+
pub async fn copy_image<SrcID: FsVerityHashValue, DestID: FsVerityHashValue>(
1091+
src: &Arc<Repository<SrcID>>,
1092+
dest: &Arc<Repository<DestID>>,
1093+
image: &OciReference,
1094+
name: Option<&str>,
1095+
zerocopy: bool,
1096+
) -> Result<composefs_oci::ImportStats> {
1097+
use std::fs::File;
1098+
use std::os::fd::AsFd as _;
1099+
1100+
use composefs_oci::ImportStats;
1101+
use composefs_oci::layer_content_id;
1102+
use composefs_oci::layer_sync::{
1103+
drain_splitdirfdstream_verified, produce_layer_splitdirfdstream,
1104+
};
1105+
1106+
// Zerocopy (hardlink) requires the same fs-verity algorithm on both sides
1107+
// because it enables verity in-place on the shared source inode.
1108+
if zerocopy && std::any::TypeId::of::<SrcID>() != std::any::TypeId::of::<DestID>() {
1109+
anyhow::bail!(
1110+
"--zerocopy requires matching hash algorithms; \
1111+
source uses {:?} but destination uses {:?}",
1112+
SrcID::ALGORITHM,
1113+
DestID::ALGORITHM,
1114+
);
1115+
}
1116+
1117+
let img = resolve_oci_image(src, image)?;
1118+
let manifest_json = img.read_manifest_json(src)?;
1119+
let config_json = img.read_config_json(src)?;
1120+
1121+
// Parse the ordered diff_ids from the config by opening the config object.
1122+
let open_cfg = composefs_oci::open_config(src, img.config_digest(), Some(img.config_verity()))
1123+
.context("opening config to read diff_ids")?;
1124+
let diff_id_strs: Vec<String> = open_cfg.config.rootfs().diff_ids().to_vec();
1125+
1126+
let mut layer_refs: Vec<(composefs_oci::OciDigest, DestID)> = Vec::new();
1127+
let mut total_stats = ImportStats {
1128+
layers: diff_id_strs.len() as u64,
1129+
..Default::default()
1130+
};
1131+
1132+
for diff_id_str in &diff_id_strs {
1133+
let diff_id: composefs_oci::OciDigest = diff_id_str
1134+
.parse()
1135+
.with_context(|| format!("parsing diff_id {diff_id_str}"))?;
1136+
let content_id = layer_content_id(&diff_id);
1137+
1138+
// Skip layers that are already present in the destination.
1139+
if let Some(dest_verity) = dest.has_stream(&content_id)? {
1140+
layer_refs.push((diff_id, dest_verity));
1141+
total_stats.layers_already_present += 1;
1142+
continue;
1143+
}
1144+
1145+
// Layer must exist in the source.
1146+
let src_verity = src
1147+
.has_stream(&content_id)?
1148+
.with_context(|| format!("source repository is missing layer {diff_id}"))?;
1149+
1150+
// Create a CLOEXEC pipe for the splitdirfdstream.
1151+
let (pipe_read, pipe_write) =
1152+
rustix::pipe::pipe_with(rustix::pipe::PipeFlags::CLOEXEC).context("pipe")?;
1153+
1154+
// Dup the source objects directory fd for use by the consumer.
1155+
let src_objects_fd = src.objects_dir().context("src objects_dir")?;
1156+
let objects_dup = rustix::io::dup(src_objects_fd.as_fd()).context("dup objects_dir")?;
1157+
1158+
// Producer thread: write the splitdirfdstream to the pipe.
1159+
// Uses SrcID — reads from the source repo's object store.
1160+
let src_clone = Arc::clone(src);
1161+
let produce_handle = std::thread::spawn(move || {
1162+
let wf = File::from(pipe_write);
1163+
produce_layer_splitdirfdstream(&src_clone, &src_verity, wf)
1164+
});
1165+
1166+
// Drain thread: read the splitdirfdstream and import into dest.
1167+
// Uses DestID — writes to the destination repo's object store,
1168+
// re-computing fs-verity digests under DestID's algorithm.
1169+
let dest_clone = Arc::clone(dest);
1170+
let diff_id_clone = diff_id.clone();
1171+
let drain_handle = tokio::task::spawn_blocking(move || {
1172+
drain_splitdirfdstream_verified(
1173+
dest_clone,
1174+
pipe_read,
1175+
vec![objects_dup],
1176+
&diff_id_clone,
1177+
zerocopy,
1178+
composefs::repository::ImportContext::default(),
1179+
)
1180+
});
1181+
1182+
// Await both sides. Surface the drain result FIRST: a verification
1183+
// failure (DiffIdMismatch) is the more informative diagnostic, and a
1184+
// producer that died mid-stream typically manifests as a drain error
1185+
// anyway. Only if the drain succeeded do we check the producer join.
1186+
let drain_result = drain_handle.await.context("drain task panicked")?;
1187+
let (dest_verity, layer_stats, _ctx) =
1188+
drain_result.map_err(|e| anyhow::anyhow!("layer copy failed for {diff_id}: {e}"))?;
1189+
1190+
// The drain succeeded, so the producer must have written a complete,
1191+
// valid stream; still join it to catch a late error/panic.
1192+
if let Err(e) = produce_handle
1193+
.join()
1194+
.map_err(|_| anyhow::anyhow!("producer panicked"))?
1195+
{
1196+
return Err(e.context("splitdirfdstream producer failed"));
1197+
}
1198+
1199+
total_stats.merge(&layer_stats);
1200+
layer_refs.push((diff_id, dest_verity));
1201+
}
1202+
1203+
// Finalize: write manifest + config splitstreams, generate EROFS, tag.
1204+
composefs_oci::layer_sync::finalize_oci_image(
1205+
dest,
1206+
&manifest_json,
1207+
&config_json,
1208+
&layer_refs,
1209+
name,
1210+
)
1211+
.context("finalize_oci_image")?;
1212+
1213+
Ok(total_stats)
1214+
}
1215+
10481216
/// Resolve an [`OciReference`] to an [`OciImage`].
10491217
#[cfg(feature = "oci")]
10501218
pub(crate) fn resolve_oci_image<ObjectID: FsVerityHashValue>(
@@ -1326,6 +1494,83 @@ where
13261494
println!("Boot image: {}", image_verity.to_hex());
13271495
}
13281496
}
1497+
OciCommand::Copy {
1498+
ref image,
1499+
ref to,
1500+
ref name,
1501+
zerocopy,
1502+
} => {
1503+
// Detect the destination's hash algorithm independently.
1504+
// Cross-algorithm copy is supported (the splitdirfdstream
1505+
// carries only algorithm-independent data and the destination
1506+
// re-digests every object under its own algorithm). The one
1507+
// exception is --zerocopy (hardlink), which enables fs-verity
1508+
// in-place on the shared source inode and therefore requires
1509+
// matching algorithms.
1510+
let dest_hash = resolve_hash_type(to, args.hash, !args.no_upgrade)
1511+
.with_context(|| format!("opening destination repository {}", to.display()))?;
1512+
1513+
// Helper: open dest, run copy_image, print results.
1514+
#[allow(clippy::too_many_arguments)]
1515+
async fn do_copy<SrcID, DestID>(
1516+
src: &Arc<Repository<SrcID>>,
1517+
to: &Path,
1518+
image: &OciReference,
1519+
name: Option<&str>,
1520+
zerocopy: bool,
1521+
insecure: bool,
1522+
require_verity: bool,
1523+
no_upgrade: bool,
1524+
) -> Result<()>
1525+
where
1526+
SrcID: FsVerityHashValue,
1527+
DestID: FsVerityHashValue,
1528+
{
1529+
let dest = open_repo_at::<DestID>(to, insecure, require_verity, no_upgrade)
1530+
.with_context(|| {
1531+
format!("opening destination repository {}", to.display())
1532+
})?;
1533+
let dest = Arc::new(dest);
1534+
let stats = copy_image(src, &dest, image, name, zerocopy).await?;
1535+
let tag_info = if let Some(n) = name {
1536+
format!(", tagged as {n}")
1537+
} else {
1538+
String::new()
1539+
};
1540+
println!("Copied image {image} to {}{tag_info}", to.display());
1541+
println!("Transfer: {stats}");
1542+
Ok(())
1543+
}
1544+
1545+
match dest_hash {
1546+
HashType::Sha256 => {
1547+
do_copy::<ObjectID, Sha256HashValue>(
1548+
&repo,
1549+
to,
1550+
image,
1551+
name.as_deref(),
1552+
zerocopy,
1553+
args.insecure,
1554+
args.require_verity,
1555+
args.no_upgrade,
1556+
)
1557+
.await?;
1558+
}
1559+
HashType::Sha512 => {
1560+
do_copy::<ObjectID, Sha512HashValue>(
1561+
&repo,
1562+
to,
1563+
image,
1564+
name.as_deref(),
1565+
zerocopy,
1566+
args.insecure,
1567+
args.require_verity,
1568+
args.no_upgrade,
1569+
)
1570+
.await?;
1571+
}
1572+
}
1573+
}
13291574
OciCommand::ListImages { json } => {
13301575
let images = composefs_oci::oci_image::list_images(&repo)?;
13311576

0 commit comments

Comments
 (0)