Skip to content

Commit 72be520

Browse files
authored
Merge branch 'main' into fix-2544-rest-signer-token
2 parents a99dcad + 2d8397e commit 72be520

File tree

19 files changed

+1317
-196
lines changed

19 files changed

+1317
-196
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ setup-venv: ## Create virtual environment
7070
uv venv $(PYTHON_ARG)
7171

7272
install-dependencies: setup-venv ## Install all dependencies including extras
73-
uv sync $(PYTHON_ARG) --all-extras
73+
uv sync $(PYTHON_ARG) --all-extras --reinstall
7474

7575
install: install-uv install-dependencies ## Install uv and dependencies
7676

pyiceberg/catalog/rest/__init__.py

Lines changed: 160 additions & 3 deletions
Large diffs are not rendered by default.
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from datetime import date, datetime, time
20+
from decimal import Decimal
21+
from typing import Annotated, Generic, Literal, TypeAlias, TypeVar
22+
from uuid import UUID
23+
24+
from pydantic import Field, model_validator
25+
26+
from pyiceberg.catalog.rest.response import ErrorResponseMessage
27+
from pyiceberg.expressions import BooleanExpression, SerializableBooleanExpression
28+
from pyiceberg.manifest import FileFormat
29+
from pyiceberg.typedef import IcebergBaseModel
30+
31+
# Primitive types that can appear in partition values and bounds
32+
PrimitiveTypeValue: TypeAlias = bool | int | float | str | Decimal | UUID | date | time | datetime | bytes
33+
34+
V = TypeVar("V")
35+
36+
37+
class KeyValueMap(IcebergBaseModel, Generic[V]):
38+
"""Map serialized as parallel key/value arrays for column statistics."""
39+
40+
keys: list[int] = Field(default_factory=list)
41+
values: list[V] = Field(default_factory=list)
42+
43+
@model_validator(mode="after")
44+
def _validate_lengths_match(self) -> KeyValueMap[V]:
45+
if len(self.keys) != len(self.values):
46+
raise ValueError(f"keys and values must have same length: {len(self.keys)} != {len(self.values)}")
47+
return self
48+
49+
def to_dict(self) -> dict[int, V]:
50+
"""Convert to dictionary mapping field ID to value."""
51+
return dict(zip(self.keys, self.values, strict=True))
52+
53+
54+
class CountMap(KeyValueMap[int]):
55+
"""Map of field IDs to counts."""
56+
57+
58+
class ValueMap(KeyValueMap[PrimitiveTypeValue]):
59+
"""Map of field IDs to primitive values (for lower/upper bounds)."""
60+
61+
62+
class StorageCredential(IcebergBaseModel):
63+
"""Storage credential for accessing content files."""
64+
65+
prefix: str = Field(description="Storage location prefix this credential applies to")
66+
config: dict[str, str] = Field(default_factory=dict)
67+
68+
69+
class RESTContentFile(IcebergBaseModel):
70+
"""Base model for data and delete files from REST API."""
71+
72+
spec_id: int = Field(alias="spec-id")
73+
partition: list[PrimitiveTypeValue] = Field(default_factory=list)
74+
content: Literal["data", "position-deletes", "equality-deletes"]
75+
file_path: str = Field(alias="file-path")
76+
file_format: FileFormat = Field(alias="file-format")
77+
file_size_in_bytes: int = Field(alias="file-size-in-bytes")
78+
record_count: int = Field(alias="record-count")
79+
key_metadata: str | None = Field(alias="key-metadata", default=None)
80+
split_offsets: list[int] | None = Field(alias="split-offsets", default=None)
81+
sort_order_id: int | None = Field(alias="sort-order-id", default=None)
82+
83+
84+
class RESTDataFile(RESTContentFile):
85+
"""Data file from REST API."""
86+
87+
content: Literal["data"] = Field(default="data")
88+
first_row_id: int | None = Field(alias="first-row-id", default=None)
89+
column_sizes: CountMap | None = Field(alias="column-sizes", default=None)
90+
value_counts: CountMap | None = Field(alias="value-counts", default=None)
91+
null_value_counts: CountMap | None = Field(alias="null-value-counts", default=None)
92+
nan_value_counts: CountMap | None = Field(alias="nan-value-counts", default=None)
93+
lower_bounds: ValueMap | None = Field(alias="lower-bounds", default=None)
94+
upper_bounds: ValueMap | None = Field(alias="upper-bounds", default=None)
95+
96+
97+
class RESTPositionDeleteFile(RESTContentFile):
98+
"""Position delete file from REST API."""
99+
100+
content: Literal["position-deletes"] = Field(default="position-deletes")
101+
referenced_data_file: str | None = Field(alias="referenced-data-file", default=None)
102+
content_offset: int | None = Field(alias="content-offset", default=None)
103+
content_size_in_bytes: int | None = Field(alias="content-size-in-bytes", default=None)
104+
105+
106+
class RESTEqualityDeleteFile(RESTContentFile):
107+
"""Equality delete file from REST API."""
108+
109+
content: Literal["equality-deletes"] = Field(default="equality-deletes")
110+
equality_ids: list[int] | None = Field(alias="equality-ids", default=None)
111+
112+
113+
# Discriminated union for delete files
114+
RESTDeleteFile = Annotated[
115+
RESTPositionDeleteFile | RESTEqualityDeleteFile,
116+
Field(discriminator="content"),
117+
]
118+
119+
120+
class RESTFileScanTask(IcebergBaseModel):
121+
"""A file scan task from the REST server."""
122+
123+
data_file: RESTDataFile = Field(alias="data-file")
124+
delete_file_references: list[int] | None = Field(alias="delete-file-references", default=None)
125+
residual_filter: BooleanExpression | None = Field(alias="residual-filter", default=None)
126+
127+
128+
class ScanTasks(IcebergBaseModel):
129+
"""Container for scan tasks returned by the server."""
130+
131+
delete_files: list[RESTDeleteFile] = Field(alias="delete-files", default_factory=list)
132+
file_scan_tasks: list[RESTFileScanTask] = Field(alias="file-scan-tasks", default_factory=list)
133+
plan_tasks: list[str] = Field(alias="plan-tasks", default_factory=list)
134+
135+
@model_validator(mode="after")
136+
def _validate_delete_file_references(self) -> ScanTasks:
137+
# validate delete file references are in bounds
138+
max_idx = len(self.delete_files) - 1
139+
for task in self.file_scan_tasks:
140+
for idx in task.delete_file_references or []:
141+
if idx < 0 or idx > max_idx:
142+
raise ValueError(f"Invalid delete file reference: {idx} (valid range: 0-{max_idx})")
143+
144+
if self.delete_files and not self.file_scan_tasks:
145+
raise ValueError("Invalid response: deleteFiles should only be returned with fileScanTasks that reference them")
146+
147+
return self
148+
149+
150+
class PlanCompleted(ScanTasks):
151+
"""Completed scan plan result."""
152+
153+
status: Literal["completed"] = "completed"
154+
plan_id: str | None = Field(alias="plan-id", default=None)
155+
storage_credentials: list[StorageCredential] | None = Field(alias="storage-credentials", default=None)
156+
157+
158+
class PlanSubmitted(IcebergBaseModel):
159+
"""Scan plan submitted, poll for completion."""
160+
161+
status: Literal["submitted"] = "submitted"
162+
plan_id: str | None = Field(alias="plan-id", default=None)
163+
164+
165+
class PlanCancelled(IcebergBaseModel):
166+
"""Planning was cancelled."""
167+
168+
status: Literal["cancelled"] = "cancelled"
169+
170+
171+
class PlanFailed(IcebergBaseModel):
172+
"""Planning failed with error."""
173+
174+
status: Literal["failed"] = "failed"
175+
error: ErrorResponseMessage
176+
177+
178+
PlanningResponse = Annotated[
179+
PlanCompleted | PlanSubmitted | PlanCancelled | PlanFailed,
180+
Field(discriminator="status"),
181+
]
182+
183+
184+
class PlanTableScanRequest(IcebergBaseModel):
185+
"""Request body for planning a REST scan."""
186+
187+
snapshot_id: int | None = Field(alias="snapshot-id", default=None)
188+
select: list[str] | None = Field(default=None)
189+
filter: SerializableBooleanExpression | None = Field(default=None)
190+
case_sensitive: bool = Field(alias="case-sensitive", default=True)
191+
use_snapshot_schema: bool = Field(alias="use-snapshot-schema", default=False)
192+
start_snapshot_id: int | None = Field(alias="start-snapshot-id", default=None)
193+
end_snapshot_id: int | None = Field(alias="end-snapshot-id", default=None)
194+
stats_fields: list[str] | None = Field(alias="stats-fields", default=None)
195+
min_rows_requested: int | None = Field(alias="min-rows-requested", default=None)
196+
197+
@model_validator(mode="after")
198+
def _validate_snapshot_fields(self) -> PlanTableScanRequest:
199+
if self.start_snapshot_id is not None and self.end_snapshot_id is None:
200+
raise ValueError("end-snapshot-id is required when start-snapshot-id is specified")
201+
if self.snapshot_id is not None and self.start_snapshot_id is not None:
202+
raise ValueError("Cannot specify both snapshot-id and start-snapshot-id")
203+
return self
204+
205+
206+
class FetchScanTasksRequest(IcebergBaseModel):
207+
"""Request body for fetching scan tasks endpoint."""
208+
209+
plan_task: str = Field(alias="plan-task")

pyiceberg/expressions/parser.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
from pyiceberg.typedef import L
7070
from pyiceberg.types import strtobool
7171

72-
ParserElement.enablePackrat()
72+
ParserElement.enable_packrat()
7373

7474
AND = CaselessKeyword("and")
7575
OR = CaselessKeyword("or")
@@ -82,7 +82,7 @@
8282
BETWEEN = CaselessKeyword("between")
8383

8484
unquoted_identifier = Word(alphas + "_", alphanums + "_$")
85-
quoted_identifier = QuotedString('"', escChar="\\", unquoteResults=True)
85+
quoted_identifier = QuotedString('"', esc_quote="\\", unquote_results=True)
8686

8787

8888
@quoted_identifier.set_parse_action

pyiceberg/io/pyarrow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2681,7 +2681,7 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[
26812681
from pyiceberg.utils.bin_packing import PackingIterator
26822682

26832683
avg_row_size_bytes = tbl.nbytes / tbl.num_rows
2684-
target_rows_per_file = target_file_size // avg_row_size_bytes
2684+
target_rows_per_file = max(1, int(target_file_size / avg_row_size_bytes))
26852685
batches = tbl.to_batches(max_chunksize=target_rows_per_file)
26862686
bin_packed_record_batches = PackingIterator(
26872687
items=batches,

pyiceberg/table/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,7 @@ def upsert(
734734
when_not_matched_insert_all: bool = True,
735735
case_sensitive: bool = True,
736736
branch: str | None = MAIN_BRANCH,
737+
snapshot_properties: dict[str, str] = EMPTY_DICT,
737738
) -> UpsertResult:
738739
"""Shorthand API for performing an upsert to an iceberg table.
739740
@@ -745,6 +746,7 @@ def upsert(
745746
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
746747
case_sensitive: Bool indicating if the match should be case-sensitive
747748
branch: Branch Reference to run the upsert operation
749+
snapshot_properties: Custom properties to be added to the snapshot summary
748750
749751
To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
750752
@@ -861,12 +863,13 @@ def upsert(
861863
rows_to_update,
862864
overwrite_filter=Or(*overwrite_predicates) if len(overwrite_predicates) > 1 else overwrite_predicates[0],
863865
branch=branch,
866+
snapshot_properties=snapshot_properties,
864867
)
865868

866869
if when_not_matched_insert_all:
867870
insert_row_cnt = len(rows_to_insert)
868871
if rows_to_insert:
869-
self.append(rows_to_insert, branch=branch)
872+
self.append(rows_to_insert, branch=branch, snapshot_properties=snapshot_properties)
870873

871874
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
872875

@@ -1327,6 +1330,7 @@ def upsert(
13271330
when_not_matched_insert_all: bool = True,
13281331
case_sensitive: bool = True,
13291332
branch: str | None = MAIN_BRANCH,
1333+
snapshot_properties: dict[str, str] = EMPTY_DICT,
13301334
) -> UpsertResult:
13311335
"""Shorthand API for performing an upsert to an iceberg table.
13321336
@@ -1338,6 +1342,7 @@ def upsert(
13381342
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
13391343
case_sensitive: Bool indicating if the match should be case-sensitive
13401344
branch: Branch Reference to run the upsert operation
1345+
snapshot_properties: Custom properties to be added to the snapshot summary
13411346
13421347
To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
13431348
@@ -1371,6 +1376,7 @@ def upsert(
13711376
when_not_matched_insert_all=when_not_matched_insert_all,
13721377
case_sensitive=case_sensitive,
13731378
branch=branch,
1379+
snapshot_properties=snapshot_properties,
13741380
)
13751381

13761382
def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:

pyiceberg/table/inspect.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,9 @@ def partitions(
285285
]
286286
)
287287

288-
partition_record = self.tbl.metadata.specs_struct()
288+
snapshot = self._get_snapshot(snapshot_id)
289+
spec_ids = {manifest.partition_spec_id for manifest in snapshot.manifests(self.tbl.io)}
290+
partition_record = self.tbl.metadata.specs_struct(spec_ids=spec_ids)
289291
has_partitions = len(partition_record.fields) > 0
290292

291293
if has_partitions:
@@ -299,8 +301,6 @@ def partitions(
299301

300302
table_schema = pa.unify_schemas([partitions_schema, table_schema])
301303

302-
snapshot = self._get_snapshot(snapshot_id)
303-
304304
scan = DataScan(
305305
table_metadata=self.tbl.metadata,
306306
io=self.tbl.io,

pyiceberg/table/metadata.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import datetime
2020
import uuid
21+
from collections.abc import Iterable
2122
from copy import copy
2223
from typing import Annotated, Any, Literal
2324

@@ -262,18 +263,23 @@ def specs(self) -> dict[int, PartitionSpec]:
262263
"""Return a dict the partition specs this table."""
263264
return {spec.spec_id: spec for spec in self.partition_specs}
264265

265-
def specs_struct(self) -> StructType:
266-
"""Produce a struct of all the combined PartitionSpecs.
266+
def specs_struct(self, spec_ids: Iterable[int] | None = None) -> StructType:
267+
"""Produce a struct of the combined PartitionSpecs.
267268
268269
The partition fields should be optional: Partition fields may be added later,
269270
in which case not all files would have the result field, and it may be null.
270271
271-
:return: A StructType that represents all the combined PartitionSpecs of the table
272+
Args:
273+
spec_ids: Optional iterable of spec IDs to include. When not provided,
274+
all table specs are used.
275+
276+
:return: A StructType that represents the combined PartitionSpecs of the table
272277
"""
273278
specs = self.specs()
279+
selected_specs = specs.values() if spec_ids is None else [specs[spec_id] for spec_id in spec_ids if spec_id in specs]
274280

275281
# Collect all the fields
276-
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}
282+
struct_fields = {field.field_id: field for spec in selected_specs for field in spec.fields}
277283

278284
schema = self.schema()
279285

pyiceberg/table/update/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,15 @@ class SetStatisticsUpdate(IcebergBaseModel):
181181

182182
@model_validator(mode="before")
183183
def validate_snapshot_id(cls, data: dict[str, Any]) -> dict[str, Any]:
184-
stats = cast(StatisticsFile, data["statistics"])
185-
186-
data["snapshot_id"] = stats.snapshot_id
184+
stats = data["statistics"]
185+
if isinstance(stats, StatisticsFile):
186+
snapshot_id = stats.snapshot_id
187+
elif isinstance(stats, dict):
188+
snapshot_id = cast(int, stats.get("snapshot-id"))
189+
else:
190+
snapshot_id = None
191+
192+
data["snapshot_id"] = snapshot_id
187193

188194
return data
189195

pyiceberg/table/update/snapshot.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,7 @@ def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: int | None
880880
update, requirement = self._transaction._set_ref_snapshot(
881881
snapshot_id=snapshot_id,
882882
ref_name=tag_name,
883-
type="tag",
883+
type=SnapshotRefType.TAG,
884884
max_ref_age_ms=max_ref_age_ms,
885885
)
886886
self._updates += update
@@ -921,7 +921,7 @@ def create_branch(
921921
update, requirement = self._transaction._set_ref_snapshot(
922922
snapshot_id=snapshot_id,
923923
ref_name=branch_name,
924-
type="branch",
924+
type=SnapshotRefType.BRANCH,
925925
max_ref_age_ms=max_ref_age_ms,
926926
max_snapshot_age_ms=max_snapshot_age_ms,
927927
min_snapshots_to_keep=min_snapshots_to_keep,

0 commit comments

Comments
 (0)