From 05486f0864559edadc52c5730f368d45895a0d60 Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 28 Sep 2025 11:49:52 +0200 Subject: [PATCH 1/7] require rust version 1.88 --- crates/librqbit_core/Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/librqbit_core/Cargo.toml b/crates/librqbit_core/Cargo.toml index 1e931dc29..0f6a7ecd7 100644 --- a/crates/librqbit_core/Cargo.toml +++ b/crates/librqbit_core/Cargo.toml @@ -2,6 +2,8 @@ name = "librqbit-core" version = "5.0.0" edition = "2024" +# https://github.com/rust-lang/rust/issues/53667 +rust-version = "1.88" description = "Important utilities used throughout librqbit useful for working with torrents." license = "Apache-2.0" documentation = "https://docs.rs/librqbit-core" From 988d2277d4a2a7101209c8fa8ada9de01139f6c8 Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 28 Sep 2025 12:06:24 +0200 Subject: [PATCH 2/7] support seeding completed read-only files (fixes #136) --- crates/librqbit/src/storage/filesystem/fs.rs | 28 ++++++++++++--- .../src/storage/filesystem/opened_file.rs | 34 ++++++++++++++++++- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 5fbd4bd69..db841e56a 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -67,6 +67,7 @@ impl TorrentStorage for FilesystemStorage { fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { let of = self.opened_files.get(file_id).context("no such file")?; + of.ensure_writeable()?; #[cfg(windows)] return of.try_mark_sparse()?.pwrite_all(offset, buf); #[cfg(not(windows))] @@ -137,17 +138,33 @@ impl TorrentStorage for FilesystemStorage { continue; }; std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; - let f = if shared.options.allow_overwrite { + if shared.options.allow_overwrite { + let (file, writeable) = match OpenOptions::new() .create(true) .truncate(false) .read(true) .write(true) .open(&full_path) - .with_context(|| format!("error opening {full_path:?} in read/write mode"))? + { + Ok(file) => (file, true), + Err(e) => { + warn!(?full_path, "error opening file in create+write mode: {e:?}"); + // open the file in read-only mode, will reopen in write mode later. + ( + OpenOptions::new() + .create(false) + .read(true) + .open(&full_path) + .with_context(|| format!("error opening {full_path:?}"))?, + false, + ) + } + }; + files.push(OpenedFile::new(full_path.clone(), file, writeable)); } else { // create_new does not seem to work with read(true), so calling this twice. - OpenOptions::new() + let file = OpenOptions::new() .create_new(true) .write(true) .open(&full_path) @@ -157,9 +174,10 @@ impl TorrentStorage for FilesystemStorage { &full_path ) })?; - OpenOptions::new().read(true).write(true).open(&full_path)? + OpenOptions::new().read(true).write(true).open(&full_path)?; + let writeable = true; + files.push(OpenedFile::new(full_path.clone(), file, writeable)); }; - files.push(OpenedFile::new(full_path.clone(), f)); } self.opened_files = files; diff --git a/crates/librqbit/src/storage/filesystem/opened_file.rs b/crates/librqbit/src/storage/filesystem/opened_file.rs index 404b3ab67..a31df6f5e 100644 --- a/crates/librqbit/src/storage/filesystem/opened_file.rs +++ b/crates/librqbit/src/storage/filesystem/opened_file.rs @@ -3,6 +3,7 @@ use std::{ io::IoSlice, ops::{Deref, DerefMut}, path::PathBuf, + sync::atomic::{AtomicBool, Ordering}, }; use anyhow::Context; @@ -124,34 +125,65 @@ impl DerefMut for OpenedFileLocked { #[derive(Debug)] pub(crate) struct OpenedFile { + path: PathBuf, file: RwLock, + is_writeable: AtomicBool, } impl OpenedFile { - pub fn new(path: PathBuf, f: File) -> Self { + pub fn new(path: PathBuf, f: File, is_writeable: bool) -> Self { Self { + path: path.clone(), file: RwLock::new(OpenedFileLocked { path, fd: Some(f), #[cfg(windows)] tried_marking_sparse: false, }), + is_writeable: AtomicBool::new(is_writeable), } } pub fn new_dummy() -> Self { Self { + path: PathBuf::from(""), file: RwLock::new(Default::default()), + is_writeable: AtomicBool::new(false), } } pub fn take_clone(&self) -> anyhow::Result { let f = std::mem::take(&mut *self.file.write()); Ok(Self { + path: self.path.clone(), file: RwLock::new(f), + is_writeable: AtomicBool::new(self.is_writeable.load(Ordering::SeqCst)), }) } + pub fn ensure_writeable(&self) -> anyhow::Result<()> { + match self + .is_writeable + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + { + Ok(_) => { + // Updated, need to reopen writeable + let mut file = self.file.write(); + let new_fd = std::fs::OpenOptions::new() + .write(true) + .create(false) + .open(&self.path) + .with_context(|| format!("error opening {:?} in write mode", self.path))?; + file.fd = Some(new_fd); + } + Err(_) => { + // Didn't update, no need to reopen + } + } + + Ok(()) + } + pub fn lock_read(&self) -> crate::Result> { RwLockReadGuard::try_map(self.file.read(), |f| f.as_ref()) .ok() From e9dcd1697cb8cbeaa4a4dd6c7c349d61a79955cf Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 5 Oct 2025 11:02:45 +0000 Subject: [PATCH 3/7] WONTFIX use remotefs-ftp --- Cargo.lock | 87 +++++++ crates/librqbit/Cargo.toml | 10 + crates/librqbit/src/session.rs | 21 ++ crates/librqbit/src/storage/middleware/mod.rs | 1 + .../src/storage/middleware/remotefs.rs | 222 ++++++++++++++++++ crates/rqbit/Cargo.toml | 2 +- 6 files changed, 342 insertions(+), 1 deletion(-) create mode 100644 crates/librqbit/src/storage/middleware/remotefs.rs diff --git a/Cargo.lock b/Cargo.lock index 05e3b7a45..7c7018231 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1730,6 +1730,19 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -2856,6 +2869,29 @@ dependencies = [ "selectors", ] +[[package]] +name = "lazy-regex" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60c7310b93682b36b98fa7ea4de998d3463ccbebd94d935d6b48ba5b6ffa7126" +dependencies = [ + "lazy-regex-proc_macros", + "once_cell", + "regex", +] + +[[package]] +name = "lazy-regex-proc_macros" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ba01db5ef81e17eb10a5e0f2109d1b3a3e29bac3070fdbd7d156bf7dbd206a1" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.106", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -2983,6 +3019,8 @@ dependencies = [ "parking_lot", "rand 0.9.2", "regex", + "remotefs", + "remotefs-ftp", "reqwest", "rlimit", "serde", @@ -4160,6 +4198,12 @@ dependencies = [ "regex", ] +[[package]] +name = "path-slash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42" + [[package]] name = "pathdiff" version = "0.2.3" @@ -4877,6 +4921,29 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" +[[package]] +name = "remotefs" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b229ef0bf00ce8ae4cbe349bd8d71f3473e26de30efa637cfc53e08eaf6867f0" +dependencies = [ + "log", + "thiserror 1.0.69", + "wildmatch", +] + +[[package]] +name = "remotefs-ftp" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c84ed1367170d6f589f09998b648e7d9bc09d93091130293be8ff09b7a8329" +dependencies = [ + "log", + "path-slash", + "remotefs", + "suppaftp", +] + [[package]] name = "reqwest" version = "0.12.23" @@ -5795,6 +5862,20 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "suppaftp" +version = "7.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "386a802d1622d43d774de76222e65cf367a72942ccf634b585c4dbba2354bec4" +dependencies = [ + "chrono", + "futures-lite", + "lazy-regex", + "log", + "native-tls", + "thiserror 2.0.16", +] + [[package]] name = "swift-rs" version = "1.0.7" @@ -7199,6 +7280,12 @@ dependencies = [ "wasite", ] +[[package]] +name = "wildmatch" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39b7d07a236abaef6607536ccfaf19b396dbe3f5110ddb73d39f4562902ed382" + [[package]] name = "winapi" version = "0.3.9" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 8e0487659..628897a3d 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -129,6 +129,16 @@ librqbit-dualstack-sockets = { version = "0.6.11", features = ["axum"] } socket2 = "0.6" nix = { version = "0.30.1", features = ["uio"] } thiserror = "2.0.12" +# https://lib.rs/crates/remotefs +remotefs = "0.3.1" +remotefs-ftp = { version = "0.3.0", features = [ "native-tls" ] } +# remotefs-ssh = "0.4.0" +# NOTE the "open" method is not supported by these backends +# remotefs-smb = "0.3.1" +# remotefs-webdav = "0.2.0" +# NOTE these backends dont exist +# remotefs-nfs = "0.3.1" +# remotefs-http = "0.3.1" [target.'cfg(windows)'.dependencies] windows = { version = "0.62.1", features = [ diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 3c85964cd..2e5bf6608 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -74,6 +74,9 @@ use tracker_comms::{TrackerComms, UdpTrackerClient}; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; +#[cfg(feature = "storage_middleware")] +use crate::storage::middleware::remotefs::{RemoteFsStorageFactory, REMOTEFS_PROTOCOLS}; + pub type TorrentId = usize; struct ParsedTorrentFile { @@ -1210,11 +1213,29 @@ impl Session { })); } + #[cfg(not(feature = "storage_middleware"))] let storage_factory = opts .storage_factory .take() .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()); + #[cfg(feature = "storage_middleware")] + let storage_factory = if let Some(folder_str) = output_folder.to_str() { + if REMOTEFS_PROTOCOLS.iter().any(|p| folder_str.starts_with(p)) { + Box::new(RemoteFsStorageFactory::new(folder_str.to_string())) + as BoxStorageFactory + } else { + opts.storage_factory + .take() + .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) + .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()) + } + } else { + opts.storage_factory + .take() + .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) + .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()) + }; let id = if let Some(id) = opts.preferred_id { id diff --git a/crates/librqbit/src/storage/middleware/mod.rs b/crates/librqbit/src/storage/middleware/mod.rs index fb4bb3e2b..898e493d8 100644 --- a/crates/librqbit/src/storage/middleware/mod.rs +++ b/crates/librqbit/src/storage/middleware/mod.rs @@ -1,3 +1,4 @@ pub mod slow; pub mod timing; pub mod write_through_cache; +pub mod remotefs; diff --git a/crates/librqbit/src/storage/middleware/remotefs.rs b/crates/librqbit/src/storage/middleware/remotefs.rs new file mode 100644 index 000000000..da0b72a66 --- /dev/null +++ b/crates/librqbit/src/storage/middleware/remotefs.rs @@ -0,0 +1,222 @@ +use remotefs::RemoteFs; +use remotefs_ftp::FtpFs; +use url::Url; + +use std::io::{Read, Seek, SeekFrom}; +use std::sync::Arc; + +use crate::storage::{StorageFactory, TorrentStorage}; +use crate::torrent_state::TorrentMetadata; +use crate::ManagedTorrentShared; + +pub const REMOTEFS_PROTOCOLS: &[&str] = &[ + "http://", + "https://", + "sftp://", + "ftp://", + "ftps://", + "smb://", + "dav://", + "davs://", + "nfs://", + "webdav://", +]; + +pub struct RemoteFsStorageFactory { + url: String, +} + +impl RemoteFsStorageFactory { + pub fn new(url: String) -> Self { + Self { url } + } +} + +// impl<'a> StorageFactory for RemoteFsStorageFactory { +impl StorageFactory for RemoteFsStorageFactory { + // type Storage = RemoteFsStorage<'a>; + type Storage = Box; + + fn create( + &self, + // shared: &'a ManagedTorrentShared, + // metadata: &'a TorrentMetadata + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata + // ) -> anyhow::Result> { + ) -> anyhow::Result { + // log::info!("Creating RemoteFsStorage for URL: {}", self.url); + + // let fs = RemoteFs::from_url(&self.url) + // .map_err(|e| anyhow::anyhow!("remotefs error: {e}"))?; + + // https://github.com/veeso/termscp/blob/05830db206605f60ce54e23ca5df7de02115f491/src/filetransfer/remotefs_builder.rs#L109-L13 + let secure = true; + let parsed = Url::parse(&self.url)?; + let protocol = parsed.scheme().to_string(); + if (protocol != "ftp") && (protocol != "ftps") { + anyhow::bail!("RemoteFsStorageFactory only supports ftp:// and ftps:// URLs, got: {protocol}"); + } + let hostname = parsed.host_str().ok_or_else(|| anyhow::anyhow!("Missing host"))?.to_string(); + let port = parsed.port().unwrap_or(21); + let username = if parsed.username().is_empty() { + None + } else { + Some(parsed.username().to_string()) + }; + let password = parsed.password().map(|s| s.to_string()); + let path = parsed.path().to_string(); + let mut fs = FtpFs::new(hostname, port).passive_mode(); + if let Some(_username) = username { + fs = fs.username(_username); + } + if let Some(_password) = password { + fs = fs.password(_password); + } + if secure { + fs = fs.secure(true, true); + } + + Ok(Box::new(RemoteFsStorage { + fs: Arc::new(fs), + shared, + metadata, + })) + } + + fn clone_box(&self) -> Box + 'static> { + Box::new(Self { + url: self.url.clone(), + }) + } +} + +pub struct RemoteFsStorage<'a> { + fs: Arc, + shared: &'a ManagedTorrentShared, + metadata: &'a TorrentMetadata, +} + +// WONTFIX remotefs is not thread-safe, so we cannot implement Sync or Send for RemoteFsStorage +// the trait `Sync` is not implemented for `(dyn RemoteFs + 'static)` +// https://github.com/remotefs-rs/remotefs-rs/pull/21 +impl<'a> TorrentStorage for RemoteFsStorage<'a> { + fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + let file_details = self.metadata.file_infos[file_id].clone(); + + let relative_path = &file_details.relative_filename; + + // let mut full_path = self.output_folder.clone(); + // full_path.push(relative_path); + + if file_details.attrs.padding { + log::info!("Skipping read for padding file: {:?}", relative_path); + return Ok(()); + }; + log::info!("RemoteFsStorage pread_exact: file_id={}, offset={}, len={}, path={:?}", file_id, offset, buf.len(), relative_path); + + // TODO prepend "/" before relative_path? + // NOTE only some remotefs backends support "open" + let mut file = self.fs.open(relative_path) + .map_err(|e| anyhow::anyhow!("remotefs open error: {e}"))?; + file.seek(SeekFrom::Start(offset)) + .map_err(|e| anyhow::anyhow!("remotefs seek error: {e}"))?; + let mut read_total = 0; + while read_total < buf.len() { + let n = file.read(&mut buf[read_total..]) + .map_err(|e| anyhow::anyhow!("remotefs read error: {e}"))?; + if n == 0 { + return Err(anyhow::anyhow!( + "remotefs: unexpected EOF (read {} of {} bytes)", + read_total, + buf.len() + )); + } + read_total += n; + } + Ok(()) + } + + fn pwrite_all(&self, _file_id: usize, _offset: u64, _buf: &[u8]) -> anyhow::Result<()> { + anyhow::bail!("RemoteFsStorage is read-only: pwrite_all is not implemented"); + } + + fn remove_file(&self, _file_id: usize, _filename: &std::path::Path) -> anyhow::Result<()> { + anyhow::bail!("RemoteFsStorage is read-only: remove_file is not implemented"); + } + + fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> { + anyhow::bail!("RemoteFsStorage is read-only: remove_directory_if_empty is not implemented"); + } + + fn ensure_file_length(&self, _file_id: usize, _length: u64) -> anyhow::Result<()> { + anyhow::bail!("RemoteFsStorage is read-only: ensure_file_length is not implemented"); + } + + fn take(&self) -> anyhow::Result> { + anyhow::bail!("RemoteFsStorage take is not implemented") + } + + fn init( + &mut self, + _shared: &ManagedTorrentShared, + _metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + // No-op for RemoteFsStorage, as it is read-only and uses references. + // let mut files = Vec::::new(); + // for file_details in metadata.file_infos.iter() { + // let mut full_path = self.output_folder.clone(); + // let relative_path = &file_details.relative_filename; + // full_path.push(relative_path); + + // if file_details.attrs.padding { + // files.push(OpenedFile::new_dummy()); + // continue; + // }; + // std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; + // if shared.options.allow_overwrite { + // let (file, writeable) = match + // OpenOptions::new() + // .create(true) + // .truncate(false) + // .read(true) + // .write(true) + // .open(&full_path) + // { + // Ok(file) => (file, true), + // Err(e) => { + // warn!(?full_path, "error opening file in create+write mode: {e:?}"); + // // open the file in read-only mode, will reopen in write mode later. + // ( + // OpenOptions::new() + // .create(false) + // .read(true) + // .open(&full_path) + // .with_context(|| format!("error opening {full_path:?}"))?, + // false, + // ) + // } + // }; + // files.push(OpenedFile::new(full_path.clone(), file, writeable)); + // } else { + // // create_new does not seem to work with read(true), so calling this twice. + // let file = OpenOptions::new() + // .create_new(true) + // .write(true) + // .open(&full_path) + // .with_context(|| { + // format!( + // "error creating a new file (because allow_overwrite = false) {:?}", + // &full_path + // ) + // })?; + // OpenOptions::new().read(true).write(true).open(&full_path)?; + // let writeable = true; + // files.push(OpenedFile::new(full_path.clone(), file, writeable)); + // }; + // } + + // self.opened_files = files; + Ok(()) + } +} diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 1a4f39a44..ffb494bd0 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -12,7 +12,7 @@ readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["default-tls", "postgres", "webui", "prometheus"] +default = ["default-tls", "postgres", "webui", "prometheus", "librqbit/storage_middleware"] openssl-vendored = ["openssl/vendored"] tokio-console = ["librqbit/tokio-console"] webui = ["librqbit/webui"] From 5247aa1382df44ea5063a9c8fa6a6509b5a0aae5 Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 5 Oct 2025 11:58:38 +0000 Subject: [PATCH 4/7] rm remotefs.rs --- .../src/storage/middleware/remotefs.rs | 222 ------------------ 1 file changed, 222 deletions(-) delete mode 100644 crates/librqbit/src/storage/middleware/remotefs.rs diff --git a/crates/librqbit/src/storage/middleware/remotefs.rs b/crates/librqbit/src/storage/middleware/remotefs.rs deleted file mode 100644 index da0b72a66..000000000 --- a/crates/librqbit/src/storage/middleware/remotefs.rs +++ /dev/null @@ -1,222 +0,0 @@ -use remotefs::RemoteFs; -use remotefs_ftp::FtpFs; -use url::Url; - -use std::io::{Read, Seek, SeekFrom}; -use std::sync::Arc; - -use crate::storage::{StorageFactory, TorrentStorage}; -use crate::torrent_state::TorrentMetadata; -use crate::ManagedTorrentShared; - -pub const REMOTEFS_PROTOCOLS: &[&str] = &[ - "http://", - "https://", - "sftp://", - "ftp://", - "ftps://", - "smb://", - "dav://", - "davs://", - "nfs://", - "webdav://", -]; - -pub struct RemoteFsStorageFactory { - url: String, -} - -impl RemoteFsStorageFactory { - pub fn new(url: String) -> Self { - Self { url } - } -} - -// impl<'a> StorageFactory for RemoteFsStorageFactory { -impl StorageFactory for RemoteFsStorageFactory { - // type Storage = RemoteFsStorage<'a>; - type Storage = Box; - - fn create( - &self, - // shared: &'a ManagedTorrentShared, - // metadata: &'a TorrentMetadata - shared: &ManagedTorrentShared, - metadata: &TorrentMetadata - // ) -> anyhow::Result> { - ) -> anyhow::Result { - // log::info!("Creating RemoteFsStorage for URL: {}", self.url); - - // let fs = RemoteFs::from_url(&self.url) - // .map_err(|e| anyhow::anyhow!("remotefs error: {e}"))?; - - // https://github.com/veeso/termscp/blob/05830db206605f60ce54e23ca5df7de02115f491/src/filetransfer/remotefs_builder.rs#L109-L13 - let secure = true; - let parsed = Url::parse(&self.url)?; - let protocol = parsed.scheme().to_string(); - if (protocol != "ftp") && (protocol != "ftps") { - anyhow::bail!("RemoteFsStorageFactory only supports ftp:// and ftps:// URLs, got: {protocol}"); - } - let hostname = parsed.host_str().ok_or_else(|| anyhow::anyhow!("Missing host"))?.to_string(); - let port = parsed.port().unwrap_or(21); - let username = if parsed.username().is_empty() { - None - } else { - Some(parsed.username().to_string()) - }; - let password = parsed.password().map(|s| s.to_string()); - let path = parsed.path().to_string(); - let mut fs = FtpFs::new(hostname, port).passive_mode(); - if let Some(_username) = username { - fs = fs.username(_username); - } - if let Some(_password) = password { - fs = fs.password(_password); - } - if secure { - fs = fs.secure(true, true); - } - - Ok(Box::new(RemoteFsStorage { - fs: Arc::new(fs), - shared, - metadata, - })) - } - - fn clone_box(&self) -> Box + 'static> { - Box::new(Self { - url: self.url.clone(), - }) - } -} - -pub struct RemoteFsStorage<'a> { - fs: Arc, - shared: &'a ManagedTorrentShared, - metadata: &'a TorrentMetadata, -} - -// WONTFIX remotefs is not thread-safe, so we cannot implement Sync or Send for RemoteFsStorage -// the trait `Sync` is not implemented for `(dyn RemoteFs + 'static)` -// https://github.com/remotefs-rs/remotefs-rs/pull/21 -impl<'a> TorrentStorage for RemoteFsStorage<'a> { - fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { - let file_details = self.metadata.file_infos[file_id].clone(); - - let relative_path = &file_details.relative_filename; - - // let mut full_path = self.output_folder.clone(); - // full_path.push(relative_path); - - if file_details.attrs.padding { - log::info!("Skipping read for padding file: {:?}", relative_path); - return Ok(()); - }; - log::info!("RemoteFsStorage pread_exact: file_id={}, offset={}, len={}, path={:?}", file_id, offset, buf.len(), relative_path); - - // TODO prepend "/" before relative_path? - // NOTE only some remotefs backends support "open" - let mut file = self.fs.open(relative_path) - .map_err(|e| anyhow::anyhow!("remotefs open error: {e}"))?; - file.seek(SeekFrom::Start(offset)) - .map_err(|e| anyhow::anyhow!("remotefs seek error: {e}"))?; - let mut read_total = 0; - while read_total < buf.len() { - let n = file.read(&mut buf[read_total..]) - .map_err(|e| anyhow::anyhow!("remotefs read error: {e}"))?; - if n == 0 { - return Err(anyhow::anyhow!( - "remotefs: unexpected EOF (read {} of {} bytes)", - read_total, - buf.len() - )); - } - read_total += n; - } - Ok(()) - } - - fn pwrite_all(&self, _file_id: usize, _offset: u64, _buf: &[u8]) -> anyhow::Result<()> { - anyhow::bail!("RemoteFsStorage is read-only: pwrite_all is not implemented"); - } - - fn remove_file(&self, _file_id: usize, _filename: &std::path::Path) -> anyhow::Result<()> { - anyhow::bail!("RemoteFsStorage is read-only: remove_file is not implemented"); - } - - fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> { - anyhow::bail!("RemoteFsStorage is read-only: remove_directory_if_empty is not implemented"); - } - - fn ensure_file_length(&self, _file_id: usize, _length: u64) -> anyhow::Result<()> { - anyhow::bail!("RemoteFsStorage is read-only: ensure_file_length is not implemented"); - } - - fn take(&self) -> anyhow::Result> { - anyhow::bail!("RemoteFsStorage take is not implemented") - } - - fn init( - &mut self, - _shared: &ManagedTorrentShared, - _metadata: &TorrentMetadata, - ) -> anyhow::Result<()> { - // No-op for RemoteFsStorage, as it is read-only and uses references. - // let mut files = Vec::::new(); - // for file_details in metadata.file_infos.iter() { - // let mut full_path = self.output_folder.clone(); - // let relative_path = &file_details.relative_filename; - // full_path.push(relative_path); - - // if file_details.attrs.padding { - // files.push(OpenedFile::new_dummy()); - // continue; - // }; - // std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; - // if shared.options.allow_overwrite { - // let (file, writeable) = match - // OpenOptions::new() - // .create(true) - // .truncate(false) - // .read(true) - // .write(true) - // .open(&full_path) - // { - // Ok(file) => (file, true), - // Err(e) => { - // warn!(?full_path, "error opening file in create+write mode: {e:?}"); - // // open the file in read-only mode, will reopen in write mode later. - // ( - // OpenOptions::new() - // .create(false) - // .read(true) - // .open(&full_path) - // .with_context(|| format!("error opening {full_path:?}"))?, - // false, - // ) - // } - // }; - // files.push(OpenedFile::new(full_path.clone(), file, writeable)); - // } else { - // // create_new does not seem to work with read(true), so calling this twice. - // let file = OpenOptions::new() - // .create_new(true) - // .write(true) - // .open(&full_path) - // .with_context(|| { - // format!( - // "error creating a new file (because allow_overwrite = false) {:?}", - // &full_path - // ) - // })?; - // OpenOptions::new().read(true).write(true).open(&full_path)?; - // let writeable = true; - // files.push(OpenedFile::new(full_path.clone(), file, writeable)); - // }; - // } - - // self.opened_files = files; - Ok(()) - } -} From 2488cd30a0e895cc282edae5af70e6a774503f84 Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 5 Oct 2025 11:58:45 +0000 Subject: [PATCH 5/7] async TorrentStorage --- crates/librqbit/src/storage/mod.rs | 76 +++++++++++++++++------------- 1 file changed, 43 insertions(+), 33 deletions(-) diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index 9bb9f82af..a0c1623ed 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -12,6 +12,7 @@ use std::{ path::Path, }; +use async_trait::async_trait; use librqbit_core::lengths::ValidPieceIndex; use crate::torrent_state::{ManagedTorrentShared, TorrentMetadata}; @@ -24,22 +25,29 @@ pub trait StorageFactory: Send + Sync + Any { shared: &ManagedTorrentShared, metadata: &TorrentMetadata, ) -> anyhow::Result; - fn create_and_init( + + fn is_type_id(&self, type_id: TypeId) -> bool { + Self::type_id(self) == type_id + } + fn clone_box(&self) -> BoxStorageFactory; +} + +/// Extension trait for async methods on StorageFactory. +#[async_trait] +pub trait StorageFactoryAsyncExt: StorageFactory { + async fn create_and_init( &self, shared: &ManagedTorrentShared, metadata: &TorrentMetadata, ) -> anyhow::Result { let mut storage = self.create(shared, metadata)?; - storage.init(shared, metadata)?; + storage.init(shared, metadata).await?; Ok(storage) } - - fn is_type_id(&self, type_id: TypeId) -> bool { - Self::type_id(self) == type_id - } - fn clone_box(&self) -> BoxStorageFactory; } +impl StorageFactoryAsyncExt for T {} + pub type BoxStorageFactory = Box>>; pub trait StorageFactoryExt { @@ -93,9 +101,10 @@ impl StorageFactory for Box { } } +#[async_trait] pub trait TorrentStorage: Send + Sync { // Create/open files etc. - fn init( + async fn init( &mut self, shared: &ManagedTorrentShared, metadata: &TorrentMetadata, @@ -103,13 +112,13 @@ pub trait TorrentStorage: Send + Sync { /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// read buf.len() bytes into buf at offset. - fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()>; + async fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()>; /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// write buf.len() bytes into the file at offset. - fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()>; + async fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()>; - fn pwrite_all_vectored( + async fn pwrite_all_vectored( &self, file_id: usize, offset: u64, @@ -119,7 +128,7 @@ pub trait TorrentStorage: Send + Sync { let mut size = 0; for ioslice in bufs { - self.pwrite_all(file_id, offset, &ioslice)?; + self.pwrite_all(file_id, offset, &ioslice).await?; offset += ioslice.len() as u64; size += ioslice.len(); } @@ -128,58 +137,59 @@ pub trait TorrentStorage: Send + Sync { } /// Remove a file from the storage. If not supported, or it doesn't matter, just return Ok(()) - fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()>; + async fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()>; - fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()>; + async fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()>; /// E.g. for filesystem backend ensure that the file has a certain length, and grow/shrink as needed. - fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()>; + async fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()>; /// Replace the current storage with a dummy, and return a new one that should be used instead. /// This is used to make the underlying object useless when e.g. pausing the torrent. - fn take(&self) -> anyhow::Result>; + async fn take(&self) -> anyhow::Result>; /// Callback called every time a piece has completed and has been validated. /// Default implementation does nothing, but can be override in trait implementations. - fn on_piece_completed(&self, _piece_index: ValidPieceIndex) -> anyhow::Result<()> { + async fn on_piece_completed(&self, _piece_index: ValidPieceIndex) -> anyhow::Result<()> { Ok(()) } } +#[async_trait] impl TorrentStorage for Box { - fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { - (**self).pread_exact(file_id, offset, buf) + async fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + (**self).pread_exact(file_id, offset, buf).await } - fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { - (**self).pwrite_all(file_id, offset, buf) + async fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + (**self).pwrite_all(file_id, offset, buf).await } - fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()> { - (**self).remove_file(file_id, filename) + async fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()> { + (**self).remove_file(file_id, filename).await } - fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> { - (**self).ensure_file_length(file_id, length) + async fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> { + (**self).ensure_file_length(file_id, length).await } - fn take(&self) -> anyhow::Result> { - (**self).take() + async fn take(&self) -> anyhow::Result> { + (**self).take().await } - fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { - (**self).remove_directory_if_empty(path) + async fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { + (**self).remove_directory_if_empty(path).await } - fn init( + async fn init( &mut self, shared: &ManagedTorrentShared, metadata: &TorrentMetadata, ) -> anyhow::Result<()> { - (**self).init(shared, metadata) + (**self).init(shared, metadata).await } - fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> { - (**self).on_piece_completed(piece_id) + async fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> { + (**self).on_piece_completed(piece_id).await } } From 3766cf423f26b2df935e41294259019a35f4412b Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 5 Oct 2025 12:57:06 +0000 Subject: [PATCH 6/7] DRAFT use async_ftp --- Cargo.lock | 186 +++++++------- crates/librqbit/Cargo.toml | 12 +- crates/librqbit/src/storage/middleware/ftp.rs | 243 ++++++++++++++++++ crates/librqbit/src/storage/middleware/mod.rs | 2 +- 4 files changed, 335 insertions(+), 108 deletions(-) create mode 100644 crates/librqbit/src/storage/middleware/ftp.rs diff --git a/Cargo.lock b/Cargo.lock index 7c7018231..8038d215c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -224,6 +224,20 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "async_ftp" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d101b45e744a5893bba920ed28f0b0f3d7a9c929e97f9cf2117bba5accadb76b" +dependencies = [ + "chrono", + "lazy_static", + "pin-project", + "regex", + "tokio", + "tokio-rustls 0.23.4", +] + [[package]] name = "atk" version = "0.18.2" @@ -1730,19 +1744,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-lite" -version = "2.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "parking", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.31" @@ -2394,10 +2395,10 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls", + "rustls 0.23.32", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower-service", "webpki-roots", ] @@ -2869,29 +2870,6 @@ dependencies = [ "selectors", ] -[[package]] -name = "lazy-regex" -version = "3.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60c7310b93682b36b98fa7ea4de998d3463ccbebd94d935d6b48ba5b6ffa7126" -dependencies = [ - "lazy-regex-proc_macros", - "once_cell", - "regex", -] - -[[package]] -name = "lazy-regex-proc_macros" -version = "3.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ba01db5ef81e17eb10a5e0f2109d1b3a3e29bac3070fdbd7d156bf7dbd206a1" -dependencies = [ - "proc-macro2", - "quote", - "regex", - "syn 2.0.106", -] - [[package]] name = "lazy_static" version = "1.5.0" @@ -2980,6 +2958,7 @@ dependencies = [ "async-compression", "async-stream", "async-trait", + "async_ftp", "axum 0.8.4", "axum-extra", "backon", @@ -3019,8 +2998,6 @@ dependencies = [ "parking_lot", "rand 0.9.2", "regex", - "remotefs", - "remotefs-ftp", "reqwest", "rlimit", "serde", @@ -4198,12 +4175,6 @@ dependencies = [ "regex", ] -[[package]] -name = "path-slash" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42" - [[package]] name = "pathdiff" version = "0.2.3" @@ -4634,7 +4605,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.32", "socket2 0.6.0", "thiserror 2.0.16", "tokio", @@ -4652,9 +4623,9 @@ dependencies = [ "getrandom 0.3.3", "lru-slab", "rand 0.9.2", - "ring", + "ring 0.17.14", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.32", "rustls-pki-types", "slab", "thiserror 2.0.16", @@ -4921,29 +4892,6 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" -[[package]] -name = "remotefs" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b229ef0bf00ce8ae4cbe349bd8d71f3473e26de30efa637cfc53e08eaf6867f0" -dependencies = [ - "log", - "thiserror 1.0.69", - "wildmatch", -] - -[[package]] -name = "remotefs-ftp" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c84ed1367170d6f589f09998b648e7d9bc09d93091130293be8ff09b7a8329" -dependencies = [ - "log", - "path-slash", - "remotefs", - "suppaftp", -] - [[package]] name = "reqwest" version = "0.12.23" @@ -4967,7 +4915,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.32", "rustls-pki-types", "serde", "serde_json", @@ -4975,7 +4923,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.2", "tower-http", @@ -4988,6 +4936,21 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted 0.7.1", + "web-sys", + "winapi", +] + [[package]] name = "ring" version = "0.17.14" @@ -5117,6 +5080,18 @@ dependencies = [ "windows-sys 0.61.1", ] +[[package]] +name = "rustls" +version = "0.20.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" +dependencies = [ + "log", + "ring 0.16.20", + "sct", + "webpki", +] + [[package]] name = "rustls" version = "0.23.32" @@ -5124,7 +5099,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd3c25631629d034ce7cd9940adc9d45762d46de2b0f57193c4443b92c6d4d40" dependencies = [ "once_cell", - "ring", + "ring 0.17.14", "rustls-pki-types", "rustls-webpki", "subtle", @@ -5147,7 +5122,7 @@ version = "0.103.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8572f3c2cb9934231157b45499fc41e1f58c589fdfb81a844ba873265e80f8eb" dependencies = [ - "ring", + "ring 0.17.14", "rustls-pki-types", "untrusted 0.9.0", ] @@ -5245,6 +5220,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring 0.17.14", + "untrusted 0.9.0", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -5675,6 +5660,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spinning_top" version = "0.3.0" @@ -5862,20 +5853,6 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "suppaftp" -version = "7.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "386a802d1622d43d774de76222e65cf367a72942ccf634b585c4dbba2354bec4" -dependencies = [ - "chrono", - "futures-lite", - "lazy-regex", - "log", - "native-tls", - "thiserror 2.0.16", -] - [[package]] name = "swift-rs" version = "1.0.7" @@ -6439,13 +6416,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls 0.20.9", + "tokio", + "webpki", +] + [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls", + "rustls 0.23.32", "tokio", ] @@ -7225,6 +7213,16 @@ dependencies = [ "system-deps", ] +[[package]] +name = "webpki" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" +dependencies = [ + "ring 0.17.14", + "untrusted 0.9.0", +] + [[package]] name = "webpki-roots" version = "1.0.2" @@ -7280,12 +7278,6 @@ dependencies = [ "wasite", ] -[[package]] -name = "wildmatch" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39b7d07a236abaef6607536ccfaf19b396dbe3f5110ddb73d39f4562902ed382" - [[package]] name = "winapi" version = "0.3.9" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 628897a3d..bc2550a2f 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -129,16 +129,8 @@ librqbit-dualstack-sockets = { version = "0.6.11", features = ["axum"] } socket2 = "0.6" nix = { version = "0.30.1", features = ["uio"] } thiserror = "2.0.12" -# https://lib.rs/crates/remotefs -remotefs = "0.3.1" -remotefs-ftp = { version = "0.3.0", features = [ "native-tls" ] } -# remotefs-ssh = "0.4.0" -# NOTE the "open" method is not supported by these backends -# remotefs-smb = "0.3.1" -# remotefs-webdav = "0.2.0" -# NOTE these backends dont exist -# remotefs-nfs = "0.3.1" -# remotefs-http = "0.3.1" +# https://lib.rs/crates/async_ftp +async_ftp = { version = "6.0.0", features = ["secure"] } [target.'cfg(windows)'.dependencies] windows = { version = "0.62.1", features = [ diff --git a/crates/librqbit/src/storage/middleware/ftp.rs b/crates/librqbit/src/storage/middleware/ftp.rs new file mode 100644 index 000000000..65c472797 --- /dev/null +++ b/crates/librqbit/src/storage/middleware/ftp.rs @@ -0,0 +1,243 @@ +use std::io::Cursor; +use async_ftp::FtpStream; +use url::Url; +use async_ftp::FtpStream; +use tokio::io::AsyncReadExt; +use url::Url; + +use std::io::{Read, Seek, SeekFrom}; +use std::sync::Arc; + +use crate::storage::{StorageFactory, TorrentStorage}; +use crate::torrent_state::TorrentMetadata; +use crate::ManagedTorrentShared; + +pub const FTP_PROTOCOLS: &[&str] = &[ + // "sftp://", // ? + "ftp://", + "ftps://", +]; + +pub struct FtpStorageFactory { + url: String, +} + +impl FtpStorageFactory { + pub fn new(url: String) -> Self { + Self { url } + } +} + +// impl<'a> StorageFactory for FtpStorageFactory { +impl StorageFactory for FtpStorageFactory { + // type Storage = FtpStorage<'a>; + type Storage = Box; + + fn create( + &self, + // shared: &'a ManagedTorrentShared, + // metadata: &'a TorrentMetadata + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata + // ) -> anyhow::Result> { + ) -> anyhow::Result { + // log::info!("Creating FtpStorage for URL: {}", self.url); + + // let fs = RemoteFs::from_url(&self.url) + // .map_err(|e| anyhow::anyhow!("remotefs error: {e}"))?; + + // https://github.com/veeso/termscp/blob/05830db206605f60ce54e23ca5df7de02115f491/src/filetransfer/remotefs_builder.rs#L109-L13 + let secure = true; + let parsed = Url::parse(&self.url)?; + let protocol = parsed.scheme().to_string(); + if (protocol != "ftp") && (protocol != "ftps") { + anyhow::bail!("FtpStorageFactory only supports ftp:// and ftps:// URLs, got: {protocol}"); + } + let hostname = parsed.host_str().ok_or_else(|| anyhow::anyhow!("Missing host"))?.to_string(); + let port = parsed.port().unwrap_or(21); + let username = if parsed.username().is_empty() { + None + } else { + Some(parsed.username().to_string()) + }; + let password = parsed.password().map(|s| s.to_string()); + let path = parsed.path().to_string(); + let mut fs = FtpFs::new(hostname, port).passive_mode(); + if let Some(_username) = username { + fs = fs.username(_username); + } + if let Some(_password) = password { + fs = fs.password(_password); + } + if secure { + fs = fs.secure(true, true); + } + + +impl FtpStorage { + pub fn new(host: String, port: u16, username: String, password: String) -> Self { + Self { host, port, username, password } + } + +} + + + Ok(Box::new(FtpStorage { + fs: Arc::new(fs), + shared, + metadata, + })) + } + + fn clone_box(&self) -> Box + 'static> { + Box::new(Self { + url: self.url.clone(), + }) + } +} + +pub struct FtpStorage<'a> { + // fs: Arc, + shared: &'a ManagedTorrentShared, + metadata: &'a TorrentMetadata, + host: String, + port: u16, + username: String, + password: String, +} + +// WONTFIX remotefs is not thread-safe, so we cannot implement Sync or Send for FtpStorage +// the trait `Sync` is not implemented for `(dyn RemoteFs + 'static)` +// https://github.com/remotefs-rs/remotefs-rs/pull/21 +impl<'a> TorrentStorage for FtpStorage<'a> { + async fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + let file_details = self.metadata.file_infos[file_id].clone(); + + let relative_path = &file_details.relative_filename; + + // let mut full_path = self.output_folder.clone(); + // full_path.push(relative_path); + + if file_details.attrs.padding { + log::info!("Skipping read for padding file: {:?}", relative_path); + return Ok(()); + }; + log::info!("FtpStorage pread_exact: file_id={}, offset={}, len={}, path={:?}", file_id, offset, buf.len(), relative_path); + + /* + // TODO prepend "/" before relative_path? + // NOTE only some remotefs backends support "open" + let mut file = self.fs.open(relative_path) + .map_err(|e| anyhow::anyhow!("remotefs open error: {e}"))?; + file.seek(SeekFrom::Start(offset)) + .map_err(|e| anyhow::anyhow!("remotefs seek error: {e}"))?; + let mut read_total = 0; + while read_total < buf.len() { + let n = file.read(&mut buf[read_total..]) + .map_err(|e| anyhow::anyhow!("remotefs read error: {e}"))?; + if n == 0 { + return Err(anyhow::anyhow!( + "remotefs: unexpected EOF (read {} of {} bytes)", + read_total, + buf.len() + )); + } + read_total += n; + } + */ + let addr: String = format!("{}:{}", self.host, self.port); + let mut ftp = FtpStream::connect(addr).await?; + ftp.login(&self.username, &self.password).await?; + let mut reader = ftp.retr(relative_path).await?; + reader.read_exact(&mut vec![0u8; offset as usize]).await?; // skip to offset + reader.read_exact(buf).await?; + ftp.quit().await?; + Ok(()) + } + + fn pwrite_all(&self, _file_id: usize, _offset: u64, _buf: &[u8]) -> anyhow::Result<()> { + anyhow::bail!("FtpStorage is read-only: pwrite_all is not implemented"); + } + + fn remove_file(&self, _file_id: usize, _filename: &std::path::Path) -> anyhow::Result<()> { + anyhow::bail!("FtpStorage is read-only: remove_file is not implemented"); + } + + fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> { + anyhow::bail!("FtpStorage is read-only: remove_directory_if_empty is not implemented"); + } + + fn ensure_file_length(&self, _file_id: usize, _length: u64) -> anyhow::Result<()> { + anyhow::bail!("FtpStorage is read-only: ensure_file_length is not implemented"); + } + + fn take(&self) -> anyhow::Result> { + anyhow::bail!("FtpStorage take is not implemented") + } + + fn init( + &mut self, + _shared: &ManagedTorrentShared, + _metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + // No-op for FtpStorage, as it is read-only and uses references. + // let mut files = Vec::::new(); + // for file_details in metadata.file_infos.iter() { + // let mut full_path = self.output_folder.clone(); + // let relative_path = &file_details.relative_filename; + // full_path.push(relative_path); + + // if file_details.attrs.padding { + // files.push(OpenedFile::new_dummy()); + // continue; + // }; + // std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; + // if shared.options.allow_overwrite { + // let (file, writeable) = match + // OpenOptions::new() + // .create(true) + // .truncate(false) + // .read(true) + // .write(true) + // .open(&full_path) + // { + // Ok(file) => (file, true), + // Err(e) => { + // warn!(?full_path, "error opening file in create+write mode: {e:?}"); + // // open the file in read-only mode, will reopen in write mode later. + // ( + // OpenOptions::new() + // .create(false) + // .read(true) + // .open(&full_path) + // .with_context(|| format!("error opening {full_path:?}"))?, + // false, + // ) + // } + // }; + // files.push(OpenedFile::new(full_path.clone(), file, writeable)); + // } else { + // // create_new does not seem to work with read(true), so calling this twice. + // let file = OpenOptions::new() + // .create_new(true) + // .write(true) + // .open(&full_path) + // .with_context(|| { + // format!( + // "error creating a new file (because allow_overwrite = false) {:?}", + // &full_path + // ) + // })?; + // OpenOptions::new().read(true).write(true).open(&full_path)?; + // let writeable = true; + // files.push(OpenedFile::new(full_path.clone(), file, writeable)); + // }; + // } + + // self.opened_files = files; + Ok(()) + } +} + + + diff --git a/crates/librqbit/src/storage/middleware/mod.rs b/crates/librqbit/src/storage/middleware/mod.rs index 898e493d8..c980243f3 100644 --- a/crates/librqbit/src/storage/middleware/mod.rs +++ b/crates/librqbit/src/storage/middleware/mod.rs @@ -1,4 +1,4 @@ pub mod slow; pub mod timing; pub mod write_through_cache; -pub mod remotefs; +pub mod ftp; From 43d429c4e990de65374d61c2e76fd4d41ebbe0e8 Mon Sep 17 00:00:00 2001 From: Milan Hauth Date: Sun, 5 Oct 2025 12:57:58 +0000 Subject: [PATCH 7/7] DRAFT convert to async --- crates/librqbit/src/file_ops.rs | 18 +++-- crates/librqbit/src/session.rs | 79 ++++++++++++-------- crates/librqbit/src/storage/filesystem/fs.rs | 16 ++-- crates/librqbit/src/storage/mod.rs | 5 +- 4 files changed, 65 insertions(+), 53 deletions(-) diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 93ed758d7..7572269f6 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -19,7 +19,7 @@ use crate::{ type_aliases::{BF, FileInfos, PeerHandle}, }; -pub fn update_hash_from_file( +pub async fn update_hash_from_file( file_id: usize, file_info: &FileInfo, mut pos: u64, @@ -35,7 +35,7 @@ pub fn update_hash_from_file( buf[..chunk].fill(0); } else { files - .pread_exact(file_id, pos, &mut buf[..chunk]) + .pread_exact(file_id, pos, &mut buf[..chunk]).await .with_context(|| { format!("failed reading chunk of size {chunk}, read so far {read}") })?; @@ -70,7 +70,7 @@ impl<'a> FileOps<'a> { } // Returns the bitvector with pieces we have. - pub fn initial_check(&self, progress: &AtomicU64) -> anyhow::Result { + pub async fn initial_check(&self, progress: &AtomicU64) -> anyhow::Result { let mut have_pieces = BF::from_boxed_slice(vec![0u8; self.torrent.lengths().piece_bitfield_bytes()].into()); let mut piece_files = Vec::::new(); @@ -144,7 +144,7 @@ impl<'a> FileOps<'a> { &mut computed_hash, &mut read_buffer, to_read_in_file, - ) { + ).await { debug!( "error reading from file {} ({:?}) at {}: {:#}", current_file.index, current_file.fi.relative_filename, pos, &err @@ -175,7 +175,7 @@ impl<'a> FileOps<'a> { Ok(have_pieces) } - pub fn check_piece(&self, piece_index: ValidPieceIndex) -> anyhow::Result { + pub async fn check_piece(&self, piece_index: ValidPieceIndex) -> anyhow::Result { if cfg!(feature = "_disable_disk_write_net_benchmark") { return Ok(true); } @@ -209,7 +209,7 @@ impl<'a> FileOps<'a> { &mut h, &mut buf, to_read_in_file, - ) + ).await .with_context(|| { format!( "error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{:?}\")", @@ -252,7 +252,7 @@ impl<'a> FileOps<'a> { } } - pub fn read_chunk( + pub async fn read_chunk( &self, who_sent: PeerHandle, chunk_info: &ChunkInfo, @@ -282,6 +282,7 @@ impl<'a> FileOps<'a> { } else { self.files .pread_exact(file_idx, absolute_offset, &mut buf[..to_read_in_file]) + .await .with_context(|| { format!("error reading {file_idx} bytes, file_id: {to_read_in_file}") })?; @@ -299,7 +300,7 @@ impl<'a> FileOps<'a> { Ok(()) } - pub fn write_chunk( + pub async fn write_chunk( &self, who_sent: PeerHandle, data: &Piece>, @@ -334,6 +335,7 @@ impl<'a> FileOps<'a> { let written = self .files .pwrite_all_vectored(file_idx, absolute_offset, slices) + .await .with_context(|| { format!( "error writing to file {file_idx} (\"{:?}\")", diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 2e5bf6608..da893f3b9 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -28,7 +28,7 @@ use crate::{ session_stats::SessionStats, spawn_utils::BlockingSpawner, storage::{ - BoxStorageFactory, StorageFactoryExt, TorrentStorage, filesystem::FilesystemStorageFactory, + BoxStorageFactory, TorrentStorage, filesystem::FilesystemStorageFactory, }, stream_connect::{ ConnectionKind, ConnectionOptions, SocksProxyConfig, StreamConnector, StreamConnectorArgs, @@ -39,6 +39,8 @@ use crate::{ }, type_aliases::{BoxAsyncReadVectored, BoxAsyncWrite, DiskWorkQueueSender, PeerStream}, }; + +use crate::storage::StorageFactoryExt; use anyhow::{Context, bail}; use arc_swap::ArcSwapOption; use bencode::bencode_serialize_to_writer; @@ -75,7 +77,7 @@ use tracker_comms::{TrackerComms, UdpTrackerClient}; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; #[cfg(feature = "storage_middleware")] -use crate::storage::middleware::remotefs::{RemoteFsStorageFactory, REMOTEFS_PROTOCOLS}; +use crate::storage::middleware::ftp::{FtpStorageFactory, FTP_PROTOCOLS}; pub type TorrentId = usize; @@ -1220,22 +1222,22 @@ impl Session { .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()); #[cfg(feature = "storage_middleware")] - let storage_factory = if let Some(folder_str) = output_folder.to_str() { - if REMOTEFS_PROTOCOLS.iter().any(|p| folder_str.starts_with(p)) { - Box::new(RemoteFsStorageFactory::new(folder_str.to_string())) - as BoxStorageFactory - } else { - opts.storage_factory - .take() - .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) - .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()) - } - } else { + let mut get_default_factory = || { opts.storage_factory .take() .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()) }; + #[cfg(feature = "storage_middleware")] + let storage_factory = if let Some(folder_str) = output_folder.to_str() { + if FTP_PROTOCOLS.iter().any(|p| folder_str.starts_with(p)) { + Box::new(FtpStorageFactory::new(folder_str.to_string())) as BoxStorageFactory + } else { + get_default_factory() + } + } else { + get_default_factory() + }; let id = if let Some(id) = opts.preferred_id { id @@ -1290,7 +1292,7 @@ impl Session { minfo.clone(), metadata.clone(), only_files.clone(), - minfo.storage_factory.create_and_init(&minfo, &metadata)?, + minfo.storage_factory.create_and_init(&minfo, &metadata).await?, false, )); let handle = Arc::new(ManagedTorrent { @@ -1371,28 +1373,36 @@ impl Session { let metadata = removed.metadata.load_full().expect("TODO"); - let storage = removed - .with_state_mut(|s| match s.take() { - ManagedTorrentState::Initializing(p) => p.files.take().ok(), - ManagedTorrentState::Paused(p) => Some(p.files), - ManagedTorrentState::Live(l) => l - .pause() - // inspect_err not available in 1.75 - .map_err(|e| { - warn!(?id, "error pausing torrent: {e:#}"); - e - }) - .ok() - .map(|p| p.files), - _ => None, - }) - .map(Ok) - .unwrap_or_else(|| { + let storage_result = removed.with_state_mut(|s| { + let id = id; + async move { + match s.take() { + ManagedTorrentState::Initializing(p) => p.files.take().await.ok(), + ManagedTorrentState::Paused(p) => Some(p.files), + ManagedTorrentState::Live(l) => l + .pause() + // inspect_err not available in 1.75 + .map_err(|e| { + warn!(?id, "error pausing torrent: {e:#}"); + e + }) + .ok() + .map(|p| p.files), + _ => None, + } + } + }).await; + + let storage: Result, _> = match storage_result { + Ok(opt) => opt, + Err(_) => Ok( removed .shared .storage_factory .create(removed.shared(), &metadata) - }); + .map(Some)? + ), + }; if let Some(p) = self.persistence.as_ref() { if let Err(e) = p.delete(id).await { @@ -1407,7 +1417,7 @@ impl Session { match (storage, delete_files) { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), - (Ok(storage), true) => { + (Ok(Some(storage)), true) => { debug!("will delete files"); remove_files_and_dirs(&metadata.file_infos, &storage); if removed.shared().options.output_folder != self.output_folder @@ -1420,6 +1430,9 @@ impl Session { ) } } + (Ok(None), true) => { + warn!("no storage found to delete files"); + } (_, false) => { debug!("not deleting files") } diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index db841e56a..fecf863d7 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -57,7 +57,7 @@ impl FilesystemStorage { } impl TorrentStorage for FilesystemStorage { - fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + async fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { self.opened_files .get(file_id) .context("no such file")? @@ -65,7 +65,7 @@ impl TorrentStorage for FilesystemStorage { .pread_exact(offset, buf) } - fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + async fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { let of = self.opened_files.get(file_id).context("no such file")?; of.ensure_writeable()?; #[cfg(windows)] @@ -74,7 +74,7 @@ impl TorrentStorage for FilesystemStorage { return of.lock_read()?.pwrite_all(offset, buf); } - fn pwrite_all_vectored( + async fn pwrite_all_vectored( &self, file_id: usize, offset: u64, @@ -87,18 +87,18 @@ impl TorrentStorage for FilesystemStorage { return of.lock_read()?.pwrite_all_vectored(offset, bufs); } - fn remove_file(&self, _file_id: usize, filename: &Path) -> anyhow::Result<()> { + async fn remove_file(&self, _file_id: usize, filename: &Path) -> anyhow::Result<()> { Ok(std::fs::remove_file(self.output_folder.join(filename))?) } - fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { + async fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { let f = &self.opened_files.get(file_id).context("no such file")?; #[cfg(windows)] f.try_mark_sparse()?; Ok(f.lock_read()?.set_len(len)?) } - fn take(&self) -> anyhow::Result> { + async fn take(&self) -> anyhow::Result> { Ok(Box::new(Self { opened_files: self .opened_files @@ -109,7 +109,7 @@ impl TorrentStorage for FilesystemStorage { })) } - fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { + async fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { let path = self.output_folder.join(path); if !path.is_dir() { anyhow::bail!("cannot remove dir: {path:?} is not a directory") @@ -122,7 +122,7 @@ impl TorrentStorage for FilesystemStorage { } } - fn init( + async fn init( &mut self, shared: &ManagedTorrentShared, metadata: &TorrentMetadata, diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index a0c1623ed..304ac4b10 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -17,6 +17,7 @@ use librqbit_core::lengths::ValidPieceIndex; use crate::torrent_state::{ManagedTorrentShared, TorrentMetadata}; +#[async_trait] pub trait StorageFactory: Send + Sync + Any { type Storage: TorrentStorage; @@ -30,11 +31,7 @@ pub trait StorageFactory: Send + Sync + Any { Self::type_id(self) == type_id } fn clone_box(&self) -> BoxStorageFactory; -} -/// Extension trait for async methods on StorageFactory. -#[async_trait] -pub trait StorageFactoryAsyncExt: StorageFactory { async fn create_and_init( &self, shared: &ManagedTorrentShared,