-
Notifications
You must be signed in to change notification settings - Fork 519
feat: add dictionary_columns to to_arrow() / to_arrow_batch_reader() for memory-efficient reads #3461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add dictionary_columns to to_arrow() / to_arrow_batch_reader() for memory-efficient reads #3461
Changes from 1 commit
7830bfb
c9a8132
e429e48
33e4c6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1625,8 +1625,12 @@ def _task_to_record_batches( | |
| partition_spec: PartitionSpec | None = None, | ||
| format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, | ||
| downcast_ns_timestamp_to_us: bool | None = None, | ||
| dictionary_columns: frozenset[str] = frozenset(), | ||
| ) -> Iterator[pa.RecordBatch]: | ||
| arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) | ||
| format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8} | ||
| if dictionary_columns and task.file.file_format == FileFormat.PARQUET: | ||
| format_kwargs["dictionary_columns"] = tuple(dictionary_columns) | ||
| arrow_format = _get_file_format(task.file.file_format, **format_kwargs) | ||
| with io.new_input(task.file.file_path).open() as fin: | ||
| fragment = arrow_format.make_fragment(fin) | ||
| physical_schema = fragment.physical_schema | ||
|
|
@@ -1729,6 +1733,7 @@ class ArrowScan: | |
| _case_sensitive: bool | ||
| _limit: int | None | ||
| _downcast_ns_timestamp_to_us: bool | None | ||
| _dictionary_columns: frozenset[str] | ||
| """Scan the Iceberg Table and create an Arrow construct. | ||
|
|
||
| Attributes: | ||
|
|
@@ -1738,6 +1743,7 @@ class ArrowScan: | |
| _bound_row_filter: Schema bound row expression to filter the data with | ||
| _case_sensitive: Case sensitivity when looking up column names | ||
| _limit: Limit the number of records. | ||
| _dictionary_columns: Column names to read as dictionary-encoded arrays. | ||
| """ | ||
|
|
||
| def __init__( | ||
|
|
@@ -1748,6 +1754,8 @@ def __init__( | |
| row_filter: BooleanExpression, | ||
| case_sensitive: bool = True, | ||
| limit: int | None = None, | ||
| *, | ||
| dictionary_columns: tuple[str, ...] = (), | ||
| ) -> None: | ||
| self._table_metadata = table_metadata | ||
| self._io = io | ||
|
|
@@ -1756,6 +1764,7 @@ def __init__( | |
| self._case_sensitive = case_sensitive | ||
| self._limit = limit | ||
| self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) | ||
| self._dictionary_columns = frozenset(dictionary_columns) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not keep it a tuple here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated this to keep as a tuple. |
||
|
|
||
| @property | ||
| def _projected_field_ids(self) -> set[int]: | ||
|
|
@@ -1866,6 +1875,7 @@ def _record_batches_from_scan_tasks_and_deletes( | |
| self._table_metadata.specs().get(task.file.spec_id), | ||
| self._table_metadata.format_version, | ||
| self._downcast_ns_timestamp_to_us, | ||
| self._dictionary_columns, | ||
| ) | ||
| for batch in batches: | ||
| if self._limit is not None: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1219,6 +1219,7 @@ def scan( | |
| snapshot_id: int | None = None, | ||
| options: Properties = EMPTY_DICT, | ||
| limit: int | None = None, | ||
| dictionary_columns: tuple[str, ...] = (), | ||
| ) -> DataScan: | ||
| """Fetch a DataScan based on the table's current metadata. | ||
|
|
||
|
|
@@ -1245,6 +1246,14 @@ def scan( | |
| An integer representing the number of rows to | ||
| return in the scan result. If None, fetches all | ||
| matching rows. | ||
| dictionary_columns: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm hesitant to add Arrow specific things to the public API
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point @Fokko I have moved table.scan(...).to_arrow(dictionary_columns=("payload",))
table.scan(...).to_arrow_batch_reader(dictionary_columns=("payload",))That way it doesn't disturb the general scan interface. |
||
| A tuple of column names that PyArrow should read as | ||
| dictionary-encoded (``pa.DictionaryArray``). Dictionary | ||
| encoding can substantially reduce memory usage for columns | ||
| that contain large or frequently repeated string values | ||
| (e.g. large JSON blobs or low-cardinality categoricals). | ||
| Only applies to Parquet files; silently ignored for ORC. | ||
| Columns absent from the file are silently skipped. | ||
|
|
||
| Returns: | ||
| A DataScan based on the table's current metadata. | ||
|
|
@@ -1260,6 +1269,7 @@ def scan( | |
| limit=limit, | ||
| catalog=self.catalog, | ||
| table_identifier=self._identifier, | ||
| dictionary_columns=dictionary_columns, | ||
| ) | ||
|
|
||
| @property | ||
|
|
@@ -1775,6 +1785,7 @@ def scan( | |
| snapshot_id: int | None = None, | ||
| options: Properties = EMPTY_DICT, | ||
| limit: int | None = None, | ||
| dictionary_columns: tuple[str, ...] = (), | ||
| ) -> DataScan: | ||
| raise ValueError("Cannot scan a staged table") | ||
|
|
||
|
|
@@ -1809,6 +1820,7 @@ class TableScan(ABC): | |
| limit: int | None | ||
| catalog: Catalog | None | ||
| table_identifier: Identifier | None | ||
| dictionary_columns: tuple[str, ...] | ||
|
|
||
| def __init__( | ||
| self, | ||
|
|
@@ -1822,6 +1834,7 @@ def __init__( | |
| limit: int | None = None, | ||
| catalog: Catalog | None = None, | ||
| table_identifier: Identifier | None = None, | ||
| dictionary_columns: tuple[str, ...] = (), | ||
| ): | ||
| self.table_metadata = table_metadata | ||
| self.io = io | ||
|
|
@@ -1833,6 +1846,7 @@ def __init__( | |
| self.limit = limit | ||
| self.catalog = catalog | ||
| self.table_identifier = table_identifier | ||
| self.dictionary_columns = dictionary_columns | ||
|
|
||
| def snapshot(self) -> Snapshot | None: | ||
| if self.snapshot_id: | ||
|
|
@@ -2072,13 +2086,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu | |
| # The lambda created here is run in multiple threads. | ||
| # So we avoid creating _EvaluatorExpression methods bound to a single | ||
| # shared instance across multiple threads. | ||
| return lambda datafile: ( | ||
| residual_evaluator_of( | ||
| spec=spec, | ||
| expr=self.row_filter, | ||
| case_sensitive=self.case_sensitive, | ||
| schema=self.table_metadata.schema(), | ||
| ) | ||
| return lambda datafile: residual_evaluator_of( | ||
| spec=spec, | ||
| expr=self.row_filter, | ||
| case_sensitive=self.case_sensitive, | ||
| schema=self.table_metadata.schema(), | ||
| ) | ||
|
|
||
| @staticmethod | ||
|
|
@@ -2224,7 +2236,13 @@ def to_arrow(self) -> pa.Table: | |
| from pyiceberg.io.pyarrow import ArrowScan | ||
|
|
||
| return ArrowScan( | ||
| self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit | ||
| self.table_metadata, | ||
| self.io, | ||
| self.projection(), | ||
| self.row_filter, | ||
| self.case_sensitive, | ||
| self.limit, | ||
| dictionary_columns=self.dictionary_columns, | ||
| ).to_table(self.plan_files()) | ||
|
|
||
| def to_arrow_batch_reader(self) -> pa.RecordBatchReader: | ||
|
|
@@ -2244,7 +2262,13 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: | |
|
|
||
| target_schema = schema_to_pyarrow(self.projection()) | ||
| batches = ArrowScan( | ||
| self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit | ||
| self.table_metadata, | ||
| self.io, | ||
| self.projection(), | ||
| self.row_filter, | ||
| self.case_sensitive, | ||
| self.limit, | ||
| dictionary_columns=self.dictionary_columns, | ||
| ).to_record_batches(self.plan_files()) | ||
|
|
||
| return pa.RecordBatchReader.from_batches( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we convert the frozenset back to a tuple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko I updated this to keep dictionary_columns as a tuple throughout the Arrow path, so we no longer convert frozenset -> tuple at read time. This is now in commit 33e4c6d.