Skip to content

Commit 39928c6

Browse files
committed
Add support for write.metadata.path
1 parent 38d57ea commit 39928c6

6 files changed

Lines changed: 39 additions & 31 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ Iceberg tables support table properties to configure table behavior.
6868
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
6969
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
7070
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
71+
| `write.metadata.path` | String pointing to location | `{metadata.location}/metadata` | Sets the location under which metadata is written. |
7172

7273
### Table behavior options
7374

pyiceberg/catalog/__init__.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,24 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
759759
pass
760760
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")
761761

762+
@staticmethod
763+
def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
764+
"""Get the full path for a metadata file.
765+
766+
Args:
767+
table_location (str): The base table location
768+
file_name (str): Name of the metadata file
769+
properties (Properties): Table properties that may contain custom metadata path
770+
771+
Returns:
772+
str: Full path where the metadata file should be stored
773+
"""
774+
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
775+
base_path = metadata_path.rstrip("/")
776+
else:
777+
base_path = f"{table_location}/metadata"
778+
return f"{base_path}/{file_name}"
779+
762780
@staticmethod
763781
def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None:
764782
"""Delete oldest metadata if config is set to true."""
@@ -857,7 +875,7 @@ def _create_staged_table(
857875
database_name, table_name = self.identifier_to_database_and_table(identifier)
858876

859877
location = self._resolve_table_location(location, database_name, table_name)
860-
metadata_location = self._get_metadata_location(location=location)
878+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
861879
metadata = new_table_metadata(
862880
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
863881
)
@@ -888,7 +906,9 @@ def _update_and_stage_table(
888906
)
889907

890908
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
891-
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
909+
new_metadata_location = self._get_metadata_location(
910+
updated_metadata.location, new_metadata_version, updated_metadata.properties
911+
)
892912

893913
return StagedTable(
894914
identifier=table_identifier,
@@ -946,11 +966,12 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) ->
946966
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
947967

948968
@staticmethod
949-
def _get_metadata_location(location: str, new_version: int = 0) -> str:
969+
def _get_metadata_location(table_location: str, new_version: int = 0, properties: Properties = EMPTY_DICT) -> str:
950970
if new_version < 0:
951971
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
952-
version_str = f"{new_version:05d}"
953-
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"
972+
973+
file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
974+
return Catalog.metadata_file_location(table_location, file_name, properties)
954975

955976
@staticmethod
956977
def _parse_metadata_version(metadata_location: str) -> int:

pyiceberg/catalog/dynamodb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def create_table(
173173
database_name, table_name = self.identifier_to_database_and_table(identifier)
174174

175175
location = self._resolve_table_location(location, database_name, table_name)
176-
metadata_location = self._get_metadata_location(location=location)
176+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
177177
metadata = new_table_metadata(
178178
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
179179
)

pyiceberg/catalog/sql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def create_table(
207207

208208
namespace = Catalog.namespace_to_string(namespace_identifier)
209209
location = self._resolve_table_location(location, namespace, table_name)
210-
metadata_location = self._get_metadata_location(location=location)
210+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
211211
metadata = new_table_metadata(
212212
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
213213
)

pyiceberg/table/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ class TableProperties:
200200
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True
201201

202202
WRITE_DATA_PATH = "write.data.path"
203+
WRITE_METADATA_PATH = "write.metadata.path"
203204

204205
DELETE_MODE = "write.delete.mode"
205206
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
@@ -1236,6 +1237,10 @@ def to_polars(self) -> pl.LazyFrame:
12361237

12371238
return pl.scan_iceberg(self)
12381239

1240+
def metadata_file_location(self, file_name: str) -> str:
1241+
"""Get the metadata file location using write.metadata.path from properties if set."""
1242+
return self.catalog.metadata_file_location(self.metadata.location, file_name, self.metadata.properties)
1243+
12391244

12401245
class StaticTable(Table):
12411246
"""Load a table directly from a metadata file (i.e., without using a catalog)."""

pyiceberg/table/update/snapshot.py

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -84,16 +84,6 @@
8484
from pyiceberg.table import Transaction
8585

8686

87-
def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
88-
return f"{location}/metadata/{commit_uuid}-m{num}.avro"
89-
90-
91-
def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
92-
# Mimics the behavior in Java:
93-
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
94-
return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
95-
96-
9787
class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
9888
commit_uuid: uuid.UUID
9989
_io: FileIO
@@ -243,13 +233,8 @@ def _commit(self) -> UpdatesAndRequirements:
243233
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
244234

245235
summary = self._summary(self.snapshot_properties)
246-
247-
manifest_list_file_path = _generate_manifest_list_path(
248-
location=self._transaction.table_metadata.location,
249-
snapshot_id=self._snapshot_id,
250-
attempt=0,
251-
commit_uuid=self.commit_uuid,
252-
)
236+
file_name = f"{self.commit_uuid}-m{self._snapshot_id}-a0.avro"
237+
manifest_list_file_path = self._transaction._table.metadata_file_location(file_name)
253238
with write_manifest_list(
254239
format_version=self._transaction.table_metadata.format_version,
255240
output_file=self._io.new_output(manifest_list_file_path),
@@ -295,13 +280,9 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
295280
)
296281

297282
def new_manifest_output(self) -> OutputFile:
298-
return self._io.new_output(
299-
_new_manifest_path(
300-
location=self._transaction.table_metadata.location,
301-
num=next(self._manifest_num_counter),
302-
commit_uuid=self.commit_uuid,
303-
)
304-
)
283+
file_name = f"{self.commit_uuid}-m{next(self._manifest_num_counter)}.avro"
284+
file_path = self._transaction._table.metadata_file_location(file_name)
285+
return self._io.new_output(file_path)
305286

306287
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
307288
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)

0 commit comments

Comments
 (0)