-
Notifications
You must be signed in to change notification settings - Fork 479
partition field names validation against schema field conflicts #2305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
92a29e8
284250b
6cf4a51
e63bedf
61b1b6d
d0b9053
252a4e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -174,26 +174,18 @@ def _commit(self) -> UpdatesAndRequirements: | |
| return updates, requirements | ||
|
|
||
| def _apply(self) -> PartitionSpec: | ||
| def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, partition_names: Set[str]) -> None: | ||
| try: | ||
| field = schema.find_field(name) | ||
| except ValueError: | ||
| field = None | ||
|
|
||
| if source_id is not None and field is not None and field.field_id != source_id: | ||
| raise ValueError(f"Cannot create identity partition from a different field in the schema {name}") | ||
| elif field is not None and source_id != field.field_id: | ||
| raise ValueError(f"Cannot create partition from name that exists in schema {name}") | ||
| if not name: | ||
| raise ValueError("Undefined name") | ||
| if name in partition_names: | ||
| raise ValueError(f"Partition name has to be unique: {name}") | ||
| def _check_and_add_partition_name( | ||
| schema: Schema, name: str, source_id: int, transform: Transform[Any, Any], partition_names: Set[str] | ||
| ) -> None: | ||
| from pyiceberg.partitioning import validate_partition_name | ||
|
|
||
| validate_partition_name(name, transform, source_id, schema, partition_names) | ||
| partition_names.add(name) | ||
|
|
||
| def _add_new_field( | ||
| schema: Schema, source_id: int, field_id: int, name: str, transform: Transform[Any, Any], partition_names: Set[str] | ||
| ) -> PartitionField: | ||
| _check_and_add_partition_name(schema, name, source_id, partition_names) | ||
| _check_and_add_partition_name(schema, name, source_id, transform, partition_names) | ||
| return PartitionField(source_id, field_id, transform, name) | ||
|
|
||
| partition_fields = [] | ||
|
|
@@ -244,6 +236,13 @@ def _add_new_field( | |
| partition_fields.append(new_field) | ||
|
|
||
| for added_field in self._adds: | ||
| _check_and_add_partition_name( | ||
| self._transaction.table_metadata.schema(), | ||
| added_field.name, | ||
| added_field.source_id, | ||
| added_field.transform, | ||
| partition_names, | ||
| ) | ||
|
Comment on lines
+239
to
+245
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch. just to confirm this covers the newly added partition fields?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, that's correct |
||
| new_field = PartitionField( | ||
| source_id=added_field.source_id, | ||
| field_id=added_field.field_id, | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,7 +20,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pyiceberg.catalog import Catalog | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pyiceberg.exceptions import NoSuchTableError | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pyiceberg.partitioning import PartitionField, PartitionSpec | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pyiceberg.schema import Schema | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pyiceberg.table import Table | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pyiceberg.transforms import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -63,13 +63,22 @@ def _table_v2(catalog: Catalog) -> Table: | |||||||||||||||||||||||||||||||||||||||||||||||||||||
| return _create_table_with_schema(catalog, schema_with_timestamp, "2") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: str) -> Table: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _create_table_with_schema( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| catalog: Catalog, schema: Schema, format_version: str, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> Table: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+67
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. following other create table helpers in tests, for example iceberg-python/tests/integration/test_register_table.py Lines 40 to 59 in 8013545
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tbl_name = "default.test_schema_evolution" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| catalog.drop_table(tbl_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except NoSuchTableError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if partition_spec: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return catalog.create_table( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return catalog.create_table( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
kevinjqliu marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.mark.integration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -564,3 +573,80 @@ def _validate_new_partition_fields( | |||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert len(spec.fields) == len(expected_partition_fields) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for i in range(len(spec.fields)): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert spec.fields[i] == expected_partition_fields[i] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.mark.integration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def test_partition_schema_field_name_conflict(catalog: Catalog) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| schema = Schema( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(1, "id", LongType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(2, "event_ts", TimestampType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(3, "another_ts", TimestampType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(4, "str", StringType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = _create_table_with_schema(catalog, schema, "2") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: another_ts"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_spec().add_field("event_ts", YearTransform(), "another_ts").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: id"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_spec().add_field("event_ts", DayTransform(), "id").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create identity partition sourced from different field in schema: another_ts"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_spec().add_field("event_ts", IdentityTransform(), "another_ts").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create identity partition sourced from different field in schema: str"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_spec().add_field("id", IdentityTransform(), "str").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_spec().add_field("id", IdentityTransform(), "id").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_spec().add_field("event_ts", YearTransform(), "event_year").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.mark.integration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def test_partition_validation_during_table_creation(catalog: Catalog) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| schema = Schema( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(1, "id", LongType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(2, "event_ts", TimestampType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(3, "another_ts", TimestampType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(4, "str", StringType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| partition_spec = PartitionSpec( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PartitionField(source_id=2, field_id=1000, transform=YearTransform(), name="another_ts"), spec_id=1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: another_ts"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _create_table_with_schema(catalog, schema, "2", partition_spec) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| partition_spec = PartitionSpec( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id"), spec_id=1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _create_table_with_schema(catalog, schema, "2", partition_spec) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.mark.integration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def test_schema_evolution_partition_conflict(catalog: Catalog) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| schema = Schema( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(1, "id", LongType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NestedField(2, "event_ts", TimestampType(), required=False), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| partition_spec = PartitionSpec( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PartitionField(source_id=2, field_id=1000, transform=YearTransform(), name="event_year"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="first_name"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="id"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| spec_id=1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = _create_table_with_schema(catalog, schema, "2", partition_spec) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: event_year"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_schema().add_column("event_year", StringType()).commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create identity partition sourced from different field in schema: first_name"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_schema().add_column("first_name", StringType()).commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_schema().add_column("other_field", StringType()).commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: event_year"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_schema().rename_column("other_field", "event_year").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with pytest.raises(ValueError, match="Cannot create identity partition sourced from different field in schema: first_name"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_schema().rename_column("other_field", "first_name").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.update_schema().rename_column("other_field", "valid_name").commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.