diff --git a/Cargo.lock b/Cargo.lock index f81b0dfc..5a07ec46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,6 +340,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "krata-xenvchan" +version = "0.0.24" +dependencies = [ + "krata-xenvchan-sys", + "libc", + "thiserror", +] + +[[package]] +name = "krata-xenvchan-sys" +version = "0.0.24" +dependencies = [ + "libc", + "pkg-config", +] + [[package]] name = "libc" version = "0.2.169" diff --git a/Cargo.toml b/Cargo.toml index 90f24592..fc187940 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,8 @@ members = [ "crates/xen/xenclient", "crates/xen/xenevtchn", "crates/xen/xengnt", + "crates/xen/xenvchan", + "crates/xen/xenvchan-sys", "crates/xen/xenplatform", "crates/xen/xenstore", ] diff --git a/crates/xen/xencall/src/lib.rs b/crates/xen/xencall/src/lib.rs index 22d9285a..c4e0f5b1 100644 --- a/crates/xen/xencall/src/lib.rs +++ b/crates/xen/xencall/src/lib.rs @@ -27,13 +27,14 @@ use std::sync::Arc; use std::time::Duration; use sys::{ CpuId, E820Entry, ForeignMemoryMap, PhysdevMapPirq, SetDomainHandle, Sysctl, SysctlCputopo, - SysctlCputopoinfo, SysctlPhysinfo, SysctlPmOp, SysctlPmOpValue, SysctlReadconsole, - SysctlSetCpuFreqGov, SysctlValue, VcpuGuestContextAny, HYPERVISOR_PHYSDEV_OP, - HYPERVISOR_SYSCTL, PHYSDEVOP_MAP_PIRQ, XEN_DOMCTL_MAX_INTERFACE_VERSION, + SysctlCputopoinfo, SysctlGetdomaininfolist, SysctlPhysinfo, SysctlPmOp, SysctlPmOpValue, + SysctlReadconsole, SysctlSetCpuFreqGov, SysctlValue, VcpuGuestContextAny, + HYPERVISOR_PHYSDEV_OP, HYPERVISOR_SYSCTL, PHYSDEVOP_MAP_PIRQ, XEN_DOMCTL_MAX_INTERFACE_VERSION, XEN_DOMCTL_MIN_INTERFACE_VERSION, XEN_DOMCTL_SETDOMAINHANDLE, XEN_MEM_SET_MEMORY_MAP, - XEN_SYSCTL_CPUTOPOINFO, XEN_SYSCTL_MAX_INTERFACE_VERSION, XEN_SYSCTL_MIN_INTERFACE_VERSION, - XEN_SYSCTL_PHYSINFO, XEN_SYSCTL_PM_OP, XEN_SYSCTL_PM_OP_DISABLE_TURBO, - XEN_SYSCTL_PM_OP_ENABLE_TURBO, XEN_SYSCTL_PM_OP_SET_CPUFREQ_GOV, XEN_SYSCTL_READCONSOLE, + XEN_SYSCTL_CPUTOPOINFO, XEN_SYSCTL_GETDOMAININFOLIST, XEN_SYSCTL_MAX_INTERFACE_VERSION, + XEN_SYSCTL_MIN_INTERFACE_VERSION, XEN_SYSCTL_PHYSINFO, XEN_SYSCTL_PM_OP, + XEN_SYSCTL_PM_OP_DISABLE_TURBO, XEN_SYSCTL_PM_OP_ENABLE_TURBO, + XEN_SYSCTL_PM_OP_SET_CPUFREQ_GOV, XEN_SYSCTL_READCONSOLE, }; use tokio::time::sleep; @@ -417,6 +418,37 @@ impl XenCall { Ok(unsafe { domctl.value.get_domain_info }) } + /// Enumerate Xen domains starting from `first_domid`. + /// + /// Uses XEN_SYSCTL_getdomaininfolist which correctly enumerates domains + /// on all Xen versions (unlike XEN_DOMCTL_GETDOMAININFO which does + /// exact domid lookup on Xen 4.17+). + pub async fn get_domain_info_list( + &self, + first_domid: u16, + max_domains: u32, + ) -> Result> { + let mut buffer = vec![GetDomainInfo::default(); max_domains as usize]; + let mut sysctl = Sysctl { + cmd: XEN_SYSCTL_GETDOMAININFOLIST, + interface_version: self.sysctl_interface_version, + value: SysctlValue { + getdomaininfolist: SysctlGetdomaininfolist { + first_domain: first_domid, + pad: 0, + max_domains, + buffer: buffer.as_mut_ptr() as u64, + num_domains: 0, + }, + }, + }; + self.hypercall1(HYPERVISOR_SYSCTL, addr_of_mut!(sysctl) as c_ulong) + .await?; + let count = unsafe { sysctl.value.getdomaininfolist.num_domains } as usize; + buffer.truncate(count); + Ok(buffer) + } + pub async fn create_domain(&self, create_domain: CreateDomain) -> Result { trace!( "domctl fd={} create_domain create_domain={:?}", diff --git a/crates/xen/xencall/src/sys.rs b/crates/xen/xencall/src/sys.rs index 10e7ae57..bd3ba024 100644 --- a/crates/xen/xencall/src/sys.rs +++ b/crates/xen/xencall/src/sys.rs @@ -807,10 +807,24 @@ pub struct SysctlCputopoinfo { pub handle: c_ulong, } +/// Buffer-based domain enumeration via XEN_SYSCTL_getdomaininfolist. +/// Returns info for domains with domid >= first_domain, up to max_domains. +/// Layout matches xen/include/public/sysctl.h xen_sysctl_getdomaininfolist. +#[repr(C)] +#[derive(Clone, Copy, Debug, Default)] +pub struct SysctlGetdomaininfolist { + pub first_domain: u16, + pub pad: u16, + pub max_domains: u32, + pub buffer: u64, // XEN_GUEST_HANDLE_64(xen_domctl_getdomaininfo_t) + pub num_domains: u32, +} + #[repr(C)] pub union SysctlValue { pub console: SysctlReadconsole, pub cputopoinfo: SysctlCputopoinfo, + pub getdomaininfolist: SysctlGetdomaininfolist, pub pm_op: SysctlPmOp, pub phys_info: SysctlPhysinfo, pub pad: [u8; 128], @@ -825,6 +839,7 @@ pub struct Sysctl { pub const XEN_SYSCTL_READCONSOLE: u32 = 1; pub const XEN_SYSCTL_PHYSINFO: u32 = 3; +pub const XEN_SYSCTL_GETDOMAININFOLIST: u32 = 6; pub const XEN_SYSCTL_PM_OP: u32 = 12; pub const XEN_SYSCTL_CPUTOPOINFO: u32 = 16; diff --git a/crates/xen/xenstore/src/bus.rs b/crates/xen/xenstore/src/bus.rs index ba14f7cf..ca47d405 100644 --- a/crates/xen/xenstore/src/bus.rs +++ b/crates/xen/xenstore/src/bus.rs @@ -32,18 +32,6 @@ const XEN_BUS_PATHS: &[&str] = &["/var/run/xenstored/socket", "/dev/xen/xenbus"] const XEN_BUS_MAX_PAYLOAD_SIZE: usize = 4096; const XEN_BUS_MAX_PACKET_SIZE: usize = XsdMessageHeader::SIZE + XEN_BUS_MAX_PAYLOAD_SIZE; -async fn find_bus_path() -> Option<(&'static str, bool)> { - for path in XEN_BUS_PATHS { - match metadata(path).await { - Ok(metadata) => { - return Some((path, metadata.file_type().is_socket())); - } - Err(_) => continue, - } - } - None -} - struct WatchState { sender: Sender, } @@ -69,21 +57,48 @@ pub struct XsdSocket { impl XsdSocket { pub async fn open() -> Result { - let (path, socket) = match find_bus_path().await { - Some(path) => path, - None => return Err(Error::BusNotFound), - }; + let mut saw_path = false; + let mut last_error = None; + + for path in XEN_BUS_PATHS { + let metadata = match metadata(path).await { + Ok(metadata) => metadata, + Err(_) => continue, + }; + saw_path = true; + + let file = if metadata.file_type().is_socket() { + match UnixStream::connect(path).await { + Ok(stream) => { + let stream = stream.into_std()?; + stream.set_nonblocking(false)?; + unsafe { File::from_raw_fd(stream.into_raw_fd()) } + } + Err(error) => { + warn!("failed to connect to xenstore socket at {path}: {error}"); + last_error = Some(Error::from(error)); + continue; + } + } + } else { + match File::options().read(true).write(true).open(path).await { + Ok(file) => file, + Err(error) => { + warn!("failed to open xenstore bus at {path}: {error}"); + last_error = Some(Error::from(error)); + continue; + } + } + }; - let file = if socket { - let stream = UnixStream::connect(path).await?; - let stream = stream.into_std()?; - stream.set_nonblocking(false)?; - unsafe { File::from_raw_fd(stream.into_raw_fd()) } - } else { - File::options().read(true).write(true).open(path).await? - }; + return XsdSocket::from_handle(file).await; + } - XsdSocket::from_handle(file).await + if saw_path { + Err(last_error.unwrap_or(Error::BusNotFound)) + } else { + Err(Error::BusNotFound) + } } pub async fn from_handle(handle: File) -> Result { diff --git a/crates/xen/xenvchan-sys/Cargo.toml b/crates/xen/xenvchan-sys/Cargo.toml new file mode 100644 index 00000000..1f2fa74c --- /dev/null +++ b/crates/xen/xenvchan-sys/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "krata-xenvchan-sys" +description = "Raw libxenvchan FFI bindings for krata" +license.workspace = true +version.workspace = true +homepage.workspace = true +repository.workspace = true +edition = "2021" +resolver = "2" + +[dependencies] +libc = { workspace = true } + +[build-dependencies] +pkg-config = "0.3" + +[lib] +name = "xenvchan_sys" diff --git a/crates/xen/xenvchan-sys/build.rs b/crates/xen/xenvchan-sys/build.rs new file mode 100644 index 00000000..6d706c4e --- /dev/null +++ b/crates/xen/xenvchan-sys/build.rs @@ -0,0 +1,23 @@ +fn main() { + if let Ok(lib_dir) = std::env::var("XENVCHAN_LIB_DIR") { + println!("cargo:rustc-link-search=native={lib_dir}"); + println!("cargo:rustc-link-lib=dylib=xenvchan"); + return; + } + + let mut config = pkg_config::Config::new(); + if std::env::var("CARGO_CFG_TARGET_ENV").as_deref() == Ok("musl") { + config.statik(true); + } + + config + .probe("xenvchan") + .or_else(|_| { + let mut fallback = pkg_config::Config::new(); + if std::env::var("CARGO_CFG_TARGET_ENV").as_deref() == Ok("musl") { + fallback.statik(true); + } + fallback.probe("libxenvchan") + }) + .expect("failed to locate libxenvchan with pkg-config"); +} diff --git a/crates/xen/xenvchan-sys/src/lib.rs b/crates/xen/xenvchan-sys/src/lib.rs new file mode 100644 index 00000000..7a7ddaec --- /dev/null +++ b/crates/xen/xenvchan-sys/src/lib.rs @@ -0,0 +1,46 @@ +#![allow(non_camel_case_types)] + +use libc::{c_char, c_int, c_void, size_t}; + +#[repr(C)] +pub struct libxenvchan { + _private: [u8; 0], +} + +#[repr(C)] +pub struct xentoollog_logger { + _private: [u8; 0], +} + +pub const XENVCHAN_OPEN_CLOSED: c_int = 0; +pub const XENVCHAN_OPEN_CONNECTED: c_int = 1; +pub const XENVCHAN_OPEN_WAITING_FOR_CLIENT: c_int = 2; + +unsafe extern "C" { + pub fn libxenvchan_server_init( + logger: *mut xentoollog_logger, + domain: c_int, + xs_path: *const c_char, + read_min: size_t, + write_min: size_t, + ) -> *mut libxenvchan; + + pub fn libxenvchan_client_init( + logger: *mut xentoollog_logger, + domain: c_int, + xs_path: *const c_char, + ) -> *mut libxenvchan; + + pub fn libxenvchan_close(ctrl: *mut libxenvchan); + + pub fn libxenvchan_recv(ctrl: *mut libxenvchan, data: *mut c_void, size: size_t) -> c_int; + pub fn libxenvchan_read(ctrl: *mut libxenvchan, data: *mut c_void, size: size_t) -> c_int; + pub fn libxenvchan_send(ctrl: *mut libxenvchan, data: *const c_void, size: size_t) -> c_int; + pub fn libxenvchan_write(ctrl: *mut libxenvchan, data: *const c_void, size: size_t) -> c_int; + + pub fn libxenvchan_wait(ctrl: *mut libxenvchan) -> c_int; + pub fn libxenvchan_fd_for_select(ctrl: *mut libxenvchan) -> c_int; + pub fn libxenvchan_is_open(ctrl: *mut libxenvchan) -> c_int; + pub fn libxenvchan_data_ready(ctrl: *mut libxenvchan) -> c_int; + pub fn libxenvchan_buffer_space(ctrl: *mut libxenvchan) -> c_int; +} diff --git a/crates/xen/xenvchan/Cargo.toml b/crates/xen/xenvchan/Cargo.toml new file mode 100644 index 00000000..af7bb89d --- /dev/null +++ b/crates/xen/xenvchan/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "krata-xenvchan" +description = "A safe libxenvchan wrapper for krata" +license.workspace = true +version.workspace = true +homepage.workspace = true +repository.workspace = true +edition = "2021" +resolver = "2" + +[dependencies] +thiserror = { workspace = true } +libc = { workspace = true } +krata-xenvchan-sys = { path = "../xenvchan-sys", version = "^0.0.24" } + +[lib] +name = "xenvchan" diff --git a/crates/xen/xenvchan/src/error.rs b/crates/xen/xenvchan/src/error.rs new file mode 100644 index 00000000..0bbe7cd4 --- /dev/null +++ b/crates/xen/xenvchan/src/error.rs @@ -0,0 +1,18 @@ +use std::ffi::NulError; +use std::io; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("{op} failed: {source}")] + Api { + op: &'static str, + #[source] + source: io::Error, + }, + #[error("xenstore path contains an interior NUL byte")] + InvalidPath(#[from] NulError), + #[error("vchan peer is closed")] + Closed, +} + +pub type Result = std::result::Result; diff --git a/crates/xen/xenvchan/src/lib.rs b/crates/xen/xenvchan/src/lib.rs new file mode 100644 index 00000000..f6cb959a --- /dev/null +++ b/crates/xen/xenvchan/src/lib.rs @@ -0,0 +1,334 @@ +pub mod error; + +use std::ffi::CString; +use std::io; +use std::os::fd::RawFd; +use std::ptr::NonNull; + +use error::{Error, Result}; +use xenvchan_sys::{ + libxenvchan, libxenvchan_buffer_space, libxenvchan_client_init, libxenvchan_close, + libxenvchan_data_ready, libxenvchan_fd_for_select, libxenvchan_is_open, libxenvchan_read, + libxenvchan_recv, libxenvchan_send, libxenvchan_server_init, libxenvchan_wait, + libxenvchan_write, XENVCHAN_OPEN_CLOSED, XENVCHAN_OPEN_CONNECTED, + XENVCHAN_OPEN_WAITING_FOR_CLIENT, +}; + +#[derive(Debug, Clone)] +pub struct VchanConfig { + pub domid: u32, + pub xs_path: String, + pub read_min: usize, + pub write_min: usize, +} + +impl VchanConfig { + pub fn symmetric(domid: u32, xs_path: impl Into, ring_min: usize) -> Self { + Self { + domid, + xs_path: xs_path.into(), + read_min: ring_min, + write_min: ring_min, + } + } +} + +struct RawVchan { + ptr: NonNull, +} + +// SAFETY: The underlying libxenvchan handle is an owned FFI resource. We only +// move it between tasks/threads and serialize all operations through &mut self +// or an outer Mutex in isol8-vchan. +unsafe impl Send for RawVchan {} + +impl RawVchan { + fn server(cfg: &VchanConfig) -> Result { + let xs_path = CString::new(cfg.xs_path.as_str())?; + let ptr = unsafe { + libxenvchan_server_init( + std::ptr::null_mut(), + cfg.domid as i32, + xs_path.as_ptr(), + cfg.read_min, + cfg.write_min, + ) + }; + Self::from_init_ptr("libxenvchan_server_init", ptr) + } + + fn client(cfg: &VchanConfig) -> Result { + let xs_path = CString::new(cfg.xs_path.as_str())?; + let ptr = unsafe { + libxenvchan_client_init(std::ptr::null_mut(), cfg.domid as i32, xs_path.as_ptr()) + }; + Self::from_init_ptr("libxenvchan_client_init", ptr) + } + + fn from_init_ptr(op: &'static str, ptr: *mut libxenvchan) -> Result { + let ptr = NonNull::new(ptr).ok_or_else(|| last_error(op))?; + Ok(Self { ptr }) + } + + fn as_ptr(&self) -> *mut libxenvchan { + self.ptr.as_ptr() + } + + fn wait_connected(&self) -> Result<()> { + loop { + match self.open_state()? { + XENVCHAN_OPEN_CONNECTED => return Ok(()), + XENVCHAN_OPEN_WAITING_FOR_CLIENT => self.wait()?, + XENVCHAN_OPEN_CLOSED => return Err(Error::Closed), + _ => self.wait()?, + } + } + } + + fn wait(&self) -> Result<()> { + let rc = unsafe { libxenvchan_wait(self.as_ptr()) }; + if rc < 0 { + return Err(last_error("libxenvchan_wait")); + } + Ok(()) + } + + fn open_state(&self) -> Result { + let rc = unsafe { libxenvchan_is_open(self.as_ptr()) }; + if rc < 0 { + return Err(last_error("libxenvchan_is_open")); + } + Ok(rc) + } + + fn fd_for_select(&self) -> Result { + let rc = unsafe { libxenvchan_fd_for_select(self.as_ptr()) }; + if rc < 0 { + return Err(last_error("libxenvchan_fd_for_select")); + } + Ok(rc) + } + + fn data_ready(&self) -> Result { + let rc = unsafe { libxenvchan_data_ready(self.as_ptr()) }; + if rc < 0 { + return Err(last_error("libxenvchan_data_ready")); + } + Ok(rc as usize) + } + + fn buffer_space(&self) -> Result { + let rc = unsafe { libxenvchan_buffer_space(self.as_ptr()) }; + if rc < 0 { + return Err(last_error("libxenvchan_buffer_space")); + } + Ok(rc as usize) + } + + fn read_some(&mut self, buf: &mut [u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + + loop { + let rc = unsafe { libxenvchan_read(self.as_ptr(), buf.as_mut_ptr().cast(), buf.len()) }; + if rc > 0 { + return Ok(rc as usize); + } + if rc < 0 { + return Err(last_error("libxenvchan_read")); + } + if self.open_state()? == XENVCHAN_OPEN_CLOSED { + return Ok(0); + } + self.wait()?; + } + } + + fn write_some(&mut self, buf: &[u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + + loop { + let rc = unsafe { libxenvchan_write(self.as_ptr(), buf.as_ptr().cast(), buf.len()) }; + if rc > 0 { + return Ok(rc as usize); + } + if rc < 0 { + return Err(last_error("libxenvchan_write")); + } + if self.open_state()? == XENVCHAN_OPEN_CLOSED { + return Err(Error::Closed); + } + self.wait()?; + } + } + + fn recv_exact(&mut self, buf: &mut [u8]) -> Result<()> { + if buf.is_empty() { + return Ok(()); + } + + loop { + let rc = unsafe { libxenvchan_recv(self.as_ptr(), buf.as_mut_ptr().cast(), buf.len()) }; + if rc == buf.len() as i32 { + return Ok(()); + } + if rc < 0 { + return Err(last_error("libxenvchan_recv")); + } + if self.open_state()? == XENVCHAN_OPEN_CLOSED { + return Err(Error::Closed); + } + self.wait()?; + } + } + + fn send_exact(&mut self, buf: &[u8]) -> Result<()> { + if buf.is_empty() { + return Ok(()); + } + + loop { + let rc = unsafe { libxenvchan_send(self.as_ptr(), buf.as_ptr().cast(), buf.len()) }; + if rc == buf.len() as i32 { + return Ok(()); + } + if rc < 0 { + return Err(last_error("libxenvchan_send")); + } + if self.open_state()? == XENVCHAN_OPEN_CLOSED { + return Err(Error::Closed); + } + self.wait()?; + } + } +} + +impl Drop for RawVchan { + fn drop(&mut self) { + unsafe { libxenvchan_close(self.as_ptr()) }; + } +} + +pub struct VchanListener { + raw: RawVchan, +} + +impl VchanListener { + pub fn bind(cfg: &VchanConfig) -> Result { + Ok(Self { + raw: RawVchan::server(cfg)?, + }) + } + + pub fn accept(self) -> Result { + self.raw.wait_connected()?; + Ok(VchanStream { raw: self.raw }) + } +} + +pub struct VchanStream { + raw: RawVchan, +} + +impl VchanStream { + pub fn connect(cfg: &VchanConfig) -> Result { + let raw = RawVchan::client(cfg)?; + Ok(Self { raw }) + } + + pub fn wait(&self) -> Result<()> { + self.raw.wait() + } + + pub fn wait_connected(&self) -> Result<()> { + self.raw.wait_connected() + } + + pub fn fd_for_select(&self) -> Result { + self.raw.fd_for_select() + } + + pub fn is_open(&self) -> Result { + self.raw.open_state() + } + + pub fn data_ready(&self) -> Result { + self.raw.data_ready() + } + + pub fn buffer_space(&self) -> Result { + self.raw.buffer_space() + } + + pub fn read_once(&mut self, buf: &mut [u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + let rc = unsafe { libxenvchan_read(self.raw.as_ptr(), buf.as_mut_ptr().cast(), buf.len()) }; + if rc < 0 { + return Err(last_error("libxenvchan_read")); + } + Ok(rc as usize) + } + + pub fn read(&mut self, buf: &mut [u8]) -> Result { + self.raw.read_some(buf) + } + + pub fn write_once(&mut self, buf: &[u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + let rc = unsafe { libxenvchan_write(self.raw.as_ptr(), buf.as_ptr().cast(), buf.len()) }; + if rc < 0 { + return Err(last_error("libxenvchan_write")); + } + Ok(rc as usize) + } + + pub fn write(&mut self, buf: &[u8]) -> Result { + self.raw.write_some(buf) + } + + pub fn recv_once(&mut self, buf: &mut [u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + let rc = unsafe { libxenvchan_recv(self.raw.as_ptr(), buf.as_mut_ptr().cast(), buf.len()) }; + if rc < 0 { + return Err(last_error("libxenvchan_recv")); + } + Ok(rc as usize) + } + + pub fn recv_exact(&mut self, buf: &mut [u8]) -> Result<()> { + self.raw.recv_exact(buf) + } + + pub fn send_once(&mut self, buf: &[u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + let rc = unsafe { libxenvchan_send(self.raw.as_ptr(), buf.as_ptr().cast(), buf.len()) }; + if rc < 0 { + return Err(last_error("libxenvchan_send")); + } + Ok(rc as usize) + } + + pub fn send_exact(&mut self, buf: &[u8]) -> Result<()> { + self.raw.send_exact(buf) + } +} + +fn last_error(op: &'static str) -> Error { + let source = io::Error::last_os_error(); + let source = match source.raw_os_error() { + Some(code) if code != 0 => source, + _ => io::Error::other(op), + }; + Error::Api { op, source } +}