-
Notifications
You must be signed in to change notification settings - Fork 468
V3: Fix invalid downcasting for nanos #2397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ | |
| import tempfile | ||
| import uuid | ||
| import warnings | ||
| from datetime import date | ||
| from datetime import date, datetime, timezone | ||
| from typing import Any, List, Optional | ||
| from unittest.mock import MagicMock, patch | ||
| from uuid import uuid4 | ||
|
|
@@ -61,6 +61,7 @@ | |
| from pyiceberg.io import S3_RETRY_STRATEGY_IMPL, InputStream, OutputStream, load_file_io | ||
| from pyiceberg.io.pyarrow import ( | ||
| ICEBERG_SCHEMA, | ||
| PYARROW_PARQUET_FIELD_ID_KEY, | ||
| ArrowScan, | ||
| PyArrowFile, | ||
| PyArrowFileIO, | ||
|
|
@@ -70,6 +71,7 @@ | |
| _determine_partitions, | ||
| _primitive_to_physical, | ||
| _read_deletes, | ||
| _task_to_record_batches, | ||
| _to_requested_schema, | ||
| bin_pack_arrow_table, | ||
| compute_statistics_plan, | ||
|
|
@@ -85,7 +87,7 @@ | |
| from pyiceberg.table.metadata import TableMetadataV2 | ||
| from pyiceberg.table.name_mapping import create_mapping_from_schema | ||
| from pyiceberg.transforms import HourTransform, IdentityTransform | ||
| from pyiceberg.typedef import UTF8, Properties, Record | ||
| from pyiceberg.typedef import UTF8, Properties, Record, TableVersion | ||
| from pyiceberg.types import ( | ||
| BinaryType, | ||
| BooleanType, | ||
|
|
@@ -102,6 +104,7 @@ | |
| PrimitiveType, | ||
| StringType, | ||
| StructType, | ||
| TimestampNanoType, | ||
| TimestampType, | ||
| TimestamptzType, | ||
| TimeType, | ||
|
|
@@ -873,6 +876,18 @@ def _write_table_to_file(filepath: str, schema: pa.Schema, table: pa.Table) -> s | |
| return filepath | ||
|
|
||
|
|
||
| def _write_table_to_data_file(filepath: str, schema: pa.Schema, table: pa.Table) -> DataFile: | ||
| filepath = _write_table_to_file(filepath, schema, table) | ||
| return DataFile.from_args( | ||
| content=DataFileContent.DATA, | ||
| file_path=filepath, | ||
| file_format=FileFormat.PARQUET, | ||
| partition={}, | ||
| record_count=len(table), | ||
| file_size_in_bytes=22, # This is not relevant for now | ||
| ) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def file_int(schema_int: Schema, tmpdir: str) -> str: | ||
| pyarrow_schema = schema_to_pyarrow(schema_int, metadata={ICEBERG_SCHEMA: bytes(schema_int.model_dump_json(), UTF8)}) | ||
|
|
@@ -2551,8 +2566,6 @@ def test_initial_value() -> None: | |
|
|
||
|
|
||
| def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None: | ||
| from datetime import datetime, timezone | ||
|
|
||
| # file is written with timestamp without timezone | ||
| file_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) | ||
| batch = pa.record_batch( | ||
|
|
@@ -2722,3 +2735,57 @@ def test_retry_strategy_not_found() -> None: | |
| io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"}) | ||
| with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"): | ||
| io.new_input("s3://bucket/path/to/file") | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("format_version", [1, 2, 3]) | ||
| def test_task_to_record_batches_nanos(format_version: TableVersion, tmpdir: str) -> None: | ||
| from datetime import datetime | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this import seems unnecessary now.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good one, I expected the linter to clean that up 🚀 |
||
|
|
||
| arrow_table = pa.table( | ||
| [ | ||
| pa.array( | ||
| [ | ||
| datetime(2025, 8, 14, 12, 0, 0), | ||
| datetime(2025, 8, 14, 13, 0, 0), | ||
| ], | ||
| type=pa.timestamp("ns"), | ||
| ) | ||
| ], | ||
| pa.schema((pa.field("ts_field", pa.timestamp("ns"), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)), | ||
| ) | ||
|
|
||
| data_file = _write_table_to_data_file(f"{tmpdir}/test_task_to_record_batches_nanos.parquet", arrow_table.schema, arrow_table) | ||
|
|
||
| if format_version <= 2: | ||
| table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) | ||
| else: | ||
| table_schema = Schema(NestedField(1, "ts_field", TimestampNanoType(), required=False)) | ||
|
|
||
| actual_result = list( | ||
| _task_to_record_batches( | ||
| PyArrowFileIO(), | ||
| FileScanTask(data_file), | ||
| bound_row_filter=AlwaysTrue(), | ||
| projected_schema=table_schema, | ||
| projected_field_ids={1}, | ||
| positional_deletes=None, | ||
| case_sensitive=True, | ||
| format_version=format_version, | ||
| ) | ||
| )[0] | ||
|
|
||
| def _expected_batch(unit: str) -> pa.RecordBatch: | ||
| return pa.record_batch( | ||
| [ | ||
| pa.array( | ||
| [ | ||
| datetime(2025, 8, 14, 12, 0, 0), | ||
| datetime(2025, 8, 14, 13, 0, 0), | ||
| ], | ||
| type=pa.timestamp(unit), | ||
| ) | ||
| ], | ||
| names=["ts_field"], | ||
| ) | ||
|
|
||
| assert _expected_batch("ns" if format_version > 2 else "us").equals(actual_result) | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check the format version for downcasting? (We have the table_metadata already, so we have access to it)