Skip to content

Commit 6da06ad

Browse files
GayathriSrividyaGayathri Srividya Rajavarapuclaude
authored
feat: add dictionary_columns to Arrow scans (#3461)
Closes #3170 ## Rationale Columns that contain large or frequently repeated string values (e.g. JSON blobs, low-cardinality categoricals) can exhaust memory when PyArrow loads them as plain string arrays. PyArrow's Parquet reader natively supports dictionary-encoded reads via its `dictionary_columns` kwarg, which deduplicates values and can dramatically reduce peak memory usage. This was previously discussed in #3168 and a prior implementation (#3234) was closed as stale. ## Changes - Added `dictionary_columns: tuple[str, ...] = ()` to `Table.scan()`, `TableScan.__init__`, and `StagedTable.scan()`. - Forwarded through `DataScan.to_arrow()` and `to_arrow_batch_reader()` → `ArrowScan.__init__` → `_task_to_record_batches` → `_get_file_format()`. - Only applied when `task.file.file_format == FileFormat.PARQUET`; silently ignored for ORC (which does not support this kwarg). ## Usage ```python # Read the "payload" column as dictionary-encoded to save memory df = table.scan(dictionary_columns=("payload",)).to_arrow() ``` ## Verification - Added `test_dictionary_columns_produces_dict_encoded_output` — confirms the requested column is dict-encoded, non-requested columns are plain, and values are identical. - `make lint` ✓ - `pytest tests/table/ tests/io/test_pyarrow.py` ✓ --------- Co-authored-by: Gayathri Srividya Rajavarapu <gayathrir@Gayathris-MacBook-Air.local> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent a1e12ad commit 6da06ad

3 files changed

Lines changed: 118 additions & 12 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1625,8 +1625,12 @@ def _task_to_record_batches(
16251625
partition_spec: PartitionSpec | None = None,
16261626
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
16271627
downcast_ns_timestamp_to_us: bool | None = None,
1628+
dictionary_columns: tuple[str, ...] = (),
16281629
) -> Iterator[pa.RecordBatch]:
1629-
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1630+
format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8}
1631+
if dictionary_columns and task.file.file_format == FileFormat.PARQUET:
1632+
format_kwargs["dictionary_columns"] = dictionary_columns
1633+
arrow_format = _get_file_format(task.file.file_format, **format_kwargs)
16301634
with io.new_input(task.file.file_path).open() as fin:
16311635
fragment = arrow_format.make_fragment(fin)
16321636
physical_schema = fragment.physical_schema
@@ -1729,6 +1733,7 @@ class ArrowScan:
17291733
_case_sensitive: bool
17301734
_limit: int | None
17311735
_downcast_ns_timestamp_to_us: bool | None
1736+
_dictionary_columns: tuple[str, ...]
17321737
"""Scan the Iceberg Table and create an Arrow construct.
17331738
17341739
Attributes:
@@ -1738,6 +1743,7 @@ class ArrowScan:
17381743
_bound_row_filter: Schema bound row expression to filter the data with
17391744
_case_sensitive: Case sensitivity when looking up column names
17401745
_limit: Limit the number of records.
1746+
_dictionary_columns: Column names to read as dictionary-encoded arrays.
17411747
"""
17421748

17431749
def __init__(
@@ -1748,6 +1754,8 @@ def __init__(
17481754
row_filter: BooleanExpression,
17491755
case_sensitive: bool = True,
17501756
limit: int | None = None,
1757+
*,
1758+
dictionary_columns: tuple[str, ...] = (),
17511759
) -> None:
17521760
self._table_metadata = table_metadata
17531761
self._io = io
@@ -1756,6 +1764,7 @@ def __init__(
17561764
self._case_sensitive = case_sensitive
17571765
self._limit = limit
17581766
self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
1767+
self._dictionary_columns = dictionary_columns
17591768

17601769
@property
17611770
def _projected_field_ids(self) -> set[int]:
@@ -1866,6 +1875,7 @@ def _record_batches_from_scan_tasks_and_deletes(
18661875
self._table_metadata.specs().get(task.file.spec_id),
18671876
self._table_metadata.format_version,
18681877
self._downcast_ns_timestamp_to_us,
1878+
self._dictionary_columns,
18691879
)
18701880
for batch in batches:
18711881
if self._limit is not None:

pyiceberg/table/__init__.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2072,13 +2072,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
20722072
# The lambda created here is run in multiple threads.
20732073
# So we avoid creating _EvaluatorExpression methods bound to a single
20742074
# shared instance across multiple threads.
2075-
return lambda datafile: (
2076-
residual_evaluator_of(
2077-
spec=spec,
2078-
expr=self.row_filter,
2079-
case_sensitive=self.case_sensitive,
2080-
schema=self.table_metadata.schema(),
2081-
)
2075+
return lambda datafile: residual_evaluator_of(
2076+
spec=spec,
2077+
expr=self.row_filter,
2078+
case_sensitive=self.case_sensitive,
2079+
schema=self.table_metadata.schema(),
20822080
)
20832081

20842082
@staticmethod
@@ -2213,27 +2211,49 @@ def plan_files(self) -> Iterable[FileScanTask]:
22132211
return self._plan_files_server_side()
22142212
return self._plan_files_local()
22152213

2216-
def to_arrow(self) -> pa.Table:
2214+
def to_arrow(self, dictionary_columns: tuple[str, ...] = ()) -> pa.Table:
22172215
"""Read an Arrow table eagerly from this DataScan.
22182216
22192217
All rows will be loaded into memory at once.
22202218
2219+
Args:
2220+
dictionary_columns:
2221+
A tuple of column names that PyArrow should read as
2222+
dictionary-encoded (``pa.DictionaryArray``). Dictionary
2223+
encoding can substantially reduce memory usage for columns
2224+
with low-cardinality repeated string values.
2225+
Only applies to Parquet files; silently ignored for ORC.
2226+
22212227
Returns:
22222228
pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
22232229
"""
22242230
from pyiceberg.io.pyarrow import ArrowScan
22252231

22262232
return ArrowScan(
2227-
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2233+
self.table_metadata,
2234+
self.io,
2235+
self.projection(),
2236+
self.row_filter,
2237+
self.case_sensitive,
2238+
self.limit,
2239+
dictionary_columns=dictionary_columns,
22282240
).to_table(self.plan_files())
22292241

2230-
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
2242+
def to_arrow_batch_reader(self, dictionary_columns: tuple[str, ...] = ()) -> pa.RecordBatchReader:
22312243
"""Return an Arrow RecordBatchReader from this DataScan.
22322244
22332245
For large results, using a RecordBatchReader requires less memory than
22342246
loading an Arrow Table for the same DataScan, because a RecordBatch
22352247
is read one at a time.
22362248
2249+
Args:
2250+
dictionary_columns:
2251+
A tuple of column names that PyArrow should read as
2252+
dictionary-encoded (``pa.DictionaryArray``). Dictionary
2253+
encoding can substantially reduce memory usage for columns
2254+
with low-cardinality repeated string values.
2255+
Only applies to Parquet files; silently ignored for ORC.
2256+
22372257
Returns:
22382258
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
22392259
which can be used to read a stream of record batches one by one.
@@ -2244,7 +2264,13 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
22442264

22452265
target_schema = schema_to_pyarrow(self.projection())
22462266
batches = ArrowScan(
2247-
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2267+
self.table_metadata,
2268+
self.io,
2269+
self.projection(),
2270+
self.row_filter,
2271+
self.case_sensitive,
2272+
self.limit,
2273+
dictionary_columns=dictionary_columns,
22482274
).to_record_batches(self.plan_files())
22492275

22502276
return pa.RecordBatchReader.from_batches(

tests/io/test_pyarrow.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5103,3 +5103,73 @@ def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCata
51035103
result_sorted = result.sort_by("name")
51045104
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
51055105
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]
5106+
5107+
5108+
def test_dictionary_columns_produces_dict_encoded_output(tmpdir: str) -> None:
5109+
"""dictionary_columns passed to ArrowScan must yield dictionary-encoded arrays.
5110+
5111+
Verifies that:
5112+
1. The requested column is returned as a pa.DictionaryArray.
5113+
2. Values are identical to a plain (non-dict) scan.
5114+
3. A column NOT in dictionary_columns is still returned as a plain array.
5115+
"""
5116+
from pyiceberg.expressions import AlwaysTrue
5117+
from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
5118+
from pyiceberg.partitioning import PartitionSpec
5119+
from pyiceberg.table import FileScanTask
5120+
from pyiceberg.table.metadata import TableMetadataV2
5121+
5122+
arrow_schema = pa.schema(
5123+
[
5124+
pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),
5125+
pa.field("label", pa.string(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "2"}),
5126+
]
5127+
)
5128+
arrow_table = pa.table(
5129+
[pa.array([1, 2, 3, 4], type=pa.int32()), pa.array(["a", "b", "a", "b"], type=pa.string())],
5130+
schema=arrow_schema,
5131+
)
5132+
data_file = _write_table_to_data_file(f"{tmpdir}/test_dict_cols.parquet", arrow_schema, arrow_table)
5133+
data_file.spec_id = 0
5134+
5135+
iceberg_schema = Schema(
5136+
NestedField(1, "id", IntegerType(), required=False),
5137+
NestedField(2, "label", StringType(), required=False),
5138+
)
5139+
table_metadata = TableMetadataV2(
5140+
location=f"file://{tmpdir}",
5141+
last_column_id=2,
5142+
format_version=2,
5143+
schemas=[iceberg_schema],
5144+
partition_specs=[PartitionSpec()],
5145+
)
5146+
io = PyArrowFileIO()
5147+
task = FileScanTask(data_file)
5148+
5149+
scan_plain = ArrowScan(
5150+
table_metadata=table_metadata,
5151+
io=io,
5152+
projected_schema=iceberg_schema,
5153+
row_filter=AlwaysTrue(),
5154+
)
5155+
scan_dict = ArrowScan(
5156+
table_metadata=table_metadata,
5157+
io=io,
5158+
projected_schema=iceberg_schema,
5159+
row_filter=AlwaysTrue(),
5160+
dictionary_columns=("label",),
5161+
)
5162+
5163+
result_plain = scan_plain.to_table([task])
5164+
result_dict = scan_dict.to_table([task])
5165+
5166+
# id column is not in dictionary_columns — both scans should return int32
5167+
assert result_plain.schema.field("id").type == pa.int32()
5168+
assert result_dict.schema.field("id").type == pa.int32()
5169+
5170+
# label column: plain scan → string, dict scan → dictionary<values=string, indices=int32>
5171+
assert result_plain.schema.field("label").type == pa.string()
5172+
assert pa.types.is_dictionary(result_dict.schema.field("label").type)
5173+
5174+
# Values must be identical
5175+
assert result_plain.column("label").to_pylist() == result_dict.column("label").to_pylist()

0 commit comments

Comments
 (0)