Skip to content

Commit 4b42e40

Browse files
Gayathri Srividya RajavarapuGayathri Srividya Rajavarapu
authored andcommitted
fix: filter iceberg_type in all SqlCatalog table operations
1 parent 6da06ad commit 4b42e40

2 files changed

Lines changed: 108 additions & 1 deletion

File tree

pyiceberg/catalog/sql.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
create_engine,
2727
delete,
2828
insert,
29+
or_,
2930
select,
31+
text,
3032
union,
3133
update,
3234
)
@@ -85,6 +87,9 @@ class SqlCatalogBaseTable(MappedAsDataclass, DeclarativeBase):
8587
pass
8688

8789

90+
ICEBERG_TABLE_TYPE = "TABLE"
91+
92+
8893
class IcebergTables(SqlCatalogBaseTable):
8994
__tablename__ = "iceberg_tables"
9095

@@ -93,6 +98,7 @@ class IcebergTables(SqlCatalogBaseTable):
9398
table_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
9499
metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True)
95100
previous_metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True)
101+
iceberg_type: Mapped[str | None] = mapped_column(String(255), nullable=True)
96102

97103

98104
class IcebergNamespaceProperties(SqlCatalogBaseTable):
@@ -147,6 +153,17 @@ def _ensure_tables_exist(self) -> None:
147153
self.create_tables()
148154
return
149155

156+
# Idempotently add iceberg_type column if it does not exist yet
157+
# (backward-compatible migration for databases created before this column was introduced).
158+
# Older SQLite versions do not support "ADD COLUMN IF NOT EXISTS", so we catch and ignore
159+
# the error if the column is already present.
160+
with Session(self.engine) as session:
161+
try:
162+
session.execute(text("ALTER TABLE iceberg_tables ADD COLUMN iceberg_type VARCHAR(255)"))
163+
session.commit()
164+
except (OperationalError, ProgrammingError):
165+
pass # column already exists
166+
150167
def create_tables(self) -> None:
151168
SqlCatalogBaseTable.metadata.create_all(self.engine)
152169

@@ -231,6 +248,7 @@ def create_table(
231248
table_name=table_name,
232249
metadata_location=metadata_location,
233250
previous_metadata_location=None,
251+
iceberg_type=ICEBERG_TABLE_TYPE,
234252
)
235253
)
236254
session.commit()
@@ -273,6 +291,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
273291
table_name=table_name,
274292
metadata_location=metadata_location,
275293
previous_metadata_location=None,
294+
iceberg_type=ICEBERG_TABLE_TYPE,
276295
)
277296
)
278297
session.commit()
@@ -305,6 +324,7 @@ def load_table(self, identifier: str | Identifier) -> Table:
305324
IcebergTables.catalog_name == self.name,
306325
IcebergTables.table_namespace == namespace,
307326
IcebergTables.table_name == table_name,
327+
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
308328
)
309329
result = session.scalar(stmt)
310330
if result:
@@ -331,6 +351,7 @@ def drop_table(self, identifier: str | Identifier) -> None:
331351
IcebergTables.catalog_name == self.name,
332352
IcebergTables.table_namespace == namespace,
333353
IcebergTables.table_name == table_name,
354+
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
334355
)
335356
)
336357
if res.rowcount < 1:
@@ -344,6 +365,7 @@ def drop_table(self, identifier: str | Identifier) -> None:
344365
IcebergTables.catalog_name == self.name,
345366
IcebergTables.table_namespace == namespace,
346367
IcebergTables.table_name == table_name,
368+
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
347369
)
348370
.one()
349371
)
@@ -385,6 +407,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I
385407
IcebergTables.catalog_name == self.name,
386408
IcebergTables.table_namespace == from_namespace,
387409
IcebergTables.table_name == from_table_name,
410+
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
388411
)
389412
.values(table_namespace=to_namespace, table_name=to_table_name)
390413
)
@@ -400,6 +423,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I
400423
IcebergTables.catalog_name == self.name,
401424
IcebergTables.table_namespace == from_namespace,
402425
IcebergTables.table_name == from_table_name,
426+
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
403427
)
404428
.one()
405429
)
@@ -462,6 +486,7 @@ def commit_table(
462486
IcebergTables.table_namespace == namespace,
463487
IcebergTables.table_name == table_name,
464488
IcebergTables.metadata_location == current_table.metadata_location,
489+
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
465490
)
466491
.values(
467492
metadata_location=updated_staged_table.metadata_location,
@@ -481,6 +506,7 @@ def commit_table(
481506
IcebergTables.table_namespace == namespace,
482507
IcebergTables.table_name == table_name,
483508
IcebergTables.metadata_location == current_table.metadata_location,
509+
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
484510
)
485511
.one()
486512
)
@@ -499,6 +525,7 @@ def commit_table(
499525
table_name=table_name,
500526
metadata_location=updated_staged_table.metadata_location,
501527
previous_metadata_location=None,
528+
iceberg_type=ICEBERG_TABLE_TYPE,
502529
)
503530
)
504531
session.commit()
@@ -615,7 +642,11 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
615642
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")
616643

617644
namespace = Catalog.namespace_to_string(namespace)
618-
stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace)
645+
stmt = select(IcebergTables).where(
646+
IcebergTables.catalog_name == self.name,
647+
IcebergTables.table_namespace == namespace,
648+
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
649+
)
619650
with Session(self.engine) as session:
620651
result = session.scalars(stmt)
621652
return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result]

tests/catalog/test_sql.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from pyiceberg.catalog.sql import (
2828
DEFAULT_ECHO_VALUE,
2929
DEFAULT_POOL_PRE_PING_VALUE,
30+
ICEBERG_TABLE_TYPE,
3031
IcebergTables,
3132
SqlCatalog,
3233
SqlCatalogBaseTable,
@@ -261,3 +262,78 @@ def test_sql_catalog_multiple_close_calls(self, catalog_sqlite: SqlCatalog) -> N
261262

262263
# Second close should not raise an exception
263264
catalog_sqlite.close()
265+
266+
267+
class TestIcebergTypeFilter:
268+
"""Verify that table operations filter on iceberg_type and ignore view rows."""
269+
270+
def _insert_view_row(self, catalog: SqlCatalog, namespace: str, name: str) -> None:
271+
"""Directly insert a row with iceberg_type='VIEW' to simulate a view written by another engine."""
272+
from sqlalchemy.orm import Session
273+
274+
with Session(catalog.engine) as session:
275+
session.add(
276+
IcebergTables(
277+
catalog_name=catalog.name,
278+
table_namespace=namespace,
279+
table_name=name,
280+
metadata_location=None,
281+
previous_metadata_location=None,
282+
iceberg_type="VIEW",
283+
)
284+
)
285+
session.commit()
286+
287+
def test_iceberg_type_set_on_create(self, catalog_memory: SqlCatalog) -> None:
288+
"""Tables created by SqlCatalog should have iceberg_type='TABLE'."""
289+
from sqlalchemy import select
290+
from sqlalchemy.orm import Session
291+
292+
catalog_memory.create_namespace("iceberg_type_ns")
293+
schema = Schema(NestedField(1, "id", StringType(), required=True))
294+
catalog_memory.create_table("iceberg_type_ns.tbl_type_check", schema)
295+
296+
with Session(catalog_memory.engine) as session:
297+
row = session.scalar(
298+
select(IcebergTables).where(
299+
IcebergTables.catalog_name == catalog_memory.name,
300+
IcebergTables.table_namespace == "iceberg_type_ns",
301+
IcebergTables.table_name == "tbl_type_check",
302+
)
303+
)
304+
assert row is not None
305+
assert row.iceberg_type == ICEBERG_TABLE_TYPE
306+
307+
def test_list_tables_excludes_view_rows(self, catalog_memory: SqlCatalog) -> None:
308+
"""list_tables must not return rows with iceberg_type='VIEW'."""
309+
catalog_memory.create_namespace("view_filter_ns")
310+
self._insert_view_row(catalog_memory, "view_filter_ns", "my_view")
311+
tables = catalog_memory.list_tables("view_filter_ns")
312+
assert ("view_filter_ns", "my_view") not in tables
313+
314+
def test_load_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None:
315+
"""load_table must raise NoSuchTableError for rows with iceberg_type='VIEW'."""
316+
from pyiceberg.exceptions import NoSuchTableError
317+
318+
catalog_memory.create_namespace("load_view_ns")
319+
self._insert_view_row(catalog_memory, "load_view_ns", "a_view")
320+
with pytest.raises(NoSuchTableError):
321+
catalog_memory.load_table("load_view_ns.a_view")
322+
323+
def test_drop_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None:
324+
"""drop_table must raise NoSuchTableError for rows with iceberg_type='VIEW'."""
325+
from pyiceberg.exceptions import NoSuchTableError
326+
327+
catalog_memory.create_namespace("drop_view_ns")
328+
self._insert_view_row(catalog_memory, "drop_view_ns", "droppable_view")
329+
with pytest.raises(NoSuchTableError):
330+
catalog_memory.drop_table("drop_view_ns.droppable_view")
331+
332+
def test_rename_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None:
333+
"""rename_table must raise NoSuchTableError for rows with iceberg_type='VIEW'."""
334+
from pyiceberg.exceptions import NoSuchTableError
335+
336+
catalog_memory.create_namespace("rename_view_ns")
337+
self._insert_view_row(catalog_memory, "rename_view_ns", "renamed_view")
338+
with pytest.raises(NoSuchTableError):
339+
catalog_memory.rename_table("rename_view_ns.renamed_view", "rename_view_ns.new_name")

0 commit comments

Comments
 (0)