Skip to content

Commit 1c9f177

Browse files
committed
Move metadata path generation to the location provider
1 parent 6f88749 commit 1c9f177

File tree

8 files changed

+104
-53
lines changed

8 files changed

+104
-53
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
Table,
5858
TableProperties,
5959
)
60+
from pyiceberg.table.locations import load_location_provider
6061
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
6162
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6263
from pyiceberg.table.update import (
@@ -857,7 +858,8 @@ def _create_staged_table(
857858
database_name, table_name = self.identifier_to_database_and_table(identifier)
858859

859860
location = self._resolve_table_location(location, database_name, table_name)
860-
metadata_location = Table.new_table_metadata_file_location(table_location=location, properties=properties)
861+
provider = load_location_provider(location, properties)
862+
metadata_location = provider.new_table_metadata_file_location()
861863
metadata = new_table_metadata(
862864
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
863865
)
@@ -888,9 +890,8 @@ def _update_and_stage_table(
888890
)
889891

890892
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
891-
new_metadata_location = Table.new_table_metadata_file_location(
892-
updated_metadata.location, new_metadata_version, updated_metadata.properties
893-
)
893+
provider = load_location_provider(updated_metadata.location, updated_metadata.properties)
894+
new_metadata_location = provider.new_table_metadata_file_location(new_metadata_version)
894895

895896
return StagedTable(
896897
identifier=table_identifier,

pyiceberg/catalog/dynamodb.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from pyiceberg.schema import Schema
5555
from pyiceberg.serializers import FromInputFile
5656
from pyiceberg.table import CommitTableResponse, Table
57+
from pyiceberg.table.locations import load_location_provider
5758
from pyiceberg.table.metadata import new_table_metadata
5859
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
5960
from pyiceberg.table.update import (
@@ -173,7 +174,9 @@ def create_table(
173174
database_name, table_name = self.identifier_to_database_and_table(identifier)
174175

175176
location = self._resolve_table_location(location, database_name, table_name)
176-
metadata_location = Table.new_table_metadata_file_location(table_location=location, properties=properties)
177+
provider = load_location_provider(table_location=location, table_properties=properties)
178+
metadata_location = provider.new_table_metadata_file_location()
179+
177180
metadata = new_table_metadata(
178181
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
179182
)

pyiceberg/catalog/sql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
from pyiceberg.schema import Schema
6363
from pyiceberg.serializers import FromInputFile
6464
from pyiceberg.table import CommitTableResponse, Table
65+
from pyiceberg.table.locations import load_location_provider
6566
from pyiceberg.table.metadata import new_table_metadata
6667
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6768
from pyiceberg.table.update import (
@@ -207,7 +208,8 @@ def create_table(
207208

208209
namespace = Catalog.namespace_to_string(namespace_identifier)
209210
location = self._resolve_table_location(location, namespace, table_name)
210-
metadata_location = Table.new_table_metadata_file_location(table_location=location, properties=properties)
211+
location_provider = load_location_provider(table_location=location, table_properties=properties)
212+
metadata_location = location_provider.new_table_metadata_file_location()
211213
metadata = new_table_metadata(
212214
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
213215
)

pyiceberg/table/__init__.py

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
)
8080
from pyiceberg.schema import Schema
8181
from pyiceberg.table.inspect import InspectTable
82+
from pyiceberg.table.locations import LocationProvider, load_location_provider
8283
from pyiceberg.table.metadata import (
8384
INITIAL_SEQUENCE_NUMBER,
8485
TableMetadata,
@@ -1001,6 +1002,10 @@ def location(self) -> str:
10011002
"""Return the table's base location."""
10021003
return self.metadata.location
10031004

1005+
def location_provider(self) -> LocationProvider:
1006+
"""Return the table's location provider."""
1007+
return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties)
1008+
10041009
@property
10051010
def last_sequence_number(self) -> int:
10061011
return self.metadata.last_sequence_number
@@ -1237,44 +1242,6 @@ def to_polars(self) -> pl.LazyFrame:
12371242

12381243
return pl.scan_iceberg(self)
12391244

1240-
@staticmethod
1241-
def new_table_metadata_file_location(table_location: str, new_version: int = 0, properties: Properties = EMPTY_DICT) -> str:
1242-
"""Return a fully-qualified metadata file location for a new table version.
1243-
1244-
Args:
1245-
table_location (str): the base table location.
1246-
new_version (int): Version number of the metadata file.
1247-
properties (Properties): Table properties that may contain a custom metadata path.
1248-
1249-
Returns:
1250-
str: fully-qualified URI for the new table metadata file.
1251-
1252-
Raises:
1253-
ValueError: If the version is negative.
1254-
"""
1255-
if new_version < 0:
1256-
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
1257-
1258-
file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
1259-
return Table.new_metadata_location(table_location, file_name, properties)
1260-
1261-
@staticmethod
1262-
def new_metadata_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
1263-
"""Return a fully-qualified metadata file location for the given filename.
1264-
1265-
Args:
1266-
table_location (str): The base table location
1267-
file_name (str): Name of the metadata file
1268-
properties (Properties): Table properties that may contain a custom metadata path
1269-
1270-
Returns:
1271-
str: A fully-qualified location URI for the metadata file.
1272-
"""
1273-
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
1274-
return f"{metadata_path.rstrip('/')}/{file_name}"
1275-
1276-
return f"{table_location}/metadata/{file_name}"
1277-
12781245

12791246
class StaticTable(Table):
12801247
"""Load a table directly from a metadata file (i.e., without using a catalog)."""

pyiceberg/table/locations.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
import importlib
1818
import logging
19+
import uuid
1920
from abc import ABC, abstractmethod
2021
from typing import Optional
2122

@@ -29,7 +30,7 @@
2930

3031

3132
class LocationProvider(ABC):
32-
"""A base class for location providers, that provide data file locations for a table's write tasks.
33+
"""A base class for location providers, that provide file locations for a table's write tasks.
3334
3435
Args:
3536
table_location (str): The table's base storage location.
@@ -40,6 +41,7 @@ class LocationProvider(ABC):
4041
table_properties: Properties
4142

4243
data_path: str
44+
metadata_path: str
4345

4446
def __init__(self, table_location: str, table_properties: Properties):
4547
self.table_location = table_location
@@ -52,6 +54,11 @@ def __init__(self, table_location: str, table_properties: Properties):
5254
else:
5355
self.data_path = f"{self.table_location.rstrip('/')}/data"
5456

57+
if path := table_properties.get(TableProperties.WRITE_METADATA_PATH):
58+
self.metadata_path = path.rstrip("/")
59+
else:
60+
self.metadata_path = f"{self.table_location.rstrip('/')}/metadata"
61+
5562
@abstractmethod
5663
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
5764
"""Return a fully-qualified data file location for the given filename.
@@ -64,6 +71,35 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti
6471
str: A fully-qualified location URI for the data file.
6572
"""
6673

74+
def new_table_metadata_file_location(self, new_version: int = 0) -> str:
75+
"""Return a fully-qualified metadata file location for a new table version.
76+
77+
Args:
78+
new_version (int): Version number of the metadata file.
79+
80+
Returns:
81+
str: fully-qualified URI for the new table metadata file.
82+
83+
Raises:
84+
ValueError: If the version is negative.
85+
"""
86+
if new_version < 0:
87+
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
88+
89+
file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
90+
return self.new_metadata_location(file_name)
91+
92+
def new_metadata_location(self, metadata_file_name: str) -> str:
93+
"""Return a fully-qualified metadata file location for the given filename.
94+
95+
Args:
96+
metadata_file_name (str): Name of the metadata file.
97+
98+
Returns:
99+
str: A fully-qualified location URI for the metadata file.
100+
"""
101+
return f"{self.metadata_path}/{metadata_file_name}"
102+
67103

68104
class SimpleLocationProvider(LocationProvider):
69105
def __init__(self, table_location: str, table_properties: Properties):

pyiceberg/table/update/snapshot.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,14 +243,13 @@ def _commit(self) -> UpdatesAndRequirements:
243243
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
244244

245245
summary = self._summary(self.snapshot_properties)
246-
table_location = self._transaction.table_metadata.location
247-
properties = self._transaction.table_metadata.properties
248246
file_name = _new_manifest_list_file_name(
249247
snapshot_id=self._snapshot_id,
250248
attempt=0,
251249
commit_uuid=self.commit_uuid,
252250
)
253-
manifest_list_file_path = self._transaction._table.new_metadata_location(table_location, file_name, properties)
251+
location_provider = self._transaction._table.location_provider()
252+
manifest_list_file_path = location_provider.new_metadata_location(file_name)
254253
with write_manifest_list(
255254
format_version=self._transaction.table_metadata.format_version,
256255
output_file=self._io.new_output(manifest_list_file_path),
@@ -296,10 +295,9 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
296295
)
297296

298297
def new_manifest_output(self) -> OutputFile:
299-
table_location = self._transaction.table_metadata.location
300-
properties = self._transaction.table_metadata.properties
298+
location_provider = self._transaction._table.location_provider()
301299
file_name = _new_manifest_file_name(num=next(self._manifest_num_counter), commit_uuid=self.commit_uuid)
302-
file_path = self._transaction._table.new_metadata_location(table_location, file_name, properties)
300+
file_path = location_provider.new_metadata_location(file_name)
303301
return self._io.new_output(file_path)
304302

305303
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:

tests/catalog/test_base.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -579,8 +579,9 @@ def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> N
579579
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
580580
table.append(df)
581581
manifests = table.current_snapshot().manifests(table.io) # type: ignore
582+
location_provider = table.location_provider()
582583

583-
assert table.new_metadata_location(table.location(), "", table.properties).startswith(metadata_path)
584+
assert location_provider.new_metadata_location("").startswith(metadata_path)
584585
assert manifests[0].manifest_path.startswith(metadata_path)
585586
assert table.location() != metadata_path
586587
assert table.metadata_location.startswith(metadata_path)
@@ -598,7 +599,26 @@ def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None
598599
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
599600
table.append(df)
600601
manifests = table.current_snapshot().manifests(table.io) # type: ignore
602+
location_provider = table.location_provider()
601603

602-
assert table.new_metadata_location(table.location(), "", table.properties).startswith(metadata_path)
604+
assert location_provider.new_metadata_location("").startswith(metadata_path)
603605
assert manifests[0].manifest_path.startswith(metadata_path)
604606
assert table.metadata_location.startswith(metadata_path)
607+
608+
609+
def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) -> None:
610+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
611+
table = catalog.create_table(
612+
identifier=TEST_TABLE_IDENTIFIER,
613+
schema=TEST_TABLE_SCHEMA,
614+
partition_spec=TEST_TABLE_PARTITION_SPEC,
615+
)
616+
617+
initial_metadata_path = f"{table.location()}/metadata"
618+
assert table.location_provider().new_metadata_location("metadata.json") == f"{initial_metadata_path}/metadata.json"
619+
620+
# update table with new path for metadata
621+
new_metadata_path = f"{table.location()}/custom/path"
622+
table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction()
623+
624+
assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json"

tests/table/test_locations.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,27 @@ def test_simple_location_provider_write_data_path() -> None:
157157
)
158158

159159
assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet"
160+
161+
162+
def test_location_provider_metadata_default_location() -> None:
163+
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)
164+
165+
assert provider.new_metadata_location("manifest.avro") == "table_location/metadata/manifest.avro"
166+
167+
168+
def test_location_provider_metadata_location_with_custom_path() -> None:
169+
provider = load_location_provider(
170+
table_location="table_location",
171+
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path"},
172+
)
173+
174+
assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"
175+
176+
177+
def test_metadata_location_with_trailing_slash() -> None:
178+
provider = load_location_provider(
179+
table_location="table_location",
180+
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path/"},
181+
)
182+
183+
assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"

0 commit comments

Comments
 (0)