Skip to content

Commit 83bb25a

Browse files
feat: Introduce Deduplicator trait to unify mutable and immutable deduplication (delta-io#1537)
## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> This PR introduces the `Deduplicator` trait which will unify Commit and Checkpoint deduplication under a single API. This is done to move the `&mut HashSet<..>` out of `AddRemoveDedup` and behind the `Deduplicator` trait. That way, we can reuse `AddRemoveDedupVisitor` in an immutable scenario. <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> --------- Co-authored-by: Nick Lanham <nicklan@users.noreply.github.com>
1 parent 743edec commit 83bb25a

5 files changed

Lines changed: 113 additions & 90 deletions

File tree

kernel/src/action_reconciliation/log_replay.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
//! actions selected
3232
//!
3333
use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _};
34+
use crate::log_replay::deduplicator::Deduplicator as _;
3435
use crate::log_replay::{
3536
ActionsBatch, FileActionDeduplicator, FileActionKey, HasSelectionVector, LogReplayProcessor,
3637
};

kernel/src/expressions/mod.rs

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -273,14 +273,14 @@ fn fail_serialize_opaque_predicate<S>(
273273
where
274274
S: Serializer,
275275
{
276-
Err(ser::Error::custom("Cannot serialize Opaque Expression"))
276+
Err(ser::Error::custom("Cannot serialize an Opaque Predicate"))
277277
}
278278

279279
fn fail_deserialize_opaque_predicate<'de, D>(_deserializer: D) -> Result<OpaquePredicate, D::Error>
280280
where
281281
D: Deserializer<'de>,
282282
{
283-
Err(de::Error::custom("Cannot deserialize Opaque Expression"))
283+
Err(de::Error::custom("Cannot deserialize an Opaque Predicate"))
284284
}
285285

286286
impl OpaquePredicate {
@@ -316,7 +316,7 @@ fn fail_serialize_opaque_expression<S>(
316316
where
317317
S: Serializer,
318318
{
319-
Err(ser::Error::custom("Cannot serialize Opaque Expression"))
319+
Err(ser::Error::custom("Cannot serialize an Opaque Expression"))
320320
}
321321

322322
fn fail_deserialize_opaque_expression<'de, D>(
@@ -325,7 +325,7 @@ fn fail_deserialize_opaque_expression<'de, D>(
325325
where
326326
D: Deserializer<'de>,
327327
{
328-
Err(de::Error::custom("Cannot deserialize Opaque Expression"))
328+
Err(de::Error::custom("Cannot deserialize an Opaque Expression"))
329329
}
330330

331331
/// A transformation affecting a single field (one pieces of a [`Transform`]). The transformation
@@ -1161,6 +1161,7 @@ mod tests {
11611161
Expression, Predicate, Scalar, Transform, UnaryExpressionOp, VariadicExpressionOp,
11621162
};
11631163
use crate::schema::{ArrayType, DataType, DecimalType, MapType, StructField};
1164+
use crate::utils::test_utils::assert_result_error_with_message;
11641165

11651166
use super::assert_roundtrip;
11661167

@@ -1552,17 +1553,7 @@ mod tests {
15521553

15531554
let expr = Expression::opaque(TestOpaqueExprOp, [Expression::literal(1)]);
15541555
let result = serde_json::to_string(&expr);
1555-
assert!(
1556-
result.is_err(),
1557-
"Opaque expression serialization should fail"
1558-
);
1559-
assert!(
1560-
result
1561-
.unwrap_err()
1562-
.to_string()
1563-
.contains("Cannot serialize Opaque Expression"),
1564-
"Error should mention opaque expression"
1565-
);
1556+
assert_result_error_with_message(result, "Cannot serialize an Opaque Expression");
15661557
}
15671558

15681559
#[test]
@@ -1610,17 +1601,7 @@ mod tests {
16101601

16111602
let pred = Predicate::opaque(TestOpaquePredOp, [Expression::literal(1)]);
16121603
let result = serde_json::to_string(&pred);
1613-
assert!(
1614-
result.is_err(),
1615-
"Opaque predicate serialization should fail"
1616-
);
1617-
assert!(
1618-
result
1619-
.unwrap_err()
1620-
.to_string()
1621-
.contains("Cannot serialize Opaque Expression"),
1622-
"Error should mention opaque expression"
1623-
);
1604+
assert_result_error_with_message(result, "Cannot serialize an Opaque Predicate");
16241605
}
16251606
}
16261607
}

kernel/src/log_replay.rs

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
//! This module provides structures for efficient batch processing, focusing on file action
1414
//! deduplication with `FileActionDeduplicator` which tracks unique files across log batches
1515
//! to minimize memory usage for tables with extensive history.
16-
use crate::actions::deletion_vector::DeletionVectorDescriptor;
17-
use crate::engine_data::{GetData, TypedGetData};
16+
use crate::engine_data::GetData;
17+
use crate::log_replay::deduplicator::Deduplicator;
1818
use crate::scan::data_skipping::DataSkippingFilter;
1919
use crate::{DeltaResult, EngineData};
2020

@@ -24,6 +24,8 @@ use std::collections::HashSet;
2424

2525
use tracing::debug;
2626

27+
pub(crate) mod deduplicator;
28+
2729
/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
2830
/// of adds and removes during log replay.
2931
#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)]
@@ -56,7 +58,8 @@ pub(crate) struct FileActionDeduplicator<'seen> {
5658
seen_file_keys: &'seen mut HashSet<FileActionKey>,
5759
// TODO: Consider renaming to `is_commit_batch`, `deduplicate_batch`, or `save_batch`
5860
// to better reflect its role in deduplication logic.
59-
/// Whether we're processing a log batch (as opposed to a checkpoint)
61+
/// Whether we're processing a commit log JSON file (`true`) or a checkpoint file (`false`).
62+
/// When `true`, file actions are added to `seen_file_keys` as they're processed.
6063
is_log_batch: bool,
6164
/// Index of the getter containing the add.path column
6265
add_path_index: usize,
@@ -86,12 +89,14 @@ impl<'seen> FileActionDeduplicator<'seen> {
8689
remove_dv_start_index,
8790
}
8891
}
92+
}
8993

94+
impl<'seen> Deduplicator for FileActionDeduplicator<'seen> {
9095
/// Checks if log replay already processed this logical file (in which case the current action
9196
/// should be ignored). If not already seen, register it so we can recognize future duplicates.
9297
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it
9398
/// and should process it.
94-
pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
99+
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
95100
// Note: each (add.path + add.dv_unique_id()) pair has a
96101
// unique Add + Remove pair in the log. For example:
97102
// https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json
@@ -117,35 +122,6 @@ impl<'seen> FileActionDeduplicator<'seen> {
117122
}
118123
}
119124

120-
/// Extracts the deletion vector unique ID if it exists.
121-
///
122-
/// This function retrieves the necessary fields for constructing a deletion vector unique ID
123-
/// by accessing `getters` at `dv_start_index` and the following two indices. Specifically:
124-
/// - `dv_start_index` retrieves the storage type (`deletionVector.storageType`).
125-
/// - `dv_start_index + 1` retrieves the path or inline deletion vector (`deletionVector.pathOrInlineDv`).
126-
/// - `dv_start_index + 2` retrieves the optional offset (`deletionVector.offset`).
127-
fn extract_dv_unique_id<'a>(
128-
&self,
129-
i: usize,
130-
getters: &[&'a dyn GetData<'a>],
131-
dv_start_index: usize,
132-
) -> DeltaResult<Option<String>> {
133-
match getters[dv_start_index].get_opt(i, "deletionVector.storageType")? {
134-
Some(storage_type) => {
135-
let path_or_inline =
136-
getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?;
137-
let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?;
138-
139-
Ok(Some(DeletionVectorDescriptor::unique_id_from_parts(
140-
storage_type,
141-
path_or_inline,
142-
offset,
143-
)))
144-
}
145-
None => Ok(None),
146-
}
147-
}
148-
149125
/// Extracts a file action key and determines if it's an add operation.
150126
/// This method examines the data at the given index using the provided getters
151127
/// to identify whether a file action exists and what type it is.
@@ -159,7 +135,7 @@ impl<'seen> FileActionDeduplicator<'seen> {
159135
/// - `Ok(Some((key, is_add)))`: When a file action is found, returns the key and whether it's an add operation
160136
/// - `Ok(None)`: When no file action is found
161137
/// - `Err(...)`: On any error during extraction
162-
pub(crate) fn extract_file_action<'a>(
138+
fn extract_file_action<'a>(
163139
&self,
164140
i: usize,
165141
getters: &[&'a dyn GetData<'a>],
@@ -190,7 +166,7 @@ impl<'seen> FileActionDeduplicator<'seen> {
190166
///
191167
/// `true` indicates we are processing a batch from a commit file.
192168
/// `false` indicates we are processing a batch from a checkpoint.
193-
pub(crate) fn is_log_batch(&self) -> bool {
169+
fn is_log_batch(&self) -> bool {
194170
self.is_log_batch
195171
}
196172
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//! Deduplication abstraction for log replay processors.
2+
//!
3+
//! The [`Deduplicator`] trait supports two deduplication strategies:
4+
//!
5+
//! - **JSON commit files** (`is_log_batch = true`): Tracks (path, dv_unique_id) and updates
6+
//! the hashmap as files are seen. Implementation: [`FileActionDeduplicator`]
7+
//!
8+
//! - **Checkpoint files** (`is_log_batch = false`): Uses (path, dv_unique_id) to filter actions
9+
//! using a read-only hashmap pre-populated from the commit log phase. Future implementation.
10+
//!
11+
//! [`FileActionDeduplicator`]: crate::log_replay::FileActionDeduplicator
12+
13+
use crate::actions::deletion_vector::DeletionVectorDescriptor;
14+
use crate::engine_data::{GetData, TypedGetData};
15+
use crate::log_replay::FileActionKey;
16+
use crate::DeltaResult;
17+
18+
pub(crate) trait Deduplicator {
19+
/// Extracts a file action key from the data. Returns `(key, is_add)` if found.
20+
///
21+
/// TODO: Remove the skip_removes field in the future. The caller is responsible for using the
22+
/// correct Deduplicator instance depeding on whether the batch belongs to a commit or to a
23+
/// checkpoint.
24+
fn extract_file_action<'a>(
25+
&self,
26+
i: usize,
27+
getters: &[&'a dyn GetData<'a>],
28+
skip_removes: bool,
29+
) -> DeltaResult<Option<(FileActionKey, bool)>>;
30+
31+
/// Checks if this file has been seen. When `is_log_batch() = true`, updates the hashmap
32+
/// to track new files. Returns `true` if the file should be filtered out.
33+
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool;
34+
35+
/// Returns `true` for commit log batches (updates hashmap), `false` for checkpoints (read-only).
36+
fn is_log_batch(&self) -> bool;
37+
38+
/// Extracts the deletion vector unique ID if it exists.
39+
///
40+
/// This function retrieves the necessary fields for constructing a deletion vector unique ID
41+
/// by accessing `getters` at `dv_start_index` and the following two indices. Specifically:
42+
/// - `dv_start_index` retrieves the storage type (`deletionVector.storageType`).
43+
/// - `dv_start_index + 1` retrieves the path or inline deletion vector (`deletionVector.pathOrInlineDv`).
44+
/// - `dv_start_index + 2` retrieves the optional offset (`deletionVector.offset`).
45+
fn extract_dv_unique_id<'a>(
46+
&self,
47+
i: usize,
48+
getters: &[&'a dyn GetData<'a>],
49+
dv_start_index: usize,
50+
) -> DeltaResult<Option<String>> {
51+
let Some(storage_type) =
52+
getters[dv_start_index].get_opt(i, "deletionVector.storageType")?
53+
else {
54+
return Ok(None);
55+
};
56+
let path_or_inline = getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?;
57+
let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?;
58+
59+
Ok(Some(DeletionVectorDescriptor::unique_id_from_parts(
60+
storage_type,
61+
path_or_inline,
62+
offset,
63+
)))
64+
}
65+
}

kernel/src/scan/log_replay.rs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::actions::get_log_add_schema;
1212
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
1313
use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef};
1414
use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _};
15+
use crate::log_replay::deduplicator::Deduplicator;
1516
use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor};
1617
use crate::scan::Scalar;
1718
use crate::schema::ToSchema as _;
@@ -80,6 +81,15 @@ pub(crate) struct ScanLogReplayProcessor {
8081
}
8182

8283
impl ScanLogReplayProcessor {
84+
// These index positions correspond to the order of columns defined in
85+
// `selected_column_names_and_types()`
86+
const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters
87+
const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters
88+
const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns
89+
const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters
90+
const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters
91+
const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns
92+
8393
/// Create a new [`ScanLogReplayProcessor`] instance
8494
pub(crate) fn new(engine: &dyn Engine, state_info: Arc<StateInfo>) -> DeltaResult<Self> {
8595
Self::new_with_seen_files(engine, state_info, Default::default())
@@ -234,40 +244,23 @@ impl ScanLogReplayProcessor {
234244
/// replay visits actions newest-first, so once we've seen a file action for a given (path, dvId)
235245
/// pair, we should ignore all subsequent (older) actions for that same (path, dvId) pair. If the
236246
/// first action for a given file is a remove, then that file does not show up in the result at all.
237-
struct AddRemoveDedupVisitor<'seen> {
238-
deduplicator: FileActionDeduplicator<'seen>,
247+
struct AddRemoveDedupVisitor<D: Deduplicator> {
248+
deduplicator: D,
239249
selection_vector: Vec<bool>,
240250
state_info: Arc<StateInfo>,
241251
partition_filter: Option<PredicateRef>,
242252
row_transform_exprs: Vec<Option<ExpressionRef>>,
243253
}
244254

245-
impl AddRemoveDedupVisitor<'_> {
246-
// These index positions correspond to the order of columns defined in
247-
// `selected_column_names_and_types()`
248-
const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters
249-
const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters
250-
const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns
251-
const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters
252-
const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters
253-
const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns
254-
255+
impl<D: Deduplicator> AddRemoveDedupVisitor<D> {
255256
fn new(
256-
seen: &mut HashSet<FileActionKey>,
257+
deduplicator: D,
257258
selection_vector: Vec<bool>,
258259
state_info: Arc<StateInfo>,
259260
partition_filter: Option<PredicateRef>,
260-
is_log_batch: bool,
261-
) -> AddRemoveDedupVisitor<'_> {
261+
) -> AddRemoveDedupVisitor<D> {
262262
AddRemoveDedupVisitor {
263-
deduplicator: FileActionDeduplicator::new(
264-
seen,
265-
is_log_batch,
266-
Self::ADD_PATH_INDEX,
267-
Self::REMOVE_PATH_INDEX,
268-
Self::ADD_DV_START_INDEX,
269-
Self::REMOVE_DV_START_INDEX,
270-
),
263+
deduplicator,
271264
selection_vector,
272265
state_info,
273266
partition_filter,
@@ -319,8 +312,8 @@ impl AddRemoveDedupVisitor<'_> {
319312
// encounter if the table's schema was replaced after the most recent checkpoint.
320313
let partition_values = match &self.state_info.transform_spec {
321314
Some(transform) if is_add => {
322-
let partition_values =
323-
getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?;
315+
let partition_values = getters[ScanLogReplayProcessor::ADD_PARTITION_VALUES_INDEX]
316+
.get(i, "add.partitionValues")?;
324317
let partition_values = parse_partition_values(
325318
&self.state_info.logical_schema,
326319
transform,
@@ -340,7 +333,7 @@ impl AddRemoveDedupVisitor<'_> {
340333
return Ok(false);
341334
}
342335
let base_row_id: Option<i64> =
343-
getters[Self::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?;
336+
getters[ScanLogReplayProcessor::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?;
344337
let transform = self
345338
.state_info
346339
.transform_spec
@@ -363,7 +356,7 @@ impl AddRemoveDedupVisitor<'_> {
363356
}
364357
}
365358

366-
impl RowVisitor for AddRemoveDedupVisitor<'_> {
359+
impl<D: Deduplicator> RowVisitor for AddRemoveDedupVisitor<D> {
367360
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
368361
// NOTE: The visitor assumes a schema with adds first and removes optionally afterward.
369362
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
@@ -514,12 +507,19 @@ impl LogReplayProcessor for ScanLogReplayProcessor {
514507
let selection_vector = self.build_selection_vector(actions.as_ref())?;
515508
assert_eq!(selection_vector.len(), actions.len());
516509

517-
let mut visitor = AddRemoveDedupVisitor::new(
510+
let deduplicator = FileActionDeduplicator::new(
518511
&mut self.seen_file_keys,
512+
is_log_batch,
513+
Self::ADD_PATH_INDEX,
514+
Self::REMOVE_PATH_INDEX,
515+
Self::ADD_DV_START_INDEX,
516+
Self::REMOVE_DV_START_INDEX,
517+
);
518+
let mut visitor = AddRemoveDedupVisitor::new(
519+
deduplicator,
519520
selection_vector,
520521
self.state_info.clone(),
521522
self.partition_filter.clone(),
522-
is_log_batch,
523523
);
524524
visitor.visit_rows_of(actions.as_ref())?;
525525

0 commit comments

Comments
 (0)