Skip to content

Commit e1e47af

Browse files
Merge remote-tracking branch 'origin/main' into sm/table-scan-refactor
# Conflicts: # pyiceberg/table/__init__.py
2 parents df00b11 + 6da06ad commit e1e47af

3 files changed

Lines changed: 118 additions & 12 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1625,8 +1625,12 @@ def _task_to_record_batches(
16251625
partition_spec: PartitionSpec | None = None,
16261626
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
16271627
downcast_ns_timestamp_to_us: bool | None = None,
1628+
dictionary_columns: tuple[str, ...] = (),
16281629
) -> Iterator[pa.RecordBatch]:
1629-
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1630+
format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8}
1631+
if dictionary_columns and task.file.file_format == FileFormat.PARQUET:
1632+
format_kwargs["dictionary_columns"] = dictionary_columns
1633+
arrow_format = _get_file_format(task.file.file_format, **format_kwargs)
16301634
with io.new_input(task.file.file_path).open() as fin:
16311635
fragment = arrow_format.make_fragment(fin)
16321636
physical_schema = fragment.physical_schema
@@ -1729,6 +1733,7 @@ class ArrowScan:
17291733
_case_sensitive: bool
17301734
_limit: int | None
17311735
_downcast_ns_timestamp_to_us: bool | None
1736+
_dictionary_columns: tuple[str, ...]
17321737
"""Scan the Iceberg Table and create an Arrow construct.
17331738
17341739
Attributes:
@@ -1738,6 +1743,7 @@ class ArrowScan:
17381743
_bound_row_filter: Schema bound row expression to filter the data with
17391744
_case_sensitive: Case sensitivity when looking up column names
17401745
_limit: Limit the number of records.
1746+
_dictionary_columns: Column names to read as dictionary-encoded arrays.
17411747
"""
17421748

17431749
def __init__(
@@ -1748,6 +1754,8 @@ def __init__(
17481754
row_filter: BooleanExpression,
17491755
case_sensitive: bool = True,
17501756
limit: int | None = None,
1757+
*,
1758+
dictionary_columns: tuple[str, ...] = (),
17511759
) -> None:
17521760
self._table_metadata = table_metadata
17531761
self._io = io
@@ -1756,6 +1764,7 @@ def __init__(
17561764
self._case_sensitive = case_sensitive
17571765
self._limit = limit
17581766
self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
1767+
self._dictionary_columns = dictionary_columns
17591768

17601769
@property
17611770
def _projected_field_ids(self) -> set[int]:
@@ -1866,6 +1875,7 @@ def _record_batches_from_scan_tasks_and_deletes(
18661875
self._table_metadata.specs().get(task.file.spec_id),
18671876
self._table_metadata.format_version,
18681877
self._downcast_ns_timestamp_to_us,
1878+
self._dictionary_columns,
18691879
)
18701880
for batch in batches:
18711881
if self._limit is not None:

pyiceberg/table/__init__.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2171,27 +2171,49 @@ def plan_files(self) -> Iterable[FileScanTask]:
21712171
return self._plan_files_server_side()
21722172
return self._plan_files_local()
21732173

2174-
def to_arrow(self) -> pa.Table:
2174+
def to_arrow(self, dictionary_columns: tuple[str, ...] = ()) -> pa.Table:
21752175
"""Read an Arrow table eagerly from this DataScan.
21762176
21772177
All rows will be loaded into memory at once.
21782178
2179+
Args:
2180+
dictionary_columns:
2181+
A tuple of column names that PyArrow should read as
2182+
dictionary-encoded (``pa.DictionaryArray``). Dictionary
2183+
encoding can substantially reduce memory usage for columns
2184+
with low-cardinality repeated string values.
2185+
Only applies to Parquet files; silently ignored for ORC.
2186+
21792187
Returns:
21802188
pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
21812189
"""
21822190
from pyiceberg.io.pyarrow import ArrowScan
21832191

21842192
return ArrowScan(
2185-
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2193+
self.table_metadata,
2194+
self.io,
2195+
self.projection(),
2196+
self.row_filter,
2197+
self.case_sensitive,
2198+
self.limit,
2199+
dictionary_columns=dictionary_columns,
21862200
).to_table(self.plan_files())
21872201

2188-
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
2202+
def to_arrow_batch_reader(self, dictionary_columns: tuple[str, ...] = ()) -> pa.RecordBatchReader:
21892203
"""Return an Arrow RecordBatchReader from this DataScan.
21902204
21912205
For large results, using a RecordBatchReader requires less memory than
21922206
loading an Arrow Table for the same DataScan, because a RecordBatch
21932207
is read one at a time.
21942208
2209+
Args:
2210+
dictionary_columns:
2211+
A tuple of column names that PyArrow should read as
2212+
dictionary-encoded (``pa.DictionaryArray``). Dictionary
2213+
encoding can substantially reduce memory usage for columns
2214+
with low-cardinality repeated string values.
2215+
Only applies to Parquet files; silently ignored for ORC.
2216+
21952217
Returns:
21962218
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
21972219
which can be used to read a stream of record batches one by one.
@@ -2202,7 +2224,13 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
22022224

22032225
target_schema = schema_to_pyarrow(self.projection())
22042226
batches = ArrowScan(
2205-
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2227+
self.table_metadata,
2228+
self.io,
2229+
self.projection(),
2230+
self.row_filter,
2231+
self.case_sensitive,
2232+
self.limit,
2233+
dictionary_columns=dictionary_columns,
22062234
).to_record_batches(self.plan_files())
22072235

22082236
return pa.RecordBatchReader.from_batches(
@@ -2381,13 +2409,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
23812409
# The lambda created here is run in multiple threads.
23822410
# So we avoid creating _EvaluatorExpression methods bound to a single
23832411
# shared instance across multiple threads.
2384-
return lambda datafile: (
2385-
residual_evaluator_of(
2386-
spec=spec,
2387-
expr=self.row_filter,
2388-
case_sensitive=self.case_sensitive,
2389-
schema=self.table_metadata.schema(),
2390-
)
2412+
return lambda datafile: residual_evaluator_of(
2413+
spec=spec,
2414+
expr=self.row_filter,
2415+
case_sensitive=self.case_sensitive,
2416+
schema=self.table_metadata.schema(),
23912417
)
23922418

23932419
@staticmethod

tests/io/test_pyarrow.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5103,3 +5103,73 @@ def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCata
51035103
result_sorted = result.sort_by("name")
51045104
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
51055105
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]
5106+
5107+
5108+
def test_dictionary_columns_produces_dict_encoded_output(tmpdir: str) -> None:
5109+
"""dictionary_columns passed to ArrowScan must yield dictionary-encoded arrays.
5110+
5111+
Verifies that:
5112+
1. The requested column is returned as a pa.DictionaryArray.
5113+
2. Values are identical to a plain (non-dict) scan.
5114+
3. A column NOT in dictionary_columns is still returned as a plain array.
5115+
"""
5116+
from pyiceberg.expressions import AlwaysTrue
5117+
from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
5118+
from pyiceberg.partitioning import PartitionSpec
5119+
from pyiceberg.table import FileScanTask
5120+
from pyiceberg.table.metadata import TableMetadataV2
5121+
5122+
arrow_schema = pa.schema(
5123+
[
5124+
pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),
5125+
pa.field("label", pa.string(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "2"}),
5126+
]
5127+
)
5128+
arrow_table = pa.table(
5129+
[pa.array([1, 2, 3, 4], type=pa.int32()), pa.array(["a", "b", "a", "b"], type=pa.string())],
5130+
schema=arrow_schema,
5131+
)
5132+
data_file = _write_table_to_data_file(f"{tmpdir}/test_dict_cols.parquet", arrow_schema, arrow_table)
5133+
data_file.spec_id = 0
5134+
5135+
iceberg_schema = Schema(
5136+
NestedField(1, "id", IntegerType(), required=False),
5137+
NestedField(2, "label", StringType(), required=False),
5138+
)
5139+
table_metadata = TableMetadataV2(
5140+
location=f"file://{tmpdir}",
5141+
last_column_id=2,
5142+
format_version=2,
5143+
schemas=[iceberg_schema],
5144+
partition_specs=[PartitionSpec()],
5145+
)
5146+
io = PyArrowFileIO()
5147+
task = FileScanTask(data_file)
5148+
5149+
scan_plain = ArrowScan(
5150+
table_metadata=table_metadata,
5151+
io=io,
5152+
projected_schema=iceberg_schema,
5153+
row_filter=AlwaysTrue(),
5154+
)
5155+
scan_dict = ArrowScan(
5156+
table_metadata=table_metadata,
5157+
io=io,
5158+
projected_schema=iceberg_schema,
5159+
row_filter=AlwaysTrue(),
5160+
dictionary_columns=("label",),
5161+
)
5162+
5163+
result_plain = scan_plain.to_table([task])
5164+
result_dict = scan_dict.to_table([task])
5165+
5166+
# id column is not in dictionary_columns — both scans should return int32
5167+
assert result_plain.schema.field("id").type == pa.int32()
5168+
assert result_dict.schema.field("id").type == pa.int32()
5169+
5170+
# label column: plain scan → string, dict scan → dictionary<values=string, indices=int32>
5171+
assert result_plain.schema.field("label").type == pa.string()
5172+
assert pa.types.is_dictionary(result_dict.schema.field("label").type)
5173+
5174+
# Values must be identical
5175+
assert result_plain.column("label").to_pylist() == result_dict.column("label").to_pylist()

0 commit comments

Comments
 (0)