Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,8 @@ PyIceberg uses multiple threads to parallelize operations. The number of workers

# Backward Compatibility

Previous versions of Java (`<1.4.0`) implementations incorrectly assume the optional attribute `current-snapshot-id` to be a required attribute in TableMetadata. This means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create a table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue
Previous versions of Java (`<1.4.0`) implementations incorrectly assume the optional attribute `current-snapshot-id` to be a required attribute in TableMetadata. This means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create a table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` property as "True" in the configuration file, or by setting the `PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue

# Nanoseconds Support

PyIceberg currently only supports upto microsecond precision in its TimestampType. PyArrow timestamp types in 's' and 'ms' will be upcast automatically to 'us' precision timestamps on write. Timestamps in 'ns' precision can also be downcast automatically on write if desired. This can be configured by setting the `downcast-ns-timestamp-on-write` property as "True" in the configuration file, or by setting the `PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE` environment variable. Refer to the [nanoseconds timestamp proposal document](https://docs.google.com/document/d/1bE1DcEGNzZAMiVJSZ0X1wElKLNkT9kRkk0hDlfkXzvU/edit#heading=h.ibflcctc9i1d) for more details on the long term roadmap for nanoseconds support
24 changes: 19 additions & 5 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
UUIDType,
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import millis_to_datetime
from pyiceberg.utils.singleton import Singleton
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
Expand Down Expand Up @@ -918,11 +919,24 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
return TimeType()
elif pa.types.is_timestamp(primitive):
primitive = cast(pa.TimestampType, primitive)
if primitive.unit == "us":
if primitive.tz == "UTC" or primitive.tz == "+00:00":
return TimestamptzType()
elif primitive.tz is None:
return TimestampType()
if primitive.unit in ("s", "ms", "us"):
# Supported types, will be upcast automatically to 'us'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice 👍

pass
elif primitive.unit == "ns":
if Config().get_bool("downcast-ns-timestamp-on-write"):
Copy link
Copy Markdown
Contributor

@HonahX HonahX Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making downcast_ns_timestamp a parameter of schema_to_pyarrow(TYPO: should be pyarrow_to_schema), and reading the Config from yml when we use this API on write? schema_to_pyarrow(TYPO: should be pyarrow_to_schema) itself seems to be a useful public API so it may be good to explicitly reveal the optional downcast. This will also help mitigate an edge case:

Since pyarrow_to_schema is used for both read/write, enabling this option also allows unit ns to pass the schema conversion when reading. For example, If users add a parquet file with ns timestamp and try to read the table as arrow, they will find the read process pass the pyarrow_to_schema check and stops at to_request_schema with

 pyarrow.lib.ArrowInvalid: Casting from timestamp[ns] to timestamp[us] would lose data:

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for raising this @HonahX - I think this is an important bad case to consider

I actually don't think it'll stop when it reads through to_requested_schema, because it will detect that the pyarrow types are different, but their IcebergTypes are the same and silently cast on read, which will drop the precision silently:

elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=False)) != values.type:

This logic was introduced to support casting small and large types interchangeably: different pyarrow types that can be mapped to the same IcebergType (string, large_string) can be cast and read through as the same PyArrow type

The only thing that blocks this write from succeeding currently is the pyarrow_to_schema call which fails to generate a corresponding IcebergSchema based on the provided pyarrow schema, which this PR seeks to fix.

I do think that the silent downcasting of data is problematic - but that isn't the only problematic aspect of the add_files API. add_files does not check for the validity of the schema, because we provide a list of files into the API. Currently, it is up to the user to ensure that the file they want to add is in the correct format, and own the risk of potentially introducing wrong data files into the table. We note that the API is only intended for expert users, which is similar to the warnings we have for the other existing Table migration procedures.

Do you think it would be helpful to decouple this concern to that of the idea of introducing an optional schema check for the add_files procedure?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a tangent, I'd like to raise another point for discussion:

If we are aware that nanoseconds will be introduced as a separate IcebergType, would introducing a pa.timestamp(unit="ns") -> TimestampType introduce too much complexity, since we will have to maintain the logic for one to many mapping for pa.timestamp(unit="ns") -> TimestampType, TimestampNsType based on the format-version of the Iceberg table? Is introducing automated conversion of ns precision timestamp really worth the complexity we are introducing in the near future?

Copy link
Copy Markdown

@corleyma corleyma Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think @HonahX raises a good point about the schema_to_pyarrow method being a useful public API, and it would be nice for its behavior to not be too tightly coupled to pyiceberg config. I.e., I agree that it's wiser to parameterize the behavior and determine the correct parameter to use via config where it's called.

Copy link
Copy Markdown
Collaborator Author

@sungwy sungwy Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your input, @corleyma. Just clarifying here - what enables us to write ns into TimestampType in PyIceberg is this proposed change in ConvertToIceberg, which is not in schema_to_pyarrow. It is actually in pyarrow_to_schema which is used to check schema compatibility on write. Once the data file is written, we are making the assumption that TimestampType is all in 'us' precision, or that it is safe to cast to 'us' precision, because the writer has already made the decision to write into 'us' precision timestamps.

If we are aware that nanoseconds will be introduced as a separate IcebergType, would introducing a pa.timestamp(unit="ns") -> TimestampType introduce too much complexity, since we will have to maintain the logic for one to many mapping for pa.timestamp(unit="ns") -> TimestampType, TimestampNsType based on the format-version of the Iceberg table? Is introducing automated conversion of ns precision timestamp really worth the complexity we are introducing in the near future?

@Fokko , @HonahX and @corleyma : I'd like to gather some feedback on this point before committing to introducing this flag. My worry is that since there's a new type that's being introduced in V3 Spec that will actually be in 'ns', enabling 'ns' casting on the existing 'us' precision TimestampType will complicate the type conversions, dooming us to have to check the type (TimestampType, TimestampNsType), downcast-to-ns boolean flag, and the format-version whenever we are casting timestamps. I'd like for us to weigh that trade off carefully and decide on whether supporting this conversion is worth the complexity we are introducing into the conversion functions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HonahX for giving the example, I just gave this a spin and ran into the following:

@pytest.mark.integration
def test_timestamp_tz(
    session_catalog: Catalog, format_version: int, mocker: MockerFixture
) -> None:
    nanoseconds_schema_iceberg = Schema(
        NestedField(1, "quux", TimestamptzType())
    )

    nanoseconds_schema = pa.schema([
        ("quux", pa.timestamp("ns", tz="UTC")),
    ])

    arrow_table = pa.Table.from_pylist(
        [
            {
                "quux": 1615967687249846175,  # 2021-03-17 07:54:47.249846159
            }
        ],
        schema=nanoseconds_schema,
    )
    mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE": "True"})

    identifier = f"default.abccccc{format_version}"

    try:
        session_catalog.drop_table(identifier=identifier)
    except NoSuchTableError:
        pass

    tbl = session_catalog.create_table(
        identifier=identifier,
        schema=nanoseconds_schema_iceberg,
        properties={"format-version": str(format_version)},
        partition_spec=PartitionSpec(),
    )

    file_paths = [f"s3://warehouse/default/test_timestamp_tz/v{format_version}/test-{i}.parquet" for i in range(5)]
    # write parquet files
    for file_path in file_paths:
        fo = tbl.io.new_output(file_path)
        with fo.create(overwrite=True) as fos:
            with pq.ParquetWriter(fos, schema=nanoseconds_schema) as writer:
                writer.write_table(arrow_table)

    # add the parquet files as data files
    tbl.add_files(file_paths=file_paths)

    print(tbl.scan().to_arrow())

I think we can force the cast to be unsafe:

return values.cast(target_type, safe=False)

We might want to check if we only apply this when doing the nanos to micros. I'm not sure what will happen when we do other lossy conversions.

Copy link
Copy Markdown
Contributor

@Fokko Fokko Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also got some issues with the nanosecond timestamp when collecting statistics:

>   ???
E   ValueError: Nanosecond resolution temporal type 1615967687249846175 is not safely convertible to microseconds to convert to datetime.datetime. Install pandas to return as Timestamp with nanosecond support or access the .value attribute.

At the lines:

col_aggs[field_id].update_min(statistics.min)
col_aggs[field_id].update_max(statistics.max)

This got fixed after updating this to:

                    col_aggs[field_id].update_min(statistics.min_raw)
                    col_aggs[field_id].update_max(statistics.max_raw)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi folks - thank you all for the valuable feedback. So it sounds like what we want is for the flag to be controlled by the configuration flag, but that flag to be passed as a parameter to the schema_to_pyarrow API so that its behavior can be fully controlled by its input parameters.

I've made the following changes:

  1. Introduced downcast_ns_timestamp_to_us as a new input parameter to pyarrow_to_schema and to_requested_schema public APIs
  2. Now table and catalog level functions infer the flag from the Config on write. (e.g. _check_schema_compatible and _convert_schema_if_needed)
  3. Always downcast ns to us on read, if there is ns timestamp in the parquet file (we will want to revise this behavior when we introduce nanosecond support in V3 spec, but until then, I think it's a reasonable assumption that data files that are in Iceberg will only be read with microseconds precision). https://github.com/apache/iceberg-python/pull/848/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR1030-R1033

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also got some issues with the nanosecond timestamp when collecting statistics:

>   ???
E   ValueError: Nanosecond resolution temporal type 1615967687249846175 is not safely convertible to microseconds to convert to datetime.datetime. Install pandas to return as Timestamp with nanosecond support or access the .value attribute.

At the lines:

col_aggs[field_id].update_min(statistics.min)
col_aggs[field_id].update_max(statistics.max)

This got fixed after updating this to:

                    col_aggs[field_id].update_min(statistics.min_raw)
                    col_aggs[field_id].update_max(statistics.max_raw)

I tried making this change and realized that this causes our serialization to break because it introduces bytes values in our statistics, which cannot be serialized (since it already is). I will need to spend a bit more time to figure out the right change to StatsAggregator to support this change. I also failed to reproduce this issue in my environment (possibly because it has pandas installed) so I'm reverting this change for now.

Copy link
Copy Markdown
Contributor

@Fokko Fokko Jul 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also failed to reproduce this issue in my environment (possibly because it has pandas installed) so I'm reverting this change for now.

Ah, of course. One of the few upsides of having a fresh Macbook.

logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
)
else:
raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")

if primitive.tz == "UTC" or primitive.tz == "+00:00":
return TimestamptzType()
elif primitive.tz is None:
return TimestampType()

elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive):
return BinaryType()
elif pa.types.is_fixed_size_binary(primitive):
Expand Down
69 changes: 68 additions & 1 deletion tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import math
import os
import time
from datetime import date, datetime
from datetime import date, datetime, timezone
from pathlib import Path
from typing import Any, Dict
from urllib.parse import urlparse
Expand Down Expand Up @@ -925,3 +925,70 @@ def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null
tbl.append(arrow_table_without_some_columns)
# overwrite and then append should produce twice the data
assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: Catalog, format_version: int) -> None:
identifier = "default.table_all_timestamp_precision"
arrow_table_schema_with_all_timestamp_precisions = pa.schema([
("timestamp_s", pa.timestamp(unit="s")),
("timestamptz_s", pa.timestamp(unit="s", tz="UTC")),
("timestamp_ms", pa.timestamp(unit="ms")),
("timestamptz_ms", pa.timestamp(unit="ms", tz="UTC")),
("timestamp_us", pa.timestamp(unit="us")),
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ns", pa.timestamp(unit="ns")),
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
])
TEST_DATA_WITH_NULL = {
"timestamp_s": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_s": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
"timestamp_ms": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_ms": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
"timestamp_us": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_us": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
"timestamp_ns": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_ns": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
}
input_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_all_timestamp_precisions)
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE": "True"})

tbl = _create_table(
session_catalog,
identifier,
{"format-version": format_version},
data=[input_arrow_table],
schema=arrow_table_schema_with_all_timestamp_precisions,
)
tbl.overwrite(input_arrow_table)
written_arrow_table = tbl.scan().to_arrow()

expected_schema_in_all_us = pa.schema([
("timestamp_s", pa.timestamp(unit="us")),
("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ms", pa.timestamp(unit="us")),
("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
("timestamp_us", pa.timestamp(unit="us")),
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ns", pa.timestamp(unit="us")),
("timestamptz_ns", pa.timestamp(unit="us", tz="UTC")),
])
assert written_arrow_table.schema == expected_schema_in_all_us
assert written_arrow_table == input_arrow_table.cast(expected_schema_in_all_us)
38 changes: 21 additions & 17 deletions tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=protected-access,unused-argument,redefined-outer-name
import os
import re

import pyarrow as pa
import pytest
from pytest_mock.plugin import MockerFixture

from pyiceberg.io.pyarrow import (
_ConvertToArrowSchema,
Expand Down Expand Up @@ -161,22 +163,25 @@ def test_pyarrow_time64_ns_to_iceberg() -> None:
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


def test_pyarrow_timestamp_to_iceberg() -> None:
pyarrow_type = pa.timestamp(unit="us")
@pytest.mark.parametrize("precision", ["s", "ms", "us", "ns"])
def test_pyarrow_timestamp_to_iceberg(mocker: MockerFixture, precision: str) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_ON_WRITE": "True"})

pyarrow_type = pa.timestamp(unit=precision)
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == TimestampType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
# all timestamp types are converted to 'us' precision
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us")


def test_pyarrow_timestamp_invalid_units() -> None:
pyarrow_type = pa.timestamp(unit="ms")
with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[ms]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
pyarrow_type = pa.timestamp(unit="s")
with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[s]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
pyarrow_type = pa.timestamp(unit="ns")
with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[ns]")):
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


Expand All @@ -192,14 +197,13 @@ def test_pyarrow_timestamp_tz_to_iceberg() -> None:


def test_pyarrow_timestamp_tz_invalid_units() -> None:
pyarrow_type = pa.timestamp(unit="ms", tz="UTC")
with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[ms, tz=UTC]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
pyarrow_type = pa.timestamp(unit="s", tz="UTC")
with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[s, tz=UTC]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
pyarrow_type = pa.timestamp(unit="ns", tz="UTC")
with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[ns, tz=UTC]")):
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


Expand Down