diff --git a/Cargo.lock b/Cargo.lock index 05e3b7a45..8038d215c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -224,6 +224,20 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "async_ftp" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d101b45e744a5893bba920ed28f0b0f3d7a9c929e97f9cf2117bba5accadb76b" +dependencies = [ + "chrono", + "lazy_static", + "pin-project", + "regex", + "tokio", + "tokio-rustls 0.23.4", +] + [[package]] name = "atk" version = "0.18.2" @@ -2381,10 +2395,10 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls", + "rustls 0.23.32", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower-service", "webpki-roots", ] @@ -2944,6 +2958,7 @@ dependencies = [ "async-compression", "async-stream", "async-trait", + "async_ftp", "axum 0.8.4", "axum-extra", "backon", @@ -4590,7 +4605,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.32", "socket2 0.6.0", "thiserror 2.0.16", "tokio", @@ -4608,9 +4623,9 @@ dependencies = [ "getrandom 0.3.3", "lru-slab", "rand 0.9.2", - "ring", + "ring 0.17.14", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.32", "rustls-pki-types", "slab", "thiserror 2.0.16", @@ -4900,7 +4915,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.32", "rustls-pki-types", "serde", "serde_json", @@ -4908,7 +4923,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.2", "tower-http", @@ -4921,6 +4936,21 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted 0.7.1", + "web-sys", + "winapi", +] + [[package]] name = "ring" version = "0.17.14" @@ -5050,6 +5080,18 @@ dependencies = [ "windows-sys 0.61.1", ] +[[package]] +name = "rustls" +version = "0.20.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" +dependencies = [ + "log", + "ring 0.16.20", + "sct", + "webpki", +] + [[package]] name = "rustls" version = "0.23.32" @@ -5057,7 +5099,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd3c25631629d034ce7cd9940adc9d45762d46de2b0f57193c4443b92c6d4d40" dependencies = [ "once_cell", - "ring", + "ring 0.17.14", "rustls-pki-types", "rustls-webpki", "subtle", @@ -5080,7 +5122,7 @@ version = "0.103.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8572f3c2cb9934231157b45499fc41e1f58c589fdfb81a844ba873265e80f8eb" dependencies = [ - "ring", + "ring 0.17.14", "rustls-pki-types", "untrusted 0.9.0", ] @@ -5178,6 +5220,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring 0.17.14", + "untrusted 0.9.0", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -5608,6 +5660,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spinning_top" version = "0.3.0" @@ -6358,13 +6416,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls 0.20.9", + "tokio", + "webpki", +] + [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls", + "rustls 0.23.32", "tokio", ] @@ -7144,6 +7213,16 @@ dependencies = [ "system-deps", ] +[[package]] +name = "webpki" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" +dependencies = [ + "ring 0.17.14", + "untrusted 0.9.0", +] + [[package]] name = "webpki-roots" version = "1.0.2" diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 8e0487659..bc2550a2f 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -129,6 +129,8 @@ librqbit-dualstack-sockets = { version = "0.6.11", features = ["axum"] } socket2 = "0.6" nix = { version = "0.30.1", features = ["uio"] } thiserror = "2.0.12" +# https://lib.rs/crates/async_ftp +async_ftp = { version = "6.0.0", features = ["secure"] } [target.'cfg(windows)'.dependencies] windows = { version = "0.62.1", features = [ diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 93ed758d7..7572269f6 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -19,7 +19,7 @@ use crate::{ type_aliases::{BF, FileInfos, PeerHandle}, }; -pub fn update_hash_from_file( +pub async fn update_hash_from_file( file_id: usize, file_info: &FileInfo, mut pos: u64, @@ -35,7 +35,7 @@ pub fn update_hash_from_file( buf[..chunk].fill(0); } else { files - .pread_exact(file_id, pos, &mut buf[..chunk]) + .pread_exact(file_id, pos, &mut buf[..chunk]).await .with_context(|| { format!("failed reading chunk of size {chunk}, read so far {read}") })?; @@ -70,7 +70,7 @@ impl<'a> FileOps<'a> { } // Returns the bitvector with pieces we have. - pub fn initial_check(&self, progress: &AtomicU64) -> anyhow::Result { + pub async fn initial_check(&self, progress: &AtomicU64) -> anyhow::Result { let mut have_pieces = BF::from_boxed_slice(vec![0u8; self.torrent.lengths().piece_bitfield_bytes()].into()); let mut piece_files = Vec::::new(); @@ -144,7 +144,7 @@ impl<'a> FileOps<'a> { &mut computed_hash, &mut read_buffer, to_read_in_file, - ) { + ).await { debug!( "error reading from file {} ({:?}) at {}: {:#}", current_file.index, current_file.fi.relative_filename, pos, &err @@ -175,7 +175,7 @@ impl<'a> FileOps<'a> { Ok(have_pieces) } - pub fn check_piece(&self, piece_index: ValidPieceIndex) -> anyhow::Result { + pub async fn check_piece(&self, piece_index: ValidPieceIndex) -> anyhow::Result { if cfg!(feature = "_disable_disk_write_net_benchmark") { return Ok(true); } @@ -209,7 +209,7 @@ impl<'a> FileOps<'a> { &mut h, &mut buf, to_read_in_file, - ) + ).await .with_context(|| { format!( "error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{:?}\")", @@ -252,7 +252,7 @@ impl<'a> FileOps<'a> { } } - pub fn read_chunk( + pub async fn read_chunk( &self, who_sent: PeerHandle, chunk_info: &ChunkInfo, @@ -282,6 +282,7 @@ impl<'a> FileOps<'a> { } else { self.files .pread_exact(file_idx, absolute_offset, &mut buf[..to_read_in_file]) + .await .with_context(|| { format!("error reading {file_idx} bytes, file_id: {to_read_in_file}") })?; @@ -299,7 +300,7 @@ impl<'a> FileOps<'a> { Ok(()) } - pub fn write_chunk( + pub async fn write_chunk( &self, who_sent: PeerHandle, data: &Piece>, @@ -334,6 +335,7 @@ impl<'a> FileOps<'a> { let written = self .files .pwrite_all_vectored(file_idx, absolute_offset, slices) + .await .with_context(|| { format!( "error writing to file {file_idx} (\"{:?}\")", diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 3c85964cd..da893f3b9 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -28,7 +28,7 @@ use crate::{ session_stats::SessionStats, spawn_utils::BlockingSpawner, storage::{ - BoxStorageFactory, StorageFactoryExt, TorrentStorage, filesystem::FilesystemStorageFactory, + BoxStorageFactory, TorrentStorage, filesystem::FilesystemStorageFactory, }, stream_connect::{ ConnectionKind, ConnectionOptions, SocksProxyConfig, StreamConnector, StreamConnectorArgs, @@ -39,6 +39,8 @@ use crate::{ }, type_aliases::{BoxAsyncReadVectored, BoxAsyncWrite, DiskWorkQueueSender, PeerStream}, }; + +use crate::storage::StorageFactoryExt; use anyhow::{Context, bail}; use arc_swap::ArcSwapOption; use bencode::bencode_serialize_to_writer; @@ -74,6 +76,9 @@ use tracker_comms::{TrackerComms, UdpTrackerClient}; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; +#[cfg(feature = "storage_middleware")] +use crate::storage::middleware::ftp::{FtpStorageFactory, FTP_PROTOCOLS}; + pub type TorrentId = usize; struct ParsedTorrentFile { @@ -1210,11 +1215,29 @@ impl Session { })); } + #[cfg(not(feature = "storage_middleware"))] let storage_factory = opts .storage_factory .take() .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()); + #[cfg(feature = "storage_middleware")] + let mut get_default_factory = || { + opts.storage_factory + .take() + .or_else(|| self.default_storage_factory.as_ref().map(|f| f.clone_box())) + .unwrap_or_else(|| FilesystemStorageFactory::default().boxed()) + }; + #[cfg(feature = "storage_middleware")] + let storage_factory = if let Some(folder_str) = output_folder.to_str() { + if FTP_PROTOCOLS.iter().any(|p| folder_str.starts_with(p)) { + Box::new(FtpStorageFactory::new(folder_str.to_string())) as BoxStorageFactory + } else { + get_default_factory() + } + } else { + get_default_factory() + }; let id = if let Some(id) = opts.preferred_id { id @@ -1269,7 +1292,7 @@ impl Session { minfo.clone(), metadata.clone(), only_files.clone(), - minfo.storage_factory.create_and_init(&minfo, &metadata)?, + minfo.storage_factory.create_and_init(&minfo, &metadata).await?, false, )); let handle = Arc::new(ManagedTorrent { @@ -1350,28 +1373,36 @@ impl Session { let metadata = removed.metadata.load_full().expect("TODO"); - let storage = removed - .with_state_mut(|s| match s.take() { - ManagedTorrentState::Initializing(p) => p.files.take().ok(), - ManagedTorrentState::Paused(p) => Some(p.files), - ManagedTorrentState::Live(l) => l - .pause() - // inspect_err not available in 1.75 - .map_err(|e| { - warn!(?id, "error pausing torrent: {e:#}"); - e - }) - .ok() - .map(|p| p.files), - _ => None, - }) - .map(Ok) - .unwrap_or_else(|| { + let storage_result = removed.with_state_mut(|s| { + let id = id; + async move { + match s.take() { + ManagedTorrentState::Initializing(p) => p.files.take().await.ok(), + ManagedTorrentState::Paused(p) => Some(p.files), + ManagedTorrentState::Live(l) => l + .pause() + // inspect_err not available in 1.75 + .map_err(|e| { + warn!(?id, "error pausing torrent: {e:#}"); + e + }) + .ok() + .map(|p| p.files), + _ => None, + } + } + }).await; + + let storage: Result, _> = match storage_result { + Ok(opt) => opt, + Err(_) => Ok( removed .shared .storage_factory .create(removed.shared(), &metadata) - }); + .map(Some)? + ), + }; if let Some(p) = self.persistence.as_ref() { if let Err(e) = p.delete(id).await { @@ -1386,7 +1417,7 @@ impl Session { match (storage, delete_files) { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), - (Ok(storage), true) => { + (Ok(Some(storage)), true) => { debug!("will delete files"); remove_files_and_dirs(&metadata.file_infos, &storage); if removed.shared().options.output_folder != self.output_folder @@ -1399,6 +1430,9 @@ impl Session { ) } } + (Ok(None), true) => { + warn!("no storage found to delete files"); + } (_, false) => { debug!("not deleting files") } diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index 5fbd4bd69..fecf863d7 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -57,7 +57,7 @@ impl FilesystemStorage { } impl TorrentStorage for FilesystemStorage { - fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + async fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { self.opened_files .get(file_id) .context("no such file")? @@ -65,15 +65,16 @@ impl TorrentStorage for FilesystemStorage { .pread_exact(offset, buf) } - fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + async fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { let of = self.opened_files.get(file_id).context("no such file")?; + of.ensure_writeable()?; #[cfg(windows)] return of.try_mark_sparse()?.pwrite_all(offset, buf); #[cfg(not(windows))] return of.lock_read()?.pwrite_all(offset, buf); } - fn pwrite_all_vectored( + async fn pwrite_all_vectored( &self, file_id: usize, offset: u64, @@ -86,18 +87,18 @@ impl TorrentStorage for FilesystemStorage { return of.lock_read()?.pwrite_all_vectored(offset, bufs); } - fn remove_file(&self, _file_id: usize, filename: &Path) -> anyhow::Result<()> { + async fn remove_file(&self, _file_id: usize, filename: &Path) -> anyhow::Result<()> { Ok(std::fs::remove_file(self.output_folder.join(filename))?) } - fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { + async fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { let f = &self.opened_files.get(file_id).context("no such file")?; #[cfg(windows)] f.try_mark_sparse()?; Ok(f.lock_read()?.set_len(len)?) } - fn take(&self) -> anyhow::Result> { + async fn take(&self) -> anyhow::Result> { Ok(Box::new(Self { opened_files: self .opened_files @@ -108,7 +109,7 @@ impl TorrentStorage for FilesystemStorage { })) } - fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { + async fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { let path = self.output_folder.join(path); if !path.is_dir() { anyhow::bail!("cannot remove dir: {path:?} is not a directory") @@ -121,7 +122,7 @@ impl TorrentStorage for FilesystemStorage { } } - fn init( + async fn init( &mut self, shared: &ManagedTorrentShared, metadata: &TorrentMetadata, @@ -137,17 +138,33 @@ impl TorrentStorage for FilesystemStorage { continue; }; std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; - let f = if shared.options.allow_overwrite { + if shared.options.allow_overwrite { + let (file, writeable) = match OpenOptions::new() .create(true) .truncate(false) .read(true) .write(true) .open(&full_path) - .with_context(|| format!("error opening {full_path:?} in read/write mode"))? + { + Ok(file) => (file, true), + Err(e) => { + warn!(?full_path, "error opening file in create+write mode: {e:?}"); + // open the file in read-only mode, will reopen in write mode later. + ( + OpenOptions::new() + .create(false) + .read(true) + .open(&full_path) + .with_context(|| format!("error opening {full_path:?}"))?, + false, + ) + } + }; + files.push(OpenedFile::new(full_path.clone(), file, writeable)); } else { // create_new does not seem to work with read(true), so calling this twice. - OpenOptions::new() + let file = OpenOptions::new() .create_new(true) .write(true) .open(&full_path) @@ -157,9 +174,10 @@ impl TorrentStorage for FilesystemStorage { &full_path ) })?; - OpenOptions::new().read(true).write(true).open(&full_path)? + OpenOptions::new().read(true).write(true).open(&full_path)?; + let writeable = true; + files.push(OpenedFile::new(full_path.clone(), file, writeable)); }; - files.push(OpenedFile::new(full_path.clone(), f)); } self.opened_files = files; diff --git a/crates/librqbit/src/storage/filesystem/opened_file.rs b/crates/librqbit/src/storage/filesystem/opened_file.rs index 404b3ab67..a31df6f5e 100644 --- a/crates/librqbit/src/storage/filesystem/opened_file.rs +++ b/crates/librqbit/src/storage/filesystem/opened_file.rs @@ -3,6 +3,7 @@ use std::{ io::IoSlice, ops::{Deref, DerefMut}, path::PathBuf, + sync::atomic::{AtomicBool, Ordering}, }; use anyhow::Context; @@ -124,34 +125,65 @@ impl DerefMut for OpenedFileLocked { #[derive(Debug)] pub(crate) struct OpenedFile { + path: PathBuf, file: RwLock, + is_writeable: AtomicBool, } impl OpenedFile { - pub fn new(path: PathBuf, f: File) -> Self { + pub fn new(path: PathBuf, f: File, is_writeable: bool) -> Self { Self { + path: path.clone(), file: RwLock::new(OpenedFileLocked { path, fd: Some(f), #[cfg(windows)] tried_marking_sparse: false, }), + is_writeable: AtomicBool::new(is_writeable), } } pub fn new_dummy() -> Self { Self { + path: PathBuf::from(""), file: RwLock::new(Default::default()), + is_writeable: AtomicBool::new(false), } } pub fn take_clone(&self) -> anyhow::Result { let f = std::mem::take(&mut *self.file.write()); Ok(Self { + path: self.path.clone(), file: RwLock::new(f), + is_writeable: AtomicBool::new(self.is_writeable.load(Ordering::SeqCst)), }) } + pub fn ensure_writeable(&self) -> anyhow::Result<()> { + match self + .is_writeable + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + { + Ok(_) => { + // Updated, need to reopen writeable + let mut file = self.file.write(); + let new_fd = std::fs::OpenOptions::new() + .write(true) + .create(false) + .open(&self.path) + .with_context(|| format!("error opening {:?} in write mode", self.path))?; + file.fd = Some(new_fd); + } + Err(_) => { + // Didn't update, no need to reopen + } + } + + Ok(()) + } + pub fn lock_read(&self) -> crate::Result> { RwLockReadGuard::try_map(self.file.read(), |f| f.as_ref()) .ok() diff --git a/crates/librqbit/src/storage/middleware/ftp.rs b/crates/librqbit/src/storage/middleware/ftp.rs new file mode 100644 index 000000000..65c472797 --- /dev/null +++ b/crates/librqbit/src/storage/middleware/ftp.rs @@ -0,0 +1,243 @@ +use std::io::Cursor; +use async_ftp::FtpStream; +use url::Url; +use async_ftp::FtpStream; +use tokio::io::AsyncReadExt; +use url::Url; + +use std::io::{Read, Seek, SeekFrom}; +use std::sync::Arc; + +use crate::storage::{StorageFactory, TorrentStorage}; +use crate::torrent_state::TorrentMetadata; +use crate::ManagedTorrentShared; + +pub const FTP_PROTOCOLS: &[&str] = &[ + // "sftp://", // ? + "ftp://", + "ftps://", +]; + +pub struct FtpStorageFactory { + url: String, +} + +impl FtpStorageFactory { + pub fn new(url: String) -> Self { + Self { url } + } +} + +// impl<'a> StorageFactory for FtpStorageFactory { +impl StorageFactory for FtpStorageFactory { + // type Storage = FtpStorage<'a>; + type Storage = Box; + + fn create( + &self, + // shared: &'a ManagedTorrentShared, + // metadata: &'a TorrentMetadata + shared: &ManagedTorrentShared, + metadata: &TorrentMetadata + // ) -> anyhow::Result> { + ) -> anyhow::Result { + // log::info!("Creating FtpStorage for URL: {}", self.url); + + // let fs = RemoteFs::from_url(&self.url) + // .map_err(|e| anyhow::anyhow!("remotefs error: {e}"))?; + + // https://github.com/veeso/termscp/blob/05830db206605f60ce54e23ca5df7de02115f491/src/filetransfer/remotefs_builder.rs#L109-L13 + let secure = true; + let parsed = Url::parse(&self.url)?; + let protocol = parsed.scheme().to_string(); + if (protocol != "ftp") && (protocol != "ftps") { + anyhow::bail!("FtpStorageFactory only supports ftp:// and ftps:// URLs, got: {protocol}"); + } + let hostname = parsed.host_str().ok_or_else(|| anyhow::anyhow!("Missing host"))?.to_string(); + let port = parsed.port().unwrap_or(21); + let username = if parsed.username().is_empty() { + None + } else { + Some(parsed.username().to_string()) + }; + let password = parsed.password().map(|s| s.to_string()); + let path = parsed.path().to_string(); + let mut fs = FtpFs::new(hostname, port).passive_mode(); + if let Some(_username) = username { + fs = fs.username(_username); + } + if let Some(_password) = password { + fs = fs.password(_password); + } + if secure { + fs = fs.secure(true, true); + } + + +impl FtpStorage { + pub fn new(host: String, port: u16, username: String, password: String) -> Self { + Self { host, port, username, password } + } + +} + + + Ok(Box::new(FtpStorage { + fs: Arc::new(fs), + shared, + metadata, + })) + } + + fn clone_box(&self) -> Box + 'static> { + Box::new(Self { + url: self.url.clone(), + }) + } +} + +pub struct FtpStorage<'a> { + // fs: Arc, + shared: &'a ManagedTorrentShared, + metadata: &'a TorrentMetadata, + host: String, + port: u16, + username: String, + password: String, +} + +// WONTFIX remotefs is not thread-safe, so we cannot implement Sync or Send for FtpStorage +// the trait `Sync` is not implemented for `(dyn RemoteFs + 'static)` +// https://github.com/remotefs-rs/remotefs-rs/pull/21 +impl<'a> TorrentStorage for FtpStorage<'a> { + async fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + let file_details = self.metadata.file_infos[file_id].clone(); + + let relative_path = &file_details.relative_filename; + + // let mut full_path = self.output_folder.clone(); + // full_path.push(relative_path); + + if file_details.attrs.padding { + log::info!("Skipping read for padding file: {:?}", relative_path); + return Ok(()); + }; + log::info!("FtpStorage pread_exact: file_id={}, offset={}, len={}, path={:?}", file_id, offset, buf.len(), relative_path); + + /* + // TODO prepend "/" before relative_path? + // NOTE only some remotefs backends support "open" + let mut file = self.fs.open(relative_path) + .map_err(|e| anyhow::anyhow!("remotefs open error: {e}"))?; + file.seek(SeekFrom::Start(offset)) + .map_err(|e| anyhow::anyhow!("remotefs seek error: {e}"))?; + let mut read_total = 0; + while read_total < buf.len() { + let n = file.read(&mut buf[read_total..]) + .map_err(|e| anyhow::anyhow!("remotefs read error: {e}"))?; + if n == 0 { + return Err(anyhow::anyhow!( + "remotefs: unexpected EOF (read {} of {} bytes)", + read_total, + buf.len() + )); + } + read_total += n; + } + */ + let addr: String = format!("{}:{}", self.host, self.port); + let mut ftp = FtpStream::connect(addr).await?; + ftp.login(&self.username, &self.password).await?; + let mut reader = ftp.retr(relative_path).await?; + reader.read_exact(&mut vec![0u8; offset as usize]).await?; // skip to offset + reader.read_exact(buf).await?; + ftp.quit().await?; + Ok(()) + } + + fn pwrite_all(&self, _file_id: usize, _offset: u64, _buf: &[u8]) -> anyhow::Result<()> { + anyhow::bail!("FtpStorage is read-only: pwrite_all is not implemented"); + } + + fn remove_file(&self, _file_id: usize, _filename: &std::path::Path) -> anyhow::Result<()> { + anyhow::bail!("FtpStorage is read-only: remove_file is not implemented"); + } + + fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> { + anyhow::bail!("FtpStorage is read-only: remove_directory_if_empty is not implemented"); + } + + fn ensure_file_length(&self, _file_id: usize, _length: u64) -> anyhow::Result<()> { + anyhow::bail!("FtpStorage is read-only: ensure_file_length is not implemented"); + } + + fn take(&self) -> anyhow::Result> { + anyhow::bail!("FtpStorage take is not implemented") + } + + fn init( + &mut self, + _shared: &ManagedTorrentShared, + _metadata: &TorrentMetadata, + ) -> anyhow::Result<()> { + // No-op for FtpStorage, as it is read-only and uses references. + // let mut files = Vec::::new(); + // for file_details in metadata.file_infos.iter() { + // let mut full_path = self.output_folder.clone(); + // let relative_path = &file_details.relative_filename; + // full_path.push(relative_path); + + // if file_details.attrs.padding { + // files.push(OpenedFile::new_dummy()); + // continue; + // }; + // std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; + // if shared.options.allow_overwrite { + // let (file, writeable) = match + // OpenOptions::new() + // .create(true) + // .truncate(false) + // .read(true) + // .write(true) + // .open(&full_path) + // { + // Ok(file) => (file, true), + // Err(e) => { + // warn!(?full_path, "error opening file in create+write mode: {e:?}"); + // // open the file in read-only mode, will reopen in write mode later. + // ( + // OpenOptions::new() + // .create(false) + // .read(true) + // .open(&full_path) + // .with_context(|| format!("error opening {full_path:?}"))?, + // false, + // ) + // } + // }; + // files.push(OpenedFile::new(full_path.clone(), file, writeable)); + // } else { + // // create_new does not seem to work with read(true), so calling this twice. + // let file = OpenOptions::new() + // .create_new(true) + // .write(true) + // .open(&full_path) + // .with_context(|| { + // format!( + // "error creating a new file (because allow_overwrite = false) {:?}", + // &full_path + // ) + // })?; + // OpenOptions::new().read(true).write(true).open(&full_path)?; + // let writeable = true; + // files.push(OpenedFile::new(full_path.clone(), file, writeable)); + // }; + // } + + // self.opened_files = files; + Ok(()) + } +} + + + diff --git a/crates/librqbit/src/storage/middleware/mod.rs b/crates/librqbit/src/storage/middleware/mod.rs index fb4bb3e2b..c980243f3 100644 --- a/crates/librqbit/src/storage/middleware/mod.rs +++ b/crates/librqbit/src/storage/middleware/mod.rs @@ -1,3 +1,4 @@ pub mod slow; pub mod timing; pub mod write_through_cache; +pub mod ftp; diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index 9bb9f82af..304ac4b10 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -12,10 +12,12 @@ use std::{ path::Path, }; +use async_trait::async_trait; use librqbit_core::lengths::ValidPieceIndex; use crate::torrent_state::{ManagedTorrentShared, TorrentMetadata}; +#[async_trait] pub trait StorageFactory: Send + Sync + Any { type Storage: TorrentStorage; @@ -24,22 +26,25 @@ pub trait StorageFactory: Send + Sync + Any { shared: &ManagedTorrentShared, metadata: &TorrentMetadata, ) -> anyhow::Result; - fn create_and_init( + + fn is_type_id(&self, type_id: TypeId) -> bool { + Self::type_id(self) == type_id + } + fn clone_box(&self) -> BoxStorageFactory; + + async fn create_and_init( &self, shared: &ManagedTorrentShared, metadata: &TorrentMetadata, ) -> anyhow::Result { let mut storage = self.create(shared, metadata)?; - storage.init(shared, metadata)?; + storage.init(shared, metadata).await?; Ok(storage) } - - fn is_type_id(&self, type_id: TypeId) -> bool { - Self::type_id(self) == type_id - } - fn clone_box(&self) -> BoxStorageFactory; } +impl StorageFactoryAsyncExt for T {} + pub type BoxStorageFactory = Box>>; pub trait StorageFactoryExt { @@ -93,9 +98,10 @@ impl StorageFactory for Box { } } +#[async_trait] pub trait TorrentStorage: Send + Sync { // Create/open files etc. - fn init( + async fn init( &mut self, shared: &ManagedTorrentShared, metadata: &TorrentMetadata, @@ -103,13 +109,13 @@ pub trait TorrentStorage: Send + Sync { /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// read buf.len() bytes into buf at offset. - fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()>; + async fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()>; /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// write buf.len() bytes into the file at offset. - fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()>; + async fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()>; - fn pwrite_all_vectored( + async fn pwrite_all_vectored( &self, file_id: usize, offset: u64, @@ -119,7 +125,7 @@ pub trait TorrentStorage: Send + Sync { let mut size = 0; for ioslice in bufs { - self.pwrite_all(file_id, offset, &ioslice)?; + self.pwrite_all(file_id, offset, &ioslice).await?; offset += ioslice.len() as u64; size += ioslice.len(); } @@ -128,58 +134,59 @@ pub trait TorrentStorage: Send + Sync { } /// Remove a file from the storage. If not supported, or it doesn't matter, just return Ok(()) - fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()>; + async fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()>; - fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()>; + async fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()>; /// E.g. for filesystem backend ensure that the file has a certain length, and grow/shrink as needed. - fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()>; + async fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()>; /// Replace the current storage with a dummy, and return a new one that should be used instead. /// This is used to make the underlying object useless when e.g. pausing the torrent. - fn take(&self) -> anyhow::Result>; + async fn take(&self) -> anyhow::Result>; /// Callback called every time a piece has completed and has been validated. /// Default implementation does nothing, but can be override in trait implementations. - fn on_piece_completed(&self, _piece_index: ValidPieceIndex) -> anyhow::Result<()> { + async fn on_piece_completed(&self, _piece_index: ValidPieceIndex) -> anyhow::Result<()> { Ok(()) } } +#[async_trait] impl TorrentStorage for Box { - fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { - (**self).pread_exact(file_id, offset, buf) + async fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + (**self).pread_exact(file_id, offset, buf).await } - fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { - (**self).pwrite_all(file_id, offset, buf) + async fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + (**self).pwrite_all(file_id, offset, buf).await } - fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()> { - (**self).remove_file(file_id, filename) + async fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()> { + (**self).remove_file(file_id, filename).await } - fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> { - (**self).ensure_file_length(file_id, length) + async fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> { + (**self).ensure_file_length(file_id, length).await } - fn take(&self) -> anyhow::Result> { - (**self).take() + async fn take(&self) -> anyhow::Result> { + (**self).take().await } - fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { - (**self).remove_directory_if_empty(path) + async fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { + (**self).remove_directory_if_empty(path).await } - fn init( + async fn init( &mut self, shared: &ManagedTorrentShared, metadata: &TorrentMetadata, ) -> anyhow::Result<()> { - (**self).init(shared, metadata) + (**self).init(shared, metadata).await } - fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> { - (**self).on_piece_completed(piece_id) + async fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> { + (**self).on_piece_completed(piece_id).await } } diff --git a/crates/librqbit_core/Cargo.toml b/crates/librqbit_core/Cargo.toml index 1e931dc29..0f6a7ecd7 100644 --- a/crates/librqbit_core/Cargo.toml +++ b/crates/librqbit_core/Cargo.toml @@ -2,6 +2,8 @@ name = "librqbit-core" version = "5.0.0" edition = "2024" +# https://github.com/rust-lang/rust/issues/53667 +rust-version = "1.88" description = "Important utilities used throughout librqbit useful for working with torrents." license = "Apache-2.0" documentation = "https://docs.rs/librqbit-core" diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 1a4f39a44..ffb494bd0 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -12,7 +12,7 @@ readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["default-tls", "postgres", "webui", "prometheus"] +default = ["default-tls", "postgres", "webui", "prometheus", "librqbit/storage_middleware"] openssl-vendored = ["openssl/vendored"] tokio-console = ["librqbit/tokio-console"] webui = ["librqbit/webui"]