Skip to content

Commit 60f95fb

Browse files
committed
Iterators all the way down
1 parent 9c99f32 commit 60f95fb

File tree

17 files changed

+135
-118
lines changed

17 files changed

+135
-118
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
TYPE_CHECKING,
2929
Callable,
3030
Dict,
31+
Iterator,
3132
List,
3233
Optional,
3334
Set,
@@ -575,42 +576,42 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
575576
"""
576577

577578
@abstractmethod
578-
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
579+
def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
579580
"""List tables under the given namespace in the catalog.
580581
581582
Args:
582583
namespace (str | Identifier): Namespace identifier to search.
583584
584585
Returns:
585-
List[Identifier]: list of table identifiers.
586+
Iterator[Identifier]: iterator of table identifiers.
586587
587588
Raises:
588589
NoSuchNamespaceError: If a namespace with the given name does not exist.
589590
"""
590591

591592
@abstractmethod
592-
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
593+
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]:
593594
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
594595
595596
Args:
596597
namespace (str | Identifier): Namespace identifier to search.
597598
598599
Returns:
599-
List[Identifier]: a List of namespace identifiers.
600+
Iterator[Identifier]: iterator of namespace identifiers.
600601
601602
Raises:
602603
NoSuchNamespaceError: If a namespace with the given name does not exist.
603604
"""
604605

605606
@abstractmethod
606-
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
607+
def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
607608
"""List views under the given namespace in the catalog.
608609
609610
Args:
610611
namespace (str | Identifier): Namespace identifier to search.
611612
612613
Returns:
613-
List[Identifier]: list of table identifiers.
614+
Iterator[Identifier]: iterator of table identifiers.
614615
615616
Raises:
616617
NoSuchNamespaceError: If a namespace with the given name does not exist.

pyiceberg/catalog/dynamodb.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
TYPE_CHECKING,
2121
Any,
2222
Dict,
23-
List,
23+
Iterator,
2424
Optional,
2525
Set,
2626
Tuple,
@@ -385,7 +385,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
385385
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
386386
table_identifiers = self.list_tables(namespace=database_name)
387387

388-
if len(table_identifiers) > 0:
388+
if len(list(table_identifiers)) > 0:
389389
raise NamespaceNotEmptyError(f"Database {database_name} is not empty")
390390

391391
try:
@@ -397,14 +397,14 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
397397
except ConditionalCheckFailedException as e:
398398
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
399399

400-
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
400+
def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
401401
"""List Iceberg tables under the given namespace in the catalog.
402402
403403
Args:
404404
namespace (str | Identifier): Namespace identifier to search.
405405
406406
Returns:
407-
List[Identifier]: list of table identifiers.
407+
Iterator[Identifier]: iterator of table identifiers.
408408
"""
409409
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
410410

@@ -429,29 +429,26 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
429429
) as e:
430430
raise GenericDynamoDbError(e.message) from e
431431

432-
table_identifiers = []
433432
for page in page_iterator:
434433
for item in page["Items"]:
435434
_dict = _convert_dynamo_item_to_regular_dict(item)
436435
identifier_col = _dict[DYNAMODB_COL_IDENTIFIER]
437436
if identifier_col == DYNAMODB_NAMESPACE:
438437
continue
439438

440-
table_identifiers.append(self.identifier_to_tuple(identifier_col))
439+
yield self.identifier_to_tuple(identifier_col)
441440

442-
return table_identifiers
443-
444-
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
441+
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]:
445442
"""List top-level namespaces from the catalog.
446443
447444
We do not support hierarchical namespace.
448445
449446
Returns:
450-
List[Identifier]: a List of namespace identifiers.
447+
Iterator[Identifier]: iterator of namespace identifiers.
451448
"""
452449
# Hierarchical namespace is not supported. Return an empty list
453450
if namespace:
454-
return []
451+
return
455452

456453
paginator = self.dynamodb.get_paginator("query")
457454

@@ -474,14 +471,11 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi
474471
) as e:
475472
raise GenericDynamoDbError(e.message) from e
476473

477-
database_identifiers = []
478474
for page in page_iterator:
479475
for item in page["Items"]:
480476
_dict = _convert_dynamo_item_to_regular_dict(item)
481477
namespace_col = _dict[DYNAMODB_COL_NAMESPACE]
482-
database_identifiers.append(self.identifier_to_tuple(namespace_col))
483-
484-
return database_identifiers
478+
yield self.identifier_to_tuple(namespace_col)
485479

486480
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
487481
"""
@@ -538,7 +532,7 @@ def update_namespace_properties(
538532

539533
return properties_update_summary
540534

541-
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
535+
def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
542536
raise NotImplementedError
543537

544538
def drop_view(self, identifier: Union[str, Identifier]) -> None:

pyiceberg/catalog/glue.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
TYPE_CHECKING,
2121
Any,
2222
Dict,
23+
Iterator,
2324
List,
2425
Optional,
2526
Set,
@@ -96,7 +97,6 @@
9697
from mypy_boto3_glue.type_defs import (
9798
ColumnTypeDef,
9899
DatabaseInputTypeDef,
99-
DatabaseTypeDef,
100100
StorageDescriptorTypeDef,
101101
TableInputTypeDef,
102102
TableTypeDef,
@@ -710,20 +710,19 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
710710
)
711711
self.glue.delete_database(Name=database_name)
712712

713-
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
713+
def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
714714
"""List Iceberg tables under the given namespace in the catalog.
715715
716716
Args:
717717
namespace (str | Identifier): Namespace identifier to search.
718718
719719
Returns:
720-
List[Identifier]: list of table identifiers.
720+
Iterator[Identifier]: iterator of table identifiers.
721721
722722
Raises:
723723
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
724724
"""
725725
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
726-
table_list: List["TableTypeDef"] = []
727726
next_token: Optional[str] = None
728727
try:
729728
while True:
@@ -732,37 +731,36 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
732731
if not next_token
733732
else self.glue.get_tables(DatabaseName=database_name, NextToken=next_token)
734733
)
735-
table_list.extend(table_list_response["TableList"])
734+
for table in table_list_response["TableList"]:
735+
if self.__is_iceberg_table(table):
736+
yield (database_name, table["Name"])
736737
next_token = table_list_response.get("NextToken")
737738
if not next_token:
738739
break
739740

740741
except self.glue.exceptions.EntityNotFoundException as e:
741742
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
742-
return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]
743743

744-
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
744+
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]:
745745
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
746746
747747
Returns:
748-
List[Identifier]: a List of namespace identifiers.
748+
Iterator[Identifier]: iterator of namespace identifiers.
749749
"""
750750
# Hierarchical namespace is not supported. Return an empty list
751751
if namespace:
752-
return []
752+
return
753753

754-
database_list: List["DatabaseTypeDef"] = []
755754
next_token: Optional[str] = None
756755

757756
while True:
758757
databases_response = self.glue.get_databases() if not next_token else self.glue.get_databases(NextToken=next_token)
759-
database_list.extend(databases_response["DatabaseList"])
758+
for database in databases_response["DatabaseList"]:
759+
yield self.identifier_to_tuple(database["Name"])
760760
next_token = databases_response.get("NextToken")
761761
if not next_token:
762762
break
763763

764-
return [self.identifier_to_tuple(database["Name"]) for database in database_list]
765-
766764
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
767765
"""Get properties for a namespace.
768766
@@ -817,7 +815,7 @@ def update_namespace_properties(
817815

818816
return properties_update_summary
819817

820-
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
818+
def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
821819
raise NotImplementedError
822820

823821
def drop_view(self, identifier: Union[str, Identifier]) -> None:

pyiceberg/catalog/hive.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
TYPE_CHECKING,
2424
Any,
2525
Dict,
26+
Iterator,
2627
List,
2728
Optional,
2829
Set,
@@ -469,7 +470,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
469470

470471
return self._convert_hive_into_iceberg(hive_table)
471472

472-
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
473+
def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
473474
raise NotImplementedError
474475

475476
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
@@ -710,7 +711,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
710711
except MetaException as e:
711712
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
712713

713-
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
714+
def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
714715
"""List Iceberg tables under the given namespace in the catalog.
715716
716717
When the database doesn't exist, it will just return an empty list.
@@ -719,33 +720,33 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
719720
namespace: Database to list.
720721
721722
Returns:
722-
List[Identifier]: list of table identifiers.
723+
Iterator[Identifier]: iterator of table identifiers.
723724
724725
Raises:
725726
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
726727
"""
727728
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
728729
with self._client as open_client:
729-
return [
730+
yield from [
730731
(database_name, table.tableName)
731732
for table in open_client.get_table_objects_by_name(
732733
dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
733734
)
734735
if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
735736
]
736737

737-
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
738+
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]:
738739
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
739740
740741
Returns:
741-
List[Identifier]: a List of namespace identifiers.
742+
Iterator[Identifier]: an iterator of namespace identifiers.
742743
"""
743744
# Hierarchical namespace is not supported. Return an empty list
744745
if namespace:
745-
return []
746+
return iter([])
746747

747748
with self._client as open_client:
748-
return list(map(self.identifier_to_tuple, open_client.get_all_databases()))
749+
return iter(map(self.identifier_to_tuple, open_client.get_all_databases()))
749750

750751
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
751752
"""Get properties for a namespace.

pyiceberg/catalog/noop.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
from typing import (
1818
TYPE_CHECKING,
19-
List,
19+
Iterator,
2020
Optional,
2121
Set,
2222
Tuple,
@@ -106,10 +106,10 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
106106
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
107107
raise NotImplementedError
108108

109-
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
109+
def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
110110
raise NotImplementedError
111111

112-
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
112+
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]:
113113
raise NotImplementedError
114114

115115
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
@@ -120,7 +120,7 @@ def update_namespace_properties(
120120
) -> PropertiesUpdateSummary:
121121
raise NotImplementedError
122122

123-
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
123+
def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]:
124124
raise NotImplementedError
125125

126126
def view_exists(self, identifier: Union[str, Identifier]) -> bool:

0 commit comments

Comments
 (0)