Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dataclasses import dataclass
from enum import Enum
from typing import (
TYPE_CHECKING,
Callable,
Dict,
List,
Expand Down Expand Up @@ -56,6 +57,9 @@
)
from pyiceberg.utils.config import Config, merge_config

if TYPE_CHECKING:
import pyarrow as pa

logger = logging.getLogger(__name__)

_ENV_CONFIG = Config()
Expand Down Expand Up @@ -288,7 +292,7 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[
def create_table(
self,
identifier: Union[str, Identifier],
schema: Schema,
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand Down
14 changes: 13 additions & 1 deletion pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import uuid
from time import time
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Expand Down Expand Up @@ -57,6 +58,9 @@
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT

if TYPE_CHECKING:
import pyarrow as pa

DYNAMODB_CLIENT = "dynamodb"

DYNAMODB_COL_IDENTIFIER = "identifier"
Expand Down Expand Up @@ -127,7 +131,7 @@ def _dynamodb_table_exists(self) -> bool:
def create_table(
self,
identifier: Union[str, Identifier],
schema: Schema,
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand All @@ -152,6 +156,14 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.

"""
if not isinstance(schema, Schema):
import pyarrow as pa

from pyiceberg.io.pyarrow import _ConvertToIcebergWithFreshIds, pre_order_visit_pyarrow

if isinstance(schema, pa.Schema):
schema: Schema = pre_order_visit_pyarrow(schema, _ConvertToIcebergWithFreshIds()) # type: ignore
Comment thread
Fokko marked this conversation as resolved.
Outdated

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
Expand Down
14 changes: 13 additions & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Expand Down Expand Up @@ -88,6 +89,9 @@
UUIDType,
)

if TYPE_CHECKING:
import pyarrow as pa

# If Glue should skip archiving an old table version when creating a new version in a commit. By
# default, Glue archives all old table versions after an UpdateTable call, but Glue has a default
# max number of archived table versions (can be increased). So for streaming use case with lots
Expand Down Expand Up @@ -329,7 +333,7 @@ def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
def create_table(
self,
identifier: Union[str, Identifier],
schema: Schema,
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand All @@ -354,6 +358,14 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.

"""
if not isinstance(schema, Schema):
import pyarrow as pa

from pyiceberg.io.pyarrow import _ConvertToIcebergWithFreshIds, pre_order_visit_pyarrow

if isinstance(schema, pa.Schema):
schema: Schema = pre_order_visit_pyarrow(schema, _ConvertToIcebergWithFreshIds()) # type: ignore

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
Expand Down
15 changes: 14 additions & 1 deletion pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import time
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Expand Down Expand Up @@ -91,6 +92,10 @@
UUIDType,
)

if TYPE_CHECKING:
import pyarrow as pa


# Replace by visitor
hive_types = {
BooleanType: "boolean",
Expand Down Expand Up @@ -250,7 +255,7 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
def create_table(
self,
identifier: Union[str, Identifier],
schema: Schema,
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand All @@ -273,6 +278,14 @@ def create_table(
AlreadyExistsError: If a table with the name already exists.
ValueError: If the identifier is invalid.
"""
if not isinstance(schema, Schema):
import pyarrow as pa

from pyiceberg.io.pyarrow import _ConvertToIcebergWithFreshIds, pre_order_visit_pyarrow

if isinstance(schema, pa.Schema):
schema: Schema = pre_order_visit_pyarrow(schema, _ConvertToIcebergWithFreshIds()) # type: ignore

properties = {**DEFAULT_PROPERTIES, **properties}
database_name, table_name = self.identifier_to_database_and_table(identifier)
current_time_millis = int(time.time() * 1000)
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from typing import (
TYPE_CHECKING,
List,
Optional,
Set,
Expand All @@ -33,12 +34,15 @@
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties

if TYPE_CHECKING:
import pyarrow as pa


class NoopCatalog(Catalog):
def create_table(
self,
identifier: Union[str, Identifier],
schema: Schema,
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand Down
14 changes: 13 additions & 1 deletion pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from json import JSONDecodeError
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Expand Down Expand Up @@ -68,6 +69,9 @@
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel

if TYPE_CHECKING:
import pyarrow as pa

ICEBERG_REST_SPEC_VERSION = "0.14.1"


Expand Down Expand Up @@ -437,12 +441,20 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:
def create_table(
self,
identifier: Union[str, Identifier],
schema: Schema,
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
if not isinstance(schema, Schema):
import pyarrow as pa

from pyiceberg.io.pyarrow import _ConvertToIcebergWithFreshIds, pre_order_visit_pyarrow

if isinstance(schema, pa.Schema):
schema: Schema = pre_order_visit_pyarrow(schema, _ConvertToIcebergWithFreshIds()) # type: ignore

namespace_and_table = self._split_identifier_for_path(identifier)
request = CreateTableRequest(
name=namespace_and_table["table"],
Expand Down
14 changes: 13 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

from typing import (
TYPE_CHECKING,
List,
Optional,
Set,
Expand Down Expand Up @@ -65,6 +66,9 @@
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT

if TYPE_CHECKING:
import pyarrow as pa


class SqlCatalogBaseTable(MappedAsDataclass, DeclarativeBase):
pass
Expand Down Expand Up @@ -140,7 +144,7 @@ def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table:
def create_table(
self,
identifier: Union[str, Identifier],
schema: Schema,
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand All @@ -165,6 +169,14 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.

"""
if not isinstance(schema, Schema):
import pyarrow as pa

from pyiceberg.io.pyarrow import _ConvertToIcebergWithFreshIds, pre_order_visit_pyarrow

if isinstance(schema, pa.Schema):
schema: Schema = pre_order_visit_pyarrow(schema, _ConvertToIcebergWithFreshIds()) # type: ignore

database_name, table_name = self.identifier_to_database_and_table(identifier)
if not self._namespace_exists(database_name):
raise NoSuchNamespaceError(f"Namespace does not exist: {database_name}")
Expand Down
Loading