|
20 | 20 | import tempfile |
21 | 21 | import uuid |
22 | 22 | import warnings |
23 | | -from datetime import date |
| 23 | +from datetime import date, datetime, timezone |
24 | 24 | from typing import Any, List, Optional |
25 | 25 | from unittest.mock import MagicMock, patch |
26 | 26 | from uuid import uuid4 |
|
61 | 61 | from pyiceberg.io import S3_RETRY_STRATEGY_IMPL, InputStream, OutputStream, load_file_io |
62 | 62 | from pyiceberg.io.pyarrow import ( |
63 | 63 | ICEBERG_SCHEMA, |
| 64 | + PYARROW_PARQUET_FIELD_ID_KEY, |
64 | 65 | ArrowScan, |
65 | 66 | PyArrowFile, |
66 | 67 | PyArrowFileIO, |
|
70 | 71 | _determine_partitions, |
71 | 72 | _primitive_to_physical, |
72 | 73 | _read_deletes, |
| 74 | + _task_to_record_batches, |
73 | 75 | _to_requested_schema, |
74 | 76 | bin_pack_arrow_table, |
75 | 77 | compute_statistics_plan, |
|
85 | 87 | from pyiceberg.table.metadata import TableMetadataV2 |
86 | 88 | from pyiceberg.table.name_mapping import create_mapping_from_schema |
87 | 89 | from pyiceberg.transforms import HourTransform, IdentityTransform |
88 | | -from pyiceberg.typedef import UTF8, Properties, Record |
| 90 | +from pyiceberg.typedef import UTF8, Properties, Record, TableVersion |
89 | 91 | from pyiceberg.types import ( |
90 | 92 | BinaryType, |
91 | 93 | BooleanType, |
|
102 | 104 | PrimitiveType, |
103 | 105 | StringType, |
104 | 106 | StructType, |
| 107 | + TimestampNanoType, |
105 | 108 | TimestampType, |
106 | 109 | TimestamptzType, |
107 | 110 | TimeType, |
@@ -873,6 +876,18 @@ def _write_table_to_file(filepath: str, schema: pa.Schema, table: pa.Table) -> s |
873 | 876 | return filepath |
874 | 877 |
|
875 | 878 |
|
| 879 | +def _write_table_to_data_file(filepath: str, schema: pa.Schema, table: pa.Table) -> DataFile: |
| 880 | + filepath = _write_table_to_file(filepath, schema, table) |
| 881 | + return DataFile.from_args( |
| 882 | + content=DataFileContent.DATA, |
| 883 | + file_path=filepath, |
| 884 | + file_format=FileFormat.PARQUET, |
| 885 | + partition={}, |
| 886 | + record_count=len(table), |
| 887 | + file_size_in_bytes=22, # This is not relevant for now |
| 888 | + ) |
| 889 | + |
| 890 | + |
876 | 891 | @pytest.fixture |
877 | 892 | def file_int(schema_int: Schema, tmpdir: str) -> str: |
878 | 893 | pyarrow_schema = schema_to_pyarrow(schema_int, metadata={ICEBERG_SCHEMA: bytes(schema_int.model_dump_json(), UTF8)}) |
@@ -2551,8 +2566,6 @@ def test_initial_value() -> None: |
2551 | 2566 |
|
2552 | 2567 |
|
2553 | 2568 | def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None: |
2554 | | - from datetime import datetime, timezone |
2555 | | - |
2556 | 2569 | # file is written with timestamp without timezone |
2557 | 2570 | file_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) |
2558 | 2571 | batch = pa.record_batch( |
@@ -2722,3 +2735,57 @@ def test_retry_strategy_not_found() -> None: |
2722 | 2735 | io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"}) |
2723 | 2736 | with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"): |
2724 | 2737 | io.new_input("s3://bucket/path/to/file") |
| 2738 | + |
| 2739 | + |
| 2740 | +@pytest.mark.parametrize("format_version", [1, 2, 3]) |
| 2741 | +def test_task_to_record_batches_nanos(format_version: TableVersion, tmpdir: str) -> None: |
| 2742 | + from datetime import datetime |
| 2743 | + |
| 2744 | + arrow_table = pa.table( |
| 2745 | + [ |
| 2746 | + pa.array( |
| 2747 | + [ |
| 2748 | + datetime(2025, 8, 14, 12, 0, 0), |
| 2749 | + datetime(2025, 8, 14, 13, 0, 0), |
| 2750 | + ], |
| 2751 | + type=pa.timestamp("ns"), |
| 2752 | + ) |
| 2753 | + ], |
| 2754 | + pa.schema((pa.field("ts_field", pa.timestamp("ns"), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)), |
| 2755 | + ) |
| 2756 | + |
| 2757 | + data_file = _write_table_to_data_file(f"{tmpdir}/test_task_to_record_batches_nanos.parquet", arrow_table.schema, arrow_table) |
| 2758 | + |
| 2759 | + if format_version <= 2: |
| 2760 | + table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) |
| 2761 | + else: |
| 2762 | + table_schema = Schema(NestedField(1, "ts_field", TimestampNanoType(), required=False)) |
| 2763 | + |
| 2764 | + actual_result = list( |
| 2765 | + _task_to_record_batches( |
| 2766 | + PyArrowFileIO(), |
| 2767 | + FileScanTask(data_file), |
| 2768 | + bound_row_filter=AlwaysTrue(), |
| 2769 | + projected_schema=table_schema, |
| 2770 | + projected_field_ids={1}, |
| 2771 | + positional_deletes=None, |
| 2772 | + case_sensitive=True, |
| 2773 | + format_version=format_version, |
| 2774 | + ) |
| 2775 | + )[0] |
| 2776 | + |
| 2777 | + def _expected_batch(unit: str) -> pa.RecordBatch: |
| 2778 | + return pa.record_batch( |
| 2779 | + [ |
| 2780 | + pa.array( |
| 2781 | + [ |
| 2782 | + datetime(2025, 8, 14, 12, 0, 0), |
| 2783 | + datetime(2025, 8, 14, 13, 0, 0), |
| 2784 | + ], |
| 2785 | + type=pa.timestamp(unit), |
| 2786 | + ) |
| 2787 | + ], |
| 2788 | + names=["ts_field"], |
| 2789 | + ) |
| 2790 | + |
| 2791 | + assert _expected_batch("ns" if format_version > 2 else "us").equals(actual_result) |
0 commit comments