Skip to content

Commit 4e47edc

Browse files
committed
Address PR comments
1 parent 27fabd2 commit 4e47edc

File tree

4 files changed

+78
-27
lines changed

4 files changed

+78
-27
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
CreateTableTransaction,
5656
StagedTable,
5757
Table,
58-
TableProperties,
5958
)
6059
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
6160
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -758,24 +757,6 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
758757
pass
759758
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")
760759

761-
@staticmethod
762-
def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
763-
"""Get the full path for a metadata file.
764-
765-
Args:
766-
table_location (str): The base table location
767-
file_name (str): Name of the metadata file
768-
properties (Properties): Table properties that may contain custom metadata path
769-
770-
Returns:
771-
str: Full path where the metadata file should be stored
772-
"""
773-
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
774-
base_path = metadata_path.rstrip("/")
775-
else:
776-
base_path = f"{table_location}/metadata"
777-
return f"{base_path}/{file_name}"
778-
779760
def __repr__(self) -> str:
780761
"""Return the string representation of the Catalog class."""
781762
return f"{self.name} ({self.__class__})"
@@ -955,7 +936,7 @@ def _get_metadata_location(table_location: str, new_version: int = 0, properties
955936
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
956937

957938
file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
958-
return Catalog.metadata_file_location(table_location, file_name, properties)
939+
return Table.metadata_file_location(table_location, file_name, properties)
959940

960941
@staticmethod
961942
def _parse_metadata_version(metadata_location: str) -> int:

pyiceberg/table/__init__.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,9 +1213,22 @@ def to_daft(self) -> daft.DataFrame:
12131213

12141214
return daft.read_iceberg(self)
12151215

1216-
def metadata_file_location(self, file_name: str) -> str:
1217-
"""Get the metadata file location using write.metadata.path from properties if set."""
1218-
return self.catalog.metadata_file_location(self.metadata.location, file_name, self.metadata.properties)
1216+
@staticmethod
1217+
def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
1218+
"""Get the full path for a metadata file.
1219+
1220+
Args:
1221+
table_location (str): The base table location
1222+
file_name (str): Name of the metadata file
1223+
properties (Properties): Table properties that may contain custom metadata path
1224+
1225+
Returns:
1226+
str: Full path where the metadata file should be stored
1227+
"""
1228+
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
1229+
return f"{metadata_path.rstrip("/")}/{file_name}"
1230+
1231+
return f"{table_location}/metadata/{file_name}"
12191232

12201233

12211234
class StaticTable(Table):

pyiceberg/table/update/snapshot.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@
8484
from pyiceberg.table import Transaction
8585

8686

87+
def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
88+
return f"{commit_uuid}-m{num}.avro"
89+
90+
91+
def _generate_manifest_list_file_name(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
92+
# Mimics the behavior in Java:
93+
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
94+
return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
95+
96+
8797
class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
8898
commit_uuid: uuid.UUID
8999
_io: FileIO
@@ -233,8 +243,14 @@ def _commit(self) -> UpdatesAndRequirements:
233243
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
234244

235245
summary = self._summary(self.snapshot_properties)
236-
file_name = f"{self.commit_uuid}-m{self._snapshot_id}-a0.avro"
237-
manifest_list_file_path = self._transaction._table.metadata_file_location(file_name)
246+
table_location = self._transaction.table_metadata.location
247+
properties = self._transaction.table_metadata.properties
248+
file_name = _generate_manifest_list_file_name(
249+
snapshot_id=self._snapshot_id,
250+
attempt=0,
251+
commit_uuid=self.commit_uuid,
252+
)
253+
manifest_list_file_path = self._transaction._table.metadata_file_location(table_location, file_name, properties)
238254
with write_manifest_list(
239255
format_version=self._transaction.table_metadata.format_version,
240256
output_file=self._io.new_output(manifest_list_file_path),
@@ -280,8 +296,10 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
280296
)
281297

282298
def new_manifest_output(self) -> OutputFile:
283-
file_name = f"{self.commit_uuid}-m{next(self._manifest_num_counter)}.avro"
284-
file_path = self._transaction._table.metadata_file_location(file_name)
299+
table_location = self._transaction.table_metadata.location
300+
properties = self._transaction.table_metadata.properties
301+
file_name = _new_manifest_file_name(num=next(self._manifest_num_counter), commit_uuid=self.commit_uuid)
302+
file_path = self._transaction._table.metadata_file_location(table_location, file_name, properties)
285303
return self._io.new_output(file_path)
286304

287305
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:

tests/catalog/test_base.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
TableAlreadyExistsError,
3636
)
3737
from pyiceberg.io import WAREHOUSE
38+
from pyiceberg.io.pyarrow import schema_to_pyarrow
3839
from pyiceberg.partitioning import PartitionField, PartitionSpec
3940
from pyiceberg.schema import Schema
4041
from pyiceberg.table import (
4142
Table,
43+
TableProperties,
4244
)
4345
from pyiceberg.table.update import (
4446
AddSchemaUpdate,
@@ -563,3 +565,40 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
563565
with pytest.raises(ValidationError) as exc_info:
564566
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
565567
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
568+
569+
570+
def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> None:
571+
metadata_path = f"{catalog._warehouse_location}/custom/path"
572+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
573+
table = catalog.create_table(
574+
identifier=TEST_TABLE_IDENTIFIER,
575+
schema=TEST_TABLE_SCHEMA,
576+
partition_spec=TEST_TABLE_PARTITION_SPEC,
577+
properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
578+
)
579+
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
580+
table.append(df)
581+
manifests = table.current_snapshot().manifests(table.io) # type: ignore
582+
583+
assert table.metadata_file_location(table.location(), "", table.properties).startswith(metadata_path)
584+
assert manifests[0].manifest_path.startswith(metadata_path)
585+
assert table.location() != metadata_path
586+
assert table.metadata_location.startswith(metadata_path)
587+
588+
589+
def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None:
590+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
591+
table = catalog.create_table(
592+
identifier=TEST_TABLE_IDENTIFIER,
593+
schema=TEST_TABLE_SCHEMA,
594+
partition_spec=TEST_TABLE_PARTITION_SPEC,
595+
properties=TEST_TABLE_PROPERTIES,
596+
)
597+
metadata_path = f"{table.location()}/metadata"
598+
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
599+
table.append(df)
600+
manifests = table.current_snapshot().manifests(table.io) # type: ignore
601+
602+
assert table.metadata_file_location(table.location(), "", table.properties).startswith(metadata_path)
603+
assert manifests[0].manifest_path.startswith(metadata_path)
604+
assert table.metadata_location.startswith(metadata_path)

0 commit comments

Comments
 (0)