Skip to content

Commit 85d44b6

Browse files
yanghuajackye1995
andauthored
feat: support tracking newly inserted and updated rows between versions (#4741)
Co-authored-by: Jack Ye <yezhaoqin@gmail.com>
1 parent 0826f3f commit 85d44b6

36 files changed

Lines changed: 3307 additions & 98 deletions

File tree

java/lance-jni/Cargo.lock

Lines changed: 10 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

java/lance-jni/src/fragment.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,8 @@ impl FromJObjectWithEnv<Fragment> for JObject<'_> {
633633
deletion_file,
634634
physical_rows: Some(physical_rows),
635635
row_id_meta,
636+
created_at_version_meta: None,
637+
last_updated_at_version_meta: None,
636638
})
637639
}
638640
}

protos/rowids.proto

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,19 @@ message EncodedU64Array {
9595
U32Array u32_array = 2;
9696
U64Array u64_array = 3;
9797
}
98-
}
98+
}
99+
100+
/// A sequence of dataset versions. Similar to RowIdSequence but tracks
101+
/// version runs. It uses RLE (Run-Length Encoding) to efficiently
102+
// represent consecutive rows with the same version.
103+
message RowDatasetVersionSequence {
104+
repeated RowDatasetVersionRun runs = 1;
105+
}
106+
107+
/// A run of rows with the same version.
108+
message RowDatasetVersionRun {
109+
/// The number of consecutive rows with the same version.
110+
U64Segment span = 1;
111+
112+
uint64 version = 2;
113+
}

protos/table.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,20 @@ message DataFragment {
277277
ExternalFile external_row_ids = 6;
278278
} // row_id_sequence
279279

280+
oneof last_updated_at_version_sequence {
281+
// If small (< 200KB), the row latest updated versions are stored inline.
282+
bytes inline_last_updated_at_versions = 7;
283+
// Otherwise, stored as part of a file.
284+
ExternalFile external_last_updated_at_versions = 8;
285+
} // last_updated_at_version_sequence
286+
287+
oneof created_at_version_sequence {
288+
// If small (< 200KB), the row created at versions are stored inline.
289+
bytes inline_created_at_versions = 9;
290+
// Otherwise, stored as part of a file.
291+
ExternalFile external_created_at_versions = 10;
292+
} // created_at_version_sequence
293+
280294
// Number of original rows in the fragment, this includes rows that are now marked with
281295
// deletion tombstones. To compute the current number of rows, subtract
282296
// `deletion_file.num_deleted_rows` from this value.

python/python/lance/fragment.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
from .lance import (
2828
DeletionFile as DeletionFile,
2929
)
30+
from .lance import (
31+
RowDatasetVersionMeta as RowDatasetVersionMeta,
32+
)
3033
from .lance import (
3134
RowIdMeta as RowIdMeta,
3235
)
@@ -67,13 +70,19 @@ class FragmentMetadata:
6770
The deletion file, if any.
6871
row_id_meta : Optional[RowIdMeta]
6972
The row id metadata, if any.
73+
created_at_version_meta : Optional[RowDatasetVersionMeta]
74+
The row created at version metadata, if any.
75+
last_updated_at_version_meta : Optional[RowDatasetVersionMeta]
76+
The row last updated at version metadata, if any.
7077
"""
7178

7279
id: int
7380
files: List[DataFile]
7481
physical_rows: int
7582
deletion_file: Optional[DeletionFile] = None
7683
row_id_meta: Optional[RowIdMeta] = None
84+
created_at_version_meta: Optional[RowDatasetVersionMeta] = None
85+
last_updated_at_version_meta: Optional[RowDatasetVersionMeta] = None
7786

7887
@property
7988
def num_deletions(self) -> int:
@@ -110,6 +119,16 @@ def to_json(self) -> dict:
110119
row_id_meta=(
111120
self.row_id_meta.asdict() if self.row_id_meta is not None else None
112121
),
122+
created_at_version_meta=(
123+
json.loads(self.created_at_version_meta.json())
124+
if self.created_at_version_meta is not None
125+
else None
126+
),
127+
last_updated_at_version_meta=(
128+
json.loads(self.last_updated_at_version_meta.json())
129+
if self.last_updated_at_version_meta is not None
130+
else None
131+
),
113132
)
114133

115134
@staticmethod
@@ -124,12 +143,26 @@ def from_json(json_data: str) -> FragmentMetadata:
124143
if row_id_meta is not None:
125144
row_id_meta = RowIdMeta(**row_id_meta)
126145

146+
created_at_version_meta = json_data.get("created_at_version_meta")
147+
if created_at_version_meta is not None:
148+
created_at_version_meta = RowDatasetVersionMeta.from_json(
149+
json.dumps(created_at_version_meta)
150+
)
151+
152+
last_updated_at_version_meta = json_data.get("last_updated_at_version_meta")
153+
if last_updated_at_version_meta is not None:
154+
last_updated_at_version_meta = RowDatasetVersionMeta.from_json(
155+
json.dumps(last_updated_at_version_meta)
156+
)
157+
127158
return FragmentMetadata(
128159
id=json_data["id"],
129160
files=[DataFile(**f) for f in json_data["files"]],
130161
physical_rows=json_data["physical_rows"],
131162
deletion_file=deletion_file,
132163
row_id_meta=row_id_meta,
164+
created_at_version_meta=created_at_version_meta,
165+
last_updated_at_version_meta=last_updated_at_version_meta,
133166
)
134167

135168

python/python/tests/test_fragment.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ def test_fragment_meta():
278278
"column_indices=[], file_major_version=0, file_minor_version=0, "
279279
"file_size_bytes=100), DataFile(path='1.lance', fields=[1], column_indices=[], "
280280
"file_major_version=0, file_minor_version=0, file_size_bytes=None)], "
281-
"physical_rows=100, deletion_file=None, row_id_meta=None)"
281+
"physical_rows=100, deletion_file=None, row_id_meta=None, "
282+
"created_at_version_meta=None, last_updated_at_version_meta=None)"
282283
)
283284

284285

python/src/fragment.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use lance::dataset::transaction::{Operation, Transaction};
2626
use lance::dataset::{InsertBuilder, NewColumnTransform};
2727
use lance::Error;
2828
use lance_io::utils::CachedFileSize;
29-
use lance_table::format::{DataFile, DeletionFile, DeletionFileType, Fragment, RowIdMeta};
29+
use lance_table::format::{
30+
DataFile, DeletionFile, DeletionFileType, Fragment, RowDatasetVersionMeta, RowIdMeta,
31+
};
3032
use lance_table::io::deletion::deletion_file_path;
3133
use object_store::path::Path;
3234
use pyo3::basic::CompareOp;
@@ -593,6 +595,9 @@ impl PyDeletionFile {
593595
#[pyclass(name = "RowIdMeta", module = "lance.fragment")]
594596
pub struct PyRowIdMeta(pub RowIdMeta);
595597

598+
#[pyclass(name = "RowDatasetVersionMeta", module = "lance.fragment")]
599+
pub struct PyRowDatasetVersionMeta(pub RowDatasetVersionMeta);
600+
596601
#[pymethods]
597602
impl PyRowIdMeta {
598603
fn asdict(&self) -> PyResult<Bound<'_, PyDict>> {
@@ -639,6 +644,55 @@ impl PyRowIdMeta {
639644
}
640645
}
641646

647+
#[pymethods]
648+
impl PyRowDatasetVersionMeta {
649+
fn asdict(&self) -> PyResult<Bound<'_, PyDict>> {
650+
Err(PyNotImplementedError::new_err(
651+
"PyRowDatasetVersionMeta.asdict is not yet supported.",
652+
))
653+
}
654+
655+
pub fn json(&self) -> PyResult<String> {
656+
serde_json::to_string(&self.0).map_err(|err| {
657+
PyValueError::new_err(format!(
658+
"Could not serialize RowDatasetVersionMeta due to error: {}",
659+
err
660+
))
661+
})
662+
}
663+
664+
#[staticmethod]
665+
pub fn from_json(json: String) -> PyResult<Self> {
666+
let dataset_version_meta = serde_json::from_str(&json).map_err(|err| {
667+
PyValueError::new_err(format!(
668+
"Could not load RowDatasetVersionMeta due to error: {}",
669+
err
670+
))
671+
})?;
672+
Ok(Self(dataset_version_meta))
673+
}
674+
675+
fn __reduce__(&self, py: Python<'_>) -> PyResult<(PyObject, PyObject)> {
676+
let state = self.json()?;
677+
let state = PyTuple::new(py, vec![state])?.extract()?;
678+
let from_json = PyModule::import(py, "lance.fragment")?
679+
.getattr("RowDatasetVersionMeta")?
680+
.getattr("from_json")?
681+
.extract()?;
682+
Ok((from_json, state))
683+
}
684+
685+
pub fn __richcmp__(&self, other: PyRef<'_, Self>, op: CompareOp) -> PyResult<bool> {
686+
match op {
687+
CompareOp::Eq => Ok(self.0 == other.0),
688+
CompareOp::Ne => Ok(self.0 != other.0),
689+
_ => Err(PyNotImplementedError::new_err(
690+
"Only == and != are supported for RowDatasetVersionMeta",
691+
)),
692+
}
693+
}
694+
}
695+
642696
#[pyclass(name = "FragmentSession", module = "_lib", subclass)]
643697
#[derive(Clone)]
644698
pub struct FragmentSession {
@@ -670,13 +724,21 @@ impl FromPyObject<'_> for PyLance<Fragment> {
670724

671725
let row_id_meta: Option<PyRef<PyRowIdMeta>> = ob.getattr("row_id_meta")?.extract()?;
672726
let row_id_meta = row_id_meta.map(|r| r.0.clone());
727+
let last_updated_at_version_meta: Option<PyRef<PyRowDatasetVersionMeta>> =
728+
ob.getattr("last_updated_at_version_meta")?.extract()?;
729+
let last_updated_at_version_meta = last_updated_at_version_meta.map(|r| r.0.clone());
730+
let created_at_version_meta: Option<PyRef<PyRowDatasetVersionMeta>> =
731+
ob.getattr("created_at_version_meta")?.extract()?;
732+
let created_at_version_meta = created_at_version_meta.map(|r| r.0.clone());
673733

674734
Ok(Self(Fragment {
675735
id: ob.getattr("id")?.extract()?,
676736
files,
677737
deletion_file,
678738
physical_rows: ob.getattr("physical_rows")?.extract()?,
679739
row_id_meta,
740+
last_updated_at_version_meta,
741+
created_at_version_meta,
680742
}))
681743
}
682744
}
@@ -699,13 +761,25 @@ impl<'py> IntoPyObject<'py> for PyLance<&Fragment> {
699761
.as_ref()
700762
.map(|f| PyDeletionFile(f.clone()));
701763
let row_id_meta = self.0.row_id_meta.as_ref().map(|r| PyRowIdMeta(r.clone()));
764+
let last_updated_at_version_meta = self
765+
.0
766+
.last_updated_at_version_meta
767+
.as_ref()
768+
.map(|r| PyRowDatasetVersionMeta(r.clone()));
769+
let created_at_version_meta = self
770+
.0
771+
.created_at_version_meta
772+
.as_ref()
773+
.map(|r| PyRowDatasetVersionMeta(r.clone()));
702774

703775
cls.call1((
704776
self.0.id,
705777
files,
706778
self.0.physical_rows,
707779
deletion_file,
708780
row_id_meta,
781+
created_at_version_meta,
782+
last_updated_at_version_meta,
709783
))
710784
}
711785
}

python/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ use crate::utils::Hnsw;
8989
use crate::utils::KMeans;
9090
pub use dataset::write_dataset;
9191
pub use dataset::Dataset;
92-
use fragment::{FileFragment, PyDeletionFile, PyRowIdMeta};
92+
use fragment::{FileFragment, PyDeletionFile, PyRowDatasetVersionMeta, PyRowIdMeta};
9393
pub use indices::register_indices;
9494
pub use reader::LanceReader;
9595
pub use scanner::Scanner;
@@ -244,6 +244,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
244244
m.add_class::<FileFragment>()?;
245245
m.add_class::<PyDeletionFile>()?;
246246
m.add_class::<PyRowIdMeta>()?;
247+
m.add_class::<PyRowDatasetVersionMeta>()?;
247248
m.add_class::<MergeInsertBuilder>()?;
248249
m.add_class::<LanceBlobFile>()?;
249250
m.add_class::<LanceFileReader>()?;

python/src/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ pub struct LanceReader {
3838
}
3939

4040
impl LanceReader {
41-
pub async fn try_new(scanner: Arc<LanceScanner>) -> ::lance::error::Result<Self> {
42-
let stream = scanner.try_into_stream().await?;
41+
pub async fn try_new(mut scanner: Arc<LanceScanner>) -> ::lance::error::Result<Self> {
42+
let stream = Arc::make_mut(&mut scanner).try_into_stream().await?;
4343
let schema = stream.schema();
4444
Ok(Self {
4545
schema,

0 commit comments

Comments
 (0)