Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,23 @@ edition.workspace = true
[lib]
path = "src/download_manager.rs"

[features]
default = ["reqwest"]
reqwest = ["dep:reqwest"]

[dependencies]
reqwest.workspace = true
reqwest = { workspace = true, optional = true }

tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
futures-core.workspace = true
futures-util.workspace = true

url.workspace = true
http.workspace = true
bytes.workspace = true

thiserror.workspace = true
anyhow.workspace = true
derive_builder.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion examples/usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path::PathBuf;

use bottles_download_manager::{DownloadManager, prelude::*};
use futures_util::StreamExt;
use reqwest::Url;
use url::Url;
// use std::fmt::Debug;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
Expand Down
57 changes: 57 additions & 0 deletions src/backend/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use thiserror::Error;

/// Transport-agnostic error type returned by [`super::DownloadBackend`] implementations.
///
/// Backends are responsible for converting their native error types into this
/// enum so that the rest of the download pipeline can reason about
/// retryability without coupling itself to any particular HTTP library.
#[derive(Debug, Error)]
pub enum BackendError {
/// TCP / socket-level connection to the remote host failed.
#[error("Connection failed: {0}")]
Connect(String),

/// The request or response exceeded a time limit.
#[error("Request timed out: {0}")]
Timeout(String),

/// A well-formed request was sent but could not be completed.
#[error("Request error: {0}")]
Request(String),

/// The server returned an HTTP 5xx status code.
#[error("HTTP server error {status}: {message}")]
ServerError { status: u16, message: String },

/// The operation was aborted via the download's [`tokio_util::sync::CancellationToken`].
#[error("Cancelled")]
Cancelled,

/// Any other transport-layer error that does not fit a specific category.
#[error("Network error: {0}")]
Other(String),
}

impl BackendError {
/// Returns `true` if the scheduler should schedule a retry after this error.
///
/// The following variants are considered **retryable** (transient failures
/// that have a reasonable chance of succeeding on a subsequent attempt):
///
/// - [`BackendError::Connect`]
/// - [`BackendError::Timeout`]
/// - [`BackendError::Request`]
/// - [`BackendError::ServerError`] (HTTP 5xx)
///
/// [`BackendError::Cancelled`] and [`BackendError::Other`] are **not**
/// retryable.
pub fn is_retryable(&self) -> bool {
matches!(
self,
BackendError::Connect(_)
| BackendError::Timeout(_)
| BackendError::Request(_)
| BackendError::ServerError { .. }
)
}
}
127 changes: 127 additions & 0 deletions src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
mod error;
#[cfg(feature = "reqwest")]
mod reqwest_backend;

pub use error::BackendError;
#[cfg(feature = "reqwest")]
pub use reqwest_backend::ReqwestBackend;

use crate::download::RemoteInfo;
use bytes::Bytes;
use futures_core::Stream;
use http::HeaderMap;
use std::{future::Future, pin::Pin};
use tokio_util::sync::CancellationToken;
use url::Url;

/// The response yielded by a successful [`DownloadBackend::fetch`] call.
///
/// Carries the optional total size advertised by the server and a pinned
/// stream of raw byte chunks.
pub struct BackendResponse {
/// Total byte count as reported by the server's `Content-Length` header,
/// or `None` when the server did not include one.
pub content_length: Option<u64>,

/// Stream of raw byte chunks arriving from the server.
///
/// Each item is `Ok(Bytes)` on success or `Err(BackendError)` when the
/// underlying transport encounters an error mid-stream.
pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, BackendError>> + Send + 'static>>,
}

/// Abstraction over the transport layer used to perform downloads.
///
/// Implement this trait to provide a custom HTTP (or non-HTTP) backend.
/// The [`ReqwestBackend`] provided behind the `reqwest` Cargo feature is the
/// built-in default.
///
/// # Object safety
///
/// The trait is object-safe when called through `Arc<dyn DownloadBackend>`.
/// Both methods return boxed futures so that the trait can be used as a
/// dynamic dispatch target without requiring `async_trait`.
///
/// # Cancellation
///
/// Both methods receive a [`CancellationToken`] for the in-flight download.
/// Implementations should honour cancellation as early as possible (e.g.
/// aborting the TCP connection rather than waiting for a response). When a
/// probe is cancelled `probe_head` should return `None`; when a fetch is
/// cancelled before the response headers arrive `fetch` should return
/// `Err(BackendError::Cancelled)`. Chunk-level cancellation during streaming
/// is handled by the worker via `tokio::select!` so backends are not required
/// to poll the token inside the returned stream.
///
/// # Example — minimal backend stub
///
/// ```rust,ignore
/// use std::{future::Future, pin::Pin};
/// use bytes::Bytes;
/// use http::HeaderMap;
/// use tokio_util::sync::CancellationToken;
/// use url::Url;
/// use bottles_download_manager::backend::{
/// BackendError, BackendResponse, DownloadBackend,
/// };
/// use bottles_download_manager::download::RemoteInfo;
///
/// struct MyBackend;
///
/// impl DownloadBackend for MyBackend {
/// fn probe_head<'a>(
/// &'a self,
/// _url: &'a Url,
/// _headers: &'a HeaderMap,
/// _cancel: &'a CancellationToken,
/// ) -> Pin<Box<dyn Future<Output = Option<RemoteInfo>> + Send + 'a>> {
/// Box::pin(async { None })
/// }
///
/// fn fetch<'a>(
/// &'a self,
/// _url: &'a Url,
/// _headers: &'a HeaderMap,
/// _cancel: &'a CancellationToken,
/// ) -> Pin<Box<dyn Future<Output = Result<BackendResponse, BackendError>> + Send + 'a>> {
/// Box::pin(async {
/// Err(BackendError::Other("not implemented".into()))
/// })
/// }
/// }
/// ```
pub trait DownloadBackend: Send + Sync + 'static {
/// Perform a best-effort `HEAD` probe and return metadata about the remote
/// resource.
///
/// Implementations should return `None` when:
/// - the server does not support `HEAD`,
/// - the response indicates an error,
/// - or the cancellation token is triggered before a response arrives.
///
/// Returning `None` is safe; the download will proceed without pre-flight
/// metadata and no [`crate::events::Event::Probed`] event will be emitted.
fn probe_head<'a>(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we rather abstract the HttpClient itself rather than abstracting specific requests?

What if we want to make other requests in the future, that means we would have to update this trait and and its implementor

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you propose we do that? I could use some ideas.

&'a self,
url: &'a Url,
headers: &'a HeaderMap,
cancel: &'a CancellationToken,
) -> Pin<Box<dyn Future<Output = Option<RemoteInfo>> + Send + 'a>>;

/// Issue a `GET` request and return a streaming response.
///
/// The implementation is responsible for:
/// - establishing the connection,
/// - reading and surfacing the `Content-Length` header (if present),
/// - returning a `Stream` of `Bytes` chunks.
///
/// Returns `Err(BackendError::Cancelled)` if the cancellation token fires
/// before the response headers are received. Mid-stream cancellation is
/// managed by the caller.
fn fetch<'a>(
&'a self,
url: &'a Url,
headers: &'a HeaderMap,
cancel: &'a CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<BackendResponse, BackendError>> + Send + 'a>>;
}
Loading