Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class Endpoints:
list_views: str = "namespaces/{namespace}/views"
drop_view: str = "namespaces/{namespace}/views/{view}"
view_exists: str = "namespaces/{namespace}/views/{view}"
plan_table_scan: str = "namespaces/{namespace}/tables/{table}/plan"
fetch_scan_tasks: str = "namespaces/{namespace}/tables/{table}/tasks"


class IdentifierKind(Enum):
Expand Down Expand Up @@ -130,6 +132,8 @@ class IdentifierKind(Enum):
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
AUTH = "auth"
CUSTOM = "custom"
REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"
REST_SCAN_PLANNING_ENABLED_DEFAULT = False

NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8)

Expand Down Expand Up @@ -269,6 +273,14 @@ def _create_session(self) -> Session:

return session

def is_rest_scan_planning_enabled(self) -> bool:
"""Check if rest server-side scan planning is enabled.

Returns:
True if enabled, False otherwise.
"""
return property_as_bool(self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT)

def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
"""Create the LegacyOAuth2AuthManager by fetching required properties.

Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/expressions/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
from pyiceberg.typedef import L
from pyiceberg.types import strtobool

ParserElement.enablePackrat()
ParserElement.enable_packrat()

AND = CaselessKeyword("and")
OR = CaselessKeyword("or")
Expand All @@ -82,7 +82,7 @@
BETWEEN = CaselessKeyword("between")

unquoted_identifier = Word(alphas + "_", alphanums + "_$")
quoted_identifier = QuotedString('"', escChar="\\", unquoteResults=True)
quoted_identifier = QuotedString('"', esc_quote="\\", unquote_results=True)


@quoted_identifier.set_parse_action
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2681,7 +2681,7 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[
from pyiceberg.utils.bin_packing import PackingIterator

avg_row_size_bytes = tbl.nbytes / tbl.num_rows
target_rows_per_file = target_file_size // avg_row_size_bytes
target_rows_per_file = max(1, int(target_file_size / avg_row_size_bytes))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This change is a great improvement, as it correctly handles cases where target_file_size is smaller than avg_row_size_bytes, preventing target_rows_per_file from becoming zero.

However, there's a related edge case to consider. If avg_row_size_bytes is zero (which can happen if tbl.num_rows is zero, causing a ZeroDivisionError on the preceding line, or if tbl.nbytes is zero), this line will raise a ZeroDivisionError.

While this PR doesn't introduce this issue, it would be more robust to handle this at the beginning of the bin_pack_arrow_table function, for example by checking if tbl.num_rows == 0 and returning an empty iterator.

batches = tbl.to_batches(max_chunksize=target_rows_per_file)
bin_packed_record_batches = PackingIterator(
items=batches,
Expand Down
10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pyiceberg = "pyiceberg.cli.console:run"
[project.optional-dependencies]
pyarrow = [
"pyarrow>=17.0.0",
"pyiceberg-core>=0.5.1,<0.8.0",
"pyiceberg-core==0.8.0rc1",
]
pandas = [
"pandas>=1.0.0,<3.0.0",
Expand Down Expand Up @@ -93,7 +93,7 @@ sql-sqlite = ["sqlalchemy>=2.0.18,<3"]
gcsfs = ["gcsfs>=2023.1.0"]
rest-sigv4 = ["boto3>=1.24.59"]
hf = ["huggingface-hub>=0.24.0"]
pyiceberg-core = ["pyiceberg-core>=0.5.1,<0.8.0"]
pyiceberg-core = ["pyiceberg-core==0.8.0rc1"]
datafusion = ["datafusion>=45,<49"]
gcp-auth = ["google-auth>=2.4.0"]

Expand All @@ -112,7 +112,7 @@ dev = [
"pyspark[connect]==4.0.1",
"protobuf==6.33.2", # match Spark Connect's gencode
"cython>=3.0.0",
"deptry>=0.14,<0.24",
"deptry>=0.14,<0.25",
"docutils!=0.21.post1",
"mypy-boto3-glue>=1.28.18",
"mypy-boto3-dynamodb>=1.28.18",
Expand All @@ -123,12 +123,12 @@ docs = [
"mkdocs==1.6.1",
"griffe==1.15.0",
"jinja2==3.1.6",
"mkdocstrings==0.30.1",
"mkdocstrings==1.0.0",
"mkdocstrings-python==2.0.1",
"mkdocs-literate-nav==0.6.2",
"mkdocs-autorefs==1.4.3",
"mkdocs-gen-files==0.6.0",
"mkdocs-material==9.7.0",
"mkdocs-material==9.7.1",
"mkdocs-material-extensions==1.3.1",
"mkdocs-section-index==0.3.10",
]
Expand Down
50 changes: 50 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1993,3 +1993,53 @@ def test_rest_catalog_context_manager_with_exception_sigv4(self, rest_mock: Mock

assert catalog is not None and hasattr(catalog, "_session")
assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS_SIGV4

def test_rest_scan_planning_disabled_by_default(self, rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)

assert catalog.is_rest_scan_planning_enabled() is False

def test_rest_scan_planning_enabled_by_property(self, rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
catalog = RestCatalog(
"rest",
uri=TEST_URI,
token=TEST_TOKEN,
**{"rest-scan-planning-enabled": "true"},
)

assert catalog.is_rest_scan_planning_enabled() is True

def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
catalog = RestCatalog(
"rest",
uri=TEST_URI,
token=TEST_TOKEN,
**{"rest-scan-planning-enabled": "false"},
)

assert catalog.is_rest_scan_planning_enabled() is False

def test_rest_scan_planning_enabled_from_server_config(self, rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {"rest-scan-planning-enabled": "true"}, "overrides": {}},
status_code=200,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)

assert catalog.is_rest_scan_planning_enabled() is True
24 changes: 12 additions & 12 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1650,18 +1650,18 @@ def test_merge_manifests_file_content(session_catalog: Catalog, arrow_table_with
for i in range(3):
tbl_a_data_file = tbl_a_entries["data_file"][i]
assert tbl_a_data_file["column_sizes"] == [
(1, 49),
(2, 78),
(3, 128),
(4, 94),
(5, 118),
(6, 94),
(7, 118),
(8, 118),
(9, 118),
(10, 94),
(11, 78),
(12, 109),
(1, 51),
(2, 80),
(3, 130),
(4, 96),
(5, 120),
(6, 96),
(7, 120),
(8, 120),
(9, 120),
(10, 96),
(11, 80),
(12, 111),
]
assert tbl_a_data_file["content"] == 0
assert tbl_a_data_file["equality_ids"] is None
Expand Down
6 changes: 6 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,12 @@ def test_bin_pack_arrow_table(arrow_table_with_null: pa.Table) -> None:
assert len(list(bin_packed)) == 5


def test_bin_pack_arrow_table_target_size_smaller_than_row(arrow_table_with_null: pa.Table) -> None:
bin_packed = list(bin_pack_arrow_table(arrow_table_with_null, target_file_size=1))
assert len(bin_packed) == arrow_table_with_null.num_rows
assert sum(batch.num_rows for bin_ in bin_packed for batch in bin_) == arrow_table_with_null.num_rows


def test_schema_mismatch_type(table_schema_simple: Schema) -> None:
other_schema = pa.schema(
(
Expand Down
Loading
Loading