Skip to content

Commit 0e85068

Browse files
committed
Add support for write.metadata.path
1 parent 7808121 commit 0e85068

File tree

7 files changed

+90
-46
lines changed

7 files changed

+90
-46
lines changed

mkdocs/docs/configuration.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,20 @@ Iceberg tables support table properties to configure table behavior.
5454

5555
### Write options
5656

57-
| Key | Options | Default | Description |
58-
|------------------------------------------|------------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
59-
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
60-
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
61-
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
62-
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
63-
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
64-
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
65-
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
66-
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
67-
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
68-
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
69-
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
57+
| Key | Options | Default | Description |
58+
|------------------------------------------|------------------------------------|--------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
59+
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
60+
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
61+
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
62+
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
63+
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
64+
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
65+
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
66+
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
67+
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
68+
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
69+
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
70+
| `write.metadata.path` | String pointing to location | `{metadata.location}/metadata` | Sets the location under which metadata is written. |
7071

7172
### Table behavior options
7273

pyiceberg/catalog/__init__.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
CreateTableTransaction,
5656
StagedTable,
5757
Table,
58+
TableProperties,
5859
)
5960
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
6061
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -746,6 +747,24 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
746747
pass
747748
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")
748749

750+
@staticmethod
751+
def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
752+
"""Get the full path for a metadata file.
753+
754+
Args:
755+
table_location (str): The base table location
756+
file_name (str): Name of the metadata file
757+
properties (Properties): Table properties that may contain custom metadata path
758+
759+
Returns:
760+
str: Full path where the metadata file should be stored
761+
"""
762+
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
763+
base_path = metadata_path.rstrip("/")
764+
else:
765+
base_path = f"{table_location}/metadata"
766+
return f"{base_path}/{file_name}"
767+
749768
def __repr__(self) -> str:
750769
"""Return the string representation of the Catalog class."""
751770
return f"{self.name} ({self.__class__})"
@@ -829,7 +848,7 @@ def _create_staged_table(
829848
database_name, table_name = self.identifier_to_database_and_table(identifier)
830849

831850
location = self._resolve_table_location(location, database_name, table_name)
832-
metadata_location = self._get_metadata_location(location=location)
851+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
833852
metadata = new_table_metadata(
834853
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
835854
)
@@ -860,7 +879,9 @@ def _update_and_stage_table(
860879
)
861880

862881
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
863-
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
882+
new_metadata_location = self._get_metadata_location(
883+
updated_metadata.location, new_metadata_version, updated_metadata.properties
884+
)
864885

865886
return StagedTable(
866887
identifier=table_identifier,
@@ -918,11 +939,12 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) ->
918939
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
919940

920941
@staticmethod
921-
def _get_metadata_location(location: str, new_version: int = 0) -> str:
942+
def _get_metadata_location(table_location: str, new_version: int = 0, properties: Properties = EMPTY_DICT) -> str:
922943
if new_version < 0:
923944
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
924-
version_str = f"{new_version:05d}"
925-
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"
945+
946+
file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
947+
return Catalog.metadata_file_location(table_location, file_name, properties)
926948

927949
@staticmethod
928950
def _parse_metadata_version(metadata_location: str) -> int:

pyiceberg/catalog/dynamodb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def create_table(
173173
database_name, table_name = self.identifier_to_database_and_table(identifier)
174174

175175
location = self._resolve_table_location(location, database_name, table_name)
176-
metadata_location = self._get_metadata_location(location=location)
176+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
177177
metadata = new_table_metadata(
178178
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
179179
)

pyiceberg/catalog/sql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def create_table(
207207

208208
namespace = Catalog.namespace_to_string(namespace_identifier)
209209
location = self._resolve_table_location(location, namespace, table_name)
210-
metadata_location = self._get_metadata_location(location=location)
210+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
211211
metadata = new_table_metadata(
212212
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
213213
)

pyiceberg/table/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ class TableProperties:
197197
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True
198198

199199
WRITE_DATA_PATH = "write.data.path"
200+
WRITE_METADATA_PATH = "write.metadata.path"
200201

201202
DELETE_MODE = "write.delete.mode"
202203
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
@@ -1210,6 +1211,10 @@ def to_daft(self) -> daft.DataFrame:
12101211

12111212
return daft.read_iceberg(self)
12121213

1214+
def metadata_file_location(self, file_name: str) -> str:
1215+
"""Get the metadata file location using write.metadata.path from properties if set."""
1216+
return self.catalog.metadata_file_location(self.metadata.location, file_name, self.metadata.properties)
1217+
12131218

12141219
class StaticTable(Table):
12151220
"""Load a table directly from a metadata file (i.e., without using a catalog)."""

pyiceberg/table/update/snapshot.py

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -84,16 +84,6 @@
8484
from pyiceberg.table import Transaction
8585

8686

87-
def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
88-
return f"{location}/metadata/{commit_uuid}-m{num}.avro"
89-
90-
91-
def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
92-
# Mimics the behavior in Java:
93-
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
94-
return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
95-
96-
9787
class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
9888
commit_uuid: uuid.UUID
9989
_io: FileIO
@@ -243,13 +233,8 @@ def _commit(self) -> UpdatesAndRequirements:
243233
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
244234

245235
summary = self._summary(self.snapshot_properties)
246-
247-
manifest_list_file_path = _generate_manifest_list_path(
248-
location=self._transaction.table_metadata.location,
249-
snapshot_id=self._snapshot_id,
250-
attempt=0,
251-
commit_uuid=self.commit_uuid,
252-
)
236+
file_name = f"{self.commit_uuid}-m{self._snapshot_id}-a0.avro"
237+
manifest_list_file_path = self._transaction._table.metadata_file_location(file_name)
253238
with write_manifest_list(
254239
format_version=self._transaction.table_metadata.format_version,
255240
output_file=self._io.new_output(manifest_list_file_path),
@@ -295,13 +280,9 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
295280
)
296281

297282
def new_manifest_output(self) -> OutputFile:
298-
return self._io.new_output(
299-
_new_manifest_path(
300-
location=self._transaction.table_metadata.location,
301-
num=next(self._manifest_num_counter),
302-
commit_uuid=self.commit_uuid,
303-
)
304-
)
283+
file_name = f"{self.commit_uuid}-m{next(self._manifest_num_counter)}.avro"
284+
file_path = self._transaction._table.metadata_file_location(file_name)
285+
return self._io.new_output(file_path)
305286

306287
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
307288
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)

tests/catalog/test_base.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
TableAlreadyExistsError,
4343
)
4444
from pyiceberg.io import WAREHOUSE, load_file_io
45+
from pyiceberg.io.pyarrow import schema_to_pyarrow
4546
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
4647
from pyiceberg.schema import Schema
4748
from pyiceberg.table import (
@@ -105,7 +106,7 @@ def create_table(
105106
location = f"{self._warehouse_location}/{'/'.join(identifier)}"
106107
location = location.rstrip("/")
107108

108-
metadata_location = self._get_metadata_location(location=location)
109+
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
109110
metadata = new_table_metadata(
110111
schema=schema,
111112
partition_spec=partition_spec,
@@ -147,7 +148,9 @@ def commit_table(
147148

148149
# write new metadata
149150
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
150-
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
151+
new_metadata_location = self._get_metadata_location(
152+
current_table.metadata.location, new_metadata_version, updated_metadata.properties
153+
)
151154
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
152155

153156
# update table state
@@ -769,3 +772,35 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
769772
with pytest.raises(ValidationError) as exc_info:
770773
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
771774
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
775+
776+
777+
def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> None:
778+
new_location = f"{catalog._warehouse_location}/custom/path"
779+
table = catalog.create_table(
780+
identifier=TEST_TABLE_IDENTIFIER,
781+
schema=TEST_TABLE_SCHEMA,
782+
partition_spec=TEST_TABLE_PARTITION_SPEC,
783+
properties={"write.metadata.path": new_location},
784+
)
785+
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
786+
table.append(df)
787+
manifests = table.current_snapshot().manifests(table.io) # type: ignore
788+
789+
assert manifests[0].manifest_path.startswith(new_location)
790+
assert table.location() != new_location
791+
assert table.metadata_location.startswith(new_location)
792+
793+
794+
def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None:
795+
table = catalog.create_table(
796+
identifier=TEST_TABLE_IDENTIFIER,
797+
schema=TEST_TABLE_SCHEMA,
798+
partition_spec=TEST_TABLE_PARTITION_SPEC,
799+
properties=TEST_TABLE_PROPERTIES,
800+
)
801+
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
802+
table.append(df)
803+
manifests = table.current_snapshot().manifests(table.io) # type: ignore
804+
805+
assert "/metadata" in manifests[0].manifest_path
806+
assert "/metadata" in table.metadata_location

0 commit comments

Comments
 (0)