|
16 | 16 | # under the License. |
17 | 17 | # pylint:disable=redefined-outer-name,eval-used |
18 | 18 | from typing import cast |
| 19 | +from unittest.mock import patch |
19 | 20 |
|
20 | | -from pyiceberg.manifest import ManifestContent |
| 21 | +from pyiceberg.io import FileIO |
| 22 | +from pyiceberg.manifest import ManifestContent, ManifestFile |
21 | 23 | from pyiceberg.table import Table |
22 | 24 | from pyiceberg.table.snapshots import Operation, Snapshot |
23 | 25 | from pyiceberg.table.update.validate import validation_history |
24 | 26 |
|
25 | 27 |
|
26 | 28 | def test_validation_history(table_v2_with_extensive_snapshots: Table) -> None: |
27 | 29 | """Test the validation history function.""" |
| 30 | + mock_manifests = {} |
| 31 | + |
| 32 | + for i, snapshot in enumerate(table_v2_with_extensive_snapshots.snapshots()): |
| 33 | + mock_manifest = ManifestFile( |
| 34 | + manifest_path=f"foo/bar/{i}", |
| 35 | + manifest_length=1, |
| 36 | + partition_spec_id=1, |
| 37 | + content=ManifestContent.DATA if i % 2 == 0 else ManifestContent.DELETES, |
| 38 | + sequence_number=1, |
| 39 | + min_sequence_number=1, |
| 40 | + added_snapshot_id=snapshot.snapshot_id, |
| 41 | + ) |
| 42 | + |
| 43 | + # Store the manifest for this specific snapshot |
| 44 | + mock_manifests[snapshot.snapshot_id] = [mock_manifest] |
| 45 | + |
| 46 | + expected_manifest_data_counts = len([m for m in mock_manifests.values() if m[0].content == ManifestContent.DATA]) - 1 |
| 47 | + |
28 | 48 | oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] |
29 | 49 | newest_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) |
30 | | - manifests, snapshots = validation_history( |
31 | | - table_v2_with_extensive_snapshots, |
32 | | - newest_snapshot, |
33 | | - oldest_snapshot, |
34 | | - {Operation.APPEND}, |
35 | | - ManifestContent.DATA, |
36 | | - ) |
37 | | - assert len(snapshots) == 2 |
| 50 | + |
| 51 | + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: |
| 52 | + """Mock the manifests method to use the snapshot_id for lookup.""" |
| 53 | + snapshot_id = self.snapshot_id |
| 54 | + if snapshot_id in mock_manifests: |
| 55 | + return mock_manifests[snapshot_id] |
| 56 | + return [] |
| 57 | + |
| 58 | + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): |
| 59 | + manifests, snapshots = validation_history( |
| 60 | + table_v2_with_extensive_snapshots, |
| 61 | + newest_snapshot, |
| 62 | + oldest_snapshot, |
| 63 | + {Operation.APPEND}, |
| 64 | + ManifestContent.DATA, |
| 65 | + ) |
| 66 | + |
| 67 | + assert len(manifests) == expected_manifest_data_counts |
0 commit comments