Skip to content

Commit c4e6be9

Browse files
Merge branch 'main' into location-providers
2 parents 00917e9 + dbcf65b commit c4e6be9

11 files changed

Lines changed: 30 additions & 354 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ help: ## Display this help
2222
install-poetry: ## Install poetry if the user has not done that yet.
2323
@if ! command -v poetry &> /dev/null; then \
2424
echo "Poetry could not be found. Installing..."; \
25-
pip install --user poetry==1.8.4; \
25+
pip install --user poetry==1.8.5; \
2626
else \
2727
echo "Poetry is already installed."; \
2828
fi

dev/provision.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,17 @@
2222
from pyiceberg.schema import Schema
2323
from pyiceberg.types import FixedType, NestedField, UUIDType
2424

25-
spark = SparkSession.builder.getOrCreate()
25+
# The configuration is important, otherwise we get many small
26+
# parquet files with a single row. When a positional delete
27+
# hits the Parquet file with one row, the parquet file gets
28+
# dropped instead of having a merge-on-read delete file.
29+
spark = (
30+
SparkSession
31+
.builder
32+
.config("spark.sql.shuffle.partitions", "1")
33+
.config("spark.default.parallelism", "1")
34+
.getOrCreate()
35+
)
2636

2737
catalogs = {
2838
'rest': load_catalog(
@@ -120,10 +130,6 @@
120130
"""
121131
)
122132

123-
# Partitioning is not really needed, but there is a bug:
124-
# https://github.com/apache/iceberg/pull/7685
125-
spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes ADD PARTITION FIELD years(dt) AS dt_years")
126-
127133
spark.sql(
128134
f"""
129135
INSERT INTO {catalog_name}.default.test_positional_mor_deletes
@@ -168,10 +174,6 @@
168174
"""
169175
)
170176

171-
# Partitioning is not really needed, but there is a bug:
172-
# https://github.com/apache/iceberg/pull/7685
173-
spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_double_deletes ADD PARTITION FIELD years(dt) AS dt_years")
174-
175177
spark.sql(
176178
f"""
177179
INSERT INTO {catalog_name}.default.test_positional_mor_double_deletes

poetry.lock

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/cli/console.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,34 +34,9 @@
3434
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
3535
from pyiceberg.table import TableProperties
3636
from pyiceberg.table.refs import SnapshotRef
37-
from pyiceberg.utils.deprecated import deprecated
3837
from pyiceberg.utils.properties import property_as_int
3938

4039

41-
class DeprecatedConstants:
42-
@property
43-
@deprecated(
44-
deprecated_in="0.8.0",
45-
removed_in="0.9.0",
46-
help_message="DEFAULT_MAX_SNAPSHOT_AGE_MS is deprecated. Use TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT instead.",
47-
)
48-
def DEFAULT_MAX_SNAPSHOT_AGE_MS(self) -> int:
49-
return 432000000
50-
51-
@property
52-
@deprecated(
53-
deprecated_in="0.8.0",
54-
removed_in="0.9.0",
55-
help_message="DEFAULT_MIN_SNAPSHOTS_TO_KEEP is deprecated. Use TableProperties.MIN_SNAPSHOTS_TO_KEEP_DEFAULT instead.",
56-
)
57-
def DEFAULT_MIN_SNAPSHOTS_TO_KEEP(self) -> int:
58-
return 1
59-
60-
61-
DEFAULT_MIN_SNAPSHOTS_TO_KEEP = DeprecatedConstants().DEFAULT_MIN_SNAPSHOTS_TO_KEEP
62-
DEFAULT_MAX_SNAPSHOT_AGE_MS = DeprecatedConstants().DEFAULT_MAX_SNAPSHOT_AGE_MS
63-
64-
6540
def catch_exception() -> Callable: # type: ignore
6641
def decorator(func: Callable) -> Callable: # type: ignore
6742
@wraps(func)

pyiceberg/io/fsspec.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,13 @@
9494

9595

9696
def s3v4_rest_signer(properties: Properties, request: AWSRequest, **_: Any) -> AWSRequest:
97-
if TOKEN not in properties:
98-
raise SignError("Signer set, but token is not available")
99-
10097
signer_url = properties.get(S3_SIGNER_URI, properties["uri"]).rstrip("/")
10198
signer_endpoint = properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)
10299

103-
signer_headers = {"Authorization": f"Bearer {properties[TOKEN]}"}
100+
signer_headers = {}
101+
if token := properties.get(TOKEN):
102+
signer_headers = {"Authorization": f"Bearer {token}"}
103+
104104
signer_body = {
105105
"method": request.method,
106106
"region": request.context["client_region"],

pyiceberg/io/pyarrow.py

Lines changed: 1 addition & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@
169169
from pyiceberg.utils.concurrent import ExecutorFactory
170170
from pyiceberg.utils.config import Config
171171
from pyiceberg.utils.datetime import millis_to_datetime
172-
from pyiceberg.utils.deprecated import deprecated, deprecation_message
172+
from pyiceberg.utils.deprecated import deprecation_message
173173
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
174174
from pyiceberg.utils.singleton import Singleton
175175
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
@@ -1532,187 +1532,6 @@ def _record_batches_from_scan_tasks_and_deletes(
15321532
total_row_count += len(batch)
15331533

15341534

1535-
@deprecated(
1536-
deprecated_in="0.8.0",
1537-
removed_in="0.9.0",
1538-
help_message="project_table is deprecated. Use ArrowScan.to_table instead.",
1539-
)
1540-
def project_table(
1541-
tasks: Iterable[FileScanTask],
1542-
table_metadata: TableMetadata,
1543-
io: FileIO,
1544-
row_filter: BooleanExpression,
1545-
projected_schema: Schema,
1546-
case_sensitive: bool = True,
1547-
limit: Optional[int] = None,
1548-
) -> pa.Table:
1549-
"""Resolve the right columns based on the identifier.
1550-
1551-
Args:
1552-
tasks (Iterable[FileScanTask]): A URI or a path to a local file.
1553-
table_metadata (TableMetadata): The table metadata of the table that's being queried
1554-
io (FileIO): A FileIO to open streams to the object store
1555-
row_filter (BooleanExpression): The expression for filtering rows.
1556-
projected_schema (Schema): The output schema.
1557-
case_sensitive (bool): Case sensitivity when looking up column names.
1558-
limit (Optional[int]): Limit the number of records.
1559-
1560-
Raises:
1561-
ResolveError: When an incompatible query is done.
1562-
"""
1563-
scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
1564-
if isinstance(io, PyArrowFileIO):
1565-
fs = io.fs_by_scheme(scheme, netloc)
1566-
else:
1567-
try:
1568-
from pyiceberg.io.fsspec import FsspecFileIO
1569-
1570-
if isinstance(io, FsspecFileIO):
1571-
from pyarrow.fs import PyFileSystem
1572-
1573-
fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
1574-
else:
1575-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
1576-
except ModuleNotFoundError as e:
1577-
# When FsSpec is not installed
1578-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
1579-
1580-
use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1581-
1582-
bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
1583-
1584-
projected_field_ids = {
1585-
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
1586-
}.union(extract_field_ids(bound_row_filter))
1587-
1588-
deletes_per_file = _read_all_delete_files(fs, tasks)
1589-
executor = ExecutorFactory.get_or_create()
1590-
futures = [
1591-
executor.submit(
1592-
_task_to_table,
1593-
fs,
1594-
task,
1595-
bound_row_filter,
1596-
projected_schema,
1597-
projected_field_ids,
1598-
deletes_per_file.get(task.file.file_path),
1599-
case_sensitive,
1600-
table_metadata.name_mapping(),
1601-
use_large_types,
1602-
)
1603-
for task in tasks
1604-
]
1605-
total_row_count = 0
1606-
# for consistent ordering, we need to maintain future order
1607-
futures_index = {f: i for i, f in enumerate(futures)}
1608-
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
1609-
for future in concurrent.futures.as_completed(futures):
1610-
completed_futures.add(future)
1611-
if table_result := future.result():
1612-
total_row_count += len(table_result)
1613-
# stop early if limit is satisfied
1614-
if limit is not None and total_row_count >= limit:
1615-
break
1616-
1617-
# by now, we've either completed all tasks or satisfied the limit
1618-
if limit is not None:
1619-
_ = [f.cancel() for f in futures if not f.done()]
1620-
1621-
tables = [f.result() for f in completed_futures if f.result()]
1622-
1623-
if len(tables) < 1:
1624-
return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema, include_field_ids=False))
1625-
1626-
result = pa.concat_tables(tables, promote_options="permissive")
1627-
1628-
if limit is not None:
1629-
return result.slice(0, limit)
1630-
1631-
return result
1632-
1633-
1634-
@deprecated(
1635-
deprecated_in="0.8.0",
1636-
removed_in="0.9.0",
1637-
help_message="project_table is deprecated. Use ArrowScan.to_record_batches instead.",
1638-
)
1639-
def project_batches(
1640-
tasks: Iterable[FileScanTask],
1641-
table_metadata: TableMetadata,
1642-
io: FileIO,
1643-
row_filter: BooleanExpression,
1644-
projected_schema: Schema,
1645-
case_sensitive: bool = True,
1646-
limit: Optional[int] = None,
1647-
) -> Iterator[pa.RecordBatch]:
1648-
"""Resolve the right columns based on the identifier.
1649-
1650-
Args:
1651-
tasks (Iterable[FileScanTask]): A URI or a path to a local file.
1652-
table_metadata (TableMetadata): The table metadata of the table that's being queried
1653-
io (FileIO): A FileIO to open streams to the object store
1654-
row_filter (BooleanExpression): The expression for filtering rows.
1655-
projected_schema (Schema): The output schema.
1656-
case_sensitive (bool): Case sensitivity when looking up column names.
1657-
limit (Optional[int]): Limit the number of records.
1658-
1659-
Raises:
1660-
ResolveError: When an incompatible query is done.
1661-
"""
1662-
scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
1663-
if isinstance(io, PyArrowFileIO):
1664-
fs = io.fs_by_scheme(scheme, netloc)
1665-
else:
1666-
try:
1667-
from pyiceberg.io.fsspec import FsspecFileIO
1668-
1669-
if isinstance(io, FsspecFileIO):
1670-
from pyarrow.fs import PyFileSystem
1671-
1672-
fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
1673-
else:
1674-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
1675-
except ModuleNotFoundError as e:
1676-
# When FsSpec is not installed
1677-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
1678-
1679-
use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1680-
1681-
bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
1682-
1683-
projected_field_ids = {
1684-
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
1685-
}.union(extract_field_ids(bound_row_filter))
1686-
1687-
deletes_per_file = _read_all_delete_files(fs, tasks)
1688-
1689-
total_row_count = 0
1690-
1691-
for task in tasks:
1692-
# stop early if limit is satisfied
1693-
if limit is not None and total_row_count >= limit:
1694-
break
1695-
batches = _task_to_record_batches(
1696-
fs,
1697-
task,
1698-
bound_row_filter,
1699-
projected_schema,
1700-
projected_field_ids,
1701-
deletes_per_file.get(task.file.file_path),
1702-
case_sensitive,
1703-
table_metadata.name_mapping(),
1704-
use_large_types,
1705-
)
1706-
for batch in batches:
1707-
if limit is not None:
1708-
if total_row_count >= limit:
1709-
break
1710-
elif total_row_count + len(batch) >= limit:
1711-
batch = batch.slice(0, limit - total_row_count)
1712-
yield batch
1713-
total_row_count += len(batch)
1714-
1715-
17161535
def _to_requested_schema(
17171536
requested_schema: Schema,
17181537
file_schema: Schema,

0 commit comments

Comments
 (0)