Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ def data_file_statistics_from_parquet_metadata(


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
schema = table_metadata.schema()
schema = sanitize_column_names(table_metadata.schema())
arrow_file_schema = schema.as_arrow()
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)

Expand All @@ -1772,12 +1772,13 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
)

def write_parquet(task: WriteTask) -> DataFile:
df = pa.Table.from_batches(task.record_batches)
df = df.rename_columns(schema.column_names)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to change the Schema (column names) of the arrow data frame, if there's a better way to do this, please let me know

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we extend the integration test to test the nested schema case? For example,

        pa.field('name', pa.string()),
        pa.field('address', pa.struct([
            pa.field('street', pa.string()),
            pa.field('city', pa.string()),
            pa.field('zip', pa.int32())
        ]

Updated: I got

pyarrow.lib.ArrowInvalid: tried to rename a table of 4 columns but only 7 names were provided

when trying with the following dataset

TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = {
        column_name_with_special_character: ['a', None, 'z'],
        'id': [1, 2, 3],
        'name': ['AB', 'CD', 'EF'],
        'address': [
            {'street': '123', 'city': 'SFO', 'zip': 12345},
            {'street': '456', 'city': 'SW', 'zip': 67890},
            {'street': '789', 'city': 'Random', 'zip': 10112}
        ]
    }
    pa_schema = pa.schema([
        pa.field(column_name_with_special_character, pa.string()),
        pa.field('id', pa.int32()),
        pa.field('name', pa.string()),
        pa.field('address', pa.struct([
            pa.field('street', pa.string()),
            pa.field('city', pa.string()),
            pa.field('zip', pa.int32())
        ]))
    ])

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! the i dont think rename_columns works well with nested schema

file_path = f'{table_metadata.location}/data/{task.generate_data_file_path("parquet")}'
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
writer.write(pa.Table.from_batches(task.record_batches), row_group_size=row_group_size)

writer.write(df, row_group_size=row_group_size)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
Expand Down
19 changes: 19 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,25 @@ def get_current_snapshot_id(identifier: str) -> int:
assert tbl.current_snapshot().snapshot_id == get_current_snapshot_id(identifier) # type: ignore


@pytest.mark.integration
def test_python_writes_special_character_column_with_spark_reads(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.python_writes_special_character_column_with_spark_reads"
column_name_with_special_character = "letter/abc"
TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = {
column_name_with_special_character: ['a', None, 'z'],
}
pa_schema = pa.schema([
(column_name_with_special_character, pa.string()),
])
arrow_table_with_special_character_column = pa.Table.from_pydict(TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN, schema=pa_schema)
tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, schema=pa_schema)
Comment thread
kevinjqliu marked this conversation as resolved.
Outdated

tbl.overwrite(arrow_table_with_special_character_column)
spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
pyiceberg_df = tbl.scan().to_pandas()
assert spark_df.equals(pyiceberg_df)


@pytest.mark.integration
def test_write_bin_pack_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.write_bin_pack_data_files"
Expand Down
7 changes: 4 additions & 3 deletions tests/integration/test_writes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
from typing import List, Optional
from typing import List, Optional, Union

import pyarrow as pa

Expand Down Expand Up @@ -65,6 +65,7 @@ def _create_table(
properties: Properties,
data: Optional[List[pa.Table]] = None,
partition_spec: Optional[PartitionSpec] = None,
schema: Union[Schema, "pa.Schema"] = TABLE_SCHEMA,
) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
Expand All @@ -73,10 +74,10 @@ def _create_table(

if partition_spec:
tbl = session_catalog.create_table(
identifier=identifier, schema=TABLE_SCHEMA, properties=properties, partition_spec=partition_spec
identifier=identifier, schema=schema, properties=properties, partition_spec=partition_spec
)
else:
tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties)
tbl = session_catalog.create_table(identifier=identifier, schema=schema, properties=properties)

if data:
for d in data:
Expand Down