Skip to content

Commit 43c3780

Browse files
jerryjchJing chen He
andauthored
fix: propagate update_columns offsets and partial last_updated for RewriteColumns (#6650)
## Summary * Fixes #6505 * `FileFragment::update_columns` returns `Result<(Fragment, Vec<u32>)>` (unchanged public shape). `update_columns_with_offsets` returns `FragmentUpdateColumnsResult` (fragment, `fields_modified`, `matched_offsets: RoaringBitmap`) for callers that need physical row indices for stable row-id metadata. * `HashJoiner::matched_join_rows` — boolean mask for hash hits; used by `update_columns_with_offsets` and covered by `test_matched_join_rows`. * `Operation::Update`: optional `updated_fragment_offsets: Option<UpdatedFragmentOffsets>` where `UpdatedFragmentOffsets` wraps `HashMap<u64, RoaringBitmap>` (newtype with `Default`, `PartialEq`, manual `DeepSizeOf`). `None` means the caller did not supply offsets. * Proto (`transaction.proto`): backward-compatible `map<uint64, UInt32List> updated_fragment_offsets = 9` on `Update`; serde round-trip preserves semantics. * `build_manifest`: when stable row IDs are enabled, `update_mode == RewriteColumns`, and `Some(UpdatedFragmentOffsets(..))` includes a non-empty bitmap for a fragment, calls `refresh_row_latest_update_meta_for_partial_frag_rewrite_cols` for those offsets only — unmatched rows and untouched fragments are left unchanged. * JNI / Java: `FragmentUpdateResult` includes matched row offsets; the 2-arg constructor `(FragmentMetadata, long[])` delegates to the 3-arg form with an empty offset array for compatibility. JNI uses `update_columns_with_offsets`. * Python: `update_columns` binding correctly destructures the `(Fragment, Vec<u32>)` tuple. ## Root cause For `Operation::Update` with `RewriteColumns`, commits could advance the dataset version without advancing `_row_last_updated_at_version` for the rows that were actually rewritten. `update_columns` did not report which physical offsets matched, and `build_manifest` had no per-fragment offset map to drive the partial refresh. Without that information the transaction layer cannot distinguish which rows changed, so the version metadata is not updated. ## Implementation notes * `RoaringBitmap` iteration is ascending and duplicate-free; redundant `sort` / `dedup` when building proto lists or offset vectors from bitmaps were removed. * Call sites that do not populate offsets use `updated_fragment_offsets: None`. ## Why the protobuf field exists lance-spark passes `Transaction` through JNI as a protobuf blob: Java builds a `Transaction` proto, Rust deserializes it and runs `build_manifest`. Without `updated_fragment_offsets` on the wire, the decoded `Operation::Update` would always have `updated_fragment_offsets: None` even when matched offsets were computed on the JVM side, and the partial refresh in `build_manifest` would silently do nothing. ## Test plan * `cargo test -p lance test_matched_join_rows` — `HashJoiner::matched_join_rows`. * `cargo test -p lance test_build_manifest_partial_last_updated_rewrite_columns_stable_row_ids` — `Dataset::commit` -> `build_manifest`: two fragments, partial `update_columns_with_offsets`, `Operation::Update` with `RewriteColumns` and an offset map; asserts matched vs unmatched vs untouched row version metadata. * `cargo test -p lance test_fragment_update` — fragment path with `Operation::Update` and offsets. * `cargo test -p lance --tests` (or at least `cargo check -p lance --tests`) and `cargo check --manifest-path java/lance-jni/Cargo.toml`. The `pylance` crate is excluded from the root workspace; validate Python bindings in the usual `maturin` / CI flow if you touch `python/`. ## Compatibility * Rust: `update_columns` signature unchanged; `update_columns_with_offsets` is additive. * Java: 2-arg `FragmentUpdateResult` constructor preserved. * Proto: field 9; older clients ignore unknown fields. --------- Co-authored-by: Jing chen He <jingh@adobe.com>
1 parent 75ddf97 commit 43c3780

15 files changed

Lines changed: 529 additions & 20 deletions

File tree

java/lance-jni/src/fragment.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub(crate) struct FragmentMergeResult {
4848
pub(crate) struct FragmentUpdateResult {
4949
updated_fragment: Fragment,
5050
fields_modified: Vec<u32>,
51+
/// Physical row offsets that received column updates (from `_rowaddr` low bits).
52+
updated_row_offsets: Vec<i64>,
5153
}
5254

5355
//////////////////
@@ -490,11 +492,13 @@ fn inner_update_column<'local>(
490492
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
491493
let left_on_str: String = left_on.extract(env)?;
492494
let right_on_str: String = right_on.extract(env)?;
493-
let (updated_fragment, fields_modified) =
494-
RT.block_on(fragment.update_columns(reader, &left_on_str, &right_on_str))?;
495+
let r =
496+
RT.block_on(fragment.update_columns_with_offsets(reader, &left_on_str, &right_on_str))?;
497+
let updated_row_offsets: Vec<i64> = r.matched_offsets.iter().map(|o| o as i64).collect();
495498
let result = FragmentUpdateResult {
496-
updated_fragment,
497-
fields_modified,
499+
updated_fragment: r.fragment,
500+
fields_modified: r.fields_modified,
501+
updated_row_offsets,
498502
};
499503
result.into_java(env)
500504
}
@@ -542,7 +546,7 @@ const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResul
542546
const FRAGMENT_MERGE_RESULT_CONSTRUCTOR_SIG: &str =
543547
"(Lorg/lance/FragmentMetadata;Lorg/lance/schema/LanceSchema;)V";
544548
const FRAGMENT_UPDATE_RESULT_CLASS: &str = "org/lance/fragment/FragmentUpdateResult";
545-
const FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;[J)V";
549+
const FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;[J[J)V";
546550

547551
impl IntoJava for &FragmentMergeResult {
548552
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
@@ -563,12 +567,14 @@ impl IntoJava for &FragmentUpdateResult {
563567
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
564568
let java_updated_fragment = self.updated_fragment.into_java(env)?;
565569
let java_fields_modified = JLance(self.fields_modified.clone()).into_java(env)?;
570+
let java_updated_row_offsets = JLance(self.updated_row_offsets.clone()).into_java(env)?;
566571
Ok(env.new_object(
567572
FRAGMENT_UPDATE_RESULT_CLASS,
568573
FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG,
569574
&[
570575
JValueGen::Object(&java_updated_fragment),
571576
JValueGen::Object(&java_fields_modified),
577+
JValueGen::Object(&java_updated_row_offsets),
572578
],
573579
)?)
574580
}

java/lance-jni/src/traits.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,14 @@ impl IntoJava for JLance<Vec<u32>> {
202202
}
203203
}
204204

205+
impl IntoJava for JLance<Vec<i64>> {
206+
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
207+
let arr = env.new_long_array(self.0.len() as i32)?;
208+
env.set_long_array_region(&arr, 0, &self.0)?;
209+
Ok(arr.into())
210+
}
211+
}
212+
205213
impl IntoJava for JLance<usize> {
206214
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
207215
Ok(env.new_object("java/lang/Long", "(J)V", &[JValueGen::Long(self.0 as i64)])?)

java/lance-jni/src/transaction.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ fn convert_to_java_operation_inner<'local>(
433433
fields_for_preserving_frag_bitmap,
434434
update_mode,
435435
inserted_rows_filter: _,
436+
updated_fragment_offsets: _,
436437
} => {
437438
let removed_ids: Vec<JLance<i64>> = removed_fragment_ids
438439
.iter()
@@ -1222,6 +1223,7 @@ fn convert_to_rust_operation(
12221223
fields_for_preserving_frag_bitmap,
12231224
update_mode,
12241225
inserted_rows_filter: None,
1226+
updated_fragment_offsets: None,
12251227
}
12261228
}
12271229
"DataReplacement" => {

java/src/main/java/org/lance/fragment/FragmentUpdateResult.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,19 @@ public class FragmentUpdateResult {
2626
private final FragmentMetadata updatedFragment;
2727
private final long[] fieldsModified;
2828

29+
/** Local physical row offsets within the fragment that received updates (see RowAddress). */
30+
private final long[] updatedRowOffsets;
31+
32+
/** Two-argument form for callers that do not track per-row offsets; offsets default to empty. */
2933
public FragmentUpdateResult(FragmentMetadata updatedFragment, long[] updatedFieldIds) {
34+
this(updatedFragment, updatedFieldIds, new long[0]);
35+
}
36+
37+
public FragmentUpdateResult(
38+
FragmentMetadata updatedFragment, long[] updatedFieldIds, long[] updatedRowOffsets) {
3039
this.updatedFragment = updatedFragment;
3140
this.fieldsModified = updatedFieldIds;
41+
this.updatedRowOffsets = updatedRowOffsets;
3242
}
3343

3444
public FragmentMetadata getUpdatedFragment() {
@@ -39,11 +49,17 @@ public long[] getFieldsModified() {
3949
return fieldsModified;
4050
}
4151

52+
/** Physical row offsets (0-based within the fragment) whose columns were rewritten. */
53+
public long[] getUpdatedRowOffsets() {
54+
return updatedRowOffsets;
55+
}
56+
4257
@Override
4358
public String toString() {
4459
return MoreObjects.toStringHelper(this)
4560
.add("fragmentMetadata", updatedFragment)
4661
.add("updatedFieldIds", fieldsModified)
62+
.add("updatedRowOffsets", updatedRowOffsets)
4763
.toString();
4864
}
4965
}

protos/transaction.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ message Transaction {
227227
}
228228
}
229229

230+
// Serialized as sorted distinct local physical row offsets within the fragment (0-based).
231+
message UInt32List {
232+
repeated uint32 values = 1;
233+
}
234+
230235
// An operation that updates rows but does not add or remove rows.
231236
message Update {
232237
// The fragments that have been removed. These are fragments where all rows
@@ -248,6 +253,8 @@ message Transaction {
248253
// Filter for checking existence of keys in newly inserted rows, used for conflict detection.
249254
// Only tracks keys from INSERT operations during merge insert, not updates.
250255
optional KeyExistenceFilter inserted_rows = 8;
256+
// Per-fragment physical row offsets that matched an update_columns hash join (RewriteColumns).
257+
map<uint64, UInt32List> updated_fragment_offsets = 9;
251258
}
252259

253260
// The mode of update operation

python/src/transaction.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ impl FromPyObject<'_, '_> for PyLance<Operation> {
299299
fields_for_preserving_frag_bitmap,
300300
update_mode,
301301
inserted_rows_filter: None,
302+
updated_fragment_offsets: None,
302303
};
303304
Ok(Self(op))
304305
}

rust/lance/src/dataset/fragment.rs

Lines changed: 90 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ use std::sync::Arc;
1313

1414
use arrow::compute::concat_batches;
1515
use arrow_array::cast::as_primitive_array;
16+
use arrow_array::types::UInt64Type;
1617
use arrow_array::{
17-
RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array, new_null_array,
18+
Array, RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array, new_null_array,
1819
};
1920
use arrow_schema::Schema as ArrowSchema;
2021
use datafusion::logical_expr::Expr;
@@ -23,6 +24,7 @@ use futures::future::try_join_all;
2324
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, join, stream};
2425
use lance_arrow::{RecordBatchExt, SchemaExt};
2526
use lance_core::datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions};
27+
use lance_core::utils::address::RowAddress;
2628
use lance_core::utils::deletion::DeletionVector;
2729
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
2830
use lance_core::{Error, Result, cache::CacheKey, datatypes::Schema};
@@ -48,6 +50,7 @@ use lance_table::utils::stream::{
4850
ReadBatchFutStream, ReadBatchTask, ReadBatchTaskStream, RowIdAndDeletesConfig,
4951
wrap_with_row_id_and_delete,
5052
};
53+
use roaring::RoaringBitmap;
5154

5255
use self::write::FragmentCreateBuilder;
5356

@@ -61,6 +64,16 @@ use crate::dataset::Dataset;
6164
use crate::dataset::fragment::session::FragmentSession;
6265
use crate::io::deletion::read_dataset_deletion_file;
6366

67+
/// Result of [`FileFragment::update_columns_with_offsets`]: updated fragment metadata, modified field ids,
68+
/// and physical row offsets that matched the join (for stable row-id version metadata).
69+
#[derive(Debug, Clone)]
70+
pub struct FragmentUpdateColumnsResult {
71+
pub fragment: Fragment,
72+
pub fields_modified: Vec<u32>,
73+
/// Physical row offsets (0-based within this fragment) whose columns were rewritten from the right-hand stream.
74+
pub matched_offsets: RoaringBitmap,
75+
}
76+
6477
/// A Fragment of a Lance [`Dataset`].
6578
///
6679
/// The interface is modeled after `pyarrow.dataset.Fragment`.
@@ -1602,12 +1615,27 @@ impl FileFragment {
16021615
Ok(self)
16031616
}
16041617

1618+
/// Same as [`Self::update_columns_with_offsets`] but discards the matched row offsets.
1619+
/// Use [`Self::update_columns_with_offsets`] if you need per-row version metadata for stable row IDs.
16051620
pub async fn update_columns(
16061621
&mut self,
16071622
right_stream: impl RecordBatchReader + Send + 'static,
16081623
left_on: &str,
16091624
right_on: &str,
16101625
) -> Result<(Fragment, Vec<u32>)> {
1626+
let r = self
1627+
.update_columns_with_offsets(right_stream, left_on, right_on)
1628+
.await?;
1629+
Ok((r.fragment, r.fields_modified))
1630+
}
1631+
1632+
/// Same operation as [`Self::update_columns`], and also returns matched physical row offsets for stable row IDs.
1633+
pub async fn update_columns_with_offsets(
1634+
&mut self,
1635+
right_stream: impl RecordBatchReader + Send + 'static,
1636+
left_on: &str,
1637+
right_on: &str,
1638+
) -> Result<FragmentUpdateColumnsResult> {
16111639
if self.schema().field(left_on).is_none() && left_on != ROW_ID && left_on != ROW_ADDR {
16121640
return Err(Error::invalid_input(format!(
16131641
"Column {} does not exist in the left side fragment",
@@ -1647,18 +1675,44 @@ impl FileFragment {
16471675
let mut read_columns: Vec<String> =
16481676
write_schema.fields.iter().map(|f| f.name.clone()).collect();
16491677
read_columns.push(left_on.to_string());
1678+
// Physical positions for matched rows are taken from `_rowaddr` (fragment id + row offset).
1679+
// The updater scans live rows in physical order; `_rowaddr` encodes the slot index used by row-level version metadata.
1680+
if !read_columns.iter().any(|n| n.as_str() == ROW_ADDR) {
1681+
read_columns.push(ROW_ADDR.to_string());
1682+
}
16501683
let mut updater = self
16511684
.updater(
16521685
Some(&read_columns),
16531686
Some((write_schema.clone(), self.schema().clone())),
16541687
None,
16551688
)
16561689
.await?;
1657-
// Hash join
1690+
// Hash join: rows matched on the right-hand stream rewrite columns; track physical offsets via `_rowaddr`.
16581691
let joiner = Arc::new(HashJoiner::try_new(right_stream, right_on).await?);
1692+
let mut matched_offsets = RoaringBitmap::new();
1693+
let frag_id_u32 = u32::try_from(self.metadata.id).map_err(|_| {
1694+
Error::invalid_input(format!(
1695+
"Fragment id {} does not fit RowAddress fragment id",
1696+
self.metadata.id
1697+
))
1698+
})?;
16591699
while let Some(batch) = updater.next().await? {
1700+
let index_column = batch[left_on].clone();
1701+
let matched = joiner.matched_join_rows(index_column.clone())?;
1702+
if let Some(addr_col) = batch.column_by_name(ROW_ADDR) {
1703+
let addrs = as_primitive_array::<UInt64Type>(addr_col.as_ref());
1704+
for (row_idx, &is_matched) in matched.iter().enumerate().take(batch.num_rows()) {
1705+
if !is_matched || addrs.is_null(row_idx) {
1706+
continue;
1707+
}
1708+
let addr = RowAddress::from(addrs.value(row_idx));
1709+
if addr.fragment_id() == frag_id_u32 {
1710+
matched_offsets.insert(addr.row_offset());
1711+
}
1712+
}
1713+
}
16601714
let updated_batch = joiner
1661-
.collect_with_fallback(batch, batch[left_on].clone(), self.dataset())
1715+
.collect_with_fallback(batch, index_column, self.dataset())
16621716
.await?;
16631717
updater.update(updated_batch).await?;
16641718
}
@@ -1689,8 +1743,11 @@ impl FileFragment {
16891743
.iter()
16901744
.filter_map(|&i| u32::try_from(i).ok())
16911745
.collect();
1692-
// Note: updated field should be returned when committing, waiting to be done
1693-
Ok((updated_fragment, updated_fields))
1746+
Ok(FragmentUpdateColumnsResult {
1747+
fragment: updated_fragment,
1748+
fields_modified: updated_fields,
1749+
matched_offsets,
1750+
})
16941751
}
16951752

16961753
/// Append new columns to the fragment
@@ -2647,12 +2704,13 @@ mod tests {
26472704
use lance_io::{assert_io_eq, assert_io_lt, object_store::ObjectStore};
26482705
use pretty_assertions::assert_eq;
26492706
use rstest::rstest;
2707+
use std::collections::HashMap;
26502708

26512709
use super::*;
26522710
use crate::{
26532711
dataset::{
26542712
InsertBuilder,
2655-
transaction::{Operation, UpdateMode},
2713+
transaction::{Operation, UpdateMode, UpdatedFragmentOffsets},
26562714
},
26572715
session::Session,
26582716
utils::test::TestDatasetGenerator,
@@ -2851,19 +2909,29 @@ mod tests {
28512909
vec![Ok(update_batch1)].into_iter(),
28522910
schema1,
28532911
));
2854-
let (updated_fragment1, fields_modified1) = fragment1
2855-
.update_columns(right_stream1, ROW_ID, ROW_ID)
2912+
let u1 = fragment1
2913+
.update_columns_with_offsets(right_stream1, ROW_ID, ROW_ID)
28562914
.await
28572915
.unwrap();
2916+
assert_eq!(u1.matched_offsets.iter().count(), 38);
2917+
assert!(!u1.matched_offsets.contains(0));
2918+
assert!(!u1.matched_offsets.contains(3));
2919+
assert!(u1.matched_offsets.contains(1));
2920+
assert!(u1.matched_offsets.contains(39));
2921+
let frag_id_1 = u1.fragment.id;
2922+
let matched_1 = u1.matched_offsets;
28582923
let op1 = Operation::Update {
28592924
removed_fragment_ids: vec![],
2860-
updated_fragments: vec![updated_fragment1],
2925+
updated_fragments: vec![u1.fragment],
28612926
new_fragments: vec![],
2862-
fields_modified: fields_modified1,
2927+
fields_modified: u1.fields_modified,
28632928
merged_generations: Vec::new(),
28642929
fields_for_preserving_frag_bitmap: vec![],
28652930
update_mode: Some(UpdateMode::RewriteColumns),
28662931
inserted_rows_filter: None,
2932+
updated_fragment_offsets: Some(UpdatedFragmentOffsets(HashMap::from([(
2933+
frag_id_1, matched_1,
2934+
)]))),
28672935
};
28682936
let mut dataset1 = Dataset::commit(
28692937
test_uri,
@@ -2924,19 +2992,27 @@ mod tests {
29242992
vec![Ok(update_batch2)].into_iter(),
29252993
schema2,
29262994
));
2927-
let (updated_fragment2, fields_modified2) = fragment2
2928-
.update_columns(right_stream2, "i", "i1")
2995+
let u2 = fragment2
2996+
.update_columns_with_offsets(right_stream2, "i", "i1")
29292997
.await
29302998
.unwrap();
2999+
assert_eq!(u2.matched_offsets.iter().count(), 38);
3000+
assert!(!u2.matched_offsets.contains(0));
3001+
assert!(!u2.matched_offsets.contains(3));
3002+
let frag_id_2 = u2.fragment.id;
3003+
let matched_2 = u2.matched_offsets;
29313004
let op = Operation::Update {
29323005
removed_fragment_ids: vec![],
2933-
updated_fragments: vec![updated_fragment2],
3006+
updated_fragments: vec![u2.fragment],
29343007
new_fragments: vec![],
2935-
fields_modified: fields_modified2,
3008+
fields_modified: u2.fields_modified,
29363009
merged_generations: Vec::new(),
29373010
fields_for_preserving_frag_bitmap: vec![],
29383011
update_mode: Some(UpdateMode::RewriteColumns),
29393012
inserted_rows_filter: None,
3013+
updated_fragment_offsets: Some(UpdatedFragmentOffsets(HashMap::from([(
3014+
frag_id_2, matched_2,
3015+
)]))),
29403016
};
29413017
let dataset2 = Dataset::commit(
29423018
test_uri,

0 commit comments

Comments
 (0)