-
Notifications
You must be signed in to change notification settings - Fork 513
Sanitized special character column name before writing to parquet #590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
06dd647
d7b5147
d278ee5
168931f
ca11640
ce9a587
bf87a8a
25bf991
3b6ecad
f6a5ac2
b51b5ce
41f5354
d264ac3
5de0b1c
e81472e
f6b72e9
22be232
9ea64c6
e5f2611
177a6b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -122,6 +122,7 @@ | |
| pre_order_visit, | ||
| promote, | ||
| prune_columns, | ||
| sanitize_column_names, | ||
| visit, | ||
| visit_with_partner, | ||
| ) | ||
|
|
@@ -1016,7 +1017,6 @@ def _task_to_table( | |
|
|
||
| if len(arrow_table) < 1: | ||
| return None | ||
|
|
||
| return to_requested_schema(projected_schema, file_project_schema, arrow_table) | ||
|
|
||
|
|
||
|
|
@@ -1769,8 +1769,9 @@ def data_file_statistics_from_parquet_metadata( | |
|
|
||
|
|
||
| def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: | ||
| schema = table_metadata.schema() | ||
| arrow_file_schema = schema.as_arrow() | ||
| iceberg_table_schema = table_metadata.schema() | ||
| parquet_schema = sanitize_column_names(iceberg_table_schema) | ||
| arrow_file_schema = parquet_schema.as_arrow() | ||
| parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) | ||
|
|
||
| row_group_size = PropertyUtil.property_as_int( | ||
|
|
@@ -1780,16 +1781,17 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT | |
| ) | ||
|
|
||
| def write_parquet(task: WriteTask) -> DataFile: | ||
| arrow_table = pa.Table.from_batches(task.record_batches) | ||
| df = to_requested_schema(requested_schema=parquet_schema, file_schema=iceberg_table_schema, table=arrow_table) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we know if The I'm asking is that we're introducing quite a bit of logic here, and I think the rewrites are only applicable for Avro: https://avro.apache.org/docs/1.8.1/spec.html#names
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quick check, depending on how long the |
||
| file_path = f'{table_metadata.location}/data/{task.generate_data_file_path("parquet")}' | ||
| fo = io.new_output(file_path) | ||
| with fo.create(overwrite=True) as fos: | ||
| with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer: | ||
| writer.write(pa.Table.from_batches(task.record_batches), row_group_size=row_group_size) | ||
|
|
||
| writer.write(df, row_group_size=row_group_size) | ||
| statistics = data_file_statistics_from_parquet_metadata( | ||
| parquet_metadata=writer.writer.metadata, | ||
| stats_columns=compute_statistics_plan(schema, table_metadata.properties), | ||
| parquet_column_mapping=parquet_path_to_id_mapping(schema), | ||
| stats_columns=compute_statistics_plan(parquet_schema, table_metadata.properties), | ||
| parquet_column_mapping=parquet_path_to_id_mapping(parquet_schema), | ||
| ) | ||
| data_file = DataFile( | ||
| content=DataFileContent.DATA, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I realize we have many names, but that might be confusing. Parquet-schema is appropriate today since we only support parquet, but we might also support ORC and Avro later.