Skip to content

Commit 23e2cb2

Browse files
committed
test written and linter ran
1 parent 960c4aa commit 23e2cb2

File tree

6 files changed

+86
-24
lines changed

6 files changed

+86
-24
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,9 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[
744744
return load_file_io({**self.properties, **properties}, location)
745745

746746
@staticmethod
747-
def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> Schema:
747+
def _convert_schema_if_needed(
748+
schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
749+
) -> Schema:
748750
if isinstance(schema, Schema):
749751
return schema
750752
try:
@@ -755,7 +757,10 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"], format_version
755757
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
756758
if isinstance(schema, pa.Schema):
757759
schema: Schema = visit_pyarrow( # type: ignore
758-
schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
760+
schema,
761+
_ConvertToIcebergWithoutIDs(
762+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
763+
),
759764
)
760765
return schema
761766
except ModuleNotFoundError:
@@ -848,7 +853,9 @@ def _create_staged_table(
848853
Returns:
849854
StagedTable: the created staged table instance.
850855
"""
851-
schema: Schema = self._convert_schema_if_needed(schema, properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) # type: ignore
856+
schema: Schema = self._convert_schema_if_needed(
857+
schema, int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
858+
) # type: ignore
852859

853860
database_name, table_name = self.identifier_to_database_and_table(identifier)
854861

pyiceberg/catalog/rest/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,9 @@ def _create_table(
499499
properties: Properties = EMPTY_DICT,
500500
stage_create: bool = False,
501501
) -> TableResponse:
502-
iceberg_schema = self._convert_schema_if_needed(schema, properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
502+
iceberg_schema = self._convert_schema_if_needed(
503+
schema, int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
504+
)
503505
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
504506
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
505507
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

pyiceberg/io/pyarrow.py

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,22 +1019,36 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
10191019

10201020

10211021
def pyarrow_to_schema(
1022-
schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
1022+
schema: pa.Schema,
1023+
name_mapping: Optional[NameMapping] = None,
1024+
downcast_ns_timestamp_to_us: bool = False,
1025+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
10231026
) -> Schema:
10241027
has_ids = visit_pyarrow(schema, _HasIds())
10251028
if has_ids:
1026-
return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version))
1029+
return visit_pyarrow(
1030+
schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
1031+
)
10271032
elif name_mapping is not None:
1028-
schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
1033+
schema_without_ids = _pyarrow_to_schema_without_ids(
1034+
schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
1035+
)
10291036
return apply_name_mapping(schema_without_ids, name_mapping)
10301037
else:
10311038
raise ValueError(
10321039
"Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
10331040
)
10341041

10351042

1036-
def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> Schema:
1037-
return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version))
1043+
def _pyarrow_to_schema_without_ids(
1044+
schema: pa.Schema,
1045+
downcast_ns_timestamp_to_us: bool = False,
1046+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
1047+
) -> Schema:
1048+
return visit_pyarrow(
1049+
schema,
1050+
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version),
1051+
)
10381052

10391053

10401054
def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema:
@@ -1113,7 +1127,7 @@ def _(obj: pa.Field, visitor: PyArrowSchemaVisitor[T]) -> T:
11131127
visitor.before_field(obj)
11141128
try:
11151129
if obj.name == "timestamp_ns":
1116-
print('alexstephen')
1130+
print("alexstephen")
11171131
result = visit_pyarrow(field_type, visitor)
11181132
except TypeError as e:
11191133
raise UnsupportedPyArrowTypeException(obj, f"Column '{obj.name}' has an unsupported type: {field_type}") from e
@@ -1218,7 +1232,9 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
12181232

12191233
_field_names: List[str]
12201234

1221-
def __init__(self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION) -> None: # noqa: F821
1235+
def __init__(
1236+
self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
1237+
) -> None: # noqa: F821
12221238
self._field_names = []
12231239
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
12241240
self._format_version = format_version
@@ -2549,8 +2565,10 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[
25492565

25502566

25512567
def _check_pyarrow_schema_compatible(
2552-
requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False,
2553-
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
2568+
requested_schema: Schema,
2569+
provided_schema: pa.Schema,
2570+
downcast_ns_timestamp_to_us: bool = False,
2571+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
25542572
) -> None:
25552573
"""
25562574
Check if the `requested_schema` is compatible with `provided_schema`.
@@ -2563,10 +2581,15 @@ def _check_pyarrow_schema_compatible(
25632581
name_mapping = requested_schema.name_mapping
25642582
try:
25652583
provided_schema = pyarrow_to_schema(
2566-
provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
2584+
provided_schema,
2585+
name_mapping=name_mapping,
2586+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
2587+
format_version=format_version,
25672588
)
25682589
except ValueError as e:
2569-
provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
2590+
provided_schema = _pyarrow_to_schema_without_ids(
2591+
provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
2592+
)
25702593
additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
25712594
raise ValueError(
25722595
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
@@ -2683,7 +2706,12 @@ def _dataframe_to_data_files(
26832706
)
26842707
name_mapping = table_metadata.schema().name_mapping
26852708
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
2686-
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=table_metadata.format_version)
2709+
task_schema = pyarrow_to_schema(
2710+
df.schema,
2711+
name_mapping=name_mapping,
2712+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
2713+
format_version=table_metadata.format_version,
2714+
)
26872715

26882716
if table_metadata.spec().is_unpartitioned():
26892717
yield from write_file(

pyiceberg/table/__init__.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
477477
)
478478
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
479479
_check_pyarrow_schema_compatible(
480-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
481-
format_version=self.table_metadata.format_version
480+
self.table_metadata.schema(),
481+
provided_schema=df.schema,
482+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
483+
format_version=self.table_metadata.format_version,
482484
)
483485

484486
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
@@ -528,8 +530,10 @@ def dynamic_partition_overwrite(
528530

529531
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
530532
_check_pyarrow_schema_compatible(
531-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
532-
format_version=self.table_metadata.format_version
533+
self.table_metadata.schema(),
534+
provided_schema=df.schema,
535+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
536+
format_version=self.table_metadata.format_version,
533537
)
534538

535539
# If dataframe does not have data, there is no need to overwrite
@@ -595,8 +599,10 @@ def overwrite(
595599
)
596600
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
597601
_check_pyarrow_schema_compatible(
598-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
599-
format_version=self.table_metadata.format_version
602+
self.table_metadata.schema(),
603+
provided_schema=df.schema,
604+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
605+
format_version=self.table_metadata.format_version,
600606
)
601607

602608
if overwrite_filter != AlwaysFalse():
@@ -792,8 +798,10 @@ def upsert(
792798

793799
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
794800
_check_pyarrow_schema_compatible(
795-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
796-
format_version=self.table_metadata.format_version
801+
self.table_metadata.schema(),
802+
provided_schema=df.schema,
803+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
804+
format_version=self.table_metadata.format_version,
797805
)
798806

799807
# get list of rows that exist so we don't have to load the entire target table

tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2830,6 +2830,7 @@ def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
28302830
]
28312831
)
28322832

2833+
28332834
@pytest.fixture(scope="session")
28342835
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
28352836
"""Iceberg table Schema with only date, timestamp and timestamptz values."""

tests/integration/test_writes/test_writes.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,6 +1355,7 @@ def test_write_all_timestamp_precision(
13551355
# and supports upto microsecond precision
13561356
assert left.timestamp() == right.timestamp(), f"Difference in column {column}: {left} != {right}"
13571357

1358+
13581359
@pytest.mark.integration
13591360
@pytest.mark.parametrize("format_version", [1, 2])
13601361
def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
@@ -2114,3 +2115,18 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio
21142115
)
21152116
assert main_df.count() == 3
21162117
assert branch_df.count() == 2
2118+
2119+
2120+
@pytest.mark.integration
2121+
def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
2122+
identifier = "default.test_nanosecond_support_on_catalog"
2123+
# Create a pyarrow table with a nanosecond timestamp column
2124+
table = pa.Table.from_arrays(
2125+
[
2126+
pa.array([datetime.now()], type=pa.timestamp("ns")),
2127+
pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")),
2128+
],
2129+
names=["timestamp_ns", "timestamptz_ns"],
2130+
)
2131+
2132+
_create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema)

0 commit comments

Comments
 (0)