Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
Expand Down Expand Up @@ -155,7 +155,7 @@ def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str])
PROP_TABLE_TYPE = "table_type"
PROP_METADATA_LOCATION = "metadata_location"
PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT}


def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]:
Expand Down
67 changes: 33 additions & 34 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import WriteTask
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
Expand Down Expand Up @@ -1384,19 +1384,12 @@ class MetricModeTypes(Enum):
FULL = "full"


DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"


@dataclass(frozen=True)
class MetricsMode(Singleton):
type: MetricModeTypes
length: Optional[int] = None


_DEFAULT_METRICS_MODE = MetricsMode(MetricModeTypes.TRUNCATE, DEFAULT_TRUNCATION_LENGTH)


def match_metrics_mode(mode: str) -> MetricsMode:
sanitized_mode = mode.strip().lower()
if sanitized_mode.startswith("truncate"):
Expand Down Expand Up @@ -1430,12 +1423,14 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
_field_id: int = 0
_schema: Schema
_properties: Dict[str, str]
_default_mode: Optional[str]
_default_mode: str

def __init__(self, schema: Schema, properties: Dict[str, str]):
self._schema = schema
self._properties = properties
self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
self._default_mode = self._properties.get(
TableProperties.DEFAULT_WRITE_METRICS_MODE, TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT
)

def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]:
return struct_result()
Expand Down Expand Up @@ -1470,12 +1465,9 @@ def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
if column_name is None:
return []

metrics_mode = _DEFAULT_METRICS_MODE

if self._default_mode:
metrics_mode = match_metrics_mode(self._default_mode)
metrics_mode = match_metrics_mode(self._default_mode)

col_mode = self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
col_mode = self._properties.get(f"{TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX}.{column_name}")
if col_mode:
metrics_mode = match_metrics_mode(col_mode)

Expand Down Expand Up @@ -1762,33 +1754,40 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
return iter([data_file])


def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
def _get_int(key: str, default: Optional[int] = None) -> Optional[int]:
if value := table_properties.get(key):
try:
return int(value)
except ValueError as e:
raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e
else:
return default
ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
PYARROW_UNCOMPRESSED_CODEC = "none"


def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
for key_pattern in [
"write.parquet.row-group-size-bytes",
"write.parquet.page-row-limit",
"write.parquet.bloom-filter-max-bytes",
"write.parquet.bloom-filter-enabled.column.*",
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
TableProperties.PARQUET_PAGE_ROW_LIMIT,
TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
]:
if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented")

compression_codec = table_properties.get("write.parquet.compression-codec", "zstd")
compression_level = _get_int("write.parquet.compression-level")
if compression_codec == "uncompressed":
compression_codec = "none"
compression_codec = table_properties.get(TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT)
compression_level = PropertyUtil.property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_COMPRESSION_LEVEL,
default=TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
)
if compression_codec == ICEBERG_UNCOMPRESSED_CODEC:
compression_codec = PYARROW_UNCOMPRESSED_CODEC

return {
"compression": compression_codec,
"compression_level": compression_level,
"data_page_size": _get_int("write.parquet.page-size-bytes"),
"dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes", default=2 * 1024 * 1024),
"data_page_size": PropertyUtil.property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_PAGE_SIZE_BYTES,
default=TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT,
),
"dictionary_pagesize_limit": PropertyUtil.property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_DICT_SIZE_BYTES,
default=TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT,
),
}
50 changes: 47 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
TableMetadataUtil,
)
from pyiceberg.table.name_mapping import (
SCHEMA_NAME_MAPPING_DEFAULT,
NameMapping,
create_mapping_from_schema,
parse_mapping_from_json,
Expand Down Expand Up @@ -134,6 +133,50 @@
_JAVA_LONG_MAX = 9223372036854775807


class TableProperties:
PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB

PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB

PARQUET_PAGE_ROW_LIMIT = "write.parquet.page-row-limit"
PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20000

PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"
PARQUET_DICT_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024 # 2 MB

PARQUET_COMPRESSION = "write.parquet.compression-codec"
PARQUET_COMPRESSION_DEFAULT = "zstd"

PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level"
PARQUET_COMPRESSION_LEVEL_DEFAULT = None

PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes"
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024

PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column"

DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"

METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column"

DEFAULT_NAME_MAPPING = "schema.name-mapping.default"


class PropertyUtil:
@staticmethod
def property_as_int(properties: Dict[str, str], property_name: str, default: Optional[int] = None) -> Optional[int]:
if value := properties.get(property_name):
try:
return int(value)
except ValueError as e:
raise ValueError(f"Could not parse table property {property_name} to an integer: {value}") from e
else:
return default


class Transaction:
_table: Table
_updates: Tuple[TableUpdate, ...]
Expand Down Expand Up @@ -921,7 +964,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive

def name_mapping(self) -> NameMapping:
"""Return the table's field-id NameMapping."""
if name_mapping_json := self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT):
if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
return parse_mapping_from_json(name_mapping_json)
else:
return create_mapping_from_schema(self.schema())
Expand Down Expand Up @@ -1493,7 +1536,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
visit_with_partner(
Catalog._convert_schema_if_needed(new_schema),
-1,
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
Comment thread
Fokko marked this conversation as resolved.
Outdated
PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
)
return self
Expand Down
2 changes: 0 additions & 2 deletions pyiceberg/table/name_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel
from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType, StructType

SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default"


class MappedField(IcebergBaseModel):
field_id: int = Field(alias="field-id")
Expand Down