Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1625,8 +1625,12 @@ def _task_to_record_batches(
partition_spec: PartitionSpec | None = None,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
downcast_ns_timestamp_to_us: bool | None = None,
dictionary_columns: frozenset[str] = frozenset(),
) -> Iterator[pa.RecordBatch]:
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8}
if dictionary_columns and task.file.file_format == FileFormat.PARQUET:
format_kwargs["dictionary_columns"] = tuple(dictionary_columns)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we convert the frozenset back to a tuple

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko I updated this to keep dictionary_columns as a tuple throughout the Arrow path, so we no longer convert frozenset -> tuple at read time. This is now in commit 33e4c6d.

arrow_format = _get_file_format(task.file.file_format, **format_kwargs)
with io.new_input(task.file.file_path).open() as fin:
fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema
Expand Down Expand Up @@ -1729,6 +1733,7 @@ class ArrowScan:
_case_sensitive: bool
_limit: int | None
_downcast_ns_timestamp_to_us: bool | None
_dictionary_columns: frozenset[str]
"""Scan the Iceberg Table and create an Arrow construct.

Attributes:
Expand All @@ -1738,6 +1743,7 @@ class ArrowScan:
_bound_row_filter: Schema bound row expression to filter the data with
_case_sensitive: Case sensitivity when looking up column names
_limit: Limit the number of records.
_dictionary_columns: Column names to read as dictionary-encoded arrays.
"""

def __init__(
Expand All @@ -1748,6 +1754,8 @@ def __init__(
row_filter: BooleanExpression,
case_sensitive: bool = True,
limit: int | None = None,
*,
dictionary_columns: tuple[str, ...] = (),
) -> None:
self._table_metadata = table_metadata
self._io = io
Expand All @@ -1756,6 +1764,7 @@ def __init__(
self._case_sensitive = case_sensitive
self._limit = limit
self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
self._dictionary_columns = frozenset(dictionary_columns)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep it a tuple here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to keep as a tuple.


@property
def _projected_field_ids(self) -> set[int]:
Expand Down Expand Up @@ -1866,6 +1875,7 @@ def _record_batches_from_scan_tasks_and_deletes(
self._table_metadata.specs().get(task.file.spec_id),
self._table_metadata.format_version,
self._downcast_ns_timestamp_to_us,
self._dictionary_columns,
)
for batch in batches:
if self._limit is not None:
Expand Down
42 changes: 33 additions & 9 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,7 @@ def scan(
snapshot_id: int | None = None,
options: Properties = EMPTY_DICT,
limit: int | None = None,
dictionary_columns: tuple[str, ...] = (),
) -> DataScan:
"""Fetch a DataScan based on the table's current metadata.

Expand All @@ -1245,6 +1246,14 @@ def scan(
An integer representing the number of rows to
return in the scan result. If None, fetches all
matching rows.
dictionary_columns:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to add Arrow specific things to the public API

@GayathriSrividya GayathriSrividya Jun 6, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @Fokko I have moved dictionary_columns off the public scan() API and onto the Arrow-specific output methods instead:

table.scan(...).to_arrow(dictionary_columns=("payload",))
table.scan(...).to_arrow_batch_reader(dictionary_columns=("payload",))

That way it doesn't disturb the general scan interface. ArrowScan still accepts it for lower-level use. Pushed in the latest commit. Let me know if you have any further suggestions.

A tuple of column names that PyArrow should read as
dictionary-encoded (``pa.DictionaryArray``). Dictionary
encoding can substantially reduce memory usage for columns
that contain large or frequently repeated string values
(e.g. large JSON blobs or low-cardinality categoricals).
Only applies to Parquet files; silently ignored for ORC.
Columns absent from the file are silently skipped.

Returns:
A DataScan based on the table's current metadata.
Expand All @@ -1260,6 +1269,7 @@ def scan(
limit=limit,
catalog=self.catalog,
table_identifier=self._identifier,
dictionary_columns=dictionary_columns,
)

@property
Expand Down Expand Up @@ -1775,6 +1785,7 @@ def scan(
snapshot_id: int | None = None,
options: Properties = EMPTY_DICT,
limit: int | None = None,
dictionary_columns: tuple[str, ...] = (),
) -> DataScan:
raise ValueError("Cannot scan a staged table")

Expand Down Expand Up @@ -1809,6 +1820,7 @@ class TableScan(ABC):
limit: int | None
catalog: Catalog | None
table_identifier: Identifier | None
dictionary_columns: tuple[str, ...]

def __init__(
self,
Expand All @@ -1822,6 +1834,7 @@ def __init__(
limit: int | None = None,
catalog: Catalog | None = None,
table_identifier: Identifier | None = None,
dictionary_columns: tuple[str, ...] = (),
):
self.table_metadata = table_metadata
self.io = io
Expand All @@ -1833,6 +1846,7 @@ def __init__(
self.limit = limit
self.catalog = catalog
self.table_identifier = table_identifier
self.dictionary_columns = dictionary_columns

def snapshot(self) -> Snapshot | None:
if self.snapshot_id:
Expand Down Expand Up @@ -2072,13 +2086,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu
# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
return lambda datafile: residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)

@staticmethod
Expand Down Expand Up @@ -2224,7 +2236,13 @@ def to_arrow(self) -> pa.Table:
from pyiceberg.io.pyarrow import ArrowScan

return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
self.table_metadata,
self.io,
self.projection(),
self.row_filter,
self.case_sensitive,
self.limit,
dictionary_columns=self.dictionary_columns,
).to_table(self.plan_files())

def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
Expand All @@ -2244,7 +2262,13 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:

target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
self.table_metadata,
self.io,
self.projection(),
self.row_filter,
self.case_sensitive,
self.limit,
dictionary_columns=self.dictionary_columns,
).to_record_batches(self.plan_files())

return pa.RecordBatchReader.from_batches(
Expand Down
70 changes: 70 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5103,3 +5103,73 @@ def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCata
result_sorted = result.sort_by("name")
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]


def test_dictionary_columns_produces_dict_encoded_output(tmpdir: str) -> None:
"""dictionary_columns passed to ArrowScan must yield dictionary-encoded arrays.

Verifies that:
1. The requested column is returned as a pa.DictionaryArray.
2. Values are identical to a plain (non-dict) scan.
3. A column NOT in dictionary_columns is still returned as a plain array.
"""
from pyiceberg.expressions import AlwaysTrue
from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table import FileScanTask
from pyiceberg.table.metadata import TableMetadataV2

arrow_schema = pa.schema(
[
pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),
pa.field("label", pa.string(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "2"}),
]
)
arrow_table = pa.table(
[pa.array([1, 2, 3, 4], type=pa.int32()), pa.array(["a", "b", "a", "b"], type=pa.string())],
schema=arrow_schema,
)
data_file = _write_table_to_data_file(f"{tmpdir}/test_dict_cols.parquet", arrow_schema, arrow_table)
data_file.spec_id = 0

iceberg_schema = Schema(
NestedField(1, "id", IntegerType(), required=False),
NestedField(2, "label", StringType(), required=False),
)
table_metadata = TableMetadataV2(
location=f"file://{tmpdir}",
last_column_id=2,
format_version=2,
schemas=[iceberg_schema],
partition_specs=[PartitionSpec()],
)
io = PyArrowFileIO()
task = FileScanTask(data_file)

scan_plain = ArrowScan(
table_metadata=table_metadata,
io=io,
projected_schema=iceberg_schema,
row_filter=AlwaysTrue(),
)
scan_dict = ArrowScan(
table_metadata=table_metadata,
io=io,
projected_schema=iceberg_schema,
row_filter=AlwaysTrue(),
dictionary_columns=("label",),
)

result_plain = scan_plain.to_table([task])
result_dict = scan_dict.to_table([task])

# id column is not in dictionary_columns — both scans should return int32
assert result_plain.schema.field("id").type == pa.int32()
assert result_dict.schema.field("id").type == pa.int32()

# label column: plain scan → string, dict scan → dictionary<values=string, indices=int32>
assert result_plain.schema.field("label").type == pa.string()
assert pa.types.is_dictionary(result_dict.schema.field("label").type)

# Values must be identical
assert result_plain.column("label").to_pylist() == result_dict.column("label").to_pylist()