Skip to content

Commit 6e4cf3a

Browse files
Merge branch 'sm/table-scan-refactor' into sm/incremental-append-scan-v3
# Conflicts: # pyiceberg/table/__init__.py
2 parents acb10f7 + e1e47af commit 6e4cf3a

3 files changed

Lines changed: 128 additions & 18 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1625,8 +1625,12 @@ def _task_to_record_batches(
16251625
partition_spec: PartitionSpec | None = None,
16261626
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
16271627
downcast_ns_timestamp_to_us: bool | None = None,
1628+
dictionary_columns: tuple[str, ...] = (),
16281629
) -> Iterator[pa.RecordBatch]:
1629-
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1630+
format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8}
1631+
if dictionary_columns and task.file.file_format == FileFormat.PARQUET:
1632+
format_kwargs["dictionary_columns"] = dictionary_columns
1633+
arrow_format = _get_file_format(task.file.file_format, **format_kwargs)
16301634
with io.new_input(task.file.file_path).open() as fin:
16311635
fragment = arrow_format.make_fragment(fin)
16321636
physical_schema = fragment.physical_schema
@@ -1729,6 +1733,7 @@ class ArrowScan:
17291733
_case_sensitive: bool
17301734
_limit: int | None
17311735
_downcast_ns_timestamp_to_us: bool | None
1736+
_dictionary_columns: tuple[str, ...]
17321737
"""Scan the Iceberg Table and create an Arrow construct.
17331738
17341739
Attributes:
@@ -1738,6 +1743,7 @@ class ArrowScan:
17381743
_bound_row_filter: Schema bound row expression to filter the data with
17391744
_case_sensitive: Case sensitivity when looking up column names
17401745
_limit: Limit the number of records.
1746+
_dictionary_columns: Column names to read as dictionary-encoded arrays.
17411747
"""
17421748

17431749
def __init__(
@@ -1748,6 +1754,8 @@ def __init__(
17481754
row_filter: BooleanExpression,
17491755
case_sensitive: bool = True,
17501756
limit: int | None = None,
1757+
*,
1758+
dictionary_columns: tuple[str, ...] = (),
17511759
) -> None:
17521760
self._table_metadata = table_metadata
17531761
self._io = io
@@ -1756,6 +1764,7 @@ def __init__(
17561764
self._case_sensitive = case_sensitive
17571765
self._limit = limit
17581766
self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
1767+
self._dictionary_columns = dictionary_columns
17591768

17601769
@property
17611770
def _projected_field_ids(self) -> set[int]:
@@ -1866,6 +1875,7 @@ def _record_batches_from_scan_tasks_and_deletes(
18661875
self._table_metadata.specs().get(task.file.spec_id),
18671876
self._table_metadata.format_version,
18681877
self._downcast_ns_timestamp_to_us,
1878+
self._dictionary_columns,
18691879
)
18701880
for batch in batches:
18711881
if self._limit is not None:

pyiceberg/table/__init__.py

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2163,24 +2163,40 @@ def _min_sequence_number(manifests: list[ManifestFile]) -> int:
21632163
return INITIAL_SEQUENCE_NUMBER
21642164

21652165

2166-
def _to_arrow_via_file_scan_tasks(scan: BaseScan, tasks: Iterable[FileScanTask]) -> pa.Table:
2166+
def _to_arrow_via_file_scan_tasks(
2167+
scan: BaseScan, tasks: Iterable[FileScanTask], dictionary_columns: tuple[str, ...] = ()
2168+
) -> pa.Table:
21672169
"""Materialize a scan into an Arrow table given its planned ``FileScanTask``s."""
21682170
from pyiceberg.io.pyarrow import ArrowScan
21692171

2170-
return ArrowScan(scan.table_metadata, scan.io, scan.projection(), scan.row_filter, scan.case_sensitive, scan.limit).to_table(
2171-
tasks
2172-
)
2172+
return ArrowScan(
2173+
scan.table_metadata,
2174+
scan.io,
2175+
scan.projection(),
2176+
scan.row_filter,
2177+
scan.case_sensitive,
2178+
scan.limit,
2179+
dictionary_columns=dictionary_columns,
2180+
).to_table(tasks)
21732181

21742182

2175-
def _to_arrow_batch_reader_via_file_scan_tasks(scan: BaseScan, tasks: Iterable[FileScanTask]) -> pa.RecordBatchReader:
2183+
def _to_arrow_batch_reader_via_file_scan_tasks(
2184+
scan: BaseScan, tasks: Iterable[FileScanTask], dictionary_columns: tuple[str, ...] = ()
2185+
) -> pa.RecordBatchReader:
21762186
"""Stream a scan into an Arrow ``RecordBatchReader`` given its planned ``FileScanTask``s."""
21772187
import pyarrow as pa
21782188

21792189
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
21802190

21812191
target_schema = schema_to_pyarrow(scan.projection())
21822192
batches = ArrowScan(
2183-
scan.table_metadata, scan.io, scan.projection(), scan.row_filter, scan.case_sensitive, scan.limit
2193+
scan.table_metadata,
2194+
scan.io,
2195+
scan.projection(),
2196+
scan.row_filter,
2197+
scan.case_sensitive,
2198+
scan.limit,
2199+
dictionary_columns=dictionary_columns,
21842200
).to_record_batches(tasks)
21852201

21862202
return pa.RecordBatchReader.from_batches(target_schema, batches).cast(target_schema)
@@ -2259,28 +2275,44 @@ def plan_files(self) -> Iterable[FileScanTask]:
22592275
return self._plan_files_server_side()
22602276
return self._plan_files_local()
22612277

2262-
def to_arrow(self) -> pa.Table:
2278+
def to_arrow(self, dictionary_columns: tuple[str, ...] = ()) -> pa.Table:
22632279
"""Read an Arrow table eagerly from this DataScan.
22642280
22652281
All rows will be loaded into memory at once.
22662282
2283+
Args:
2284+
dictionary_columns:
2285+
A tuple of column names that PyArrow should read as
2286+
dictionary-encoded (``pa.DictionaryArray``). Dictionary
2287+
encoding can substantially reduce memory usage for columns
2288+
with low-cardinality repeated string values.
2289+
Only applies to Parquet files; silently ignored for ORC.
2290+
22672291
Returns:
22682292
pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
22692293
"""
2270-
return _to_arrow_via_file_scan_tasks(self, self.plan_files())
2294+
return _to_arrow_via_file_scan_tasks(self, self.plan_files(), dictionary_columns=dictionary_columns)
22712295

2272-
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
2296+
def to_arrow_batch_reader(self, dictionary_columns: tuple[str, ...] = ()) -> pa.RecordBatchReader:
22732297
"""Return an Arrow RecordBatchReader from this DataScan.
22742298
22752299
For large results, using a RecordBatchReader requires less memory than
22762300
loading an Arrow Table for the same DataScan, because a RecordBatch
22772301
is read one at a time.
22782302
2303+
Args:
2304+
dictionary_columns:
2305+
A tuple of column names that PyArrow should read as
2306+
dictionary-encoded (``pa.DictionaryArray``). Dictionary
2307+
encoding can substantially reduce memory usage for columns
2308+
with low-cardinality repeated string values.
2309+
Only applies to Parquet files; silently ignored for ORC.
2310+
22792311
Returns:
22802312
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
22812313
which can be used to read a stream of record batches one by one.
22822314
"""
2283-
return _to_arrow_batch_reader_via_file_scan_tasks(self, self.plan_files())
2315+
return _to_arrow_batch_reader_via_file_scan_tasks(self, self.plan_files(), dictionary_columns=dictionary_columns)
22842316

22852317
def count(self) -> int:
22862318
from pyiceberg.io.pyarrow import ArrowScan
@@ -2637,13 +2669,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
26372669
# The lambda created here is run in multiple threads.
26382670
# So we avoid creating _EvaluatorExpression methods bound to a single
26392671
# shared instance across multiple threads.
2640-
return lambda datafile: (
2641-
residual_evaluator_of(
2642-
spec=spec,
2643-
expr=self.row_filter,
2644-
case_sensitive=self.case_sensitive,
2645-
schema=self.table_metadata.schema(),
2646-
)
2672+
return lambda datafile: residual_evaluator_of(
2673+
spec=spec,
2674+
expr=self.row_filter,
2675+
case_sensitive=self.case_sensitive,
2676+
schema=self.table_metadata.schema(),
26472677
)
26482678

26492679
@staticmethod

tests/io/test_pyarrow.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5103,3 +5103,73 @@ def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCata
51035103
result_sorted = result.sort_by("name")
51045104
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
51055105
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]
5106+
5107+
5108+
def test_dictionary_columns_produces_dict_encoded_output(tmpdir: str) -> None:
5109+
"""dictionary_columns passed to ArrowScan must yield dictionary-encoded arrays.
5110+
5111+
Verifies that:
5112+
1. The requested column is returned as a pa.DictionaryArray.
5113+
2. Values are identical to a plain (non-dict) scan.
5114+
3. A column NOT in dictionary_columns is still returned as a plain array.
5115+
"""
5116+
from pyiceberg.expressions import AlwaysTrue
5117+
from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
5118+
from pyiceberg.partitioning import PartitionSpec
5119+
from pyiceberg.table import FileScanTask
5120+
from pyiceberg.table.metadata import TableMetadataV2
5121+
5122+
arrow_schema = pa.schema(
5123+
[
5124+
pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),
5125+
pa.field("label", pa.string(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "2"}),
5126+
]
5127+
)
5128+
arrow_table = pa.table(
5129+
[pa.array([1, 2, 3, 4], type=pa.int32()), pa.array(["a", "b", "a", "b"], type=pa.string())],
5130+
schema=arrow_schema,
5131+
)
5132+
data_file = _write_table_to_data_file(f"{tmpdir}/test_dict_cols.parquet", arrow_schema, arrow_table)
5133+
data_file.spec_id = 0
5134+
5135+
iceberg_schema = Schema(
5136+
NestedField(1, "id", IntegerType(), required=False),
5137+
NestedField(2, "label", StringType(), required=False),
5138+
)
5139+
table_metadata = TableMetadataV2(
5140+
location=f"file://{tmpdir}",
5141+
last_column_id=2,
5142+
format_version=2,
5143+
schemas=[iceberg_schema],
5144+
partition_specs=[PartitionSpec()],
5145+
)
5146+
io = PyArrowFileIO()
5147+
task = FileScanTask(data_file)
5148+
5149+
scan_plain = ArrowScan(
5150+
table_metadata=table_metadata,
5151+
io=io,
5152+
projected_schema=iceberg_schema,
5153+
row_filter=AlwaysTrue(),
5154+
)
5155+
scan_dict = ArrowScan(
5156+
table_metadata=table_metadata,
5157+
io=io,
5158+
projected_schema=iceberg_schema,
5159+
row_filter=AlwaysTrue(),
5160+
dictionary_columns=("label",),
5161+
)
5162+
5163+
result_plain = scan_plain.to_table([task])
5164+
result_dict = scan_dict.to_table([task])
5165+
5166+
# id column is not in dictionary_columns — both scans should return int32
5167+
assert result_plain.schema.field("id").type == pa.int32()
5168+
assert result_dict.schema.field("id").type == pa.int32()
5169+
5170+
# label column: plain scan → string, dict scan → dictionary<values=string, indices=int32>
5171+
assert result_plain.schema.field("label").type == pa.string()
5172+
assert pa.types.is_dictionary(result_dict.schema.field("label").type)
5173+
5174+
# Values must be identical
5175+
assert result_plain.column("label").to_pylist() == result_dict.column("label").to_pylist()

0 commit comments

Comments
 (0)