Skip to content

Commit cf8b46e

Browse files
authored
fix: allow reading pyarrow timestamptz as timestamp (#2708)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Closes #2663 Relates to #2333 which allow reading pyarrow timestamp as iceberg timestamptz This PR allows PyArrow timestamptz to be read as Iceberg timestamp. Although this configuration does not conform to the Iceberg spec, the change aligns PyIceberg's behavior with Spark when reading mismatched types (timestamptz as iceberg timestamp and timestamp as iceberg timestamptz) **The write path remains strict and will reject this type mismatch.** ## Are these changes tested? Yes ## Are there any user-facing changes? No <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 0618b66 commit cf8b46e

File tree

2 files changed

+120
-4
lines changed

2 files changed

+120
-4
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,6 +1656,7 @@ def _task_to_record_batches(
16561656
current_batch,
16571657
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
16581658
projected_missing_fields=projected_missing_fields,
1659+
allow_timestamp_tz_mismatch=True,
16591660
)
16601661

16611662

@@ -1849,13 +1850,18 @@ def _to_requested_schema(
18491850
downcast_ns_timestamp_to_us: bool = False,
18501851
include_field_ids: bool = False,
18511852
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
1853+
allow_timestamp_tz_mismatch: bool = False,
18521854
) -> pa.RecordBatch:
18531855
# We could reuse some of these visitors
18541856
struct_array = visit_with_partner(
18551857
requested_schema,
18561858
batch,
18571859
ArrowProjectionVisitor(
1858-
file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields
1860+
file_schema,
1861+
downcast_ns_timestamp_to_us,
1862+
include_field_ids,
1863+
projected_missing_fields=projected_missing_fields,
1864+
allow_timestamp_tz_mismatch=allow_timestamp_tz_mismatch,
18591865
),
18601866
ArrowAccessor(file_schema),
18611867
)
@@ -1868,6 +1874,7 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
18681874
_downcast_ns_timestamp_to_us: bool
18691875
_use_large_types: bool | None
18701876
_projected_missing_fields: dict[int, Any]
1877+
_allow_timestamp_tz_mismatch: bool
18711878

18721879
def __init__(
18731880
self,
@@ -1876,12 +1883,16 @@ def __init__(
18761883
include_field_ids: bool = False,
18771884
use_large_types: bool | None = None,
18781885
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
1886+
allow_timestamp_tz_mismatch: bool = False,
18791887
) -> None:
18801888
self._file_schema = file_schema
18811889
self._include_field_ids = include_field_ids
18821890
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
18831891
self._use_large_types = use_large_types
18841892
self._projected_missing_fields = projected_missing_fields
1893+
# When True, allows projecting timestamptz (UTC) to timestamp (no tz).
1894+
# Allowed for reading (aligns with Spark); disallowed for writing to enforce Iceberg spec's strict typing.
1895+
self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch
18851896

18861897
if use_large_types is not None:
18871898
deprecation_message(
@@ -1896,16 +1907,19 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
18961907
if field.field_type.is_primitive:
18971908
if (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
18981909
if field.field_type == TimestampType():
1899-
# Downcasting of nanoseconds to microseconds
1910+
source_tz_compatible = values.type.tz is None or (
1911+
self._allow_timestamp_tz_mismatch and values.type.tz in UTC_ALIASES
1912+
)
19001913
if (
19011914
pa.types.is_timestamp(target_type)
19021915
and not target_type.tz
19031916
and pa.types.is_timestamp(values.type)
1904-
and not values.type.tz
1917+
and source_tz_compatible
19051918
):
1919+
# Downcasting of nanoseconds to microseconds
19061920
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
19071921
return values.cast(target_type, safe=False)
1908-
elif target_type.unit == "us" and values.type.unit in {"s", "ms"}:
1922+
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
19091923
return values.cast(target_type)
19101924
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
19111925
elif field.field_type == TimestamptzType():
@@ -1915,6 +1929,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
19151929
and pa.types.is_timestamp(values.type)
19161930
and (values.type.tz in UTC_ALIASES or values.type.tz is None)
19171931
):
1932+
# Downcasting of nanoseconds to microseconds
19181933
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
19191934
return values.cast(target_type, safe=False)
19201935
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:

tests/io/test_pyarrow.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
expression_to_pyarrow,
8282
parquet_path_to_id_mapping,
8383
schema_to_pyarrow,
84+
write_file,
8485
)
8586
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
8687
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -2725,6 +2726,106 @@ def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None:
27252726
assert expected.equals(actual_result)
27262727

27272728

2729+
def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None:
2730+
# file is written with timestamp with timezone
2731+
file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2732+
batch = pa.record_batch(
2733+
[
2734+
pa.array(
2735+
[
2736+
datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
2737+
datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
2738+
],
2739+
type=pa.timestamp("us", tz="UTC"),
2740+
)
2741+
],
2742+
names=["ts_field"],
2743+
)
2744+
2745+
# table schema expects timestamp without timezone
2746+
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2747+
2748+
# allow_timestamp_tz_mismatch=True enables reading timestamptz as timestamp
2749+
actual_result = _to_requested_schema(
2750+
table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=True
2751+
)
2752+
expected = pa.record_batch(
2753+
[
2754+
pa.array(
2755+
[
2756+
datetime(2025, 8, 14, 12, 0, 0),
2757+
datetime(2025, 8, 14, 13, 0, 0),
2758+
],
2759+
type=pa.timestamp("us"),
2760+
)
2761+
],
2762+
names=["ts_field"],
2763+
)
2764+
2765+
# expect actual_result to have no timezone
2766+
assert expected.equals(actual_result)
2767+
2768+
2769+
def test__to_requested_schema_timestamptz_to_timestamp_write_rejects() -> None:
2770+
"""Test that the write path (default) rejects timestamptz to timestamp casting.
2771+
2772+
This ensures we enforce the Iceberg spec distinction between timestamp and timestamptz on writes,
2773+
while the read path can be more permissive (like Spark) via allow_timestamp_tz_mismatch=True.
2774+
"""
2775+
# file is written with timestamp with timezone
2776+
file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2777+
batch = pa.record_batch(
2778+
[
2779+
pa.array(
2780+
[
2781+
datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
2782+
datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
2783+
],
2784+
type=pa.timestamp("us", tz="UTC"),
2785+
)
2786+
],
2787+
names=["ts_field"],
2788+
)
2789+
2790+
# table schema expects timestamp without timezone
2791+
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2792+
2793+
# allow_timestamp_tz_mismatch=False (default, used in write path) should raise
2794+
with pytest.raises(ValueError, match="Unsupported schema projection"):
2795+
_to_requested_schema(
2796+
table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=False
2797+
)
2798+
2799+
2800+
def test_write_file_rejects_timestamptz_to_timestamp(tmp_path: Path) -> None:
2801+
"""Test that write_file rejects writing timestamptz data to a timestamp column."""
2802+
from pyiceberg.table import WriteTask
2803+
2804+
# Table expects timestamp (no tz), but data has timestamptz
2805+
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2806+
task_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2807+
2808+
arrow_data = pa.table({"ts_field": [datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc)]})
2809+
2810+
table_metadata = TableMetadataV2(
2811+
location=f"file://{tmp_path}",
2812+
last_column_id=1,
2813+
format_version=2,
2814+
schemas=[table_schema],
2815+
partition_specs=[PartitionSpec()],
2816+
)
2817+
2818+
task = WriteTask(
2819+
write_uuid=uuid.uuid4(),
2820+
task_id=0,
2821+
record_batches=arrow_data.to_batches(),
2822+
schema=task_schema,
2823+
)
2824+
2825+
with pytest.raises(ValueError, match="Unsupported schema projection"):
2826+
list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
2827+
2828+
27282829
def test__to_requested_schema_timestamps(
27292830
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
27302831
arrow_table_with_all_timestamp_precisions: pa.Table,

0 commit comments

Comments
 (0)