Skip to content

Commit dd17340

Browse files
Nits
1 parent 510b586 commit dd17340

1 file changed

Lines changed: 39 additions & 37 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1788,44 +1788,11 @@ def projection(self) -> Schema: ...
17881788
@abstractmethod
17891789
def plan_files(self) -> Iterable[ScanTask]: ...
17901790

1791-
def to_arrow(self) -> pa.Table:
1792-
"""Read an Arrow table eagerly from this scan.
1793-
1794-
All rows will be loaded into memory at once.
1795-
1796-
Returns:
1797-
pa.Table: Materialized Arrow Table from the Iceberg table scan.
1798-
"""
1799-
from pyiceberg.io.pyarrow import ArrowScan
1800-
1801-
return ArrowScan(
1802-
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
1803-
).to_table(self.plan_files())
1804-
1805-
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
1806-
"""Return an Arrow RecordBatchReader from this scan.
1807-
1808-
For large results, using a RecordBatchReader requires less memory than
1809-
loading an Arrow Table for the same scan, because a RecordBatch is read
1810-
one at a time.
1811-
1812-
Returns:
1813-
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table scan,
1814-
which can be used to read a stream of record batches one by one.
1815-
"""
1816-
import pyarrow as pa
1817-
1818-
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
1819-
1820-
target_schema = schema_to_pyarrow(self.projection())
1821-
batches = ArrowScan(
1822-
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
1823-
).to_record_batches(self.plan_files())
1791+
@abstractmethod
1792+
def to_arrow(self) -> pa.Table: ...
18241793

1825-
return pa.RecordBatchReader.from_batches(
1826-
target_schema,
1827-
batches,
1828-
).cast(target_schema)
1794+
@abstractmethod
1795+
def to_arrow_batch_reader(self) -> pa.RecordBatchReader: ...
18291796

18301797
def update(self: A, **overrides: Any) -> A:
18311798
"""Create a copy of this table scan with updated fields."""
@@ -2089,6 +2056,29 @@ def _min_sequence_number(manifests: list[ManifestFile]) -> int:
20892056
return INITIAL_SEQUENCE_NUMBER
20902057

20912058

2059+
def _to_arrow_via_file_scan_tasks(scan: BaseScan, tasks: Iterable[FileScanTask]) -> pa.Table:
2060+
"""Materialize a scan into an Arrow table given its planned ``FileScanTask``s."""
2061+
from pyiceberg.io.pyarrow import ArrowScan
2062+
2063+
return ArrowScan(scan.table_metadata, scan.io, scan.projection(), scan.row_filter, scan.case_sensitive, scan.limit).to_table(
2064+
tasks
2065+
)
2066+
2067+
2068+
def _to_arrow_batch_reader_via_file_scan_tasks(scan: BaseScan, tasks: Iterable[FileScanTask]) -> pa.RecordBatchReader:
2069+
"""Stream a scan into an Arrow ``RecordBatchReader`` given its planned ``FileScanTask``s."""
2070+
import pyarrow as pa
2071+
2072+
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
2073+
2074+
target_schema = schema_to_pyarrow(scan.projection())
2075+
batches = ArrowScan(
2076+
scan.table_metadata, scan.io, scan.projection(), scan.row_filter, scan.case_sensitive, scan.limit
2077+
).to_record_batches(tasks)
2078+
2079+
return pa.RecordBatchReader.from_batches(target_schema, batches).cast(target_schema)
2080+
2081+
20922082
class DataScan(TableScan):
20932083
@cached_property
20942084
def _manifest_planner(self) -> ManifestGroupPlanner:
@@ -2189,6 +2179,12 @@ def plan_files(self) -> Iterable[FileScanTask]:
21892179
return self._plan_files_server_side()
21902180
return self._plan_files_local()
21912181

2182+
def to_arrow(self) -> pa.Table:
2183+
return _to_arrow_via_file_scan_tasks(self, self.plan_files())
2184+
2185+
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
2186+
return _to_arrow_batch_reader_via_file_scan_tasks(self, self.plan_files())
2187+
21922188
def count(self) -> int:
21932189
from pyiceberg.io.pyarrow import ArrowScan
21942190

@@ -2342,6 +2338,12 @@ def plan_files(self) -> Iterable[FileScanTask]:
23422338
and manifest_entry.status == ManifestEntryStatus.ADDED,
23432339
)
23442340

2341+
def to_arrow(self) -> pa.Table:
2342+
return _to_arrow_via_file_scan_tasks(self, self.plan_files())
2343+
2344+
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
2345+
return _to_arrow_batch_reader_via_file_scan_tasks(self, self.plan_files())
2346+
23452347
def _validate_and_resolve_snapshots(self) -> tuple[int, int]:
23462348
if self.from_snapshot_id_exclusive is None:
23472349
raise ValueError("Start snapshot is not set, please set from_snapshot_id_exclusive")

0 commit comments

Comments
 (0)