Skip to content

Commit b5feb85

Browse files
committed
oci: Add varlink APIs using "splitdirfdstream"
First, I discovered that actually fd-passing with varlink generally works well, and I was misguided in thinking we needed jsonrpc-fdpass. Almost: one issue is that varlink doesn't have good support for passing *a lot* of file descriptors (which jsonrpc-fdpass was designed to handle). But upon some reflection, I realized we don't need to pass a file descriptor per file, all use cases here are fine with a directory fd plus filename. So here a new data stream format "splitdirfdstream" is implemented. We first now use that *internally* when we're doing a direct pull from containers-storage for reflinking/hardlinkling. But better: let's expose that data concept over varlink, where a varlink client can both pull or push container image layers that way. This paves the way to a very clear mechanism for us to integrate with containers-storage or other storage stacks (like containerd) in an agnostic way. We also now support `cfsctl oci copy` to copy across composefs repositories which is also implemented this way. Generated-by: OpenCode (Claude Opus 4.8) Signed-off-by: Colin Walters <walters@verbum.org>
1 parent cbc65ee commit b5feb85

30 files changed

Lines changed: 9446 additions & 2885 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ default-members = [
1414
"crates/composefs-setup-root",
1515
"crates/composefs-storage",
1616
"crates/composefs-erofs-debug",
17-
"crates/composefs-splitfdstream",
17+
"crates/composefs-splitdirfdstream",
1818
]
1919
resolver = "2"
2020

@@ -41,8 +41,6 @@ composefs-http = { version = "0.4.0", path = "crates/composefs-http", default-fe
4141
cap-std-ext = "5.1.2"
4242
ocidir = "0.7.2"
4343

44-
# JSON-RPC with FD passing for userns helper
45-
jsonrpc-fdpass = { version = "0.1.0", default-features = false }
4644
zlink = { version = "0.5", default-features = false, features = ["tokio", "introspection", "proxy", "server", "service", "tracing"] }
4745
zlink-core = { version = "0.5", default-features = false, features = ["introspection", "std", "tracing"] }
4846

crates/composefs-ctl/Cargo.toml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ path = "src/main.rs"
1919
[features]
2020
default = ['pre-6.15', 'oci', 'containers-storage']
2121
http = ['composefs-http']
22-
oci = ['composefs-oci', 'composefs-oci/varlink']
22+
oci = ['composefs-oci', 'composefs-oci/varlink', 'composefs-splitdirfdstream']
2323
containers-storage = ['composefs-oci/containers-storage', 'cstorage']
2424
rhel9 = ['composefs/rhel9']
2525
'pre-6.15' = ['composefs/pre-6.15']
@@ -32,18 +32,25 @@ comfy-table = { version = "7.1", default-features = false }
3232
composefs = { workspace = true, features = ["varlink"] }
3333
composefs-boot = { workspace = true }
3434
composefs-oci = { workspace = true, optional = true, features = ["boot"] }
35+
composefs-splitdirfdstream = { path = "../composefs-splitdirfdstream", version = "0.4.0", optional = true }
3536
composefs-http = { workspace = true, optional = true }
36-
cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.4.0", features = ["userns-helper"], optional = true }
37+
cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.4.0", features = ["layer-transfer"], optional = true }
3738
env_logger = { version = "0.11.0", default-features = false }
3839
hex = { version = "0.4.0", default-features = false }
3940
indicatif = { version = "0.17.0", default-features = false }
4041
libsystemd = { version = "0.7" }
4142
log = { version = "0.4", default-features = false }
42-
rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] }
43+
rustix = { version = "1.0.0", default-features = false, features = ["fs", "net", "pipe", "process"] }
4344
serde = { version = "1.0", default-features = false, features = ["derive"] }
4445
serde_json = { version = "1.0", default-features = false, features = ["std"] }
45-
tokio = { version = "1.24.2", default-features = false, features = ["io-std", "io-util", "net", "rt", "sync"] }
46+
tokio = { version = "1.24.2", default-features = false, features = ["io-std", "io-util", "net", "rt", "rt-multi-thread", "sync"] }
4647
zlink = { workspace = true }
4748

49+
[dev-dependencies]
50+
similar-asserts = "1.7.0"
51+
tar = { version = "0.4.38", default-features = false }
52+
tempfile = "3.8.0"
53+
tokio = { version = "1.24.2", default-features = false, features = ["macros", "rt-multi-thread"] }
54+
4855
[lints]
4956
workspace = true

crates/composefs-ctl/src/lib.rs

Lines changed: 258 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ impl From<ErofsVersion> for composefs::erofs::format::FormatVersion {
261261
/// start with `@`.
262262
#[cfg(feature = "oci")]
263263
#[derive(Debug, Clone)]
264-
pub(crate) enum OciReference {
264+
pub enum OciReference {
265265
/// A content-addressable digest such as `sha256:abcdef…`.
266266
Digest(composefs_oci::OciDigest),
267267
/// A named ref resolved through the repository's ref tree, typically
@@ -374,6 +374,31 @@ enum OciCommand {
374374
#[arg(long, value_enum, default_value_t = LocalFetchCli::Disabled)]
375375
local_fetch: LocalFetchCli,
376376
},
377+
/// Copy an OCI image (and its layers) from this repository into another
378+
/// composefs repository, reflinking object data when possible.
379+
///
380+
/// The source repository is selected by the global `--repo`/`--user`/
381+
/// `--system` flags. The destination is `--to`. Both repositories must use
382+
/// the same hash algorithm.
383+
///
384+
/// Pass `--zerocopy` to attempt reflink (then hardlink) instead of copying
385+
/// object data. This requires both repositories to be on the same
386+
/// filesystem and the caller to have `CAP_DAC_READ_SEARCH` (i.e. root).
387+
/// Without `--zerocopy`, objects are always copied, which is safe on any
388+
/// filesystem.
389+
Copy {
390+
/// Image to copy (tag name or `@digest`).
391+
image: OciReference,
392+
/// Path to the destination composefs repository.
393+
#[clap(long)]
394+
to: PathBuf,
395+
/// Tag to assign to the image in the destination repository.
396+
#[clap(long)]
397+
name: Option<String>,
398+
/// Use reflink/hardlink zero-copy transfer (requires same filesystem and root).
399+
#[clap(long)]
400+
zerocopy: bool,
401+
},
377402
/// List all tagged OCI images in the repository
378403
#[clap(name = "images")]
379404
ListImages {
@@ -840,12 +865,18 @@ pub async fn run_if_socket_activated() -> Result<bool> {
840865
if std::env::args_os().len() != 1 {
841866
return Ok(false);
842867
}
843-
let Some(listener) = crate::varlink::try_activated_listener()? else {
844-
return Ok(false);
845-
};
846868
let service = crate::varlink::CfsctlService::activated();
847-
crate::varlink::serve_activated(service, listener).await?;
848-
Ok(true)
869+
match crate::varlink::try_activated_listener()? {
870+
Some(crate::varlink::ActivatedSocket::Connected(l)) => {
871+
crate::varlink::serve_activated(service, l).await?;
872+
Ok(true)
873+
}
874+
Some(crate::varlink::ActivatedSocket::Listening(listener)) => {
875+
crate::varlink::serve_on_listener(service, listener).await?;
876+
Ok(true)
877+
}
878+
None => Ok(false),
879+
}
849880
}
850881

851882
/// Top-level dispatch: handle init specially, otherwise open repo and run.
@@ -1045,6 +1076,150 @@ where
10451076
Ok(repo)
10461077
}
10471078

1079+
/// Copy an OCI image (and all its layers) from one repository to another.
1080+
///
1081+
/// The source and destination may use different fs-verity hash algorithms
1082+
/// (`SrcID` vs `DestID`): the splitdirfdstream carries only raw bytes and
1083+
/// algorithm-independent OCI content digests, and the destination re-computes
1084+
/// each object's fs-verity digest under its own algorithm on import.
1085+
///
1086+
/// When `zerocopy` is `true` the destination attempts reflink/hardlink
1087+
/// (no data copy). This requires both repos to reside on the **same
1088+
/// filesystem** (same `st_dev`), and hardlink additionally requires matching
1089+
/// hash algorithms (fs-verity is enabled in-place on the shared inode).
1090+
/// If `zerocopy` is `true` but the algorithms differ, this function returns
1091+
/// an error up front rather than silently falling back.
1092+
///
1093+
/// Returns accumulated [`composefs_oci::ImportStats`] across all transferred
1094+
/// layers.
1095+
#[cfg(feature = "oci")]
1096+
pub async fn copy_image<SrcID: FsVerityHashValue, DestID: FsVerityHashValue>(
1097+
src: &Arc<Repository<SrcID>>,
1098+
dest: &Arc<Repository<DestID>>,
1099+
image: &OciReference,
1100+
name: Option<&str>,
1101+
zerocopy: bool,
1102+
) -> Result<composefs_oci::ImportStats> {
1103+
use std::fs::File;
1104+
use std::os::fd::AsFd as _;
1105+
1106+
use composefs_oci::ImportStats;
1107+
use composefs_oci::layer_content_id;
1108+
use composefs_oci::layer_sync::{
1109+
drain_splitdirfdstream_verified, produce_layer_splitdirfdstream,
1110+
};
1111+
1112+
// Zerocopy (hardlink) requires the same fs-verity algorithm on both sides
1113+
// because it enables verity in-place on the shared source inode.
1114+
if zerocopy && std::any::TypeId::of::<SrcID>() != std::any::TypeId::of::<DestID>() {
1115+
anyhow::bail!(
1116+
"--zerocopy requires matching hash algorithms; \
1117+
source uses {:?} but destination uses {:?}",
1118+
SrcID::ALGORITHM,
1119+
DestID::ALGORITHM,
1120+
);
1121+
}
1122+
1123+
let img = resolve_oci_image(src, image)?;
1124+
let manifest_json = img.read_manifest_json(src)?;
1125+
let config_json = img.read_config_json(src)?;
1126+
1127+
// Parse the ordered diff_ids from the config by opening the config object.
1128+
let open_cfg = composefs_oci::open_config(src, img.config_digest(), Some(img.config_verity()))
1129+
.context("opening config to read diff_ids")?;
1130+
let diff_id_strs: Vec<String> = open_cfg.config.rootfs().diff_ids().to_vec();
1131+
1132+
let mut layer_refs: Vec<(composefs_oci::OciDigest, DestID)> = Vec::new();
1133+
let mut total_stats = ImportStats {
1134+
layers: diff_id_strs.len() as u64,
1135+
..Default::default()
1136+
};
1137+
1138+
for diff_id_str in &diff_id_strs {
1139+
let diff_id: composefs_oci::OciDigest = diff_id_str
1140+
.parse()
1141+
.with_context(|| format!("parsing diff_id {diff_id_str}"))?;
1142+
let content_id = layer_content_id(&diff_id);
1143+
1144+
// Skip layers that are already present in the destination.
1145+
if let Some(dest_verity) = dest.has_stream(&content_id)? {
1146+
layer_refs.push((diff_id, dest_verity));
1147+
total_stats.layers_already_present += 1;
1148+
continue;
1149+
}
1150+
1151+
// Layer must exist in the source.
1152+
let src_verity = src
1153+
.has_stream(&content_id)?
1154+
.with_context(|| format!("source repository is missing layer {diff_id}"))?;
1155+
1156+
// Create a CLOEXEC pipe for the splitdirfdstream.
1157+
let (pipe_read, pipe_write) =
1158+
rustix::pipe::pipe_with(rustix::pipe::PipeFlags::CLOEXEC).context("pipe")?;
1159+
1160+
// Dup the source objects directory fd for use by the consumer.
1161+
let src_objects_fd = src.objects_dir().context("src objects_dir")?;
1162+
let objects_dup = rustix::io::dup(src_objects_fd.as_fd()).context("dup objects_dir")?;
1163+
1164+
// Producer thread: write the splitdirfdstream to the pipe.
1165+
// Uses SrcID — reads from the source repo's object store.
1166+
// The single objects dir is passed directly at index 0 (no sparse layout).
1167+
let src_clone = Arc::clone(src);
1168+
let produce_handle = std::thread::spawn(move || {
1169+
let wf = File::from(pipe_write);
1170+
produce_layer_splitdirfdstream(&src_clone, &src_verity, 0, wf)
1171+
});
1172+
1173+
// Drain thread: read the splitdirfdstream and import into dest.
1174+
// Uses DestID — writes to the destination repo's object store,
1175+
// re-computing fs-verity digests under DestID's algorithm.
1176+
let dest_clone = Arc::clone(dest);
1177+
let diff_id_clone = diff_id.clone();
1178+
let drain_handle = tokio::task::spawn_blocking(move || {
1179+
drain_splitdirfdstream_verified(
1180+
dest_clone,
1181+
pipe_read,
1182+
vec![objects_dup],
1183+
&diff_id_clone,
1184+
zerocopy,
1185+
composefs::repository::ImportContext::default(),
1186+
)
1187+
});
1188+
1189+
// Await both sides. Surface the drain result FIRST: a verification
1190+
// failure (DiffIdMismatch) is the more informative diagnostic, and a
1191+
// producer that died mid-stream typically manifests as a drain error
1192+
// anyway. Only if the drain succeeded do we check the producer join.
1193+
let drain_result = drain_handle.await.context("drain task panicked")?;
1194+
let (dest_verity, layer_stats, _ctx) =
1195+
drain_result.map_err(|e| anyhow::anyhow!("layer copy failed for {diff_id}: {e}"))?;
1196+
1197+
// The drain succeeded, so the producer must have written a complete,
1198+
// valid stream; still join it to catch a late error/panic.
1199+
if let Err(e) = produce_handle
1200+
.join()
1201+
.map_err(|_| anyhow::anyhow!("producer panicked"))?
1202+
{
1203+
return Err(e.context("splitdirfdstream producer failed"));
1204+
}
1205+
1206+
total_stats.merge(&layer_stats);
1207+
layer_refs.push((diff_id, dest_verity));
1208+
}
1209+
1210+
// Finalize: write manifest + config splitstreams, generate EROFS, tag.
1211+
composefs_oci::layer_sync::finalize_oci_image(
1212+
dest,
1213+
&manifest_json,
1214+
&config_json,
1215+
&layer_refs,
1216+
name,
1217+
)
1218+
.context("finalize_oci_image")?;
1219+
1220+
Ok(total_stats)
1221+
}
1222+
10481223
/// Resolve an [`OciReference`] to an [`OciImage`].
10491224
#[cfg(feature = "oci")]
10501225
pub(crate) fn resolve_oci_image<ObjectID: FsVerityHashValue>(
@@ -1326,6 +1501,83 @@ where
13261501
println!("Boot image: {}", image_verity.to_hex());
13271502
}
13281503
}
1504+
OciCommand::Copy {
1505+
ref image,
1506+
ref to,
1507+
ref name,
1508+
zerocopy,
1509+
} => {
1510+
// Detect the destination's hash algorithm independently.
1511+
// Cross-algorithm copy is supported (the splitdirfdstream
1512+
// carries only algorithm-independent data and the destination
1513+
// re-digests every object under its own algorithm). The one
1514+
// exception is --zerocopy (hardlink), which enables fs-verity
1515+
// in-place on the shared source inode and therefore requires
1516+
// matching algorithms.
1517+
let dest_hash = resolve_hash_type(to, args.hash, !args.no_upgrade)
1518+
.with_context(|| format!("opening destination repository {}", to.display()))?;
1519+
1520+
// Helper: open dest, run copy_image, print results.
1521+
#[allow(clippy::too_many_arguments)]
1522+
async fn do_copy<SrcID, DestID>(
1523+
src: &Arc<Repository<SrcID>>,
1524+
to: &Path,
1525+
image: &OciReference,
1526+
name: Option<&str>,
1527+
zerocopy: bool,
1528+
insecure: bool,
1529+
require_verity: bool,
1530+
no_upgrade: bool,
1531+
) -> Result<()>
1532+
where
1533+
SrcID: FsVerityHashValue,
1534+
DestID: FsVerityHashValue,
1535+
{
1536+
let dest = open_repo_at::<DestID>(to, insecure, require_verity, no_upgrade)
1537+
.with_context(|| {
1538+
format!("opening destination repository {}", to.display())
1539+
})?;
1540+
let dest = Arc::new(dest);
1541+
let stats = copy_image(src, &dest, image, name, zerocopy).await?;
1542+
let tag_info = if let Some(n) = name {
1543+
format!(", tagged as {n}")
1544+
} else {
1545+
String::new()
1546+
};
1547+
println!("Copied image {image} to {}{tag_info}", to.display());
1548+
println!("Transfer: {stats}");
1549+
Ok(())
1550+
}
1551+
1552+
match dest_hash {
1553+
HashType::Sha256 => {
1554+
do_copy::<ObjectID, Sha256HashValue>(
1555+
&repo,
1556+
to,
1557+
image,
1558+
name.as_deref(),
1559+
zerocopy,
1560+
args.insecure,
1561+
args.require_verity,
1562+
args.no_upgrade,
1563+
)
1564+
.await?;
1565+
}
1566+
HashType::Sha512 => {
1567+
do_copy::<ObjectID, Sha512HashValue>(
1568+
&repo,
1569+
to,
1570+
image,
1571+
name.as_deref(),
1572+
zerocopy,
1573+
args.insecure,
1574+
args.require_verity,
1575+
args.no_upgrade,
1576+
)
1577+
.await?;
1578+
}
1579+
}
1580+
}
13291581
OciCommand::ListImages { json } => {
13301582
let images = composefs_oci::oci_image::list_images(&repo)?;
13311583

0 commit comments

Comments
 (0)