Skip to content

Commit 49df2ad

Browse files
authored
Merge branch 'main' into fd-1-11
2 parents ab93c5d + 75cd77f commit 49df2ad

51 files changed

Lines changed: 2639 additions & 355 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/codeql.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ jobs:
4646
persist-credentials: false
4747

4848
- name: Initialize CodeQL
49-
uses: github/codeql-action/init@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
49+
uses: github/codeql-action/init@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
5050
with:
5151
languages: actions
5252

5353
- name: Perform CodeQL Analysis
54-
uses: github/codeql-action/analyze@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
54+
uses: github/codeql-action/analyze@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
5555
with:
5656
category: "/language:actions"

.github/workflows/pypi-build-artifacts.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ jobs:
8989
if: matrix.os == 'ubuntu-latest'
9090
run: ls -lah dist/* && cp dist/* wheelhouse/
9191

92-
- uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
92+
- uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
9393
with:
9494
name: "pypi-release-candidate-${{ matrix.os }}"
9595
path: ./wheelhouse/*

.github/workflows/python-ci.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ jobs:
101101
if: ${{ failure() }}
102102
run: docker compose -f dev/docker-compose-integration.yml logs
103103
- name: Upload coverage data
104-
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
104+
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
105105
with:
106106
name: coverage-integration
107107
path: .coverage*
@@ -130,7 +130,7 @@ jobs:
130130
if: ${{ failure() }}
131131
run: docker compose -f dev/docker-compose.yml logs
132132
- name: Upload coverage data
133-
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
133+
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
134134
with:
135135
name: coverage-s3
136136
path: .coverage*
@@ -159,7 +159,7 @@ jobs:
159159
if: ${{ failure() }}
160160
run: docker compose -f dev/docker-compose-azurite.yml logs
161161
- name: Upload coverage data
162-
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
162+
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
163163
with:
164164
name: coverage-adls
165165
path: .coverage*
@@ -188,7 +188,7 @@ jobs:
188188
if: ${{ failure() }}
189189
run: docker compose -f dev/docker-compose-gcs-server.yml logs
190190
- name: Upload coverage data
191-
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
191+
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
192192
with:
193193
name: coverage-gcs
194194
path: .coverage*

.github/workflows/svn-build-artifacts.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ jobs:
8080
if: matrix.os == 'ubuntu-latest'
8181
run: ls -lah dist/* && cp dist/* wheelhouse/
8282

83-
- uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
83+
- uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
8484
with:
8585
name: "svn-release-candidate-${{ matrix.os }}"
8686
path: ./wheelhouse/*

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
.PHONY: help install install-uv check-license lint \
1818
test test-integration test-integration-setup test-integration-exec test-integration-cleanup test-integration-rebuild \
19-
test-s3 test-adls test-gcs test-coverage coverage-report \
19+
test-s3 test-adls test-gcs test-coverage coverage-report test test-notebook\
2020
docs-serve docs-build notebook notebook-infra \
2121
clean
2222

@@ -150,6 +150,9 @@ coverage-report: ## Combine and report coverage
150150
uv run $(PYTHON_ARG) coverage html
151151
uv run $(PYTHON_ARG) coverage xml
152152

153+
test-notebook: ## Run notebook tests (pyiceberg_example and spark_integration_example) via papermill
154+
$(TEST_RUNNER) pytest tests/notebooks/test_pyiceberg_example.py tests/notebooks/test_spark_integration_example.py -m notebook $(PYTEST_ARGS)
155+
153156
# ================
154157
# Documentation
155158
# ================

mkdocs/docs/api.md

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,17 @@ for buf in tbl.scan().to_arrow_batch_reader():
365365
print(f"Buffer contains {len(buf)} rows")
366366
```
367367

368+
### Streaming writes from a `RecordBatchReader`
369+
370+
`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` directly, which lets you write datasets that don't fit in memory without materialising them as a `pa.Table` first. PyIceberg consumes the reader once and microbatches it into Parquet files of approximately `write.target-file-size-bytes` (default 512 MiB), keeping memory usage bounded by the target size. All files are committed in a single snapshot.
371+
372+
```python
373+
reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
374+
tbl.append(reader)
375+
```
376+
377+
Streaming writes are currently only supported on **unpartitioned** tables. For a partitioned table, materialise the reader as a `pa.Table` first, or follow [#2152](https://github.com/apache/iceberg-python/issues/2152) for the partitioned support tracked as a follow-up.
378+
368379
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
369380

370381
```python
@@ -425,7 +436,7 @@ You can overwrite the record of `Paris` with a record of `New York`:
425436
from pyiceberg.expressions import EqualTo
426437
df = pa.Table.from_pylist(
427438
[
428-
{"city": "New York", "lat": 40.7128, "long": 74.0060},
439+
{"city": "New York", "lat": 40.7128, "long": -74.0060},
429440
]
430441
)
431442
tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris"))
@@ -441,7 +452,7 @@ long: double
441452
----
442453
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
443454
lat: [[40.7128],[52.371807,37.773972,53.11254]]
444-
long: [[74.006],[4.896029,-122.431297,6.0989]]
455+
long: [[-74.006],[4.896029,-122.431297,6.0989]]
445456
```
446457

447458
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table.
@@ -1529,6 +1540,17 @@ catalog = load_catalog("default")
15291540
catalog.view_exists("default.bar")
15301541
```
15311542

1543+
## Register a view
1544+
1545+
To register a view using existing metadata:
1546+
1547+
```python
1548+
catalog.register_view(
1549+
identifier="docs_example.bids",
1550+
metadata_location="s3://warehouse/path/to/metadata.json"
1551+
)
1552+
```
1553+
15321554
## Table Statistics Management
15331555

15341556
Manage table statistics with operations through the `Table` API:

pyiceberg/catalog/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
cast,
3232
)
3333

34+
from typing_extensions import override
35+
3436
from pyiceberg.exceptions import (
3537
NamespaceAlreadyExistsError,
3638
NoSuchNamespaceError,
@@ -690,6 +692,22 @@ def update_namespace_properties(
690692
ValueError: If removals and updates have overlapping keys.
691693
"""
692694

695+
@abstractmethod
696+
def register_view(self, identifier: str | Identifier, metadata_location: str) -> View:
697+
"""Register a new view using existing metadata.
698+
699+
Args:
700+
identifier (Union[str, Identifier]): View identifier for the view
701+
metadata_location (str): The location to the metadata
702+
703+
Returns:
704+
View: The newly registered view
705+
706+
Raises:
707+
ViewAlreadyExistsError: If the view already exists.
708+
TableAlreadyExistsError: If a table with the same name already exists.
709+
"""
710+
693711
@abstractmethod
694712
def drop_view(self, identifier: str | Identifier) -> None:
695713
"""Drop a view.
@@ -888,9 +906,11 @@ class MetastoreCatalog(Catalog, ABC):
888906
def __init__(self, name: str, **properties: str):
889907
super().__init__(name, **properties)
890908

909+
@override
891910
def supports_server_side_planning(self) -> bool:
892911
return False
893912

913+
@override
894914
def create_table_transaction(
895915
self,
896916
identifier: str | Identifier,
@@ -904,13 +924,15 @@ def create_table_transaction(
904924
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
905925
)
906926

927+
@override
907928
def table_exists(self, identifier: str | Identifier) -> bool:
908929
try:
909930
self.load_table(identifier)
910931
return True
911932
except NoSuchTableError:
912933
return False
913934

935+
@override
914936
def namespace_exists(self, namespace: str | Identifier) -> bool:
915937
"""Check if a namespace exists.
916938
@@ -926,6 +948,7 @@ def namespace_exists(self, namespace: str | Identifier) -> bool:
926948
except NoSuchNamespaceError:
927949
return False
928950

951+
@override
929952
def purge_table(self, identifier: str | Identifier) -> None:
930953
table = self.load_table(identifier)
931954
self.drop_table(identifier)
@@ -946,6 +969,7 @@ def purge_table(self, identifier: str | Identifier) -> None:
946969
delete_files(io, prev_metadata_files, PREVIOUS_METADATA)
947970
delete_files(io, {table.metadata_location}, METADATA)
948971

972+
@override
949973
def create_view(
950974
self,
951975
identifier: str | Identifier,

pyiceberg/catalog/bigquery_metastore.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from google.cloud.bigquery.schema import SerDeInfo, StorageDescriptor
2727
from google.cloud.exceptions import Conflict
2828
from google.oauth2 import service_account
29+
from typing_extensions import override
2930

3031
from pyiceberg.catalog import WAREHOUSE_LOCATION, MetastoreCatalog, PropertiesUpdateSummary
3132
from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError
@@ -101,6 +102,7 @@ def __init__(self, name: str, **properties: str):
101102
self.location = location
102103
self.project_id = project_id
103104

105+
@override
104106
def create_table(
105107
self,
106108
identifier: str | Identifier,
@@ -156,6 +158,7 @@ def create_table(
156158

157159
return self.load_table(identifier=identifier)
158160

161+
@override
159162
def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None:
160163
"""Create a namespace in the catalog.
161164
@@ -177,6 +180,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties =
177180
except Conflict as e:
178181
raise NamespaceAlreadyExistsError("Namespace {database_name} already exists") from e
179182

183+
@override
180184
def load_table(self, identifier: str | Identifier) -> Table:
181185
"""
182186
Load the table's metadata and returns the table instance.
@@ -205,6 +209,7 @@ def load_table(self, identifier: str | Identifier) -> Table:
205209
except NotFound as e:
206210
raise NoSuchTableError(f"Table does not exist: {dataset_name}.{table_name}") from e
207211

212+
@override
208213
def drop_table(self, identifier: str | Identifier) -> None:
209214
"""Drop a table.
210215
@@ -225,14 +230,17 @@ def drop_table(self, identifier: str | Identifier) -> None:
225230
except NoSuchTableError as e:
226231
raise NoSuchTableError(f"Table does not exist: {dataset_name}.{table_name}") from e
227232

233+
@override
228234
def commit_table(
229235
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
230236
) -> CommitTableResponse:
231237
raise NotImplementedError
232238

239+
@override
233240
def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table:
234241
raise NotImplementedError
235242

243+
@override
236244
def drop_namespace(self, namespace: str | Identifier) -> None:
237245
database_name = self.identifier_to_database(namespace)
238246

@@ -243,6 +251,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
243251
except NotFound as e:
244252
raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e
245253

254+
@override
246255
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
247256
database_name = self.identifier_to_database(namespace)
248257
iceberg_tables: list[Identifier] = []
@@ -257,6 +266,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
257266
raise NoSuchNamespaceError(f"Namespace (dataset) '{database_name}' not found.") from None
258267
return iceberg_tables
259268

269+
@override
260270
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
261271
# Since this catalog only supports one-level namespaces, it always returns an empty list unless
262272
# passed an empty namespace to list all namespaces within the catalog.
@@ -267,6 +277,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
267277
datasets_iterator = self.client.list_datasets()
268278
return [(dataset.dataset_id,) for dataset in datasets_iterator]
269279

280+
@override
270281
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
271282
"""Register a new table using existing metadata.
272283
@@ -302,18 +313,27 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
302313

303314
return self.load_table(identifier=identifier)
304315

316+
@override
305317
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
306318
raise NotImplementedError
307319

320+
@override
321+
def register_view(self, identifier: str | Identifier, metadata_location: str) -> View:
322+
raise NotImplementedError
323+
324+
@override
308325
def drop_view(self, identifier: str | Identifier) -> None:
309326
raise NotImplementedError
310327

328+
@override
311329
def view_exists(self, identifier: str | Identifier) -> bool:
312330
raise NotImplementedError
313331

332+
@override
314333
def load_view(self, identifier: str | Identifier) -> View:
315334
raise NotImplementedError
316335

336+
@override
317337
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
318338
dataset_name = self.identifier_to_database(namespace)
319339

@@ -326,6 +346,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
326346
raise NoSuchNamespaceError(f"Namespace {namespace} not found") from e
327347
return {}
328348

349+
@override
329350
def update_namespace_properties(
330351
self, namespace: str | Identifier, removals: set[str] | None = None, updates: Properties = EMPTY_DICT
331352
) -> PropertiesUpdateSummary:

0 commit comments

Comments
 (0)