-
Notifications
You must be signed in to change notification settings - Fork 466
Add concurrency safety validation checks #3049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,14 +19,23 @@ | |
| from pyiceberg.exceptions import ValidationException | ||
| from pyiceberg.expressions import BooleanExpression | ||
| from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator | ||
| from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile | ||
| from pyiceberg.manifest import ( | ||
| INITIAL_SEQUENCE_NUMBER, | ||
| DataFile, | ||
| ManifestContent, | ||
| ManifestEntry, | ||
| ManifestEntryStatus, | ||
| ManifestFile, | ||
| ) | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.table import Table | ||
| from pyiceberg.table.delete_file_index import DeleteFileIndex | ||
| from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between | ||
| from pyiceberg.typedef import Record | ||
|
|
||
| VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} | ||
| VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND, Operation.OVERWRITE} | ||
| VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE, Operation.OVERWRITE} | ||
|
|
||
|
|
||
| def _validation_history( | ||
|
|
@@ -216,6 +225,60 @@ def _added_data_files( | |
| yield entry | ||
|
|
||
|
|
||
| def _added_delete_files( | ||
| table: Table, | ||
| starting_snapshot: Snapshot, | ||
| data_filter: BooleanExpression | None, | ||
| partition_set: dict[int, set[Record]] | None, | ||
| parent_snapshot: Snapshot | None, | ||
| ) -> DeleteFileIndex: | ||
| """Return matching delete files that have been added to the table since a starting snapshot. | ||
|
|
||
| Args: | ||
| table: Table to get the history from | ||
| starting_snapshot: Starting snapshot to get the history from | ||
| data_filter: Optional filter to match data files | ||
| partition_set: Optional set of partitions to match data files | ||
| parent_snapshot: Parent snapshot to get the history from | ||
|
|
||
| Returns: | ||
| DeleteFileIndex | ||
| """ | ||
| if parent_snapshot is None or table.format_version < 2: | ||
| return DeleteFileIndex() | ||
|
|
||
| manifests, snapshot_ids = _validation_history( | ||
| table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES | ||
| ) | ||
|
|
||
| dfi = DeleteFileIndex() | ||
|
|
||
| for manifest in manifests: | ||
| for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True): | ||
| if _filter_manifest_entries( | ||
| entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema() | ||
| ): | ||
| dfi.add_delete_file(entry, entry.data_file.partition) | ||
|
|
||
| return dfi | ||
|
|
||
|
|
||
| def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None) -> int: | ||
| """Find the starting sequence number from a snapshot. | ||
|
|
||
| Args: | ||
| table: Table to find snapshot from | ||
| starting_snapshot: Snapshot from where to start looking | ||
|
|
||
| Returns | ||
| Sequence number as int | ||
| """ | ||
| if starting_snapshot is not None: | ||
| if seq := starting_snapshot.sequence_number: | ||
| return seq | ||
| return INITIAL_SEQUENCE_NUMBER | ||
|
|
||
|
|
||
| def _validate_added_data_files( | ||
| table: Table, | ||
| starting_snapshot: Snapshot, | ||
|
|
@@ -235,3 +298,60 @@ def _validate_added_data_files( | |
| if any(conflicting_entries): | ||
| conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None} | ||
| raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!") | ||
|
|
||
|
|
||
| def _validate_no_new_delete_files( | ||
| table: Table, | ||
| starting_snapshot: Snapshot, | ||
| data_filter: BooleanExpression | None, | ||
| partition_set: dict[int, set[Record]] | None, | ||
| parent_snapshot: Snapshot | None, | ||
| ) -> None: | ||
| """Validate no new delete files matching a filter have been added to the table since starting a snapshot. | ||
|
|
||
| Args: | ||
| table: Table to validate | ||
| starting_snapshot: Snapshot current at the start of the operation | ||
| data_filter: Expression used to find added data files | ||
| partition_set: Dictionary of partition spec to set of partition records | ||
| parent_snapshot: Ending snapshot on the branch being validated | ||
| """ | ||
| deletes = _added_delete_files(table, starting_snapshot, data_filter, partition_set, parent_snapshot) | ||
|
|
||
| if deletes.is_empty(): | ||
| return | ||
|
|
||
| conflicting_delete_paths = [file.file_path for file in deletes.referenced_delete_files()] | ||
| raise ValidationException( | ||
| f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_paths}" | ||
| ) | ||
|
|
||
|
|
||
| def _validate_no_new_deletes_for_data_files( | ||
| table: Table, | ||
| starting_snapshot: Snapshot, | ||
| data_filter: BooleanExpression | None, | ||
| data_files: set[DataFile], | ||
| parent_snapshot: Snapshot | None, | ||
| ) -> None: | ||
| """Validate no new delete files must be applied for data files that have been added to the table since a starting snapshot. | ||
|
|
||
| Args: | ||
| table: Table to validate | ||
| starting_snapshot: Snapshot current at the start of the operation | ||
| data_filter: Expression used to find added data files | ||
| data_files: data files to validate have no new deletes | ||
| parent_snapshot: Ending snapshot on the branch being validated | ||
| """ | ||
| # If there is no current state, or no files has been added | ||
| if parent_snapshot is None or table.format_version < 2: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've verified this logic locally by mocking a concurrent conflict. The table.format_version < 2 guard correctly prevents unnecessary overhead for V1 tables, and the use of the DeleteFileIndex ensures we are only blocking commits when there is a real overlap in data files (avoiding 'lazy' global locks). |
||
| return | ||
|
|
||
| deletes = _added_delete_files(table, starting_snapshot, data_filter, None, parent_snapshot) | ||
| seq_num = _starting_sequence_number(table, starting_snapshot) | ||
|
|
||
| # Fail to any delete file found that applies to files written in or before the starting snapshot | ||
| for data_file in data_files: | ||
| delete_files = deletes.for_data_file(seq_num, data_file, data_file.partition) | ||
| if len(delete_files) > 0: | ||
| raise ValidationException(f"Cannot commit, found new delete for replace data file {data_file}") | ||
Uh oh!
There was an error while loading. Please reload this page.