120120 Schema ,
121121 SchemaVisitorPerPrimitiveType ,
122122 SchemaWithPartnerVisitor ,
123+ _check_schema_compatible ,
123124 pre_order_visit ,
124125 promote ,
125126 prune_columns ,
@@ -1397,7 +1398,7 @@ def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array:
13971398 # This can be removed once this has been fixed:
13981399 # https://github.com/apache/arrow/issues/38809
13991400 list_array = pa .LargeListArray .from_arrays (list_array .offsets , value_array )
1400-
1401+ value_array = self . _cast_if_needed ( list_type . element_field , value_array )
14011402 arrow_field = pa .large_list (self ._construct_field (list_type .element_field , value_array .type ))
14021403 return list_array .cast (arrow_field )
14031404 else :
@@ -1407,6 +1408,8 @@ def map(
14071408 self , map_type : MapType , map_array : Optional [pa .Array ], key_result : Optional [pa .Array ], value_result : Optional [pa .Array ]
14081409 ) -> Optional [pa .Array ]:
14091410 if isinstance (map_array , pa .MapArray ) and key_result is not None and value_result is not None :
1411+ key_result = self ._cast_if_needed (map_type .key_field , key_result )
1412+ value_result = self ._cast_if_needed (map_type .value_field , value_result )
14101413 arrow_field = pa .map_ (
14111414 self ._construct_field (map_type .key_field , key_result .type ),
14121415 self ._construct_field (map_type .value_field , value_result .type ),
@@ -1539,9 +1542,16 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc
15391542
15401543 expected_physical_type = _primitive_to_physical (iceberg_type )
15411544 if expected_physical_type != physical_type_string :
1542- raise ValueError (
1543- f"Unexpected physical type { physical_type_string } for { iceberg_type } , expected { expected_physical_type } "
1544- )
1545+ # Allow promotable physical types
1546+ # INT32 -> INT64 and FLOAT -> DOUBLE are safe type casts
1547+ if (physical_type_string == "INT32" and expected_physical_type == "INT64" ) or (
1548+ physical_type_string == "FLOAT" and expected_physical_type == "DOUBLE"
1549+ ):
1550+ pass
1551+ else :
1552+ raise ValueError (
1553+ f"Unexpected physical type { physical_type_string } for { iceberg_type } , expected { expected_physical_type } "
1554+ )
15451555
15461556 self .primitive_type = iceberg_type
15471557
@@ -1886,16 +1896,6 @@ def data_file_statistics_from_parquet_metadata(
18861896 set the mode for column metrics collection
18871897 parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
18881898 """
1889- if parquet_metadata .num_columns != len (stats_columns ):
1890- raise ValueError (
1891- f"Number of columns in statistics configuration ({ len (stats_columns )} ) is different from the number of columns in pyarrow table ({ parquet_metadata .num_columns } )"
1892- )
1893-
1894- if parquet_metadata .num_columns != len (parquet_column_mapping ):
1895- raise ValueError (
1896- f"Number of columns in column mapping ({ len (parquet_column_mapping )} ) is different from the number of columns in pyarrow table ({ parquet_metadata .num_columns } )"
1897- )
1898-
18991899 column_sizes : Dict [int , int ] = {}
19001900 value_counts : Dict [int , int ] = {}
19011901 split_offsets : List [int ] = []
@@ -1988,8 +1988,7 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
19881988 )
19891989
19901990 def write_parquet (task : WriteTask ) -> DataFile :
1991- table_schema = task .schema
1992-
1991+ table_schema = table_metadata .schema ()
19931992 # if schema needs to be transformed, use the transformed schema and adjust the arrow table accordingly
19941993 # otherwise use the original schema
19951994 if (sanitized_schema := sanitize_column_names (table_schema )) != table_schema :
@@ -2001,7 +2000,7 @@ def write_parquet(task: WriteTask) -> DataFile:
20012000 batches = [
20022001 _to_requested_schema (
20032002 requested_schema = file_schema ,
2004- file_schema = table_schema ,
2003+ file_schema = task . schema ,
20052004 batch = batch ,
20062005 downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ,
20072006 include_field_ids = True ,
@@ -2060,47 +2059,30 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[
20602059 return bin_packed_record_batches
20612060
20622061
2063- def _check_schema_compatible (table_schema : Schema , other_schema : pa .Schema , downcast_ns_timestamp_to_us : bool = False ) -> None :
2062+ def _check_pyarrow_schema_compatible (
2063+ requested_schema : Schema , provided_schema : pa .Schema , downcast_ns_timestamp_to_us : bool = False
2064+ ) -> None :
20642065 """
2065- Check if the `table_schema ` is compatible with `other_schema `.
2066+ Check if the `requested_schema ` is compatible with `provided_schema `.
20662067
20672068 Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type.
20682069
20692070 Raises:
20702071 ValueError: If the schemas are not compatible.
20712072 """
2072- name_mapping = table_schema .name_mapping
2073+ name_mapping = requested_schema .name_mapping
20732074 try :
2074- task_schema = pyarrow_to_schema (
2075- other_schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
2075+ provided_schema = pyarrow_to_schema (
2076+ provided_schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
20762077 )
20772078 except ValueError as e :
2078- other_schema = _pyarrow_to_schema_without_ids (other_schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2079- additional_names = set (other_schema . column_names ) - set (table_schema . column_names )
2079+ provided_schema = _pyarrow_to_schema_without_ids (provided_schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2080+ additional_names = set (provided_schema . _name_to_id . keys ()) - set (requested_schema . _name_to_id . keys () )
20802081 raise ValueError (
20812082 f"PyArrow table contains more columns: { ', ' .join (sorted (additional_names ))} . Update the schema first (hint, use union_by_name)."
20822083 ) from e
20832084
2084- if table_schema .as_struct () != task_schema .as_struct ():
2085- from rich .console import Console
2086- from rich .table import Table as RichTable
2087-
2088- console = Console (record = True )
2089-
2090- rich_table = RichTable (show_header = True , header_style = "bold" )
2091- rich_table .add_column ("" )
2092- rich_table .add_column ("Table field" )
2093- rich_table .add_column ("Dataframe field" )
2094-
2095- for lhs in table_schema .fields :
2096- try :
2097- rhs = task_schema .find_field (lhs .field_id )
2098- rich_table .add_row ("✅" if lhs == rhs else "❌" , str (lhs ), str (rhs ))
2099- except ValueError :
2100- rich_table .add_row ("❌" , str (lhs ), "Missing" )
2101-
2102- console .print (rich_table )
2103- raise ValueError (f"Mismatch in fields:\n { console .export_text ()} " )
2085+ _check_schema_compatible (requested_schema , provided_schema )
21042086
21052087
21062088def parquet_files_to_data_files (io : FileIO , table_metadata : TableMetadata , file_paths : Iterator [str ]) -> Iterator [DataFile ]:
@@ -2114,7 +2096,7 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_
21142096 f"Cannot add file { file_path } because it has field IDs. `add_files` only supports addition of files without field_ids"
21152097 )
21162098 schema = table_metadata .schema ()
2117- _check_schema_compatible (schema , parquet_metadata .schema .to_arrow_schema ())
2099+ _check_pyarrow_schema_compatible (schema , parquet_metadata .schema .to_arrow_schema ())
21182100
21192101 statistics = data_file_statistics_from_parquet_metadata (
21202102 parquet_metadata = parquet_metadata ,
@@ -2195,7 +2177,7 @@ def _dataframe_to_data_files(
21952177 Returns:
21962178 An iterable that supplies datafiles that represent the table.
21972179 """
2198- from pyiceberg .table import PropertyUtil , TableProperties , WriteTask
2180+ from pyiceberg .table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE , PropertyUtil , TableProperties , WriteTask
21992181
22002182 counter = counter or itertools .count (0 )
22012183 write_uuid = write_uuid or uuid .uuid4 ()
@@ -2204,13 +2186,16 @@ def _dataframe_to_data_files(
22042186 property_name = TableProperties .WRITE_TARGET_FILE_SIZE_BYTES ,
22052187 default = TableProperties .WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT ,
22062188 )
2189+ name_mapping = table_metadata .schema ().name_mapping
2190+ downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
2191+ task_schema = pyarrow_to_schema (df .schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
22072192
22082193 if table_metadata .spec ().is_unpartitioned ():
22092194 yield from write_file (
22102195 io = io ,
22112196 table_metadata = table_metadata ,
22122197 tasks = iter ([
2213- WriteTask (write_uuid = write_uuid , task_id = next (counter ), record_batches = batches , schema = table_metadata . schema () )
2198+ WriteTask (write_uuid = write_uuid , task_id = next (counter ), record_batches = batches , schema = task_schema )
22142199 for batches in bin_pack_arrow_table (df , target_file_size )
22152200 ]),
22162201 )
@@ -2225,7 +2210,7 @@ def _dataframe_to_data_files(
22252210 task_id = next (counter ),
22262211 record_batches = batches ,
22272212 partition_key = partition .partition_key ,
2228- schema = table_metadata . schema () ,
2213+ schema = task_schema ,
22292214 )
22302215 for partition in partitions
22312216 for batches in bin_pack_arrow_table (partition .arrow_table_partition , target_file_size )
0 commit comments