Skip to content

Commit 254f413

Browse files
authored
feat(encoding): cache repetition index for FullZip encoding (lance-format#4104)
## Summary This PR introduces caching for repetition index in FullZip encoding to reduce I/O operations when reading variable-length data from remote storage. ## Why this change? When reading variable-length data (like strings) with FullZip encoding, the system needs to load a repetition index to determine byte offsets. Currently, this index is loaded from storage for every read operation, causing 2 additional I/O requests per query. For remote storage scenarios with sufficient RAM, caching this index can significantly improve performance. ## What's changed? - Added `FullZipCacheableState` struct to store decoded repetition index in memory - Implemented cache initialization in `FullZipScheduler::initialize()` that loads and decodes the repetition index once - Added `schedule_ranges_with_cached_rep()` method that uses cached data directly without I/O - Modified `schedule_ranges_rep()` to check for cached data first before falling back to disk reads - Integrated with existing `CachedPageData` trait system for cache management This is the foundation work for issue lance-format#3579. The caching is currently automatic when a repetition index exists, with configuration options coming in follow-up PRs. --- I changed this PR into disable repetition index cache by default so users won't be affected. I will re-add this part after configration from `field metadata` and `ReadParams` has been added. --------- Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent 99d31ec commit 254f413

1 file changed

Lines changed: 313 additions & 9 deletions

File tree

rust/lance-encoding/src/encodings/logical/primitive.rs

Lines changed: 313 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,6 +1656,8 @@ pub struct FullZipScheduler {
16561656
rows_in_page: u64,
16571657
bits_per_offset: u8,
16581658
details: Arc<FullZipDecodeDetails>,
1659+
/// Cached state containing the decoded repetition index
1660+
cached_state: Option<Arc<FullZipCacheableState>>,
16591661
}
16601662

16611663
impl FullZipScheduler {
@@ -1742,6 +1744,7 @@ impl FullZipScheduler {
17421744
priority,
17431745
rows_in_page,
17441746
bits_per_offset,
1747+
cached_state: None,
17451748
})
17461749
}
17471750

@@ -1790,11 +1793,19 @@ impl FullZipScheduler {
17901793
match &details.value_decompressor {
17911794
PerValueDecompressor::Fixed(decompressor) => {
17921795
let bits_per_value = decompressor.bits_per_value();
1793-
assert!(bits_per_value > 0);
1796+
if bits_per_value == 0 {
1797+
return Err(lance_core::Error::Internal {
1798+
message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1799+
location: location!(),
1800+
});
1801+
}
17941802
if bits_per_value % 8 != 0 {
17951803
// Unlikely we will ever want this since full-zip values are so large the few bits we shave off don't
17961804
// make much difference.
1797-
unimplemented!("Bit-packed full-zip");
1805+
return Err(lance_core::Error::NotSupported {
1806+
source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1807+
location: location!(),
1808+
});
17981809
}
17991810
let bytes_per_value = bits_per_value / 8;
18001811
let total_bytes_per_value =
@@ -1822,13 +1833,114 @@ impl FullZipScheduler {
18221833
}
18231834
}
18241835

1836+
/// Schedules ranges directly using cached repetition index data
1837+
fn schedule_ranges_with_cached_rep(
1838+
&self,
1839+
ranges: &[Range<u64>],
1840+
io: &Arc<dyn EncodingsIo>,
1841+
cached_rep_data: &LanceBuffer,
1842+
bytes_per_value: u64,
1843+
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1844+
use crate::utils::bytepack::ByteUnpacker;
1845+
1846+
// Extract byte ranges directly from the cached repetition index
1847+
let byte_ranges: Vec<Range<u64>> = ranges
1848+
.iter()
1849+
.map(|r| {
1850+
// Get start and end values from the cached buffer
1851+
let start_offset = (r.start * bytes_per_value) as usize;
1852+
let end_offset = (r.end * bytes_per_value) as usize;
1853+
1854+
// Use ByteUnpacker to read single values
1855+
let start_slice =
1856+
&cached_rep_data[start_offset..start_offset + bytes_per_value as usize];
1857+
let start_val =
1858+
ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
1859+
.next()
1860+
.unwrap();
1861+
1862+
let end_slice = &cached_rep_data[end_offset..end_offset + bytes_per_value as usize];
1863+
let end_val =
1864+
ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
1865+
.next()
1866+
.unwrap();
1867+
1868+
(self.data_buf_position + start_val)..(self.data_buf_position + end_val)
1869+
})
1870+
.collect();
1871+
1872+
let data = io.submit_request(byte_ranges, self.priority);
1873+
let row_ranges = ranges.to_vec();
1874+
let details = self.details.clone();
1875+
let bits_per_offset = self.bits_per_offset;
1876+
1877+
Ok(async move {
1878+
let data = data.await?;
1879+
let data = data
1880+
.into_iter()
1881+
.map(|d| LanceBuffer::from_bytes(d, 1))
1882+
.collect();
1883+
let num_rows = row_ranges.into_iter().map(|r| r.end - r.start).sum();
1884+
1885+
match &details.value_decompressor {
1886+
PerValueDecompressor::Fixed(decompressor) => {
1887+
let bits_per_value = decompressor.bits_per_value();
1888+
if bits_per_value == 0 {
1889+
return Err(lance_core::Error::Internal {
1890+
message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1891+
location: location!(),
1892+
});
1893+
}
1894+
if bits_per_value % 8 != 0 {
1895+
return Err(lance_core::Error::NotSupported {
1896+
source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1897+
location: location!(),
1898+
});
1899+
}
1900+
let bytes_per_value = bits_per_value / 8;
1901+
let total_bytes_per_value =
1902+
bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1903+
Ok(Box::new(FixedFullZipDecoder {
1904+
details,
1905+
data,
1906+
num_rows,
1907+
offset_in_current: 0,
1908+
bytes_per_value: bytes_per_value as usize,
1909+
total_bytes_per_value,
1910+
}) as Box<dyn StructuralPageDecoder>)
1911+
}
1912+
PerValueDecompressor::Variable(_decompressor) => {
1913+
Ok(Box::new(VariableFullZipDecoder::new(
1914+
details,
1915+
data,
1916+
num_rows,
1917+
bits_per_offset,
1918+
bits_per_offset,
1919+
)) as Box<dyn StructuralPageDecoder>)
1920+
}
1921+
}
1922+
}
1923+
.boxed())
1924+
}
1925+
18251926
/// Schedules ranges in the presence of a repetition index
18261927
fn schedule_ranges_rep(
18271928
&self,
18281929
ranges: &[Range<u64>],
18291930
io: &Arc<dyn EncodingsIo>,
18301931
rep_index: &FullZipRepIndexDetails,
18311932
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1933+
// Check if we have cached repetition index data
1934+
if let Some(cached_state) = &self.cached_state {
1935+
return self.schedule_ranges_with_cached_rep(
1936+
ranges,
1937+
io,
1938+
&cached_state.rep_index_buffer,
1939+
rep_index.bytes_per_value,
1940+
);
1941+
}
1942+
1943+
// Fall back to loading from disk
18321944
let rep_index_ranges = ranges
18331945
.iter()
18341946
.flat_map(|r| {
@@ -1908,16 +2020,69 @@ impl FullZipScheduler {
19082020
}
19092021
}
19102022

2023+
/// Cacheable state for FullZip encoding, storing the decoded repetition index
2024+
#[derive(Debug)]
2025+
struct FullZipCacheableState {
2026+
/// The raw repetition index buffer for future decoding
2027+
rep_index_buffer: LanceBuffer,
2028+
}
2029+
2030+
impl DeepSizeOf for FullZipCacheableState {
2031+
fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2032+
self.rep_index_buffer.len()
2033+
}
2034+
}
2035+
2036+
impl CachedPageData for FullZipCacheableState {
2037+
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2038+
self
2039+
}
2040+
}
2041+
19112042
impl StructuralPageScheduler for FullZipScheduler {
1912-
// TODO: Add opt-in caching of repetition index
2043+
/// Initializes the scheduler. If there's a repetition index, loads and caches it.
2044+
/// Otherwise returns NoCachedPageData.
19132045
fn initialize<'a>(
19142046
&'a mut self,
1915-
_io: &Arc<dyn EncodingsIo>,
2047+
io: &Arc<dyn EncodingsIo>,
19162048
) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1917-
std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2049+
// Check if we have a repetition index
2050+
if let Some(rep_index) = &self.rep_index {
2051+
// Calculate the total size of the repetition index
2052+
let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2053+
let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2054+
2055+
// Load the repetition index buffer
2056+
let io_clone = io.clone();
2057+
let future = async move {
2058+
let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2059+
let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2060+
2061+
// Create and return the cacheable state
2062+
Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2063+
};
2064+
2065+
future.boxed()
2066+
} else {
2067+
// No repetition index, skip caching
2068+
std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2069+
}
19182070
}
19192071

1920-
fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
2072+
/// Loads previously cached repetition index data from the cache system.
2073+
/// This method is called when a scheduler instance needs to use cached data
2074+
/// that was initialized by another instance or in a previous operation.
2075+
fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2076+
// Try to downcast to our specific cache type
2077+
if let Ok(cached_state) = cache
2078+
.clone()
2079+
.as_arc_any()
2080+
.downcast::<FullZipCacheableState>()
2081+
{
2082+
// Store the cached state for use in schedule_ranges
2083+
self.cached_state = Some(cached_state);
2084+
}
2085+
}
19212086

19222087
fn schedule_ranges(
19232088
&self,
@@ -4074,14 +4239,16 @@ impl FieldEncoder for PrimitiveStructuralEncoder {
40744239
mod tests {
40754240
use std::{collections::VecDeque, sync::Arc};
40764241

4077-
use arrow_array::{ArrayRef, Int8Array, StringArray};
4078-
40794242
use crate::encodings::logical::primitive::{
40804243
ChunkDrainInstructions, PrimitiveStructuralEncoder,
40814244
};
4245+
use arrow_array::{ArrayRef, Int8Array, StringArray};
40824246

40834247
use super::{
4084-
ChunkInstructions, DataBlock, DecodeMiniBlockTask, PreambleAction, RepetitionIndex,
4248+
ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4249+
FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4250+
FullZipScheduler, PerValueDecompressor, PreambleAction, RepetitionIndex,
4251+
StructuralPageScheduler,
40854252
};
40864253

40874254
#[test]
@@ -4690,4 +4857,141 @@ mod tests {
46904857
assert!(!need_preamble);
46914858
assert_eq!(skip_in_chunk, 0);
46924859
}
4860+
4861+
#[tokio::test]
4862+
async fn test_fullzip_repetition_index_caching() {
4863+
use crate::testing::SimulatedScheduler;
4864+
use lance_core::cache::LanceCache;
4865+
4866+
// Simplified FixedPerValueDecompressor for testing
4867+
#[derive(Debug)]
4868+
struct TestFixedDecompressor;
4869+
4870+
impl FixedPerValueDecompressor for TestFixedDecompressor {
4871+
fn decompress(
4872+
&self,
4873+
_data: FixedWidthDataBlock,
4874+
_num_rows: u64,
4875+
) -> crate::Result<DataBlock> {
4876+
unimplemented!("Test decompressor")
4877+
}
4878+
4879+
fn bits_per_value(&self) -> u64 {
4880+
32
4881+
}
4882+
}
4883+
4884+
// Create test repetition index data
4885+
let rows_in_page = 100u64;
4886+
let bytes_per_value = 4u64;
4887+
let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
4888+
4889+
// Create mock repetition index data
4890+
let mut rep_index_data = Vec::new();
4891+
for i in 0..=rows_in_page {
4892+
let offset = (i * 100) as u32; // Each row starts at i * 100 bytes
4893+
rep_index_data.extend_from_slice(&offset.to_le_bytes());
4894+
}
4895+
4896+
// Simulate storage with the repetition index at position 1000
4897+
let mut full_data = vec![0u8; 1000];
4898+
full_data.extend_from_slice(&rep_index_data);
4899+
full_data.extend_from_slice(&vec![0u8; 10000]); // Add some data after
4900+
4901+
let data = bytes::Bytes::from(full_data);
4902+
let io = Arc::new(SimulatedScheduler::new(data));
4903+
let _cache = Arc::new(LanceCache::with_capacity(1024 * 1024));
4904+
4905+
// Create FullZipScheduler with repetition index
4906+
let mut scheduler = FullZipScheduler {
4907+
data_buf_position: 0,
4908+
rep_index: Some(FullZipRepIndexDetails {
4909+
buf_position: 1000,
4910+
bytes_per_value,
4911+
}),
4912+
priority: 0,
4913+
rows_in_page,
4914+
bits_per_offset: 32,
4915+
details: Arc::new(FullZipDecodeDetails {
4916+
value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
4917+
def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
4918+
ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
4919+
max_rep: 0,
4920+
max_visible_def: 0,
4921+
}),
4922+
cached_state: None,
4923+
};
4924+
4925+
// First initialization should load and cache the repetition index
4926+
let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
4927+
let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
4928+
4929+
// Verify that we got a FullZipCacheableState (not NoCachedPageData)
4930+
let is_cached = cached_data1
4931+
.clone()
4932+
.as_arc_any()
4933+
.downcast::<FullZipCacheableState>()
4934+
.is_ok();
4935+
assert!(
4936+
is_cached,
4937+
"Expected FullZipCacheableState, got NoCachedPageData"
4938+
);
4939+
4940+
// Load the cached data into the scheduler
4941+
scheduler.load(&cached_data1);
4942+
4943+
// Verify that cached_state is now populated
4944+
assert!(
4945+
scheduler.cached_state.is_some(),
4946+
"cached_state should be populated after load"
4947+
);
4948+
4949+
// Verify the cached data contains the repetition index
4950+
let cached_state = scheduler.cached_state.as_ref().unwrap();
4951+
4952+
// Test that schedule_ranges_rep uses the cached data
4953+
let ranges = vec![0..10, 20..30];
4954+
let result = scheduler.schedule_ranges_rep(
4955+
&ranges,
4956+
&io_dyn,
4957+
&FullZipRepIndexDetails {
4958+
buf_position: 1000,
4959+
bytes_per_value,
4960+
},
4961+
);
4962+
4963+
// The result should be OK (not an error)
4964+
assert!(
4965+
result.is_ok(),
4966+
"schedule_ranges_rep should succeed with cached data"
4967+
);
4968+
4969+
// Second scheduler instance should be able to use the cached data
4970+
let mut scheduler2 = FullZipScheduler {
4971+
data_buf_position: 0,
4972+
rep_index: Some(FullZipRepIndexDetails {
4973+
buf_position: 1000,
4974+
bytes_per_value,
4975+
}),
4976+
priority: 0,
4977+
rows_in_page,
4978+
bits_per_offset: 32,
4979+
details: scheduler.details.clone(),
4980+
cached_state: None,
4981+
};
4982+
4983+
// Load cached data from the first scheduler
4984+
scheduler2.load(&cached_data1);
4985+
assert!(
4986+
scheduler2.cached_state.is_some(),
4987+
"Second scheduler should have cached_state after load"
4988+
);
4989+
4990+
// Verify that both schedulers have the same cached data
4991+
let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
4992+
assert!(
4993+
Arc::ptr_eq(cached_state, cached_state2),
4994+
"Both schedulers should share the same cached data"
4995+
);
4996+
}
46934997
}

0 commit comments

Comments
 (0)