-
Notifications
You must be signed in to change notification settings - Fork 172
Expand file tree
/
Copy patharray.rs
More file actions
106 lines (92 loc) · 3.44 KB
/
Copy patharray.rs
File metadata and controls
106 lines (92 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use std::future::Future;
use std::sync::Arc;
use std::sync::OnceLock;
use async_lock::Mutex as AsyncMutex;
use vortex_error::SharedVortexResult;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use crate::ArrayRef;
use crate::Canonical;
use crate::IntoArray;
use crate::dtype::DType;
use crate::stats::ArrayStats;
/// The source array that is shared and lazily computed.
pub(super) const SOURCE_SLOT: usize = 0;
pub(super) const NUM_SLOTS: usize = 1;
pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["source"];
/// A lazily-executing array wrapper with a one-way transition from source to cached form.
///
/// Before materialization, operations delegate to the source array.
/// After materialization (via `get_or_compute`), operations delegate to the cached result.
#[derive(Debug, Clone)]
pub struct SharedArray {
pub(super) slots: Vec<Option<ArrayRef>>,
pub(super) cached: Arc<OnceLock<SharedVortexResult<ArrayRef>>>,
pub(super) async_compute_lock: Arc<AsyncMutex<()>>,
pub(super) dtype: DType,
pub(super) stats: ArrayStats,
}
impl SharedArray {
pub fn new(source: ArrayRef) -> Self {
Self {
dtype: source.dtype().clone(),
slots: vec![Some(source)],
cached: Arc::new(OnceLock::new()),
async_compute_lock: Arc::new(AsyncMutex::new(())),
stats: ArrayStats::default(),
}
}
/// Returns the source array reference.
pub(super) fn source(&self) -> &ArrayRef {
self.slots[SOURCE_SLOT]
.as_ref()
.vortex_expect("SharedArray source slot")
}
/// Returns the current array reference.
///
/// After materialization, returns the cached result. Otherwise, returns the source.
/// If materialization failed, falls back to the source.
pub(super) fn current_array_ref(&self) -> &ArrayRef {
match self.cached.get() {
Some(Ok(arr)) => arr,
_ => self.source(),
}
}
/// Compute and cache the result. The computation runs exactly once via `OnceLock`.
///
/// If the computation fails, the error is cached and returned on all subsequent calls.
pub fn get_or_compute(
&self,
f: impl FnOnce(&ArrayRef) -> VortexResult<Canonical>,
) -> VortexResult<ArrayRef> {
let result = self
.cached
.get_or_init(|| f(self.source()).map(|c| c.into_array()).map_err(Arc::new));
result.clone().map_err(Into::into)
}
/// Async version of `get_or_compute`.
pub async fn get_or_compute_async<F, Fut>(&self, f: F) -> VortexResult<ArrayRef>
where
F: FnOnce(ArrayRef) -> Fut,
Fut: Future<Output = VortexResult<Canonical>>,
{
// Fast path: already computed.
if let Some(result) = self.cached.get() {
return result.clone().map_err(Into::into);
}
// Serialize async computation to prevent redundant work.
let _guard = self.async_compute_lock.lock().await;
// Double-check after acquiring the lock.
if let Some(result) = self.cached.get() {
return result.clone().map_err(Into::into);
}
let computed = f(self.source().clone())
.await
.map(|c| c.into_array())
.map_err(Arc::new);
let result = self.cached.get_or_init(|| computed);
result.clone().map_err(Into::into)
}
}