-
Notifications
You must be signed in to change notification settings - Fork 467
Cast 's', 'ms' and 'ns' PyArrow timestamp to 'us' precision on write #848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 6 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
d5d31d7
checkpoint
sungwy ae6ea72
support more timestamps
sungwy 0d065af
Merge branch 'main' into timestamp-cast
sungwy e4471ab
adopt review feedback
sungwy e41a813
fix
sungwy d7483db
revert min_raw max_raw change
sungwy 97ce9a0
adopt nits
sungwy f8ec372
Merge branch 'main' into timestamp-cast
sungwy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -154,6 +154,7 @@ | |
| UUIDType, | ||
| ) | ||
| from pyiceberg.utils.concurrent import ExecutorFactory | ||
| from pyiceberg.utils.config import Config | ||
| from pyiceberg.utils.datetime import millis_to_datetime | ||
| from pyiceberg.utils.singleton import Singleton | ||
| from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string | ||
|
|
@@ -470,7 +471,9 @@ def __setstate__(self, state: Dict[str, Any]) -> None: | |
|
|
||
|
|
||
| def schema_to_pyarrow( | ||
| schema: Union[Schema, IcebergType], metadata: Dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True | ||
| schema: Union[Schema, IcebergType], | ||
| metadata: Dict[bytes, bytes] = EMPTY_DICT, | ||
| include_field_ids: bool = True, | ||
| ) -> pa.schema: | ||
| return visit(schema, _ConvertToArrowSchema(metadata, include_field_ids)) | ||
|
|
||
|
|
@@ -663,21 +666,23 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start | |
| return np.subtract(np.setdiff1d(np.arange(start_index, end_index), all_chunks, assume_unique=False), start_index) | ||
|
|
||
|
|
||
| def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] = None) -> Schema: | ||
| def pyarrow_to_schema( | ||
| schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False | ||
| ) -> Schema: | ||
| has_ids = visit_pyarrow(schema, _HasIds()) | ||
| if has_ids: | ||
| visitor = _ConvertToIceberg() | ||
| visitor = _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
| elif name_mapping is not None: | ||
| visitor = _ConvertToIceberg(name_mapping=name_mapping) | ||
| visitor = _ConvertToIceberg(name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
| else: | ||
| raise ValueError( | ||
| "Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined" | ||
| ) | ||
| return visit_pyarrow(schema, visitor) | ||
|
|
||
|
|
||
| def _pyarrow_to_schema_without_ids(schema: pa.Schema) -> Schema: | ||
| return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs()) | ||
| def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> Schema: | ||
| return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) | ||
|
|
||
|
|
||
| def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema: | ||
|
|
@@ -849,9 +854,10 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): | |
| _field_names: List[str] | ||
| _name_mapping: Optional[NameMapping] | ||
|
|
||
| def __init__(self, name_mapping: Optional[NameMapping] = None) -> None: | ||
| def __init__(self, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False) -> None: | ||
| self._field_names = [] | ||
| self._name_mapping = name_mapping | ||
| self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us | ||
|
|
||
| def _field_id(self, field: pa.Field) -> int: | ||
| if self._name_mapping: | ||
|
|
@@ -918,11 +924,24 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: | |
| return TimeType() | ||
| elif pa.types.is_timestamp(primitive): | ||
| primitive = cast(pa.TimestampType, primitive) | ||
| if primitive.unit == "us": | ||
| if primitive.tz == "UTC" or primitive.tz == "+00:00": | ||
| return TimestamptzType() | ||
| elif primitive.tz is None: | ||
| return TimestampType() | ||
| if primitive.unit in ("s", "ms", "us"): | ||
| # Supported types, will be upcast automatically to 'us' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is nice 👍 |
||
| pass | ||
| elif primitive.unit == "ns": | ||
| if self._downcast_ns_timestamp_to_us: | ||
| logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") | ||
| else: | ||
| raise TypeError( | ||
| "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." | ||
| ) | ||
| else: | ||
| raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}") | ||
|
|
||
| if primitive.tz == "UTC" or primitive.tz == "+00:00": | ||
| return TimestamptzType() | ||
| elif primitive.tz is None: | ||
| return TimestampType() | ||
|
|
||
| elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive): | ||
| return BinaryType() | ||
| elif pa.types.is_fixed_size_binary(primitive): | ||
|
|
@@ -1010,8 +1029,11 @@ def _task_to_record_batches( | |
| with fs.open_input_file(path) as fin: | ||
| fragment = arrow_format.make_fragment(fin) | ||
| physical_schema = fragment.physical_schema | ||
| file_schema = pyarrow_to_schema(physical_schema, name_mapping) | ||
|
|
||
| # In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema | ||
| # Hence it is reasonable to always cast 'ns' timestamp to 'us' on read. | ||
| # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on | ||
| # the table format version. | ||
| file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) | ||
| pyarrow_filter = None | ||
| if bound_row_filter is not AlwaysTrue(): | ||
| translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) | ||
|
|
@@ -1049,7 +1071,7 @@ def _task_to_record_batches( | |
| arrow_table = pa.Table.from_batches([batch]) | ||
| arrow_table = arrow_table.filter(pyarrow_filter) | ||
| batch = arrow_table.to_batches()[0] | ||
| yield to_requested_schema(projected_schema, file_project_schema, batch) | ||
| yield to_requested_schema(projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True) | ||
| current_index += len(batch) | ||
|
|
||
|
|
||
|
|
@@ -1248,8 +1270,12 @@ def project_batches( | |
| total_row_count += len(batch) | ||
|
|
||
|
|
||
| def to_requested_schema(requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch) -> pa.RecordBatch: | ||
| struct_array = visit_with_partner(requested_schema, batch, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema)) | ||
| def to_requested_schema( | ||
| requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch, downcast_ns_timestamp_to_us: bool = False | ||
| ) -> pa.RecordBatch: | ||
| struct_array = visit_with_partner( | ||
| requested_schema, batch, ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us), ArrowAccessor(file_schema) | ||
| ) | ||
|
|
||
| arrays = [] | ||
| fields = [] | ||
|
|
@@ -1263,8 +1289,9 @@ def to_requested_schema(requested_schema: Schema, file_schema: Schema, batch: pa | |
| class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): | ||
| file_schema: Schema | ||
|
|
||
| def __init__(self, file_schema: Schema): | ||
| def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False): | ||
| self.file_schema = file_schema | ||
| self.downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us | ||
|
|
||
| def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: | ||
| file_field = self.file_schema.find_field(field.field_id) | ||
|
|
@@ -1275,7 +1302,15 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: | |
| # if file_field and field_type (e.g. String) are the same | ||
| # but the pyarrow type of the array is different from the expected type | ||
| # (e.g. string vs larger_string), we want to cast the array to the larger type | ||
| return values.cast(target_type) | ||
| safe = True | ||
| if ( | ||
| pa.types.is_timestamp(target_type) | ||
| and target_type.unit == "us" | ||
| and pa.types.is_timestamp(values.type) | ||
| and values.type.unit == "ns" | ||
| ): | ||
| safe = False | ||
| return values.cast(target_type, safe=safe) | ||
| return values | ||
|
|
||
| def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field: | ||
|
|
@@ -1912,8 +1947,14 @@ def write_parquet(task: WriteTask) -> DataFile: | |
| else: | ||
| file_schema = table_schema | ||
|
|
||
| downcast_ns_timestamp_to_us = Config().get_bool("downcast-ns-timestamp-to-us-on-write") or False | ||
| batches = [ | ||
| to_requested_schema(requested_schema=file_schema, file_schema=table_schema, batch=batch) | ||
| to_requested_schema( | ||
| requested_schema=file_schema, | ||
| file_schema=table_schema, | ||
| batch=batch, | ||
| downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, | ||
| ) | ||
| for batch in task.record_batches | ||
| ] | ||
| arrow_table = pa.Table.from_batches(batches) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can move
"downcast-ns-timestamp-to-us-on-write"into a constant, and reuse it inpyarrow.pyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the review! I've adopted this in the new commits