Skip to content

Commit da1afdc

Browse files
Changes to upgrade datafusion and arrow to latest versions (opensearch-project#21590)
Signed-off-by: bharath-techie <bharath78910@gmail.com>
1 parent b960baa commit da1afdc

19 files changed

Lines changed: 244 additions & 318 deletions

File tree

sandbox/libs/dataformat-native/rust/Cargo.toml

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,62 +16,62 @@ members = [
1616

1717
[workspace.dependencies]
1818
# Arrow / Parquet
19-
arrow = { version = "57.3.0", features = ["ffi"] }
20-
arrow-array = "57.3.0"
21-
arrow-ipc = "57.3.0"
22-
arrow-schema = "57.3.0"
23-
arrow-buffer = "57.3.0"
24-
parquet = "57.3.0"
19+
arrow = { version = "=58.2.0", features = ["ffi"] }
20+
arrow-array = "=58.2.0"
21+
arrow-ipc = "=58.2.0"
22+
arrow-schema = "=58.2.0"
23+
arrow-buffer = "=58.2.0"
24+
parquet = "=58.2.0"
2525

2626
# DataFusion
27-
datafusion = "52.1.0"
28-
datafusion-expr = "52.1.0"
29-
datafusion-datasource = "52.1.0"
30-
datafusion-common = "52.1.0"
31-
datafusion-execution = "52.1.0"
32-
datafusion-physical-expr = "52.1.0"
33-
datafusion-substrait = "52.1.0"
27+
datafusion = "=53.1.0"
28+
datafusion-expr = "=53.1.0"
29+
datafusion-datasource = "=53.1.0"
30+
datafusion-common = "=53.1.0"
31+
datafusion-execution = "=53.1.0"
32+
datafusion-physical-expr = "=53.1.0"
33+
datafusion-substrait = "=53.1.0"
3434

3535
# Async
36-
tokio = { version = "1.0", features = ["full"] }
37-
tokio-util = "0.7"
38-
futures = "0.3"
39-
tokio-stream = "0.1.17"
36+
tokio = { version = "=1.51.0", features = ["full"] }
37+
tokio-util = "=0.7.18"
38+
futures = "=0.3.32"
39+
tokio-stream = "=0.1.18"
4040

4141
# Serialization
42-
prost = "0.14"
43-
substrait = "=0.62.0"
44-
serde = { version = "1.0", features = ["derive"] }
45-
serde_json = "1.0"
42+
prost = "=0.14.3"
43+
substrait = "=0.62.2"
44+
serde = { version = "=1.0.228", features = ["derive"] }
45+
serde_json = "=1.0.149"
4646

4747
# Logging
48-
log = "0.4"
48+
log = "=0.4.29"
4949

5050
# Allocator
5151
# disable_initial_exec_tls: Required because this library is loaded at runtime via dlopen/JVM FFM.
5252
# Without it, jemalloc uses initial-exec TLS which fails on aarch64 Linux with:
5353
# "cannot allocate memory in static TLS block"
5454
# The feature switches to global-dynamic TLS model, compatible with runtime loading.
55-
tikv-jemallocator = { version = "0.6", features = ["disable_initial_exec_tls"] }
56-
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
55+
tikv-jemallocator = { version = "=0.6.1", features = ["disable_initial_exec_tls"] }
56+
tikv-jemalloc-ctl = { version = "=0.6.1", features = ["stats"] }
5757

5858
# Misc
59-
dashmap = "5.5"
60-
num_cpus = "1.16"
61-
object_store = "0.12.5"
62-
url = "2.0"
63-
tempfile = "3.0"
64-
chrono = "0.4"
65-
once_cell = "1.21.3"
66-
crc32fast = "1.4"
67-
parking_lot = "0.12.5"
68-
lazy_static = "1.4.0"
69-
rayon = "1.10"
70-
thiserror = "1.0"
71-
async-trait = "0.1"
72-
bytes = "1"
73-
criterion = { version = "0.5", features = ["async_tokio"] }
74-
tokio-metrics = { version = "0.5", features = ["rt"] }
59+
dashmap = "=5.5.3"
60+
num_cpus = "=1.17.0"
61+
object_store = "=0.13.2"
62+
url = "=2.5.8"
63+
tempfile = "=3.27.0"
64+
chrono = "=0.4.44"
65+
once_cell = "=1.21.4"
66+
crc32fast = "=1.5.0"
67+
parking_lot = "=0.12.5"
68+
lazy_static = "=1.5.0"
69+
rayon = "=1.11.0"
70+
thiserror = "=1.0.69"
71+
async-trait = "=0.1.89"
72+
bytes = "=1.11.1"
73+
criterion = { version = "=0.5.1", features = ["async_tokio"] }
74+
tokio-metrics = { version = "=0.5.0", features = ["rt"] }
7575

7676
# Internal
7777
native-bridge-common = { path = "common" }

sandbox/libs/dataformat-native/rust/macros/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ license = "Apache-2.0"
88
proc-macro = true
99

1010
[dependencies]
11-
quote = "1"
12-
syn = { version = "2", features = ["full"] }
13-
proc-macro2 = "1"
11+
quote = "=1.0.45"
12+
syn = { version = "=2.0.117", features = ["full"] }
13+
proc-macro2 = "=1.0.106"

sandbox/libs/tiered-storage/src/main/rust/src/tiered_object_store.rs

Lines changed: 43 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,14 @@
1919
//! registry's atomics and DashMap — no locks are held during I/O.
2020
2121
use std::fmt;
22-
use std::ops::Range;
2322
use std::sync::Arc;
2423

2524
use async_trait::async_trait;
26-
use bytes::Bytes;
2725
use futures::stream::BoxStream;
2826
use futures::StreamExt;
2927
use object_store::{
30-
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
31-
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OsResult,
28+
path::Path, CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
29+
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OsResult,
3230
};
3331

3432
use crate::registry::traits::FileRegistry;
@@ -220,9 +218,36 @@ impl ObjectStore for TieredObjectStore {
220218

221219
/// Primary read path: check registry for remote routing, otherwise local.
222220
/// If local read fails with NotFound and file transitioned to REMOTE, retries from remote.
221+
///
222+
/// Also handles head requests (options.head == true) by returning cached
223+
/// size from the registry when available — avoids I/O for the common case.
223224
async fn get_opts(&self, location: &Path, options: GetOptions) -> OsResult<GetResult> {
224225
let path_str = location.as_ref();
225226

227+
// Fast path for head: return cached size from registry if available
228+
if options.head {
229+
if let Some(guard) = self.registry.get(path_str) {
230+
let size = guard.size();
231+
if size > 0 {
232+
let meta = ObjectMeta {
233+
location: location.clone(),
234+
last_modified: chrono::DateTime::<chrono::Utc>::default(),
235+
size,
236+
e_tag: None,
237+
version: None,
238+
};
239+
return Ok(GetResult {
240+
payload: object_store::GetResultPayload::Stream(
241+
futures::stream::empty().boxed(),
242+
),
243+
meta,
244+
range: 0..size,
245+
attributes: Default::default(),
246+
});
247+
}
248+
}
249+
}
250+
226251
if let Some((rp, store)) = self.resolve_remote(path_str) {
227252
native_bridge_common::log_debug!(
228253
"TieredObjectStore: get_opts REMOTE path='{}'",
@@ -240,76 +265,20 @@ impl ObjectStore for TieredObjectStore {
240265
result
241266
}
242267

243-
/// Range read with local-fail-retry-remote.
244-
async fn get_range(&self, location: &Path, range: Range<u64>) -> OsResult<Bytes> {
245-
let path_str = location.as_ref();
246-
247-
if let Some((rp, store)) = self.resolve_remote(path_str) {
248-
return store.get_range(&rp, range).await;
249-
}
250-
251-
let result = self.local.get_range(location, range.clone()).await;
252-
if let Err(ref e) = result {
253-
if let Some((rp, store)) = self.should_retry_remote(path_str, e) {
254-
return store.get_range(&rp, range).await;
255-
}
256-
}
257-
result
258-
}
259-
260-
/// Multi-range read with local-fail-retry-remote.
261-
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OsResult<Vec<Bytes>> {
262-
let path_str = location.as_ref();
263-
264-
if let Some((rp, store)) = self.resolve_remote(path_str) {
265-
return store.get_ranges(&rp, ranges).await;
266-
}
267-
268-
let result = self.local.get_ranges(location, ranges).await;
269-
if let Err(ref e) = result {
270-
if let Some((rp, store)) = self.should_retry_remote(path_str, e) {
271-
return store.get_ranges(&rp, ranges).await;
272-
}
273-
}
274-
result
275-
}
276-
277-
/// Head: check registry first (cached size), then local, then remote.
278-
/// Head: check registry for cached size first (no I/O), then route via resolve_remote.
279-
async fn head(&self, location: &Path) -> OsResult<ObjectMeta> {
280-
let path_str = location.as_ref();
281-
282-
// Check registry for cached size — return immediately if available
283-
if let Some(guard) = self.registry.get(path_str) {
284-
let size = guard.size();
285-
if size > 0 {
286-
return Ok(ObjectMeta {
287-
location: location.clone(),
288-
last_modified: chrono::DateTime::<chrono::Utc>::default(),
289-
size,
290-
e_tag: None,
291-
version: None,
292-
});
268+
/// Delete stream: remove each path from registry only, NO local delete.
269+
/// Local file deletion is handled by the Java layer.
270+
fn delete_stream(
271+
&self,
272+
locations: BoxStream<'static, OsResult<Path>>,
273+
) -> BoxStream<'static, OsResult<Path>> {
274+
let registry = Arc::clone(&self.registry);
275+
let mapped = locations.map(move |result| {
276+
if let Ok(ref path) = result {
277+
registry.remove(path.as_ref(), true);
293278
}
294-
}
295-
296-
// Size not cached — route via resolve_remote (REMOTE → remote store)
297-
if let Some((rp, store)) = self.resolve_remote(path_str) {
298-
return store.head(&rp).await;
299-
}
300-
301-
// Not remote — try local
302-
self.local.head(location).await
303-
}
304-
305-
/// Delete: remove from registry only, NO local delete.
306-
/// Local file deletion is handled by the Java layer
307-
/// (TieredSubdirectoryAwareDirectory.deleteFile). Eviction (local copy
308-
/// removal after sync) is a writable warm concern — not implemented.
309-
async fn delete(&self, location: &Path) -> OsResult<()> {
310-
let path_str = location.as_ref();
311-
self.registry.remove(path_str, true);
312-
Ok(())
279+
result
280+
});
281+
Box::pin(mapped)
313282
}
314283

315284
/// List: local entries first, then remote-only entries from registry (deduplicated).
@@ -364,23 +333,11 @@ impl ObjectStore for TieredObjectStore {
364333
Ok(result)
365334
}
366335

367-
async fn copy(&self, _from: &Path, _to: &Path) -> OsResult<()> {
336+
async fn copy_opts(&self, _from: &Path, _to: &Path, _options: CopyOptions) -> OsResult<()> {
368337
Err(object_store::Error::NotSupported {
369338
source: "TieredObjectStore does not support copy".into(),
370339
})
371340
}
372-
373-
async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> OsResult<()> {
374-
Err(object_store::Error::NotSupported {
375-
source: "TieredObjectStore does not support copy_if_not_exists".into(),
376-
})
377-
}
378-
379-
async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> OsResult<()> {
380-
Err(object_store::Error::NotSupported {
381-
source: "TieredObjectStore does not support rename_if_not_exists".into(),
382-
})
383-
}
384341
}
385342

386343
// ---------------------------------------------------------------------------

sandbox/libs/tiered-storage/src/main/rust/src/tiered_object_store_tests.rs

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::*;
22
use futures::StreamExt;
33
use object_store::memory::InMemory;
4-
use object_store::PutPayload;
4+
use object_store::{CopyOptions, ObjectStoreExt, PutPayload};
55
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
66

77
/// Helper: create a registry + tiered store backed by in-memory stores.
@@ -514,10 +514,6 @@ impl ObjectStore for CallCountingStore {
514514
self.inner.get_opts(location, options).await
515515
}
516516

517-
async fn head(&self, location: &Path) -> OsResult<ObjectMeta> {
518-
self.inner.head(location).await
519-
}
520-
521517
async fn put_opts(
522518
&self,
523519
location: &Path,
@@ -535,8 +531,11 @@ impl ObjectStore for CallCountingStore {
535531
self.inner.put_multipart_opts(location, opts).await
536532
}
537533

538-
async fn delete(&self, location: &Path) -> OsResult<()> {
539-
self.inner.delete(location).await
534+
fn delete_stream(
535+
&self,
536+
locations: BoxStream<'static, OsResult<Path>>,
537+
) -> BoxStream<'static, OsResult<Path>> {
538+
self.inner.delete_stream(locations)
540539
}
541540

542541
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OsResult<ObjectMeta>> {
@@ -547,16 +546,8 @@ impl ObjectStore for CallCountingStore {
547546
self.inner.list_with_delimiter(prefix).await
548547
}
549548

550-
async fn copy(&self, from: &Path, to: &Path) -> OsResult<()> {
551-
self.inner.copy(from, to).await
552-
}
553-
554-
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OsResult<()> {
555-
self.inner.copy_if_not_exists(from, to).await
556-
}
557-
558-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OsResult<()> {
559-
self.inner.rename_if_not_exists(from, to).await
549+
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> OsResult<()> {
550+
self.inner.copy_opts(from, to, options).await
560551
}
561552
}
562553

@@ -617,13 +608,6 @@ impl ObjectStore for ErrorStore {
617608
})
618609
}
619610

620-
async fn head(&self, _location: &Path) -> OsResult<ObjectMeta> {
621-
Err(object_store::Error::Generic {
622-
store: "ErrorStore",
623-
source: "simulated error".into(),
624-
})
625-
}
626-
627611
async fn put_opts(
628612
&self,
629613
_location: &Path,
@@ -647,11 +631,14 @@ impl ObjectStore for ErrorStore {
647631
})
648632
}
649633

650-
async fn delete(&self, _location: &Path) -> OsResult<()> {
651-
Err(object_store::Error::Generic {
634+
fn delete_stream(
635+
&self,
636+
locations: BoxStream<'static, OsResult<Path>>,
637+
) -> BoxStream<'static, OsResult<Path>> {
638+
Box::pin(locations.map(|_| Err(object_store::Error::Generic {
652639
store: "ErrorStore",
653640
source: "simulated error".into(),
654-
})
641+
})))
655642
}
656643

657644
fn list(&self, _prefix: Option<&Path>) -> BoxStream<'static, OsResult<ObjectMeta>> {
@@ -665,19 +652,7 @@ impl ObjectStore for ErrorStore {
665652
})
666653
}
667654

668-
async fn copy(&self, _from: &Path, _to: &Path) -> OsResult<()> {
669-
Err(object_store::Error::NotSupported {
670-
source: "not supported".into(),
671-
})
672-
}
673-
674-
async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> OsResult<()> {
675-
Err(object_store::Error::NotSupported {
676-
source: "not supported".into(),
677-
})
678-
}
679-
680-
async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> OsResult<()> {
655+
async fn copy_opts(&self, _from: &Path, _to: &Path, _options: CopyOptions) -> OsResult<()> {
681656
Err(object_store::Error::NotSupported {
682657
source: "not supported".into(),
683658
})

0 commit comments

Comments
 (0)