Skip to content

Commit f0150df

Browse files
committed
Add support for write.data.path
1 parent 4a055d7 commit f0150df

3 files changed

Lines changed: 40 additions & 18 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,19 @@ Iceberg tables support table properties to configure table behavior.
5454

5555
### Write options
5656

57-
| Key | Options | Default | Description |
58-
|------------------------------------------|-----------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
59-
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
60-
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
61-
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
62-
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
63-
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
64-
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
65-
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
66-
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
67-
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
68-
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
57+
| Key | Options | Default | Description |
58+
|------------------------------------------|------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
59+
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
60+
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
61+
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
62+
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
63+
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
64+
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
65+
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
66+
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
67+
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
68+
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
69+
| `write.data.path` | String pointing to location || Sets the location where to write the data. If not set, it will use the table location postfixed with `data/`. |
6970

7071
### Table behavior options
7172

pyiceberg/table/locations.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
logger = logging.getLogger(__name__)
3030

31+
WRITE_DATA_PATH = "write.data.path"
32+
3133

3234
class LocationProvider(ABC):
3335
"""A base class for location providers, that provide data file locations for a table's write tasks.
@@ -40,10 +42,17 @@ class LocationProvider(ABC):
4042
table_location: str
4143
table_properties: Properties
4244

45+
data_path: str
46+
4347
def __init__(self, table_location: str, table_properties: Properties):
4448
self.table_location = table_location
4549
self.table_properties = table_properties
4650

51+
if path := table_properties.get(WRITE_DATA_PATH):
52+
self.data_path = path.rstrip("/")
53+
else:
54+
self.data_path = f"{self.table_location.rstrip('/')}/data"
55+
4756
@abstractmethod
4857
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
4958
"""Return a fully-qualified data file location for the given filename.
@@ -62,8 +71,11 @@ def __init__(self, table_location: str, table_properties: Properties):
6271
super().__init__(table_location, table_properties)
6372

6473
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
65-
prefix = f"{self.table_location}/data"
66-
return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}"
74+
return (
75+
f"{self.data_path}/{partition_key.to_path()}/{data_file_name}"
76+
if partition_key
77+
else f"{self.data_path}/{data_file_name}"
78+
)
6779

6880

6981
class ObjectStoreLocationProvider(LocationProvider):
@@ -85,13 +97,12 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti
8597
if self._include_partition_paths and partition_key:
8698
return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}")
8799

88-
prefix = f"{self.table_location}/data"
89100
hashed_path = self._compute_hash(data_file_name)
90101

91102
return (
92-
f"{prefix}/{hashed_path}/{data_file_name}"
103+
f"{self.data_path}/{hashed_path}/{data_file_name}"
93104
if self._include_partition_paths
94-
else f"{prefix}/{hashed_path}-{data_file_name}"
105+
else f"{self.data_path}/{hashed_path}-{data_file_name}"
95106
)
96107

97108
@staticmethod

tests/table/test_locations.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec
2222
from pyiceberg.schema import Schema
23-
from pyiceberg.table.locations import LocationProvider, load_location_provider
23+
from pyiceberg.table.locations import WRITE_DATA_PATH, LocationProvider, load_location_provider
2424
from pyiceberg.transforms import IdentityTransform
2525
from pyiceberg.typedef import EMPTY_DICT
2626
from pyiceberg.types import NestedField, StringType
@@ -133,3 +133,13 @@ def test_hash_injection(data_file_name: str, expected_hash: str) -> None:
133133
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)
134134

135135
assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}"
136+
137+
138+
def test_write_data_path() -> None:
139+
provider = load_location_provider(
140+
table_location="s3://table-location/table", table_properties={WRITE_DATA_PATH: "s3://table-location/custom/data/path"}
141+
)
142+
143+
assert (
144+
provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/0010/1111/0101/11011101/file.parquet"
145+
)

0 commit comments

Comments
 (0)