diff --git a/src/aws/builder.rs b/src/aws/builder.rs index 5a6d2b49..2a807fb3 100644 --- a/src/aws/builder.rs +++ b/src/aws/builder.rs @@ -26,7 +26,9 @@ use crate::aws::{ }; use crate::client::{HttpConnector, TokenCredentialProvider, http_connector}; use crate::config::ConfigValue; -use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider}; +use crate::{ + Capabilities, ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider, +}; use base64::Engine; use base64::prelude::BASE64_STANDARD; use itertools::Itertools; @@ -193,6 +195,8 @@ pub struct AmazonS3Builder { request_payer: ConfigValue, /// The [`HttpConnector`] to use http_connector: Option>, + /// Capabilities to advertise for this store instance + capabilities: Option>, } /// Configuration keys for [`AmazonS3Builder`] @@ -463,6 +467,9 @@ pub enum AmazonS3ConfigKey { /// Encryption options Encryption(S3EncryptionConfigKey), + + /// Override the capabilities advertised by this store. + Capabilities, } impl AsRef for AmazonS3ConfigKey { @@ -497,6 +504,7 @@ impl AsRef for AmazonS3ConfigKey { Self::RequestPayer => "aws_request_payer", Self::Client(opt) => opt.as_ref(), Self::Encryption(opt) => opt.as_ref(), + Self::Capabilities => "aws_capabilities", } } } @@ -557,6 +565,7 @@ impl FromStr for AmazonS3ConfigKey { "aws_sse_customer_key_base64" | "sse_customer_key_base64" => Ok(Self::Encryption( S3EncryptionConfigKey::CustomerEncryptionKey, )), + "aws_capabilities" => Ok(Self::Capabilities), _ => match s.strip_prefix("aws_").unwrap_or(s).parse() { Ok(key) => Ok(Self::Client(key)), Err(_) => Err(Error::UnknownConfigurationKey { key: s.into() }.into()), @@ -709,6 +718,9 @@ impl AmazonS3Builder { self.encryption_customer_key_base64 = Some(value.into()) } }, + AmazonS3ConfigKey::Capabilities => { + self.capabilities = Some(ConfigValue::Deferred(value.into())) + } }; self } @@ -765,6 +777,7 @@ impl AmazonS3Builder { AmazonS3ConfigKey::DisableTagging => Some(self.disable_tagging.to_string()), AmazonS3ConfigKey::DisableBulkDelete => Some(self.disable_bulk_delete.to_string()), AmazonS3ConfigKey::RequestPayer => Some(self.request_payer.to_string()), + AmazonS3ConfigKey::Capabilities => self.capabilities.as_ref().map(ToString::to_string), AmazonS3ConfigKey::Encryption(key) => match key { S3EncryptionConfigKey::ServerSideEncryption => { self.encryption_type.as_ref().map(ToString::to_string) @@ -1105,6 +1118,12 @@ impl AmazonS3Builder { self } + /// Override the [`Capabilities`] advertised by this store. + pub fn with_capabilities(mut self, capabilities: Capabilities) -> Self { + self.capabilities = Some(ConfigValue::Parsed(capabilities)); + self + } + /// Create a [`AmazonS3`] instance from the provided values, /// consuming `self`. pub fn build(mut self) -> Result { @@ -1286,7 +1305,10 @@ impl AmazonS3Builder { let http_client = http.connect(&config.client_options)?; let client = Arc::new(S3Client::new(config, http_client)); - Ok(AmazonS3 { client }) + Ok(AmazonS3 { + client, + capabilities: self.capabilities.map(|x| x.get()).transpose()?, + }) } } @@ -1535,6 +1557,7 @@ impl From for HeaderMap { #[cfg(test)] mod tests { use super::*; + use crate::Capability; use std::collections::HashMap; #[test] @@ -1552,6 +1575,7 @@ mod tests { ("aws_session_token", aws_session_token.clone()), ("aws_unsigned_payload", "true".to_string()), ("aws_checksum_algorithm", "sha256".to_string()), + ("aws_capabilities", "ordered_listing".to_string()), ]); let builder = options @@ -1571,6 +1595,14 @@ mod tests { Checksum::SHA256 ); assert!(builder.unsigned_payload.get().unwrap()); + assert!( + builder + .capabilities + .unwrap() + .get() + .unwrap() + .has(Capability::OrderedListing) + ); } #[test] @@ -1625,7 +1657,8 @@ mod tests { .with_config( "aws_sse_customer_key_base64".parse().unwrap(), "some_customer_key", - ); + ) + .with_config(AmazonS3ConfigKey::Capabilities, "ordered_listing"); assert_eq!( builder @@ -1685,6 +1718,12 @@ mod tests { .unwrap(), "some_customer_key" ); + assert_eq!( + builder + .get_config_value(&"aws_capabilities".parse().unwrap()) + .unwrap(), + "ordered_listing" + ); } #[test] @@ -1908,6 +1947,26 @@ mod tests { assert!(s3.client.config.request_payer); } + #[test] + fn test_parse_capabilities() { + // Default: ordered listing disabled + let s3 = AmazonS3Builder::new() + .with_bucket_name("bucket") + .with_region("region") + .build() + .unwrap(); + assert!(!s3.capabilities.is_some()); + + // Explicit override via with_capabilities: no capabilities + let s3 = AmazonS3Builder::new() + .with_capabilities(Capabilities::new([Capability::OrderedListing])) + .with_bucket_name("bucket") + .with_region("region") + .build() + .unwrap(); + assert!(s3.capabilities.unwrap().has(Capability::OrderedListing)); + } + #[test] fn test_parse_bucket_az() { let cases = [ diff --git a/src/aws/mod.rs b/src/aws/mod.rs index e1cdb065..293bfcb6 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -44,9 +44,9 @@ use crate::multipart::{MultipartStore, PartId}; use crate::signer::Signer; use crate::util::STRICT_ENCODE_SET; use crate::{ - CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, - Result, UploadPart, + Capabilities, CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult, MultipartId, + MultipartUpload, ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, + PutPayload, PutResult, Result, UploadPart, }; static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging"); @@ -79,10 +79,17 @@ use crate::client::parts::Parts; use crate::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore}; pub use credential::{AwsAuthorizer, AwsCredential}; +// OrderedListing capability depends on the bucket type, it's not enabled for directory bucket. +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html +fn get_default_capabilities() -> Capabilities { + return Capabilities::new([]); +} + /// Interface for [Amazon S3](https://aws.amazon.com/s3/). #[derive(Debug, Clone)] pub struct AmazonS3 { client: Arc, + capabilities: Option, } impl std::fmt::Display for AmazonS3 { @@ -413,6 +420,12 @@ impl ObjectStore for AmazonS3 { } } } + + fn capabilities(&self) -> Capabilities { + self.capabilities + .clone() + .unwrap_or_else(get_default_capabilities) + } } #[derive(Debug)] @@ -707,6 +720,7 @@ mod tests { tagging( Arc::new(AmazonS3 { client: Arc::clone(&integration.client), + capabilities: None, }), !config.disable_tagging, |p| { diff --git a/src/azure/builder.rs b/src/azure/builder.rs index 1f57fac5..da63d449 100644 --- a/src/azure/builder.rs +++ b/src/azure/builder.rs @@ -23,7 +23,10 @@ use crate::azure::credential::{ use crate::azure::{AzureCredential, AzureCredentialProvider, MicrosoftAzure, STORE}; use crate::client::{HttpConnector, TokenCredentialProvider, http_connector}; use crate::config::ConfigValue; -use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider}; +use crate::{ + Capabilities, ClientConfigKey, ClientOptions, ObjectStoreExt, Result, RetryConfig, + StaticCredentialProvider, +}; use percent_encoding::percent_decode_str; use serde::{Deserialize, Serialize}; use std::str::FromStr; @@ -180,6 +183,8 @@ pub struct MicrosoftAzureBuilder { fabric_cluster_identifier: Option, /// The [`HttpConnector`] to use http_connector: Option>, + /// Capabilities to advertise for this store instance + capabilities: Option>, } /// Configuration keys for [`MicrosoftAzureBuilder`] @@ -382,6 +387,9 @@ pub enum AzureConfigKey { /// Client options Client(ClientConfigKey), + + /// Override the capabilities advertised by this store. + Capabilities, } impl AsRef for AzureConfigKey { @@ -411,6 +419,7 @@ impl AsRef for AzureConfigKey { Self::FabricSessionToken => "azure_fabric_session_token", Self::FabricClusterIdentifier => "azure_fabric_cluster_identifier", Self::Client(key) => key.as_ref(), + Self::Capabilities => "azure_capabilities", } } } @@ -468,6 +477,7 @@ impl FromStr for AzureConfigKey { } // Backwards compatibility "azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), + "azure_capabilities" => Ok(Self::Capabilities), _ => match s.strip_prefix("azure_").unwrap_or(s).parse() { Ok(key) => Ok(Self::Client(key)), Err(_) => Err(Error::UnknownConfigurationKey { key: s.into() }.into()), @@ -594,6 +604,9 @@ impl MicrosoftAzureBuilder { AzureConfigKey::FabricClusterIdentifier => { self.fabric_cluster_identifier = Some(value.into()) } + AzureConfigKey::Capabilities => { + self.capabilities = Some(ConfigValue::Deferred(value.into())) + } }; self } @@ -635,6 +648,7 @@ impl MicrosoftAzureBuilder { AzureConfigKey::FabricWorkloadHost => self.fabric_workload_host.clone(), AzureConfigKey::FabricSessionToken => self.fabric_session_token.clone(), AzureConfigKey::FabricClusterIdentifier => self.fabric_cluster_identifier.clone(), + AzureConfigKey::Capabilities => self.capabilities.as_ref().map(ToString::to_string), } } @@ -906,6 +920,12 @@ impl MicrosoftAzureBuilder { self } + /// Override the [`Capabilities`] advertised by this store. + pub fn with_capabilities(mut self, capabilities: Capabilities) -> Self { + self.capabilities = Some(ConfigValue::Parsed(capabilities)); + self + } + /// Configure a connection to container with given name on Microsoft Azure Blob store. pub fn build(mut self) -> Result { if let Some(url) = self.url.take() { @@ -1054,7 +1074,10 @@ impl MicrosoftAzureBuilder { let http_client = http.connect(&config.client_options)?; let client = Arc::new(AzureClient::new(config, http_client)); - Ok(MicrosoftAzure { client }) + Ok(MicrosoftAzure { + client, + capabilities: self.capabilities.map(|x| x.get()).transpose()?, + }) } } @@ -1097,6 +1120,7 @@ pub fn split_sas(sas: &str) -> Result> { #[cfg(test)] mod tests { use super::*; + use crate::Capability; use std::collections::HashMap; #[test] @@ -1244,6 +1268,7 @@ mod tests { ("azure_client_id", azure_client_id), ("azure_storage_account_name", azure_storage_account_name), ("azure_storage_token", azure_storage_token), + ("azure_capabilities", "ordered_listing"), ]); let builder = options @@ -1254,6 +1279,26 @@ mod tests { assert_eq!(builder.client_id.unwrap(), azure_client_id); assert_eq!(builder.account_name.unwrap(), azure_storage_account_name); assert_eq!(builder.bearer_token.unwrap(), azure_storage_token); + assert!( + builder + .capabilities + .unwrap() + .get() + .unwrap() + .has(Capability::OrderedListing) + ); + } + + #[test] + fn azure_test_config_get_value() { + let builder = MicrosoftAzureBuilder::new() + .with_config(AzureConfigKey::Capabilities, "ordered_listing"); + assert_eq!( + builder + .get_config_value(&"azure_capabilities".parse().unwrap()) + .unwrap(), + "ordered_listing" + ); } #[test] diff --git a/src/azure/mod.rs b/src/azure/mod.rs index 1429bec9..4264004f 100644 --- a/src/azure/mod.rs +++ b/src/azure/mod.rs @@ -24,9 +24,9 @@ //! Unused blocks will automatically be dropped after 7 days. //! use crate::{ - CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, - UploadPart, + Capabilities, CopyMode, CopyOptions, GetOptions, GetResult, ListResult, + MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, + PutPayload, PutResult, Result, UploadPart, multipart::{MultipartStore, PartId}, path::Path, signer::Signer, @@ -58,10 +58,17 @@ pub use credential::AzureCredential; const STORE: &str = "MicrosoftAzure"; +// OrderedListing capability is not supported by with Azure Storage Hierarchical Namespace is enabled. +// https://learn.microsoft.com/en-us/rest/api/storageservices/list-blobs +fn get_default_capabilities() -> Capabilities { + Capabilities::new([]) +} + /// Interface for [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/). #[derive(Debug)] pub struct MicrosoftAzure { client: Arc, + capabilities: Option, } impl MicrosoftAzure { @@ -180,6 +187,12 @@ impl ObjectStore for MicrosoftAzure { CopyMode::Create => self.client.copy_request(from, to, false).await, } } + + fn capabilities(&self) -> Capabilities { + self.capabilities + .clone() + .unwrap_or_else(get_default_capabilities) + } } #[async_trait] @@ -363,6 +376,7 @@ mod tests { tagging( Arc::new(MicrosoftAzure { client: Arc::clone(&integration.client), + capabilities: None, }), validate, |p| { diff --git a/src/capabilities.rs b/src/capabilities.rs new file mode 100644 index 00000000..77762541 --- /dev/null +++ b/src/capabilities.rs @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Capability advertisement for [`ObjectStore`](crate::ObjectStore) implementations. +//! +//! See [`Capabilities`] and [`Capability`] for details. +use crate::Error; +use std::collections::HashSet; + +const ORDERED_LISTING: &str = "ordered_listing"; + +/// An individual capability that an [`ObjectStore`] implementation may support. +/// +/// Used together with [`Capabilities`] to advertise optional backend features. +/// Get the set of supported capabilities via [`ObjectStore::capabilities`]. +#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)] +#[non_exhaustive] +pub enum Capability { + /// List results from [`ObjectStore::list`] and + /// [`ObjectStore::list_with_offset`] are returned in ascending + /// lexicographic order by [`Path`]. + /// + /// When this capability is present, callers may rely on the ordering and + /// avoid buffering all results solely for sorting purposes. + OrderedListing, +} + +impl std::str::FromStr for Capability { + type Err = Error; + + /// Parses a capability from its snake_case string representation. + fn from_str(s: &str) -> Result { + match s { + ORDERED_LISTING => Ok(Self::OrderedListing), + cap => Err(Error::Generic { + store: "Config", + source: format!("invalid capability: {cap}").into(), + }), + } + } +} + +impl std::fmt::Display for Capability { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::OrderedListing => write!(f, "{}", ORDERED_LISTING), + } + } +} + +/// Optional features supported by an [`ObjectStore`] implementation. +/// +/// Get the capabilities of a store by calling [`ObjectStore::capabilities`]. +/// Check whether [`Capability`] is supported by calling [`Capabilities::has`] method. +/// +/// The struct is `#[non_exhaustive]` so that new capability flags can be added +/// in future versions without breaking existing code. +/// +/// # Example +/// +/// ``` +/// # use object_store::{ObjectStore, memory::InMemory, Capability}; +/// let store = InMemory::new(); +/// if store.capabilities().has(Capability::OrderedListing) { +/// println!("list() results are in lexicographic order — no need to sort"); +/// } +/// ``` +#[derive(Debug, PartialEq, Clone, Default)] +pub struct Capabilities { + supported: HashSet, +} + +impl Capabilities { + /// Create a [`Capabilities`] from an explicit list of supported [`Capability`] values. + /// + /// Any capability not included in `capabilities` is considered unsupported. + /// + /// # Example + /// + /// ``` + /// # use object_store::{Capabilities, Capability}; + /// let caps = Capabilities::new([Capability::OrderedListing]); + /// assert!(caps.has(Capability::OrderedListing)); + /// ``` + pub fn new(capabilities: impl IntoIterator) -> Self { + Self { + supported: capabilities.into_iter().collect(), + } + } + + /// Returns `true` if the given [`Capability`] is supported by this store. + pub fn has(&self, capability: Capability) -> bool { + self.supported.contains(&capability) + } +} + +impl std::str::FromStr for Capabilities { + type Err = Error; + + /// Parses a comma-separated list of capability names into a [`Capabilities`]. + fn from_str(s: &str) -> crate::Result { + let mut capabilities: Vec = Vec::new(); + for mut cap in s.split(',') { + cap = cap.trim(); + if cap.is_empty() { + continue; + } + capabilities.push(cap.parse::()?); + } + Ok(Self::new(capabilities)) + } +} + +impl std::fmt::Display for Capabilities { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut caps: Vec<_> = self.supported.iter().map(ToString::to_string).collect(); + caps.sort(); + write!(f, "{}", caps.join(", ")) + } +} + +#[cfg(test)] +mod tests { + use super::{Capabilities, Capability}; + + #[test] + fn test_capability() { + assert_eq!(format!("{}", Capability::OrderedListing), "ordered_listing"); + assert_eq!( + Capability::OrderedListing, + "ordered_listing".parse::().unwrap() + ); + assert_eq!("invalid".parse::().is_ok(), false); + } + + #[test] + fn test_capabilities() { + assert_eq!("invalid".parse::().is_err(), true); + assert_eq!( + "".parse::() + .unwrap() + .has(Capability::OrderedListing), + false + ); + assert_eq!( + "ordered_listing" + .parse::() + .unwrap() + .has(Capability::OrderedListing), + true + ); + } +} diff --git a/src/chunked.rs b/src/chunked.rs index 870540a2..58a026e4 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -28,8 +28,9 @@ use futures_util::stream::BoxStream; use crate::path::Path; use crate::{ - CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions, + Capabilities, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutResult, + RenameOptions, }; use crate::{PutPayload, Result}; @@ -169,6 +170,10 @@ impl ObjectStore for ChunkedStore { async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> { self.inner.rename_opts(from, to, options).await } + + fn capabilities(&self) -> Capabilities { + self.inner.capabilities() + } } #[cfg(test)] diff --git a/src/config.rs b/src/config.rs index 29a389d4..871248f9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,7 +21,7 @@ use std::time::Duration; use humantime::{format_duration, parse_duration}; use reqwest::header::HeaderValue; -use crate::{Error, Result}; +use crate::{Capabilities, Error, Result}; /// Provides deferred parsing of a value /// @@ -121,6 +121,12 @@ impl Parse for HeaderValue { } } +impl Parse for Capabilities { + fn parse(v: &str) -> Result { + Self::from_str(v) + } +} + pub(crate) fn fmt_duration(duration: &ConfigValue) -> String { match duration { ConfigValue::Parsed(v) => format_duration(*v).to_string(), diff --git a/src/gcp/builder.rs b/src/gcp/builder.rs index 82752b05..154bd86c 100644 --- a/src/gcp/builder.rs +++ b/src/gcp/builder.rs @@ -26,7 +26,9 @@ use crate::gcp::{ GcpCredential, GcpCredentialProvider, GcpSigningCredential, GcpSigningCredentialProvider, GoogleCloudStorage, STORE, credential, }; -use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider}; +use crate::{ + Capabilities, ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider, +}; use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::sync::Arc; @@ -120,6 +122,8 @@ pub struct GoogleCloudStorageBuilder { signing_credentials: Option, /// The [`HttpConnector`] to use http_connector: Option>, + /// Capabilities to advertise for this store instance + capabilities: Option>, } /// Configuration keys for [`GoogleCloudStorageBuilder`] @@ -199,6 +203,9 @@ pub enum GoogleConfigKey { /// Client options Client(ClientConfigKey), + + /// Override the capabilities advertised by this store. + Capabilities, } impl AsRef for GoogleConfigKey { @@ -212,6 +219,7 @@ impl AsRef for GoogleConfigKey { Self::BearerToken => "google_bearer_token", Self::SkipSignature => "google_skip_signature", Self::Client(key) => key.as_ref(), + Self::Capabilities => "google_capabilities", } } } @@ -233,6 +241,7 @@ impl FromStr for GoogleConfigKey { } "google_bearer_token" | "bearer_token" => Ok(Self::BearerToken), "google_skip_signature" | "skip_signature" => Ok(Self::SkipSignature), + "google_capabilities" => Ok(Self::Capabilities), _ => match s.strip_prefix("google_").unwrap_or(s).parse() { Ok(key) => Ok(Self::Client(key)), Err(_) => Err(Error::UnknownConfigurationKey { key: s.into() }.into()), @@ -257,6 +266,7 @@ impl Default for GoogleCloudStorageBuilder { skip_signature: Default::default(), signing_credentials: None, http_connector: None, + capabilities: None, } } } @@ -340,7 +350,10 @@ impl GoogleCloudStorageBuilder { GoogleConfigKey::SkipSignature => self.skip_signature.parse(value), GoogleConfigKey::Client(key) => { self.client_options = self.client_options.with_config(key, value) - } + }, + GoogleConfigKey::Capabilities => { + self.capabilities = Some(ConfigValue::Deferred(value.into())) + }, }; self } @@ -366,6 +379,7 @@ impl GoogleCloudStorageBuilder { GoogleConfigKey::BearerToken => self.bearer_token.clone(), GoogleConfigKey::SkipSignature => Some(self.skip_signature.to_string()), GoogleConfigKey::Client(key) => self.client_options.get_config_value(key), + GoogleConfigKey::Capabilities => self.capabilities.as_ref().map(|v| v.to_string()), } } @@ -534,6 +548,12 @@ impl GoogleCloudStorageBuilder { self } + /// Override the [`Capabilities`] advertised by this store. + pub fn with_capabilities(mut self, capabilities: Capabilities) -> Self { + self.capabilities = Some(ConfigValue::Parsed(capabilities)); + self + } + /// Configure a connection to Google Cloud Storage, returning a /// new [`GoogleCloudStorage`] and consuming `self` pub fn build(mut self) -> Result { @@ -669,6 +689,7 @@ impl GoogleCloudStorageBuilder { let http_client = http.connect(&config.client_options)?; Ok(GoogleCloudStorage { client: Arc::new(GoogleCloudStorageClient::new(config, http_client)?), + capabilities: self.capabilities.map(|x| x.get()).transpose()?, }) } } @@ -676,6 +697,7 @@ impl GoogleCloudStorageBuilder { #[cfg(test)] mod tests { use super::*; + use crate::Capability; use std::collections::HashMap; use std::io::Write; use tempfile::NamedTempFile; @@ -702,6 +724,7 @@ mod tests { let options = HashMap::from([ ("google_service_account", google_service_account.clone()), ("google_bucket_name", google_bucket_name.clone()), + ("google_capabilities", "ordered_listing".to_string()), ]); let builder = options @@ -715,6 +738,14 @@ mod tests { google_service_account.as_str() ); assert_eq!(builder.bucket_name.unwrap(), google_bucket_name.as_str()); + assert!( + builder + .capabilities + .unwrap() + .get() + .unwrap() + .has(Capability::OrderedListing) + ); } #[tokio::test] @@ -873,6 +904,12 @@ mod tests { .unwrap(), google_bearer_token ); + assert_eq!( + builder + .get_config_value(&"google_capabilities".parse().unwrap()) + .unwrap(), + "ordered_listing" + ); } #[test] diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs index 51e85ae6..f7407210 100644 --- a/src/gcp/mod.rs +++ b/src/gcp/mod.rs @@ -37,15 +37,15 @@ use std::sync::Arc; use std::time::Duration; -use crate::CopyOptions; use crate::client::CredentialProvider; use crate::gcp::credential::GCSAuthorizer; use crate::signer::Signer; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, multipart::PartId, - path::Path, + Capabilities, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, + multipart::PartId, path::Path, }; +use crate::{Capability, CopyOptions}; use async_trait::async_trait; use client::GoogleCloudStorageClient; use futures_util::stream::{BoxStream, StreamExt}; @@ -66,6 +66,12 @@ mod credential; const STORE: &str = "GCS"; +// OrderedListing is supported by all GCP bucket types. +// https://docs.cloud.google.com/storage/docs/listing-objects +fn get_default_capabilities() -> Capabilities { + Capabilities::new([Capability::OrderedListing]) +} + /// [`CredentialProvider`] for [`GoogleCloudStorage`] pub type GcpCredentialProvider = Arc>; @@ -77,6 +83,7 @@ pub type GcpSigningCredentialProvider = #[derive(Debug, Clone)] pub struct GoogleCloudStorage { client: Arc, + capabilities: Option, } impl std::fmt::Display for GoogleCloudStorage { @@ -223,6 +230,12 @@ impl ObjectStore for GoogleCloudStorage { self.client.copy_request(from, to, mode).await } + + fn capabilities(&self) -> Capabilities { + self.capabilities + .clone() + .unwrap_or_else(get_default_capabilities) + } } #[async_trait] diff --git a/src/integration.rs b/src/integration.rs index e68837c5..6949a69c 100644 --- a/src/integration.rs +++ b/src/integration.rs @@ -24,6 +24,7 @@ //! //! They are intended solely for testing purposes. +use crate::capabilities::Capability; use crate::list::{PaginatedListOptions, PaginatedListStore}; use crate::multipart::MultipartStore; use crate::path::Path; @@ -398,6 +399,27 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) { assert_eq!(actual, expected, "{prefix:?} - {offset:?}"); } + if storage.capabilities().has(Capability::OrderedListing) { + let mut sorted_files = files.clone(); + sorted_files.sort(); + + let actual: Vec<_> = storage + .list(None) + .map_ok(|x| x.location) + .try_collect::>() + .await + .unwrap(); + assert_eq!(actual, sorted_files); + + let actual: Vec<_> = storage + .list_with_offset(None, &sorted_files[1]) + .map_ok(|x| x.location) + .try_collect::>() + .await + .unwrap(); + assert_eq!(actual, sorted_files[2..]); + } + // Test bulk delete let paths = vec![ Path::from("a/a.file"), diff --git a/src/lib.rs b/src/lib.rs index d3ea9ee2..4676e667 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -582,6 +582,7 @@ mod tags; pub use tags::TagSet; +pub mod capabilities; pub mod list; pub mod multipart; mod parse; @@ -609,6 +610,7 @@ use crate::path::Path; use crate::util::maybe_spawn_blocking; use async_trait::async_trait; use bytes::Bytes; +pub use capabilities::{Capabilities, Capability}; use chrono::{DateTime, Utc}; use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; use std::fmt::{Debug, Formatter}; @@ -1131,6 +1133,14 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { self.delete(from).await?; Ok(()) } + + /// Return the [`Capabilities`] supported by this store. + /// + /// By default, an empty set of capabilities is returned. Individual store + /// implementations override this to advertise the features they support. + fn capabilities(&self) -> Capabilities { + Capabilities::new([]) + } } macro_rules! as_ref_impl { @@ -1202,6 +1212,10 @@ macro_rules! as_ref_impl { ) -> Result<()> { self.as_ref().rename_opts(from, to, options).await } + + fn capabilities(&self) -> Capabilities { + self.as_ref().capabilities() + } } }; } diff --git a/src/limit.rs b/src/limit.rs index fa29d1b2..ae099ca2 100644 --- a/src/limit.rs +++ b/src/limit.rs @@ -18,9 +18,9 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, - ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, - RenameOptions, Result, StreamExt, UploadPart, + BoxStream, Capabilities, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, + PutResult, RenameOptions, Result, StreamExt, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -162,6 +162,10 @@ impl ObjectStore for LimitStore { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.rename_opts(from, to, options).await } + + fn capabilities(&self) -> Capabilities { + self.inner.capabilities() + } } fn permit_get_result(r: GetResult, permit: OwnedSemaphorePermit) -> GetResult { diff --git a/src/memory.rs b/src/memory.rs index 383c553e..0e91db84 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -26,12 +26,13 @@ use chrono::{DateTime, Utc}; use futures_util::{StreamExt, stream::BoxStream}; use parking_lot::RwLock; +use crate::capabilities::Capability; use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; use crate::{ - Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutResult, Result, - UpdateVersion, UploadPart, path::Path, + Attributes, Capabilities, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, + MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutResult, + Result, UpdateVersion, UploadPart, path::Path, }; use crate::{CopyMode, CopyOptions, GetOptions, PutPayload}; @@ -412,6 +413,10 @@ impl ObjectStore for InMemory { Ok(()) } + + fn capabilities(&self) -> Capabilities { + Capabilities::new([Capability::OrderedListing]) + } } #[async_trait] @@ -546,6 +551,7 @@ mod tests { #[tokio::test] async fn in_memory_test() { let integration = InMemory::new(); + assert!(integration.capabilities().has(Capability::OrderedListing)); put_get_delete_list(&integration).await; list_with_offset_exclusivity(&integration).await; diff --git a/src/prefix.rs b/src/prefix.rs index 46cd6816..abadc2b1 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -24,10 +24,7 @@ use crate::multipart::{MultipartStore, PartId}; use crate::path::Path; #[cfg(feature = "cloud")] use crate::signer::Signer; -use crate::{ - CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result, -}; +use crate::{Capabilities, CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result}; /// Store wrapper that applies a constant prefix to all paths handled by the store. #[derive(Debug, Clone)] @@ -200,6 +197,10 @@ impl ObjectStore for PrefixStore { let full_to = self.full_path(to); self.inner.rename_opts(&full_from, &full_to, options).await } + + fn capabilities(&self) -> Capabilities { + self.inner.capabilities() + } } #[async_trait::async_trait] diff --git a/src/throttle.rs b/src/throttle.rs index 695afe40..e6378265 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -21,7 +21,7 @@ use std::ops::Range; use std::{convert::TryInto, sync::Arc}; use crate::multipart::{MultipartStore, PartId}; -use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart}; +use crate::{Capabilities, CopyOptions, GetOptions, RenameOptions, UploadPart}; use crate::{ GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path, @@ -263,6 +263,10 @@ impl ObjectStore for ThrottledStore { self.inner.rename_opts(from, to, options).await } + + fn capabilities(&self) -> Capabilities { + self.inner.capabilities() + } } /// Saturated `usize` to `u32` cast.