-
Notifications
You must be signed in to change notification settings - Fork 467
Allow writing pa.Table that are either a subset of table schema or in arbitrary order, and support type promotion on write
#921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
245acda
0118f2a
e75e0ad
b6e3410
6b774c6
e26eb23
f0125e9
29573d9
d7ec362
d4d80e3
865c446
7340476
28e20d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -120,6 +120,7 @@ | |
| Schema, | ||
| SchemaVisitorPerPrimitiveType, | ||
| SchemaWithPartnerVisitor, | ||
| _check_schema_compatible, | ||
| pre_order_visit, | ||
| promote, | ||
| prune_columns, | ||
|
|
@@ -1450,14 +1451,17 @@ def field_partner(self, partner_struct: Optional[pa.Array], field_id: int, _: st | |
| except ValueError: | ||
| return None | ||
|
|
||
| if isinstance(partner_struct, pa.StructArray): | ||
| return partner_struct.field(name) | ||
| elif isinstance(partner_struct, pa.Table): | ||
| return partner_struct.column(name).combine_chunks() | ||
| elif isinstance(partner_struct, pa.RecordBatch): | ||
| return partner_struct.column(name) | ||
| else: | ||
| raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}") | ||
| try: | ||
| if isinstance(partner_struct, pa.StructArray): | ||
| return partner_struct.field(name) | ||
| elif isinstance(partner_struct, pa.Table): | ||
| return partner_struct.column(name).combine_chunks() | ||
| elif isinstance(partner_struct, pa.RecordBatch): | ||
| return partner_struct.column(name) | ||
| else: | ||
| raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}") | ||
| except KeyError: | ||
| return None | ||
|
|
||
| return None | ||
|
|
||
|
|
@@ -1998,8 +2002,7 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT | |
| ) | ||
|
|
||
| def write_parquet(task: WriteTask) -> DataFile: | ||
| table_schema = task.schema | ||
|
|
||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| table_schema = table_metadata.schema() | ||
| # if schema needs to be transformed, use the transformed schema and adjust the arrow table accordingly | ||
| # otherwise use the original schema | ||
| if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema: | ||
|
|
@@ -2011,7 +2014,7 @@ def write_parquet(task: WriteTask) -> DataFile: | |
| batches = [ | ||
| _to_requested_schema( | ||
| requested_schema=file_schema, | ||
| file_schema=table_schema, | ||
| file_schema=task.schema, | ||
| batch=batch, | ||
| downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, | ||
| include_field_ids=True, | ||
|
|
@@ -2070,47 +2073,30 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ | |
| return bin_packed_record_batches | ||
|
|
||
|
|
||
| def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> None: | ||
| def _check_pyarrow_schema_compatible( | ||
| requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False | ||
| ) -> None: | ||
| """ | ||
| Check if the `table_schema` is compatible with `other_schema`. | ||
| Check if the `requested_schema` is compatible with `provided_schema`. | ||
|
|
||
| Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type. | ||
|
|
||
| Raises: | ||
| ValueError: If the schemas are not compatible. | ||
| """ | ||
| name_mapping = table_schema.name_mapping | ||
| name_mapping = requested_schema.name_mapping | ||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| try: | ||
| task_schema = pyarrow_to_schema( | ||
| other_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us | ||
| provided_schema = pyarrow_to_schema( | ||
| provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us | ||
| ) | ||
| except ValueError as e: | ||
| other_schema = _pyarrow_to_schema_without_ids(other_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
| additional_names = set(other_schema.column_names) - set(table_schema.column_names) | ||
| provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
| additional_names = provided_schema.field_names - requested_schema.field_names | ||
| raise ValueError( | ||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." | ||
| ) from e | ||
|
|
||
| if table_schema.as_struct() != task_schema.as_struct(): | ||
| from rich.console import Console | ||
| from rich.table import Table as RichTable | ||
|
|
||
| console = Console(record=True) | ||
|
|
||
| rich_table = RichTable(show_header=True, header_style="bold") | ||
| rich_table.add_column("") | ||
| rich_table.add_column("Table field") | ||
| rich_table.add_column("Dataframe field") | ||
|
|
||
| for lhs in table_schema.fields: | ||
| try: | ||
| rhs = task_schema.find_field(lhs.field_id) | ||
| rich_table.add_row("✅" if lhs == rhs else "❌", str(lhs), str(rhs)) | ||
| except ValueError: | ||
| rich_table.add_row("❌", str(lhs), "Missing") | ||
|
|
||
| console.print(rich_table) | ||
| raise ValueError(f"Mismatch in fields:\n{console.export_text()}") | ||
| _check_schema_compatible(requested_schema, provided_schema) | ||
|
|
||
|
|
||
| def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: | ||
|
|
@@ -2124,7 +2110,7 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_ | |
| f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" | ||
| ) | ||
| schema = table_metadata.schema() | ||
| _check_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) | ||
| _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) | ||
|
|
||
| statistics = data_file_statistics_from_parquet_metadata( | ||
| parquet_metadata=parquet_metadata, | ||
|
|
@@ -2205,7 +2191,7 @@ def _dataframe_to_data_files( | |
| Returns: | ||
| An iterable that supplies datafiles that represent the table. | ||
| """ | ||
| from pyiceberg.table import PropertyUtil, TableProperties, WriteTask | ||
| from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, PropertyUtil, TableProperties, WriteTask | ||
|
|
||
| counter = counter or itertools.count(0) | ||
| write_uuid = write_uuid or uuid.uuid4() | ||
|
|
@@ -2214,13 +2200,16 @@ def _dataframe_to_data_files( | |
| property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, | ||
| default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, | ||
| ) | ||
| name_mapping = table_metadata.schema().name_mapping | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we use table's
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question - I'm not sure actually. When we are writing a dataframe into an Iceberg table, I think we are making the assumption that its names match the current names of the Iceberg table, so I think using the
I'm curious to hear what others' thoughts are, and whether anyone has a workflow in mind that would benefit from this change!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds great! I initially raised this because we’re assigning field IDs for the input dataframe, which aligns with the general purpose of name mapping - to provide fallback IDs. On second thought, schema.name-mapping.default is more for the read side, so using it here may silently introduce unwanted side effects during write. I agree, let’s hold off on this for a while and wait for more discussions.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds great 👍 thank you for the review! |
||
| downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False | ||
| task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is necessary to ensure that we are comparing the Schema that matches that arrow table's schema versus the Table Schema in order to properly invoke
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch 👍 |
||
|
|
||
| if table_metadata.spec().is_unpartitioned(): | ||
| yield from write_file( | ||
| io=io, | ||
| table_metadata=table_metadata, | ||
| tasks=iter([ | ||
| WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema()) | ||
| WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema) | ||
| for batches in bin_pack_arrow_table(df, target_file_size) | ||
| ]), | ||
| ) | ||
|
|
@@ -2235,7 +2224,7 @@ def _dataframe_to_data_files( | |
| task_id=next(counter), | ||
| record_batches=batches, | ||
| partition_key=partition.partition_key, | ||
| schema=table_metadata.schema(), | ||
| schema=task_schema, | ||
| ) | ||
| for partition in partitions | ||
| for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -324,6 +324,11 @@ def field_ids(self) -> Set[int]: | |||||||||||
| """Return the IDs of the current schema.""" | ||||||||||||
| return set(self._name_to_id.values()) | ||||||||||||
|
|
||||||||||||
| @property | ||||||||||||
| def field_names(self) -> Set[str]: | ||||||||||||
Fokko marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||
| """Return the Names of the current schema.""" | ||||||||||||
| return set(self._name_to_id.keys()) | ||||||||||||
|
|
||||||||||||
| def _validate_identifier_field(self, field_id: int) -> None: | ||||||||||||
| """Validate that the field with the given ID is a valid identifier field. | ||||||||||||
|
|
||||||||||||
|
|
@@ -1616,3 +1621,145 @@ def _(file_type: FixedType, read_type: IcebergType) -> IcebergType: | |||||||||||
| return read_type | ||||||||||||
| else: | ||||||||||||
| raise ResolveError(f"Cannot promote {file_type} to {read_type}") | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) -> None: | ||||||||||||
| """ | ||||||||||||
| Check if the `provided_schema` is compatible with `requested_schema`. | ||||||||||||
|
|
||||||||||||
| Both Schemas must have valid IDs and share the same ID for the same field names. | ||||||||||||
|
|
||||||||||||
| Two schemas are considered compatible when: | ||||||||||||
| 1. All `required` fields in `requested_schema` are present and are also `required` in the `provided_schema` | ||||||||||||
| 2. Field Types are consistent for fields that are present in both schemas. I.e. the field type | ||||||||||||
| in the `provided_schema` can be promoted to the field type of the same field ID in `requested_schema` | ||||||||||||
|
|
||||||||||||
| Raises: | ||||||||||||
| ValueError: If the schemas are not compatible. | ||||||||||||
| """ | ||||||||||||
| visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema)) | ||||||||||||
|
|
||||||||||||
| # from rich.console import Console | ||||||||||||
| # from rich.table import Table as RichTable | ||||||||||||
|
|
||||||||||||
| # console = Console(record=True) | ||||||||||||
|
|
||||||||||||
| # rich_table = RichTable(show_header=True, header_style="bold") | ||||||||||||
| # rich_table.add_column("") | ||||||||||||
| # rich_table.add_column("Table field") | ||||||||||||
| # rich_table.add_column("Dataframe field") | ||||||||||||
|
|
||||||||||||
| # is_compatible = True | ||||||||||||
|
|
||||||||||||
| # for field_id in requested_schema.field_ids: | ||||||||||||
| # lhs = requested_schema.find_field(field_id) | ||||||||||||
| # try: | ||||||||||||
| # rhs = provided_schema.find_field(field_id) | ||||||||||||
| # except ValueError: | ||||||||||||
| # if lhs.required: | ||||||||||||
| # rich_table.add_row("❌", str(lhs), "Missing") | ||||||||||||
| # is_compatible = False | ||||||||||||
| # else: | ||||||||||||
| # rich_table.add_row("✅", str(lhs), "Missing") | ||||||||||||
| # continue | ||||||||||||
|
|
||||||||||||
| # if lhs.required and not rhs.required: | ||||||||||||
| # rich_table.add_row("❌", str(lhs), "Missing") | ||||||||||||
| # is_compatible = False | ||||||||||||
|
|
||||||||||||
| # if lhs.field_type == rhs.field_type: | ||||||||||||
| # rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
| # continue | ||||||||||||
| # elif any( | ||||||||||||
| # (isinstance(lhs.field_type, container_type) and isinstance(rhs.field_type, container_type)) | ||||||||||||
| # for container_type in {StructType, MapType, ListType} | ||||||||||||
| # ): | ||||||||||||
| # rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
| # continue | ||||||||||||
| # else: | ||||||||||||
| # try: | ||||||||||||
| # promote(rhs.field_type, lhs.field_type) | ||||||||||||
| # rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
| # except ResolveError: | ||||||||||||
| # rich_table.add_row("❌", str(lhs), str(rhs)) | ||||||||||||
| # is_compatible = False | ||||||||||||
|
|
||||||||||||
| # if not is_compatible: | ||||||||||||
| # console.print(rich_table) | ||||||||||||
| # raise ValueError(f"Mismatch in fields:\n{console.export_text()}") | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| class _SchemaCompatibilityVisitor(SchemaVisitor[bool]): | ||||||||||||
|
||||||||||||
| provided_schema: Schema | ||||||||||||
|
|
||||||||||||
| def __init__(self, provided_schema: Schema): | ||||||||||||
| from rich.console import Console | ||||||||||||
| from rich.table import Table as RichTable | ||||||||||||
|
|
||||||||||||
| self.provided_schema = provided_schema | ||||||||||||
| self.rich_table = RichTable(show_header=True, header_style="bold") | ||||||||||||
| self.rich_table.add_column("") | ||||||||||||
| self.rich_table.add_column("Table field") | ||||||||||||
| self.rich_table.add_column("Dataframe field") | ||||||||||||
| self.console = Console(record=True) | ||||||||||||
|
|
||||||||||||
| def _is_field_compatible(self, lhs: NestedField) -> bool: | ||||||||||||
| # Check required field exists as required field first | ||||||||||||
| try: | ||||||||||||
| rhs = self.provided_schema.find_field(lhs.field_id) | ||||||||||||
| except ValueError: | ||||||||||||
| if lhs.required: | ||||||||||||
| self.rich_table.add_row("❌", str(lhs), "Missing") | ||||||||||||
| return False | ||||||||||||
| else: | ||||||||||||
| self.rich_table.add_row("✅", str(lhs), "Missing") | ||||||||||||
| return True | ||||||||||||
|
|
||||||||||||
| if lhs.required and not rhs.required: | ||||||||||||
| self.rich_table.add_row("❌", str(lhs), "Missing") | ||||||||||||
| return False | ||||||||||||
|
|
||||||||||||
| # Check type compatibility | ||||||||||||
| if lhs.field_type == rhs.field_type: | ||||||||||||
| self.rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
| return True | ||||||||||||
| elif any( | ||||||||||||
| (isinstance(lhs.field_type, container_type) and isinstance(rhs.field_type, container_type)) | ||||||||||||
| for container_type in {StructType, MapType, ListType} | ||||||||||||
| ): | ||||||||||||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
| self.rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
| return True | ||||||||||||
| else: | ||||||||||||
| try: | ||||||||||||
| promote(rhs.field_type, lhs.field_type) | ||||||||||||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This succeeds for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have a test to reproduce this? This is interesting since for Python
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll write up a test 👍 The comparison isn't between python types, but between parquet physical types: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L1503-L1507
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we could get away with just removing this check, since we are running a comprehensive type compatibility check already? iceberg-python/pyiceberg/io/pyarrow.py Lines 1550 to 1554 in e27cd90
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test added that demonstrates this issue: https://github.com/apache/iceberg-python/pull/921/files#diff-8ca7e967a2c2ef394c75f707879f1b7e6d09226c321643140b9325f742041d67R669-R713
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirmed that this would work @Fokko let me know if we are good to move forward with this change! |
||||||||||||
| self.rich_table.add_row("✅", str(lhs), str(rhs)) | ||||||||||||
| return True | ||||||||||||
| except ResolveError: | ||||||||||||
| self.rich_table.add_row("❌", str(lhs), str(rhs)) | ||||||||||||
| return False | ||||||||||||
|
|
||||||||||||
| def schema(self, schema: Schema, struct_result: bool) -> bool: | ||||||||||||
| if not struct_result: | ||||||||||||
| self.console.print(self.rich_table) | ||||||||||||
| raise ValueError(f"Mismatch in fields:\n{self.console.export_text()}") | ||||||||||||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
| return struct_result | ||||||||||||
|
|
||||||||||||
| def struct(self, struct: StructType, field_results: List[bool]) -> bool: | ||||||||||||
| return all(field_results) | ||||||||||||
|
|
||||||||||||
| def field(self, field: NestedField, field_result: bool) -> bool: | ||||||||||||
| return all([self._is_field_compatible(field), field_result]) | ||||||||||||
|
|
||||||||||||
| def list(self, list_type: ListType, element_result: bool) -> bool: | ||||||||||||
| return element_result and self._is_field_compatible(list_type.element_field) | ||||||||||||
|
|
||||||||||||
| def map(self, map_type: MapType, key_result: bool, value_result: bool) -> bool: | ||||||||||||
| return all([ | ||||||||||||
| self._is_field_compatible(map_type.key_field), | ||||||||||||
| self._is_field_compatible(map_type.value_field), | ||||||||||||
| key_result, | ||||||||||||
| value_result, | ||||||||||||
| ]) | ||||||||||||
|
|
||||||||||||
| def primitive(self, primitive: PrimitiveType) -> bool: | ||||||||||||
| return True | ||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -501,14 +501,11 @@ def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog | |
| ) | ||
|
|
||
| expected = """Mismatch in fields: | ||
| ┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | ||
| ┃ ┃ Table field ┃ Dataframe field ┃ | ||
| ┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩ | ||
| │ ✅ │ 1: foo: optional boolean │ 1: foo: optional boolean │ | ||
| | ✅ │ 2: bar: optional string │ 2: bar: optional string │ | ||
| │ ❌ │ 3: baz: optional int │ 3: baz: optional string │ | ||
| │ ✅ │ 4: qux: optional date │ 4: qux: optional date │ | ||
| └────┴──────────────────────────┴──────────────────────────┘ | ||
| ┏━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓ | ||
|
||
| ┃ Field Name ┃ Category ┃ Table field ┃ Dataframe field ┃ | ||
| ┡━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━┩ | ||
| │ baz │ Type │ optional int │ optional string │ | ||
| └────────────┴──────────┴──────────────┴─────────────────┘ | ||
| """ | ||
|
|
||
| with pytest.raises(ValueError, match=expected): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is necessary to support writing dataframes / recordbatches with a subset of the schema. Otherwise, the
ArrowAccessorthrows aKeyError. This way, we return aNoneand theArrowProjectionVisitoris responsible for checking if the field is nullable, and can be filled in with a null array.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change responsible for schema projection / writing a subset of the schema? Do you mind expanding on the mechanism behind how this works? I'm curious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right - the
ArrowProjectionVisitoris responsible for detecting that thefield_partnerisNoneand then checking if the table field is also optional before filling it in with a null array. This change is necessary so that theArrowAccessordoesn't throw an exception if the field can't be found in the arrow component, and enablesArrowProjectionVisitorto make use of a code pathway it wasn't able to make use of before:iceberg-python/pyiceberg/io/pyarrow.py
Lines 1388 to 1395 in b11cdb5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above we have the
file_schemathat should correspond with thepartner_struct. I expect that when looking up the field-id, it should alreadyreturn None.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I as I pointed out in this comment: #921 (comment) I think
write_parquetis using the Table Schema, instead of the Schema corresponding to the data types of the PyArrow construct.I will take that to mean that this isn't intended and making sure that we use the Schema corresponding to the data types of the PyArrow construct is what we intend to do here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the context. This isn't intended, the schema should align with the data. I checked against the last commit, and it doesn't throw the
KeyErroranymore because of your fix. Thanks 👍There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the suggestion - I've removed this try exception block in the latest update.