145145 visit ,
146146 visit_with_partner ,
147147)
148+ from pyiceberg .table import TableProperties
148149from pyiceberg .table .locations import load_location_provider
149150from pyiceberg .table .metadata import TableMetadata
150151from pyiceberg .table .name_mapping import NameMapping , apply_name_mapping
151152from pyiceberg .table .puffin import PuffinFile
152153from pyiceberg .transforms import IdentityTransform , TruncateTransform
153- from pyiceberg .typedef import EMPTY_DICT , Properties , Record
154+ from pyiceberg .typedef import EMPTY_DICT , Properties , Record , TableVersion
154155from pyiceberg .types import (
155156 BinaryType ,
156157 BooleanType ,
@@ -1017,22 +1018,36 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
10171018
10181019
10191020def pyarrow_to_schema (
1020- schema : pa .Schema , name_mapping : Optional [NameMapping ] = None , downcast_ns_timestamp_to_us : bool = False
1021+ schema : pa .Schema ,
1022+ name_mapping : Optional [NameMapping ] = None ,
1023+ downcast_ns_timestamp_to_us : bool = False ,
1024+ format_version : TableVersion = TableProperties .DEFAULT_FORMAT_VERSION ,
10211025) -> Schema :
10221026 has_ids = visit_pyarrow (schema , _HasIds ())
10231027 if has_ids :
1024- return visit_pyarrow (schema , _ConvertToIceberg (downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ))
1028+ return visit_pyarrow (
1029+ schema , _ConvertToIceberg (downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us , format_version = format_version )
1030+ )
10251031 elif name_mapping is not None :
1026- schema_without_ids = _pyarrow_to_schema_without_ids (schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
1032+ schema_without_ids = _pyarrow_to_schema_without_ids (
1033+ schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us , format_version = format_version
1034+ )
10271035 return apply_name_mapping (schema_without_ids , name_mapping )
10281036 else :
10291037 raise ValueError (
10301038 "Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
10311039 )
10321040
10331041
1034- def _pyarrow_to_schema_without_ids (schema : pa .Schema , downcast_ns_timestamp_to_us : bool = False ) -> Schema :
1035- return visit_pyarrow (schema , _ConvertToIcebergWithoutIDs (downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ))
1042+ def _pyarrow_to_schema_without_ids (
1043+ schema : pa .Schema ,
1044+ downcast_ns_timestamp_to_us : bool = False ,
1045+ format_version : TableVersion = TableProperties .DEFAULT_FORMAT_VERSION ,
1046+ ) -> Schema :
1047+ return visit_pyarrow (
1048+ schema ,
1049+ _ConvertToIcebergWithoutIDs (downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us , format_version = format_version ),
1050+ )
10361051
10371052
10381053def _pyarrow_schema_ensure_large_types (schema : pa .Schema ) -> pa .Schema :
@@ -1214,9 +1229,12 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
12141229
12151230 _field_names : List [str ]
12161231
1217- def __init__ (self , downcast_ns_timestamp_to_us : bool = False ) -> None :
1232+ def __init__ (
1233+ self , downcast_ns_timestamp_to_us : bool = False , format_version : TableVersion = TableProperties .DEFAULT_FORMAT_VERSION
1234+ ) -> None : # noqa: F821
12181235 self ._field_names = []
12191236 self ._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1237+ self ._format_version = format_version
12201238
12211239 def _field_id (self , field : pa .Field ) -> int :
12221240 if (field_id := _get_field_id (field )) is not None :
@@ -1287,6 +1305,11 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
12871305 elif primitive .unit == "ns" :
12881306 if self ._downcast_ns_timestamp_to_us :
12891307 logger .warning ("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'." )
1308+ elif self ._format_version >= 3 :
1309+ if primitive .tz in UTC_ALIASES :
1310+ return TimestamptzNanoType ()
1311+ else :
1312+ return TimestampNanoType ()
12901313 else :
12911314 raise TypeError (
12921315 "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." ,
@@ -2519,7 +2542,10 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[
25192542
25202543
25212544def _check_pyarrow_schema_compatible (
2522- requested_schema : Schema , provided_schema : pa .Schema , downcast_ns_timestamp_to_us : bool = False
2545+ requested_schema : Schema ,
2546+ provided_schema : pa .Schema ,
2547+ downcast_ns_timestamp_to_us : bool = False ,
2548+ format_version : TableVersion = TableProperties .DEFAULT_FORMAT_VERSION ,
25232549) -> None :
25242550 """
25252551 Check if the `requested_schema` is compatible with `provided_schema`.
@@ -2532,10 +2558,15 @@ def _check_pyarrow_schema_compatible(
25322558 name_mapping = requested_schema .name_mapping
25332559 try :
25342560 provided_schema = pyarrow_to_schema (
2535- provided_schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
2561+ provided_schema ,
2562+ name_mapping = name_mapping ,
2563+ downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ,
2564+ format_version = format_version ,
25362565 )
25372566 except ValueError as e :
2538- provided_schema = _pyarrow_to_schema_without_ids (provided_schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2567+ provided_schema = _pyarrow_to_schema_without_ids (
2568+ provided_schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us , format_version = format_version
2569+ )
25392570 additional_names = set (provided_schema ._name_to_id .keys ()) - set (requested_schema ._name_to_id .keys ())
25402571 raise ValueError (
25412572 f"PyArrow table contains more columns: { ', ' .join (sorted (additional_names ))} . Update the schema first (hint, use union_by_name)."
@@ -2561,7 +2592,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
25612592 )
25622593
25632594 schema = table_metadata .schema ()
2564- _check_pyarrow_schema_compatible (schema , arrow_schema )
2595+ _check_pyarrow_schema_compatible (schema , arrow_schema , format_version = table_metadata . format_version )
25652596
25662597 statistics = data_file_statistics_from_parquet_metadata (
25672598 parquet_metadata = parquet_metadata ,
@@ -2652,7 +2683,12 @@ def _dataframe_to_data_files(
26522683 )
26532684 name_mapping = table_metadata .schema ().name_mapping
26542685 downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
2655- task_schema = pyarrow_to_schema (df .schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2686+ task_schema = pyarrow_to_schema (
2687+ df .schema ,
2688+ name_mapping = name_mapping ,
2689+ downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ,
2690+ format_version = table_metadata .format_version ,
2691+ )
26562692
26572693 if table_metadata .spec ().is_unpartitioned ():
26582694 yield from write_file (
0 commit comments