Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 139 additions & 2 deletions src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,20 @@ impl ObjectStore for MicrosoftAzure {
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
if self.client.config().is_emulator {
// Azurite doesn't support the startFrom query parameter,
let disable_start_from = self.client.config().is_emulator
|| self
.client
.config()
.service
.host_str()
.is_some_and(|h| h.ends_with(".fabric.microsoft.com"));

if disable_start_from {
// Azurite and OneLake don't support the startFrom query parameter,
// fall back to client-side filtering
//
// See https://github.com/Azure/Azurite/issues/2619#issuecomment-3660701055
// See https://github.com/apache/arrow-rs-object-store/issues/695
let offset = offset.clone();
self.list(prefix)
.try_filter(move |f| futures_util::future::ready(f.location > offset))
Expand Down Expand Up @@ -410,6 +419,134 @@ mod tests {
assert_eq!(data, loaded);
}

/// Verifies that `list_with_offset` works against OneLake (Fabric) endpoints.
///
/// OneLake silently ignores the `startFrom` query parameter when using
/// friendly-name URLs (e.g. `.../MyWorkspace/lakehouse.Lakehouse/...`),
/// returning 200 OK with zero results.
/// GUID-based URLs handle `startFrom` correctly.
///
/// Set these env vars before running:
/// - `AZURE_STORAGE_TOKEN`: bearer token (e.g. from `az account get-access-token`)
/// - `ONELAKE_URL`: full OneLake URL with friendly names, e.g.
/// `https://onelake.blob.fabric.microsoft.com/<workspace>/<item>.Lakehouse/`
///
/// See <https://github.com/apache/arrow-rs-object-store/issues/695>
#[ignore = "Used for manual testing against a real OneLake endpoint."]
#[tokio::test]
async fn test_onelake_list_with_offset() {
let url = std::env::var("ONELAKE_URL").unwrap();
let token = std::env::var("AZURE_STORAGE_TOKEN").unwrap();

let store = MicrosoftAzureBuilder::new()
.with_url(&url)
.with_config(AzureConfigKey::Token, token)
.build()
.unwrap();

// Derive a writable path prefix from the URL
// (skip workspace segment which becomes the container)
let parsed: Url = url.parse().unwrap();
let mut segments = parsed.path_segments().unwrap();
let _workspace = segments.next().unwrap();
let base: String = segments.collect::<Vec<_>>().join("/");
let test_dir = format!("{base}/test_onelake_offset");

// Create test files with predictable ordering
let prefix = Path::from(test_dir.as_str());
let files: Vec<Path> = (b'a'..=b'e')
.map(|c| Path::from(format!("{test_dir}/file_{}.txt", c as char)))
.collect();
let data = Bytes::from("test data");
for file in &files {
store.put(file, data.clone().into()).await.unwrap();
}

// Test 1: Offset at file_b → should return c, d, e (not b)
let offset = Path::from(format!("{test_dir}/file_b.txt"));
let result: Vec<Path> = store
.list_with_offset(Some(&prefix), &offset)
.map_ok(|m| m.location)
.try_collect()
.await
.unwrap();
assert!(
!result.contains(&offset),
"offset file_b should be excluded, got: {result:?}"
);
assert_eq!(
result.len(),
3,
"expected c/d/e after file_b, got: {result:?}"
);

// Test 2: Offset at file_a → should return b, c, d, e
let offset = Path::from(format!("{test_dir}/file_a.txt"));
let result: Vec<Path> = store
.list_with_offset(Some(&prefix), &offset)
.map_ok(|m| m.location)
.try_collect()
.await
.unwrap();
assert!(
!result.contains(&offset),
"offset file_a should be excluded"
);
assert_eq!(
result.len(),
4,
"expected b/c/d/e after file_a, got: {result:?}"
);

// Test 3: Offset at file_e (last) → should return empty
let offset = Path::from(format!("{test_dir}/file_e.txt"));
let result: Vec<Path> = store
.list_with_offset(Some(&prefix), &offset)
.map_ok(|m| m.location)
.try_collect()
.await
.unwrap();
assert!(
result.is_empty(),
"offset at last file should return empty, got: {result:?}"
);

// Test 4: Offset before all files → should return all 5
let offset = Path::from(format!("{test_dir}/file"));
let result: Vec<Path> = store
.list_with_offset(Some(&prefix), &offset)
.map_ok(|m| m.location)
.try_collect()
.await
.unwrap();
assert_eq!(
result.len(),
5,
"offset before all files should return all, got: {result:?}"
);

// Test 5: Every returned entry is strictly greater than offset
let offset = Path::from(format!("{test_dir}/file_c.txt"));
let result: Vec<ObjectMeta> = store
.list_with_offset(Some(&prefix), &offset)
.try_collect()
.await
.unwrap();
for meta in &result {
assert!(
meta.location > offset,
"entry {} should be > offset {}",
meta.location,
offset
);
}

// Cleanup
for file in &files {
let _ = store.delete(file).await;
}
}

#[test]
fn azure_test_config_get_value() {
let azure_client_id = "object_store:fake_access_key_id".to_string();
Expand Down