-
Notifications
You must be signed in to change notification settings - Fork 466
Cast 's', 'ms' and 'ns' PyArrow timestamp to 'us' precision on write #848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
d5d31d7
ae6ea72
0d065af
e4471ab
e41a813
d7483db
97ce9a0
f8ec372
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -154,6 +154,7 @@ | |||||||||||
| UUIDType, | ||||||||||||
| ) | ||||||||||||
| from pyiceberg.utils.concurrent import ExecutorFactory | ||||||||||||
| from pyiceberg.utils.config import Config | ||||||||||||
| from pyiceberg.utils.datetime import millis_to_datetime | ||||||||||||
| from pyiceberg.utils.singleton import Singleton | ||||||||||||
| from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string | ||||||||||||
|
|
@@ -918,11 +919,24 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: | |||||||||||
| return TimeType() | ||||||||||||
| elif pa.types.is_timestamp(primitive): | ||||||||||||
| primitive = cast(pa.TimestampType, primitive) | ||||||||||||
| if primitive.unit == "us": | ||||||||||||
| if primitive.tz == "UTC" or primitive.tz == "+00:00": | ||||||||||||
| return TimestamptzType() | ||||||||||||
| elif primitive.tz is None: | ||||||||||||
| return TimestampType() | ||||||||||||
| if primitive.unit in ("s", "ms", "us"): | ||||||||||||
| # Supported types, will be upcast automatically to 'us' | ||||||||||||
| pass | ||||||||||||
| elif primitive.unit == "ns": | ||||||||||||
| if Config().get_bool("downcast-ns-timestamp-on-write"): | ||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about making Since
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for raising this @HonahX - I think this is an important bad case to consider I actually don't think it'll stop when it reads through iceberg-python/pyiceberg/io/pyarrow.py Line 1274 in 0e381fa
This logic was introduced to support casting small and large types interchangeably: different pyarrow types that can be mapped to the same IcebergType (string, large_string) can be cast and read through as the same PyArrow type The only thing that blocks this write from succeeding currently is the I do think that the silent downcasting of data is problematic - but that isn't the only problematic aspect of the Do you think it would be helpful to decouple this concern to that of the idea of introducing an optional schema check for the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a tangent, I'd like to raise another point for discussion: If we are aware that nanoseconds will be introduced as a separate IcebergType, would introducing a pa.timestamp(unit="ns") -> TimestampType introduce too much complexity, since we will have to maintain the logic for one to many mapping for pa.timestamp(unit="ns") -> TimestampType, TimestampNsType based on the format-version of the Iceberg table? Is introducing automated conversion of ns precision timestamp really worth the complexity we are introducing in the near future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still think @HonahX raises a good point about the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your input, @corleyma. Just clarifying here - what enables us to write
@Fokko , @HonahX and @corleyma : I'd like to gather some feedback on this point before committing to introducing this flag. My worry is that since there's a new type that's being introduced in V3 Spec that will actually be in 'ns', enabling 'ns' casting on the existing 'us' precision TimestampType will complicate the type conversions, dooming us to have to check the type (TimestampType, TimestampNsType), downcast-to-ns boolean flag, and the format-version whenever we are casting timestamps. I'd like for us to weigh that trade off carefully and decide on whether supporting this conversion is worth the complexity we are introducing into the conversion functions.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @HonahX for giving the example, I just gave this a spin and ran into the following: @pytest.mark.integration
def test_timestamp_tz(
session_catalog: Catalog, format_version: int, mocker: MockerFixture
) -> None:
nanoseconds_schema_iceberg = Schema(
NestedField(1, "quux", TimestamptzType())
)
nanoseconds_schema = pa.schema([
("quux", pa.timestamp("ns", tz="UTC")),
])
arrow_table = pa.Table.from_pylist(
[
{
"quux": 1615967687249846175, # 2021-03-17 07:54:47.249846159
}
],
schema=nanoseconds_schema,
)
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE": "True"})
identifier = f"default.abccccc{format_version}"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(
identifier=identifier,
schema=nanoseconds_schema_iceberg,
properties={"format-version": str(format_version)},
partition_spec=PartitionSpec(),
)
file_paths = [f"s3://warehouse/default/test_timestamp_tz/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=nanoseconds_schema) as writer:
writer.write_table(arrow_table)
# add the parquet files as data files
tbl.add_files(file_paths=file_paths)
print(tbl.scan().to_arrow())I think we can force the cast to be unsafe: return values.cast(target_type, safe=False)We might want to check if we only apply this when doing the nanos to micros. I'm not sure what will happen when we do other lossy conversions.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also got some issues with the nanosecond timestamp when collecting statistics: At the lines: iceberg-python/pyiceberg/io/pyarrow.py Lines 1870 to 1871 in 7afd6d6
This got fixed after updating this to: col_aggs[field_id].update_min(statistics.min_raw)
col_aggs[field_id].update_max(statistics.max_raw)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi folks - thank you all for the valuable feedback. So it sounds like what we want is for the flag to be controlled by the configuration flag, but that flag to be passed as a parameter to the I've made the following changes:
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I tried making this change and realized that this causes our serialization to break because it introduces
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ah, of course. One of the few upsides of having a fresh Macbook. |
||||||||||||
| logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") | ||||||||||||
| else: | ||||||||||||
| raise TypeError( | ||||||||||||
| "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write." | ||||||||||||
| ) | ||||||||||||
| else: | ||||||||||||
| raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}") | ||||||||||||
|
|
||||||||||||
| if primitive.tz == "UTC" or primitive.tz == "+00:00": | ||||||||||||
| return TimestamptzType() | ||||||||||||
| elif primitive.tz is None: | ||||||||||||
| return TimestampType() | ||||||||||||
|
|
||||||||||||
| elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive): | ||||||||||||
| return BinaryType() | ||||||||||||
| elif pa.types.is_fixed_size_binary(primitive): | ||||||||||||
|
|
||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice 👍