|
34 | 34 | NoSuchNamespaceError, |
35 | 35 | NoSuchTableError, |
36 | 36 | TableAlreadyExistsError, |
| 37 | + ValidationError, |
37 | 38 | ) |
38 | 39 | from pyiceberg.io import WAREHOUSE |
39 | 40 | from pyiceberg.partitioning import PartitionField, PartitionSpec |
@@ -635,3 +636,56 @@ def test_rest_custom_namespace_separator(rest_catalog: RestCatalog, table_schema |
635 | 636 |
|
636 | 637 | loaded_table = rest_catalog.load_table(identifier=full_table_identifier_tuple) |
637 | 638 | assert loaded_table.name() == full_table_identifier_tuple |
| 639 | + |
| 640 | + |
| 641 | +@pytest.mark.integration |
| 642 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 643 | +def test_incompatible_partitioned_schema_evolution( |
| 644 | + test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str |
| 645 | +) -> None: |
| 646 | + if isinstance(test_catalog, HiveCatalog): |
| 647 | + pytest.skip("HiveCatalog does not support schema evolution") |
| 648 | + |
| 649 | + identifier = (database_name, table_name) |
| 650 | + test_catalog.create_namespace(database_name) |
| 651 | + table = test_catalog.create_table(identifier, test_schema, partition_spec=test_partition_spec) |
| 652 | + assert test_catalog.table_exists(identifier) |
| 653 | + |
| 654 | + with pytest.raises(ValidationError): |
| 655 | + with table.update_schema() as update: |
| 656 | + update.delete_column("VendorID") |
| 657 | + |
| 658 | + # Assert column was not dropped |
| 659 | + assert "VendorID" in table.schema().column_names |
| 660 | + |
| 661 | + with table.transaction() as transaction: |
| 662 | + with transaction.update_spec() as spec_update: |
| 663 | + spec_update.remove_field("VendorID") |
| 664 | + |
| 665 | + with transaction.update_schema() as schema_update: |
| 666 | + schema_update.delete_column("VendorID") |
| 667 | + |
| 668 | + assert table.spec() == PartitionSpec(PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), spec_id=1) |
| 669 | + assert table.schema() == Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False)) |
| 670 | + |
| 671 | + |
| 672 | +@pytest.mark.integration |
| 673 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 674 | +def test_incompatible_sorted_schema_evolution( |
| 675 | + test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str |
| 676 | +) -> None: |
| 677 | + if isinstance(test_catalog, HiveCatalog): |
| 678 | + pytest.skip("HiveCatalog does not support schema evolution") |
| 679 | + |
| 680 | + identifier = (database_name, table_name) |
| 681 | + test_catalog.create_namespace(database_name) |
| 682 | + table = test_catalog.create_table(identifier, test_schema, sort_order=test_sort_order) |
| 683 | + assert test_catalog.table_exists(identifier) |
| 684 | + |
| 685 | + with pytest.raises(ValidationError): |
| 686 | + with table.update_schema() as update: |
| 687 | + update.delete_column("VendorID") |
| 688 | + |
| 689 | + assert table.schema() == Schema( |
| 690 | + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False) |
| 691 | + ) |
0 commit comments