Skip to content

Commit 27fabd2

Browse files
committed
Add support for write.metadata.path
1 parent 509713b commit 27fabd2

File tree

6 files changed

+53
-44
lines changed

6 files changed

+53
-44
lines changed

mkdocs/docs/configuration.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,20 @@ Iceberg tables support table properties to configure table behavior.
5454

5555
### Write options
5656

57-
| Key | Options | Default | Description |
58-
|------------------------------------------|------------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
59-
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
60-
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
61-
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
62-
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
63-
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
64-
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
65-
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
66-
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
67-
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
68-
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
69-
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
57+
| Key | Options | Default | Description |
58+
|------------------------------------------|------------------------------------|--------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
59+
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
60+
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
61+
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
62+
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
63+
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
64+
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
65+
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
66+
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
67+
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
68+
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
69+
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
70+
| `write.metadata.path` | String pointing to location | `{metadata.location}/metadata` | Sets the location under which metadata is written. |
7071

7172
### Table behavior options
7273

pyiceberg/catalog/__init__.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
CreateTableTransaction,
5656
StagedTable,
5757
Table,
58+
TableProperties,
5859
)
5960
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
6061
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -757,6 +758,24 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
757758
pass
758759
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")
759760

761+
@staticmethod
762+
def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
763+
"""Get the full path for a metadata file.
764+
765+
Args:
766+
table_location (str): The base table location
767+
file_name (str): Name of the metadata file
768+
properties (Properties): Table properties that may contain custom metadata path
769+
770+
Returns:
771+
str: Full path where the metadata file should be stored
772+
"""
773+
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
774+
base_path = metadata_path.rstrip("/")
775+
else:
776+
base_path = f"{table_location}/metadata"
777+
return f"{base_path}/{file_name}"
778+
760779
def __repr__(self) -> str:
761780
"""Return the string representation of the Catalog class."""
762781
return f"{self.name} ({self.__class__})"
@@ -840,7 +859,7 @@ def _create_staged_table(
840859
database_name, table_name = self.identifier_to_database_and_table(identifier)
841860

842861
location = self._resolve_table_location(location, database_name, table_name)
843-
metadata_location = self._get_metadata_location(location=location)
862+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
844863
metadata = new_table_metadata(
845864
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
846865
)
@@ -871,7 +890,9 @@ def _update_and_stage_table(
871890
)
872891

873892
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
874-
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
893+
new_metadata_location = self._get_metadata_location(
894+
updated_metadata.location, new_metadata_version, updated_metadata.properties
895+
)
875896

876897
return StagedTable(
877898
identifier=table_identifier,
@@ -929,11 +950,12 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) ->
929950
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
930951

931952
@staticmethod
932-
def _get_metadata_location(location: str, new_version: int = 0) -> str:
953+
def _get_metadata_location(table_location: str, new_version: int = 0, properties: Properties = EMPTY_DICT) -> str:
933954
if new_version < 0:
934955
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
935-
version_str = f"{new_version:05d}"
936-
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"
956+
957+
file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
958+
return Catalog.metadata_file_location(table_location, file_name, properties)
937959

938960
@staticmethod
939961
def _parse_metadata_version(metadata_location: str) -> int:

pyiceberg/catalog/dynamodb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def create_table(
173173
database_name, table_name = self.identifier_to_database_and_table(identifier)
174174

175175
location = self._resolve_table_location(location, database_name, table_name)
176-
metadata_location = self._get_metadata_location(location=location)
176+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
177177
metadata = new_table_metadata(
178178
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
179179
)

pyiceberg/catalog/sql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def create_table(
207207

208208
namespace = Catalog.namespace_to_string(namespace_identifier)
209209
location = self._resolve_table_location(location, namespace, table_name)
210-
metadata_location = self._get_metadata_location(location=location)
210+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
211211
metadata = new_table_metadata(
212212
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
213213
)

pyiceberg/table/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class TableProperties:
199199
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True
200200

201201
WRITE_DATA_PATH = "write.data.path"
202+
WRITE_METADATA_PATH = "write.metadata.path"
202203

203204
DELETE_MODE = "write.delete.mode"
204205
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
@@ -1212,6 +1213,10 @@ def to_daft(self) -> daft.DataFrame:
12121213

12131214
return daft.read_iceberg(self)
12141215

1216+
def metadata_file_location(self, file_name: str) -> str:
1217+
"""Get the metadata file location using write.metadata.path from properties if set."""
1218+
return self.catalog.metadata_file_location(self.metadata.location, file_name, self.metadata.properties)
1219+
12151220

12161221
class StaticTable(Table):
12171222
"""Load a table directly from a metadata file (i.e., without using a catalog)."""

pyiceberg/table/update/snapshot.py

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -84,16 +84,6 @@
8484
from pyiceberg.table import Transaction
8585

8686

87-
def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
88-
return f"{location}/metadata/{commit_uuid}-m{num}.avro"
89-
90-
91-
def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
92-
# Mimics the behavior in Java:
93-
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
94-
return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
95-
96-
9787
class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
9888
commit_uuid: uuid.UUID
9989
_io: FileIO
@@ -243,13 +233,8 @@ def _commit(self) -> UpdatesAndRequirements:
243233
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
244234

245235
summary = self._summary(self.snapshot_properties)
246-
247-
manifest_list_file_path = _generate_manifest_list_path(
248-
location=self._transaction.table_metadata.location,
249-
snapshot_id=self._snapshot_id,
250-
attempt=0,
251-
commit_uuid=self.commit_uuid,
252-
)
236+
file_name = f"{self.commit_uuid}-m{self._snapshot_id}-a0.avro"
237+
manifest_list_file_path = self._transaction._table.metadata_file_location(file_name)
253238
with write_manifest_list(
254239
format_version=self._transaction.table_metadata.format_version,
255240
output_file=self._io.new_output(manifest_list_file_path),
@@ -295,13 +280,9 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
295280
)
296281

297282
def new_manifest_output(self) -> OutputFile:
298-
return self._io.new_output(
299-
_new_manifest_path(
300-
location=self._transaction.table_metadata.location,
301-
num=next(self._manifest_num_counter),
302-
commit_uuid=self.commit_uuid,
303-
)
304-
)
283+
file_name = f"{self.commit_uuid}-m{next(self._manifest_num_counter)}.avro"
284+
file_path = self._transaction._table.metadata_file_location(file_name)
285+
return self._io.new_output(file_path)
305286

306287
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
307288
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)

0 commit comments

Comments
 (0)