-
Notifications
You must be signed in to change notification settings - Fork 464
partition field names validation against schema field conflicts #2305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
92a29e8
284250b
6cf4a51
e63bedf
61b1b6d
d0b9053
252a4e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -249,6 +249,26 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: | |||||||||
| UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def validate_partition_name( | ||||||||||
| field_name: str, | ||||||||||
| partition_transform: Transform[Any, Any], | ||||||||||
| source_id: int, | ||||||||||
| schema: Schema, | ||||||||||
| ) -> None: | ||||||||||
| """Validate that a partition field name doesn't conflict with schema field names.""" | ||||||||||
| try: | ||||||||||
| schema_field = schema.find_field(field_name) | ||||||||||
| except ValueError: | ||||||||||
| return # No conflict if field doesn't exist in schema | ||||||||||
|
|
||||||||||
| if isinstance(partition_transform, (IdentityTransform, VoidTransform)): | ||||||||||
| # For identity transforms, allow conflict only if sourced from the same schema field | ||||||||||
| if schema_field.field_id != source_id: | ||||||||||
| raise ValueError(f"Cannot create identity partition from a different source field in the schema: {field_name}") | ||||||||||
| else: | ||||||||||
|
||||||||||
| raise ValueError(f"Cannot create identity partition from a different source field in the schema: {field_name}") | |
| else: | |
| raise ValueError(f"Cannot create identity partition sourced from different field in schema: {field_name}") | |
| else: |
rutb327 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -658,6 +658,14 @@ def _apply(self) -> Schema: | |||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Check the field-ids | ||||||||||||||||||||||||||||||||
| new_schema = Schema(*struct.fields) | ||||||||||||||||||||||||||||||||
| if self._transaction is not None: | ||||||||||||||||||||||||||||||||
| from pyiceberg.partitioning import validate_partition_name | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| for spec in self._transaction.table_metadata.partition_specs: | ||||||||||||||||||||||||||||||||
| for partition_field in spec.fields: | ||||||||||||||||||||||||||||||||
| validate_partition_name( | ||||||||||||||||||||||||||||||||
| partition_field.name, partition_field.transform, partition_field.source_id, new_schema | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
| if self._transaction is not None: | |
| from pyiceberg.partitioning import validate_partition_name | |
| for spec in self._transaction.table_metadata.partition_specs: | |
| for partition_field in spec.fields: | |
| validate_partition_name( | |
| partition_field.name, partition_field.transform, partition_field.source_id, new_schema | |
| ) | |
| from pyiceberg.partitioning import validate_partition_name | |
| for spec in self._transaction.table_metadata.partition_specs: | |
| for partition_field in spec.fields: | |
| validate_partition_name( | |
| partition_field.name, partition_field.transform, partition_field.source_id, new_schema | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'll do the suggested changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some tests show that transaction can be None in some cases, (after removing the check, tests from test_schema.py are failing). They use: UpdateSchema(transaction=None, schema=Schema())
https://github.com/rutb327/iceberg-python/blob/24b12ddd8fdab4a62650786a2c3cdd56a53f8719/tests/test_schema.py#L933
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like everywhere else in the codebase we include transaction in UpdateSchema.
Maybe we can update the tests like this
def test_add_top_level_primitives(primitive_fields: List[NestedField], table_v2: Table) -> None:
for primitive_field in primitive_fields:
new_schema = Schema(primitive_field)
applied = UpdateSchema(transaction=Transaction(table_v2), schema=Schema()).union_by_name(new_schema)._apply() # type: ignore
assert applied == new_schema
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -174,16 +174,12 @@ def _commit(self) -> UpdatesAndRequirements: | |
| return updates, requirements | ||
|
|
||
| def _apply(self) -> PartitionSpec: | ||
| def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, partition_names: Set[str]) -> None: | ||
| try: | ||
| field = schema.find_field(name) | ||
| except ValueError: | ||
| field = None | ||
|
|
||
| if source_id is not None and field is not None and field.field_id != source_id: | ||
| raise ValueError(f"Cannot create identity partition from a different field in the schema {name}") | ||
| elif field is not None and source_id != field.field_id: | ||
| raise ValueError(f"Cannot create partition from name that exists in schema {name}") | ||
| def _check_and_add_partition_name( | ||
| schema: Schema, name: str, source_id: int, transform: Transform[Any, Any], partition_names: Set[str] | ||
| ) -> None: | ||
| from pyiceberg.partitioning import validate_partition_name | ||
|
|
||
| validate_partition_name(name, transform, source_id, schema) | ||
| if not name: | ||
|
||
| raise ValueError("Undefined name") | ||
| if name in partition_names: | ||
|
|
@@ -193,7 +189,7 @@ def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, par | |
| def _add_new_field( | ||
| schema: Schema, source_id: int, field_id: int, name: str, transform: Transform[Any, Any], partition_names: Set[str] | ||
| ) -> PartitionField: | ||
| _check_and_add_partition_name(schema, name, source_id, partition_names) | ||
| _check_and_add_partition_name(schema, name, source_id, transform, partition_names) | ||
| return PartitionField(source_id, field_id, transform, name) | ||
|
|
||
| partition_fields = [] | ||
|
|
@@ -244,6 +240,13 @@ def _add_new_field( | |
| partition_fields.append(new_field) | ||
|
|
||
| for added_field in self._adds: | ||
| _check_and_add_partition_name( | ||
| self._transaction.table_metadata.schema(), | ||
| added_field.name, | ||
| added_field.source_id, | ||
| added_field.transform, | ||
| partition_names, | ||
| ) | ||
|
Comment on lines
+239
to
+245
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch. just to confirm this covers the newly added partition fields?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, that's correct |
||
| new_field = PartitionField( | ||
| source_id=added_field.source_id, | ||
| field_id=added_field.field_id, | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||
| # specific language governing permissions and limitations | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # under the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # pylint:disable=redefined-outer-name | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Optional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import pytest | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -63,12 +64,19 @@ def _table_v2(catalog: Catalog) -> Table: | |||||||||||||||||||||||||||||||||||||||||||||||||||||
| return _create_table_with_schema(catalog, schema_with_timestamp, "2") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: str) -> Table: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _create_table_with_schema( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| catalog: Catalog, schema: Schema, format_version: str, partition_spec: Optional[PartitionSpec] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> Table: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+67
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. following other create table helpers in tests, for example iceberg-python/tests/integration/test_register_table.py Lines 40 to 59 in 8013545
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tbl_name = "default.test_schema_evolution" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| catalog.drop_table(tbl_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except NoSuchTableError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if partition_spec: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return catalog.create_table( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if partition_spec: | |
| return catalog.create_table( | |
| identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | |
| ) | |
| return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) | |
| return catalog.create_table( | |
| identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.