|
| 1 | +// SPDX-License-Identifier: Apache-2.0 |
| 2 | +// SPDX-FileCopyrightText: Copyright the Vortex contributors |
| 3 | + |
| 4 | +mod operations; |
| 5 | +mod validity; |
| 6 | + |
| 7 | +use std::hash::Hasher; |
| 8 | + |
| 9 | +use prost::Message; |
| 10 | +use vortex_error::VortexExpect; |
| 11 | +use vortex_error::VortexResult; |
| 12 | +use vortex_error::vortex_ensure_eq; |
| 13 | +use vortex_error::vortex_err; |
| 14 | +use vortex_error::vortex_panic; |
| 15 | +use vortex_session::VortexSession; |
| 16 | + |
| 17 | +use crate::ArrayEq; |
| 18 | +use crate::ArrayHash; |
| 19 | +use crate::ArrayRef; |
| 20 | +use crate::ExecutionCtx; |
| 21 | +use crate::ExecutionResult; |
| 22 | +use crate::IntoArray; |
| 23 | +use crate::Precision; |
| 24 | +use crate::array::Array; |
| 25 | +use crate::array::ArrayId; |
| 26 | +use crate::array::ArrayView; |
| 27 | +use crate::array::VTable; |
| 28 | +use crate::array::ValidityVTableFromChild; |
| 29 | +use crate::arrays::PatchedArray; |
| 30 | +use crate::buffer::BufferHandle; |
| 31 | +use crate::dtype::DType; |
| 32 | +use crate::dtype::PType; |
| 33 | +use crate::patches::Patches; |
| 34 | +use crate::serde::ArrayChildren; |
| 35 | +use crate::vtable; |
| 36 | + |
| 37 | +#[derive(Clone, Debug)] |
| 38 | +pub struct LazyPatched; |
| 39 | + |
| 40 | +vtable!(LazyPatched, LazyPatched, LazyPatchedData); |
| 41 | + |
| 42 | +#[derive(Clone, prost::Message)] |
| 43 | +pub struct LazyPatchedMetadata { |
| 44 | + #[prost(uint32, tag = "1")] |
| 45 | + pub(crate) num_patches: u32, |
| 46 | + #[prost(uint32, tag = "2")] |
| 47 | + pub(crate) offset: u32, |
| 48 | + #[prost(enumeration = "PType", tag = "3")] |
| 49 | + pub(crate) indices_ptype: i32, |
| 50 | +} |
| 51 | + |
| 52 | +impl LazyPatched { |
| 53 | + pub const ID: ArrayId = ArrayId::new_ref("vortex.patched_lazy"); |
| 54 | +} |
| 55 | + |
| 56 | +const INNER_SLOT: usize = 0; |
| 57 | +const PATCH_INDICES_SLOT: usize = 1; |
| 58 | +const PATCH_VALUES_SLOT: usize = 2; |
| 59 | +const NUM_SLOTS: usize = 3; |
| 60 | +const SLOT_NAMES: [&str; NUM_SLOTS] = ["inner", "patch_indices", "patch_values"]; |
| 61 | + |
| 62 | +impl VTable for LazyPatched { |
| 63 | + type ArrayData = LazyPatchedData; |
| 64 | + |
| 65 | + type OperationsVTable = Self; |
| 66 | + type ValidityVTable = ValidityVTableFromChild; |
| 67 | + |
| 68 | + fn id(&self) -> ArrayId { |
| 69 | + Self::ID |
| 70 | + } |
| 71 | + |
| 72 | + fn validate(&self, data: &Self::ArrayData, dtype: &DType, len: usize) -> VortexResult<()> { |
| 73 | + vortex_ensure_eq!(data.inner().len(), len); |
| 74 | + vortex_ensure_eq!(data.patches().dtype(), dtype); |
| 75 | + Ok(()) |
| 76 | + } |
| 77 | + |
| 78 | + fn array_hash<H: Hasher>(array: &Self::ArrayData, state: &mut H, precision: Precision) { |
| 79 | + array.slots[INNER_SLOT] |
| 80 | + .as_ref() |
| 81 | + .vortex_expect("present") |
| 82 | + .array_hash(state, precision); |
| 83 | + array.slots[PATCH_INDICES_SLOT] |
| 84 | + .as_ref() |
| 85 | + .vortex_expect("present") |
| 86 | + .array_hash(state, precision); |
| 87 | + array.slots[PATCH_VALUES_SLOT] |
| 88 | + .as_ref() |
| 89 | + .vortex_expect("present") |
| 90 | + .array_hash(state, precision); |
| 91 | + } |
| 92 | + |
| 93 | + fn array_eq(array: &Self::ArrayData, other: &Self::ArrayData, precision: Precision) -> bool { |
| 94 | + array.inner().array_eq(other.inner(), precision) |
| 95 | + && array.patches().array_eq(&other.patches(), precision) |
| 96 | + } |
| 97 | + |
| 98 | + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { |
| 99 | + 0 |
| 100 | + } |
| 101 | + |
| 102 | + fn buffer(_array: ArrayView<'_, Self>, _idx: usize) -> BufferHandle { |
| 103 | + vortex_panic!("LazyPatched array holds no buffers") |
| 104 | + } |
| 105 | + |
| 106 | + fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option<String> { |
| 107 | + vortex_panic!("LazyPatched array holds no buffers") |
| 108 | + } |
| 109 | + |
| 110 | + fn serialize(array: ArrayView<'_, Self>) -> VortexResult<Option<Vec<u8>>> { |
| 111 | + let num_patches = u32::try_from(array.num_patches())?; |
| 112 | + let offset = u32::try_from(array.offset)?; |
| 113 | + let indices_ptype = array.patch_indices_ptype() as i32; |
| 114 | + |
| 115 | + Ok(Some( |
| 116 | + LazyPatchedMetadata { |
| 117 | + num_patches, |
| 118 | + offset, |
| 119 | + indices_ptype, |
| 120 | + } |
| 121 | + .encode_to_vec(), |
| 122 | + )) |
| 123 | + } |
| 124 | + |
| 125 | + fn deserialize( |
| 126 | + &self, |
| 127 | + dtype: &DType, |
| 128 | + len: usize, |
| 129 | + metadata: &[u8], |
| 130 | + _buffers: &[BufferHandle], |
| 131 | + children: &dyn ArrayChildren, |
| 132 | + _session: &VortexSession, |
| 133 | + ) -> VortexResult<LazyPatchedData> { |
| 134 | + let metadata = LazyPatchedMetadata::decode(metadata)?; |
| 135 | + |
| 136 | + // Convert into PType |
| 137 | + let indices_ptype = PType::try_from(100i32)?; |
| 138 | + let num_patches = metadata.num_patches as usize; |
| 139 | + |
| 140 | + // Child must have expected DType. |
| 141 | + let inner = children.get(0, dtype, len)?; |
| 142 | + let patch_indices = children.get(1, &DType::from(indices_ptype), num_patches)?; |
| 143 | + let patch_values = children.get(2, dtype, num_patches)?; |
| 144 | + |
| 145 | + Ok(LazyPatchedData { |
| 146 | + offset: metadata.offset as usize, |
| 147 | + slots: vec![Some(inner), Some(patch_indices), Some(patch_values)], |
| 148 | + }) |
| 149 | + } |
| 150 | + |
| 151 | + fn slots(array: ArrayView<'_, Self>) -> &[Option<ArrayRef>] { |
| 152 | + &array.data().slots |
| 153 | + } |
| 154 | + |
| 155 | + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { |
| 156 | + SLOT_NAMES[idx].to_string() |
| 157 | + } |
| 158 | + |
| 159 | + fn with_slots( |
| 160 | + array: &mut Self::ArrayData, |
| 161 | + mut slots: Vec<Option<ArrayRef>>, |
| 162 | + ) -> VortexResult<()> { |
| 163 | + vortex_ensure_eq!(slots.len(), NUM_SLOTS); |
| 164 | + |
| 165 | + array.slots[INNER_SLOT] = Some( |
| 166 | + slots |
| 167 | + .remove(0) |
| 168 | + .ok_or_else(|| vortex_err!("inner slot required"))?, |
| 169 | + ); |
| 170 | + |
| 171 | + array.slots[PATCH_INDICES_SLOT] = Some( |
| 172 | + slots |
| 173 | + .remove(0) |
| 174 | + .ok_or_else(|| vortex_err!("patch_indices slot required"))?, |
| 175 | + ); |
| 176 | + array.slots[PATCH_VALUES_SLOT] = Some( |
| 177 | + slots |
| 178 | + .remove(0) |
| 179 | + .ok_or_else(|| vortex_err!("patch_values slot required"))?, |
| 180 | + ); |
| 181 | + |
| 182 | + Ok(()) |
| 183 | + } |
| 184 | + |
| 185 | + fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> { |
| 186 | + // Execution => actually transpose the patches, get back a `PatchedArray`. |
| 187 | + let patched = |
| 188 | + PatchedArray::from_array_and_patches(array.inner().clone(), &array.patches(), ctx)? |
| 189 | + .into_array(); |
| 190 | + |
| 191 | + Ok(ExecutionResult::done(patched)) |
| 192 | + } |
| 193 | +} |
| 194 | + |
| 195 | +#[derive(Debug, Clone)] |
| 196 | +pub struct LazyPatchedData { |
| 197 | + /// Slots. Contains the inner, the patch_indices and patch_values. |
| 198 | + /// All slots must be occupied. |
| 199 | + pub(crate) slots: Vec<Option<ArrayRef>>, |
| 200 | + /// Offset into the patches. |
| 201 | + pub(crate) offset: usize, |
| 202 | +} |
| 203 | + |
| 204 | +impl LazyPatchedData { |
| 205 | + /// Create a new `LazyPatchedData` from an inner array and an aligned set of [`Patches`]. |
| 206 | + /// |
| 207 | + /// # Errors |
| 208 | + /// |
| 209 | + /// Returns an error if the patches are not aligned to the array, i.e. the `array_len` of |
| 210 | + /// the patches does not equal the length of the inner array. |
| 211 | + pub fn try_new(inner: ArrayRef, patches: Patches) -> VortexResult<Self> { |
| 212 | + vortex_ensure_eq!( |
| 213 | + inner.len(), |
| 214 | + patches.array_len(), |
| 215 | + "Patches array_len does not match array len" |
| 216 | + ); |
| 217 | + |
| 218 | + vortex_ensure_eq!( |
| 219 | + inner.dtype(), |
| 220 | + patches.dtype(), |
| 221 | + "Array and Patches types must match" |
| 222 | + ); |
| 223 | + |
| 224 | + let offset = patches.offset(); |
| 225 | + let slots = vec![ |
| 226 | + Some(inner), |
| 227 | + Some(patches.indices().clone()), |
| 228 | + Some(patches.values().clone()), |
| 229 | + ]; |
| 230 | + |
| 231 | + Ok(Self { slots, offset }) |
| 232 | + } |
| 233 | + |
| 234 | + pub(crate) fn inner(&self) -> &ArrayRef { |
| 235 | + self.slots[INNER_SLOT] |
| 236 | + .as_ref() |
| 237 | + .vortex_expect("always occupied") |
| 238 | + } |
| 239 | + |
| 240 | + pub(crate) fn patch_indices_ptype(&self) -> PType { |
| 241 | + self.slots[PATCH_INDICES_SLOT] |
| 242 | + .as_ref() |
| 243 | + .vortex_expect("must be occupied") |
| 244 | + .dtype() |
| 245 | + .as_ptype() |
| 246 | + } |
| 247 | + |
| 248 | + pub(crate) fn patches(&self) -> Patches { |
| 249 | + let patch_indices = self.slots[PATCH_INDICES_SLOT] |
| 250 | + .clone() |
| 251 | + .vortex_expect("must be occupied"); |
| 252 | + let patch_values = self.slots[PATCH_VALUES_SLOT] |
| 253 | + .clone() |
| 254 | + .vortex_expect("must be occupied"); |
| 255 | + |
| 256 | + // SAFETY: the components are shredded from an original Patches at construction time, |
| 257 | + // we are just re-assembling them without modification. |
| 258 | + unsafe { |
| 259 | + Patches::new_unchecked( |
| 260 | + self.inner().len(), |
| 261 | + self.offset, |
| 262 | + patch_indices, |
| 263 | + patch_values, |
| 264 | + None, |
| 265 | + None, |
| 266 | + ) |
| 267 | + } |
| 268 | + } |
| 269 | + |
| 270 | + pub(crate) fn num_patches(&self) -> usize { |
| 271 | + self.slots[PATCH_INDICES_SLOT] |
| 272 | + .as_ref() |
| 273 | + .vortex_expect("must be occupied") |
| 274 | + .len() |
| 275 | + } |
| 276 | +} |
0 commit comments