Skip to content

Commit fe6b6fc

Browse files
Block schema field drop if it is reference by an active partition or sort field (#2410)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> Closes #2166 # Rationale for this change We should block when an user wants to drop a column if that column is being referenced by either a active partition spec or sort order field. ## Are these changes tested? Yes, I added unit tests for every incompatible schema change in partitions and sort orders. Also added two new integration tests in `test_catalog` to test for this scenario ## Are there any user-facing changes? No <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com> Co-authored-by: Kevin Liu <kevin.jq.liu@gmail.com>
1 parent 382a15b commit fe6b6fc

File tree

8 files changed

+187
-1
lines changed

8 files changed

+187
-1
lines changed

pyiceberg/partitioning.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
model_validator,
3333
)
3434

35+
from pyiceberg.exceptions import ValidationError
3536
from pyiceberg.schema import Schema
3637
from pyiceberg.transforms import (
3738
BucketTransform,
@@ -249,6 +250,39 @@ def partition_to_path(self, data: Record, schema: Schema) -> str:
249250
path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs, strict=True)])
250251
return path
251252

253+
def check_compatible(self, schema: Schema, allow_missing_fields: bool = False) -> None:
254+
# if the underlying field is dropped, we cannot check they are compatible -- continue
255+
schema_fields = schema._lazy_id_to_field
256+
parents = schema._lazy_id_to_parent
257+
258+
for field in self.fields:
259+
source_field = schema_fields.get(field.source_id)
260+
261+
if allow_missing_fields and source_field is None:
262+
continue
263+
264+
if isinstance(field.transform, VoidTransform):
265+
continue
266+
267+
if not source_field:
268+
raise ValidationError(f"Cannot find source column for partition field: {field}")
269+
270+
source_type = source_field.field_type
271+
if not source_type.is_primitive:
272+
raise ValidationError(f"Cannot partition by non-primitive source field: {source_field}")
273+
if not field.transform.can_transform(source_type):
274+
raise ValidationError(
275+
f"Invalid source field {source_field.name} with type {source_type} " + f"for transform: {field.transform}"
276+
)
277+
278+
# The only valid parent types for a PartitionField are StructTypes. This must be checked recursively
279+
parent_id = parents.get(field.source_id)
280+
while parent_id:
281+
parent_type = schema.find_type(parent_id)
282+
if not parent_type.is_struct:
283+
raise ValidationError(f"Invalid partition field parent: {parent_type}")
284+
parent_id = parents.get(parent_id)
285+
252286

253287
UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
254288

pyiceberg/table/metadata.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ def current_snapshot(self) -> Snapshot | None:
318318
def next_sequence_number(self) -> int:
319319
return self.last_sequence_number + 1 if self.format_version > 1 else INITIAL_SEQUENCE_NUMBER
320320

321+
def sort_order(self) -> SortOrder:
322+
"""Get the current sort order for this table, or UNSORTED_SORT_ORDER if there is no sort order."""
323+
return self.sort_order_by_id(self.default_sort_order_id) or UNSORTED_SORT_ORDER
324+
321325
def sort_order_by_id(self, sort_order_id: int) -> SortOrder | None:
322326
"""Get the sort order by sort_order_id."""
323327
return next((sort_order for sort_order in self.sort_orders if sort_order.order_id == sort_order_id), None)

pyiceberg/table/sorting.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
model_validator,
2828
)
2929

30+
from pyiceberg.exceptions import ValidationError
3031
from pyiceberg.schema import Schema
3132
from pyiceberg.transforms import IdentityTransform, Transform, parse_transform
3233
from pyiceberg.typedef import IcebergBaseModel
@@ -170,6 +171,19 @@ def __repr__(self) -> str:
170171
fields = f"{', '.join(repr(column) for column in self.fields)}, " if self.fields else ""
171172
return f"SortOrder({fields}order_id={self.order_id})"
172173

174+
def check_compatible(self, schema: Schema) -> None:
175+
for field in self.fields:
176+
source_field = schema._lazy_id_to_field.get(field.source_id)
177+
if source_field is None:
178+
raise ValidationError(f"Cannot find source column for sort field: {field}")
179+
if not source_field.field_type.is_primitive:
180+
raise ValidationError(f"Cannot sort by non-primitive source field: {source_field}")
181+
if not field.transform.can_transform(source_field.field_type):
182+
raise ValidationError(
183+
f"Invalid source field {source_field.name} with type {source_field.field_type} "
184+
+ f"for transform: {field.transform}"
185+
)
186+
173187

174188
UNSORTED_SORT_ORDER_ID = 0
175189
UNSORTED_SORT_ORDER = SortOrder(order_id=UNSORTED_SORT_ORDER_ID)

pyiceberg/table/update/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,10 @@ def update_table_metadata(
708708
if base_metadata.last_updated_ms == new_metadata.last_updated_ms:
709709
new_metadata = new_metadata.model_copy(update={"last_updated_ms": datetime_to_millis(datetime.now().astimezone())})
710710

711+
# Check correctness of partition spec, and sort order
712+
new_metadata.spec().check_compatible(new_metadata.schema())
713+
new_metadata.sort_order().check_compatible(new_metadata.schema())
714+
711715
if enforce_validation:
712716
return TableMetadataUtil.parse_obj(new_metadata.model_dump())
713717
else:

tests/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
from pyiceberg.serializers import ToOutputFile
7474
from pyiceberg.table import FileScanTask, Table
7575
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3
76+
from pyiceberg.table.sorting import NullOrder, SortField, SortOrder
7677
from pyiceberg.transforms import DayTransform, IdentityTransform
7778
from pyiceberg.typedef import Identifier
7879
from pyiceberg.types import (
@@ -1893,6 +1894,11 @@ def test_partition_spec() -> PartitionSpec:
18931894
)
18941895

18951896

1897+
@pytest.fixture(scope="session")
1898+
def test_sort_order() -> SortOrder:
1899+
return SortOrder(SortField(source_id=1, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST))
1900+
1901+
18961902
@pytest.fixture(scope="session")
18971903
def generated_manifest_entry_file(
18981904
avro_schema_manifest_entry: dict[str, Any], test_schema: Schema, test_partition_spec: PartitionSpec

tests/integration/test_catalog.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
NoSuchNamespaceError,
3535
NoSuchTableError,
3636
TableAlreadyExistsError,
37+
ValidationError,
3738
)
3839
from pyiceberg.io import WAREHOUSE
3940
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -635,3 +636,56 @@ def test_rest_custom_namespace_separator(rest_catalog: RestCatalog, table_schema
635636

636637
loaded_table = rest_catalog.load_table(identifier=full_table_identifier_tuple)
637638
assert loaded_table.name() == full_table_identifier_tuple
639+
640+
641+
@pytest.mark.integration
642+
@pytest.mark.parametrize("test_catalog", CATALOGS)
643+
def test_incompatible_partitioned_schema_evolution(
644+
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str
645+
) -> None:
646+
if isinstance(test_catalog, HiveCatalog):
647+
pytest.skip("HiveCatalog does not support schema evolution")
648+
649+
identifier = (database_name, table_name)
650+
test_catalog.create_namespace(database_name)
651+
table = test_catalog.create_table(identifier, test_schema, partition_spec=test_partition_spec)
652+
assert test_catalog.table_exists(identifier)
653+
654+
with pytest.raises(ValidationError):
655+
with table.update_schema() as update:
656+
update.delete_column("VendorID")
657+
658+
# Assert column was not dropped
659+
assert "VendorID" in table.schema().column_names
660+
661+
with table.transaction() as transaction:
662+
with transaction.update_spec() as spec_update:
663+
spec_update.remove_field("VendorID")
664+
665+
with transaction.update_schema() as schema_update:
666+
schema_update.delete_column("VendorID")
667+
668+
assert table.spec() == PartitionSpec(PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), spec_id=1)
669+
assert table.schema() == Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False))
670+
671+
672+
@pytest.mark.integration
673+
@pytest.mark.parametrize("test_catalog", CATALOGS)
674+
def test_incompatible_sorted_schema_evolution(
675+
test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str
676+
) -> None:
677+
if isinstance(test_catalog, HiveCatalog):
678+
pytest.skip("HiveCatalog does not support schema evolution")
679+
680+
identifier = (database_name, table_name)
681+
test_catalog.create_namespace(database_name)
682+
table = test_catalog.create_table(identifier, test_schema, sort_order=test_sort_order)
683+
assert test_catalog.table_exists(identifier)
684+
685+
with pytest.raises(ValidationError):
686+
with table.update_schema() as update:
687+
update.delete_column("VendorID")
688+
689+
assert table.schema() == Schema(
690+
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False)
691+
)

tests/table/test_partitioning.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import pytest
2323

24+
from pyiceberg.exceptions import ValidationError
2425
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
2526
from pyiceberg.schema import Schema
2627
from pyiceberg.transforms import (
@@ -264,3 +265,36 @@ def test_deserialize_partition_field_v3() -> None:
264265

265266
field = PartitionField.model_validate_json(json_partition_spec)
266267
assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate")
268+
269+
270+
def test_incompatible_source_column_not_found() -> None:
271+
schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType()))
272+
273+
spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), "some_partition"))
274+
275+
with pytest.raises(ValidationError) as exc:
276+
spec.check_compatible(schema)
277+
278+
assert "Cannot find source column for partition field: 1000: some_partition: identity(3)" in str(exc.value)
279+
280+
281+
def test_incompatible_non_primitive_type() -> None:
282+
schema = Schema(NestedField(1, "foo", StructType()), NestedField(2, "bar", IntegerType()))
283+
284+
spec = PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "some_partition"))
285+
286+
with pytest.raises(ValidationError) as exc:
287+
spec.check_compatible(schema)
288+
289+
assert "Cannot partition by non-primitive source field: 1: foo: optional struct<>" in str(exc.value)
290+
291+
292+
def test_incompatible_transform_source_type() -> None:
293+
schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType()))
294+
295+
spec = PartitionSpec(PartitionField(1, 1000, YearTransform(), "some_partition"))
296+
297+
with pytest.raises(ValidationError) as exc:
298+
spec.check_compatible(schema)
299+
300+
assert "Invalid source field foo with type int for transform: year" in str(exc.value)

tests/table/test_sorting.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import pytest
2222

23+
from pyiceberg.exceptions import ValidationError
24+
from pyiceberg.schema import Schema
2325
from pyiceberg.table.metadata import TableMetadataUtil
2426
from pyiceberg.table.sorting import (
2527
UNSORTED_SORT_ORDER,
@@ -28,7 +30,8 @@
2830
SortField,
2931
SortOrder,
3032
)
31-
from pyiceberg.transforms import BucketTransform, IdentityTransform, VoidTransform
33+
from pyiceberg.transforms import BucketTransform, IdentityTransform, VoidTransform, YearTransform
34+
from pyiceberg.types import IntegerType, NestedField, StructType
3235

3336

3437
@pytest.fixture
@@ -133,3 +136,36 @@ def test_serialize_sort_field_v3() -> None:
133136
expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)
134137
payload = '{"source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}'
135138
assert SortField.model_validate_json(payload) == expected
139+
140+
141+
def test_incompatible_source_column_not_found(sort_order: SortOrder) -> None:
142+
schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType()))
143+
144+
with pytest.raises(ValidationError) as exc:
145+
sort_order.check_compatible(schema)
146+
147+
assert "Cannot find source column for sort field: 19 ASC NULLS FIRST" in str(exc.value)
148+
149+
150+
def test_incompatible_non_primitive_type() -> None:
151+
schema = Schema(NestedField(1, "foo", StructType()), NestedField(2, "bar", IntegerType()))
152+
153+
sort_order = SortOrder(SortField(source_id=1, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST))
154+
155+
with pytest.raises(ValidationError) as exc:
156+
sort_order.check_compatible(schema)
157+
158+
assert "Cannot sort by non-primitive source field: 1: foo: optional struct<>" in str(exc.value)
159+
160+
161+
def test_incompatible_transform_source_type() -> None:
162+
schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType()))
163+
164+
sort_order = SortOrder(
165+
SortField(source_id=1, transform=YearTransform(), null_order=NullOrder.NULLS_FIRST),
166+
)
167+
168+
with pytest.raises(ValidationError) as exc:
169+
sort_order.check_compatible(schema)
170+
171+
assert "Invalid source field foo with type int for transform: year" in str(exc.value)

0 commit comments

Comments
 (0)