Skip to content

Commit 960c4aa

Browse files
committed
just need a new test
1 parent 45f66da commit 960c4aa

File tree

6 files changed

+57
-21
lines changed

6 files changed

+57
-21
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
Identifier,
7171
Properties,
7272
RecursiveDict,
73+
TableVersion,
7374
)
7475
from pyiceberg.utils.config import Config, merge_config
7576
from pyiceberg.utils.properties import property_as_bool
@@ -743,7 +744,7 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[
743744
return load_file_io({**self.properties, **properties}, location)
744745

745746
@staticmethod
746-
def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
747+
def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> Schema:
747748
if isinstance(schema, Schema):
748749
return schema
749750
try:
@@ -754,7 +755,7 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
754755
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
755756
if isinstance(schema, pa.Schema):
756757
schema: Schema = visit_pyarrow( # type: ignore
757-
schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
758+
schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
758759
)
759760
return schema
760761
except ModuleNotFoundError:
@@ -847,7 +848,7 @@ def _create_staged_table(
847848
Returns:
848849
StagedTable: the created staged table instance.
849850
"""
850-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
851+
schema: Schema = self._convert_schema_if_needed(schema, properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) # type: ignore
851852

852853
database_name, table_name = self.identifier_to_database_and_table(identifier)
853854

pyiceberg/catalog/rest/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
StagedTable,
6565
Table,
6666
TableIdentifier,
67+
TableProperties,
6768
)
6869
from pyiceberg.table.metadata import TableMetadata
6970
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
@@ -498,7 +499,7 @@ def _create_table(
498499
properties: Properties = EMPTY_DICT,
499500
stage_create: bool = False,
500501
) -> TableResponse:
501-
iceberg_schema = self._convert_schema_if_needed(schema)
502+
iceberg_schema = self._convert_schema_if_needed(schema, properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
502503
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
503504
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
504505
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

pyiceberg/io/pyarrow.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,13 @@
146146
visit,
147147
visit_with_partner,
148148
)
149+
from pyiceberg.table import TableProperties
149150
from pyiceberg.table.locations import load_location_provider
150151
from pyiceberg.table.metadata import TableMetadata
151152
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
152153
from pyiceberg.table.puffin import PuffinFile
153154
from pyiceberg.transforms import IdentityTransform, TruncateTransform
154-
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
155+
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
155156
from pyiceberg.types import (
156157
BinaryType,
157158
BooleanType,
@@ -1018,22 +1019,22 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
10181019

10191020

10201021
def pyarrow_to_schema(
1021-
schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False
1022+
schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
10221023
) -> Schema:
10231024
has_ids = visit_pyarrow(schema, _HasIds())
10241025
if has_ids:
1025-
return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
1026+
return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version))
10261027
elif name_mapping is not None:
1027-
schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
1028+
schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
10281029
return apply_name_mapping(schema_without_ids, name_mapping)
10291030
else:
10301031
raise ValueError(
10311032
"Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
10321033
)
10331034

10341035

1035-
def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> Schema:
1036-
return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
1036+
def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> Schema:
1037+
return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version))
10371038

10381039

10391040
def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema:
@@ -1111,6 +1112,8 @@ def _(obj: pa.Field, visitor: PyArrowSchemaVisitor[T]) -> T:
11111112

11121113
visitor.before_field(obj)
11131114
try:
1115+
if obj.name == "timestamp_ns":
1116+
print('alexstephen')
11141117
result = visit_pyarrow(field_type, visitor)
11151118
except TypeError as e:
11161119
raise UnsupportedPyArrowTypeException(obj, f"Column '{obj.name}' has an unsupported type: {field_type}") from e
@@ -1215,9 +1218,10 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
12151218

12161219
_field_names: List[str]
12171220

1218-
def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None:
1221+
def __init__(self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> None: # noqa: F821
12191222
self._field_names = []
12201223
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1224+
self._format_version = format_version
12211225

12221226
def _field_id(self, field: pa.Field) -> int:
12231227
if (field_id := _get_field_id(field)) is not None:
@@ -1288,6 +1292,11 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
12881292
elif primitive.unit == "ns":
12891293
if self._downcast_ns_timestamp_to_us:
12901294
logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
1295+
elif self._format_version == 3:
1296+
if primitive.tz in UTC_ALIASES:
1297+
return TimestamptzNanoType()
1298+
else:
1299+
return TimestampNanoType()
12911300
else:
12921301
raise TypeError(
12931302
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.",
@@ -2540,7 +2549,8 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[
25402549

25412550

25422551
def _check_pyarrow_schema_compatible(
2543-
requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False
2552+
requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False,
2553+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
25442554
) -> None:
25452555
"""
25462556
Check if the `requested_schema` is compatible with `provided_schema`.
@@ -2553,10 +2563,10 @@ def _check_pyarrow_schema_compatible(
25532563
name_mapping = requested_schema.name_mapping
25542564
try:
25552565
provided_schema = pyarrow_to_schema(
2556-
provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
2566+
provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
25572567
)
25582568
except ValueError as e:
2559-
provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
2569+
provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
25602570
additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
25612571
raise ValueError(
25622572
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
@@ -2582,7 +2592,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
25822592
)
25832593

25842594
schema = table_metadata.schema()
2585-
_check_pyarrow_schema_compatible(schema, arrow_schema)
2595+
_check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version)
25862596

25872597
statistics = data_file_statistics_from_parquet_metadata(
25882598
parquet_metadata=parquet_metadata,
@@ -2673,7 +2683,7 @@ def _dataframe_to_data_files(
26732683
)
26742684
name_mapping = table_metadata.schema().name_mapping
26752685
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
2676-
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
2686+
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=table_metadata.format_version)
26772687

26782688
if table_metadata.spec().is_unpartitioned():
26792689
yield from write_file(

pyiceberg/table/__init__.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,8 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
477477
)
478478
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
479479
_check_pyarrow_schema_compatible(
480-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
480+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
481+
format_version=self.table_metadata.format_version
481482
)
482483

483484
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
@@ -527,7 +528,8 @@ def dynamic_partition_overwrite(
527528

528529
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
529530
_check_pyarrow_schema_compatible(
530-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
531+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
532+
format_version=self.table_metadata.format_version
531533
)
532534

533535
# If dataframe does not have data, there is no need to overwrite
@@ -593,7 +595,8 @@ def overwrite(
593595
)
594596
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
595597
_check_pyarrow_schema_compatible(
596-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
598+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
599+
format_version=self.table_metadata.format_version
597600
)
598601

599602
if overwrite_filter != AlwaysFalse():
@@ -789,7 +792,8 @@ def upsert(
789792

790793
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
791794
_check_pyarrow_schema_compatible(
792-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
795+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
796+
format_version=self.table_metadata.format_version
793797
)
794798

795799
# get list of rows that exist so we don't have to load the entire target table

tests/conftest.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2809,6 +2809,27 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
28092809
)
28102810

28112811

2812+
@pytest.fixture(scope="session")
2813+
def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
2814+
"""Pyarrow Schema with all microseconds timestamp."""
2815+
import pyarrow as pa
2816+
2817+
return pa.schema(
2818+
[
2819+
("timestamp_s", pa.timestamp(unit="us")),
2820+
("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
2821+
("timestamp_ms", pa.timestamp(unit="us")),
2822+
("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
2823+
("timestamp_us", pa.timestamp(unit="us")),
2824+
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
2825+
("timestamp_ns", pa.timestamp(unit="us")),
2826+
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
2827+
("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
2828+
("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
2829+
("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
2830+
]
2831+
)
2832+
28122833
@pytest.fixture(scope="session")
28132834
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
28142835
"""Iceberg table Schema with only date, timestamp and timestamptz values."""

tests/integration/test_writes/test_writes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1355,7 +1355,6 @@ def test_write_all_timestamp_precision(
13551355
# and supports upto microsecond precision
13561356
assert left.timestamp() == right.timestamp(), f"Difference in column {column}: {left} != {right}"
13571357

1358-
13591358
@pytest.mark.integration
13601359
@pytest.mark.parametrize("format_version", [1, 2])
13611360
def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:

0 commit comments

Comments
 (0)