Skip to content

Commit 5c341b5

Browse files
committed
merge main, fix integration and unit tests
1 parent bfbd52c commit 5c341b5

5 files changed

Lines changed: 76 additions & 37 deletions

File tree

dev/Dockerfile

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ WORKDIR ${SPARK_HOME}
4040
ENV SPARK_VERSION=3.5.6
4141
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
4242
ENV ICEBERG_VERSION=1.9.1
43-
ENV PYICEBERG_VERSION=0.9.1
4443

4544
RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
4645
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
@@ -55,18 +54,30 @@ RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-
5554
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \
5655
-Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
5756

58-
COPY spark-defaults.conf /opt/spark/conf
57+
COPY dev/spark-defaults.conf /opt/spark/conf
5958
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"
6059

6160
RUN chmod u+x /opt/spark/sbin/* && \
6261
chmod u+x /opt/spark/bin/*
6362

6463
RUN pip3 install -q ipython
6564

66-
RUN pip3 install "pyiceberg[s3fs,hive,pyarrow]==${PYICEBERG_VERSION}"
65+
# Copy the local pyiceberg source code and install locally
66+
COPY pyiceberg/ /tmp/pyiceberg/pyiceberg
67+
COPY pyproject.toml /tmp/pyiceberg/
68+
COPY build-module.py /tmp/pyiceberg/
69+
COPY vendor/ /tmp/pyiceberg/vendor
70+
COPY README.md /tmp/pyiceberg/
71+
COPY NOTICE /tmp/pyiceberg/
6772

68-
COPY entrypoint.sh .
69-
COPY provision.py .
73+
# Install pyiceberg from the copied source
74+
RUN cd /tmp/pyiceberg && pip3 install ".[s3fs,hive,pyarrow]"
75+
76+
# Clean up
77+
RUN rm -rf /tmp/pyiceberg
78+
79+
COPY dev/entrypoint.sh ${SPARK_HOME}/
80+
COPY dev/provision.py ${SPARK_HOME}/
7081

7182
ENTRYPOINT ["./entrypoint.sh"]
7283
CMD ["notebook"]

dev/docker-compose-integration.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ services:
1919
spark-iceberg:
2020
image: python-integration
2121
container_name: pyiceberg-spark
22-
build: .
22+
build:
23+
context: ..
24+
dockerfile: dev/Dockerfile
2325
networks:
2426
iceberg_net:
2527
depends_on:
@@ -53,7 +55,6 @@ services:
5355
- CATALOG_WAREHOUSE=s3://warehouse/
5456
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
5557
- CATALOG_S3_ENDPOINT=http://minio:9000
56-
- CATALOG_JDBC_STRICT__MODE=true
5758
minio:
5859
image: minio/minio
5960
container_name: pyiceberg-minio

pyiceberg/catalog/hive.py

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import logging
2020
import socket
2121
import time
22+
from collections import namedtuple
2223
from types import TracebackType
2324
from typing import (
2425
TYPE_CHECKING,
@@ -33,8 +34,8 @@
3334
)
3435
from urllib.parse import urlparse
3536

36-
from hive_metastore.v4.ThriftHiveMetastore import Client
37-
from hive_metastore.v4.ttypes import (
37+
from hive_metastore.v3.ThriftHiveMetastore import Client
38+
from hive_metastore.v3.ttypes import (
3839
AlreadyExistsException,
3940
CheckLockRequest,
4041
EnvironmentContext,
@@ -54,10 +55,10 @@
5455
StorageDescriptor,
5556
UnlockRequest,
5657
)
57-
from hive_metastore.v4.ttypes import (
58+
from hive_metastore.v3.ttypes import (
5859
Database as HiveDatabase,
5960
)
60-
from hive_metastore.v4.ttypes import (
61+
from hive_metastore.v3.ttypes import (
6162
Table as HiveTable,
6263
)
6364
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
@@ -148,6 +149,7 @@
148149
DEFAULT_LOCK_CHECK_RETRIES = 4
149150
DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS"
150151
DO_NOT_UPDATE_STATS_DEFAULT = "true"
152+
HiveVersion = namedtuple("HiveVersion", "major minor patch")
151153

152154
logger = logging.getLogger(__name__)
153155

@@ -157,7 +159,7 @@ class _HiveClient:
157159

158160
_transport: TTransport
159161
_ugi: Optional[List[str]]
160-
_hive_version: int = 4
162+
_hive_version: HiveVersion = HiveVersion(4, 0, 0)
161163
_hms_v3: object
162164
_hms_v4: object
163165

@@ -177,13 +179,15 @@ def __init__(
177179
self.hms_v4 = importlib.import_module("hive_metastore.v4.ThriftHiveMetastore")
178180
self._hive_version = self._get_hive_version()
179181

180-
def _get_hive_version(self) -> int:
182+
def _get_hive_version(self) -> HiveVersion:
181183
with self as open_client:
182-
major, *_ = open_client.getVersion().split(".")
183-
return int(major)
184+
version = map(int, open_client.getVersion().split("."))
185+
return HiveVersion(*version)
184186

185187
def _init_thrift_transport(self) -> TTransport:
186188
url_parts = urlparse(self._uri)
189+
if not url_parts.hostname or not url_parts.port:
190+
raise ValueError("hive hostname and port must be set")
187191
socket = TSocket.TSocket(url_parts.hostname, url_parts.port)
188192
if not self._kerberos_auth:
189193
return TTransport.TBufferedTransport(socket)
@@ -192,7 +196,7 @@ def _init_thrift_transport(self) -> TTransport:
192196

193197
def _client(self) -> Client:
194198
protocol = TBinaryProtocol.TBinaryProtocol(self._transport)
195-
hms = self.hms_v3 if self._hive_version < 4 else self.hms_v4
199+
hms = self.hms_v4 if all((self._hive_version.major >= 4, self._hive_version.patch > 0)) else self.hms_v3
196200
client: Client = hms.Client(protocol)
197201
if self._ugi:
198202
client.set_ugi(*self._ugi)
@@ -407,14 +411,17 @@ def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None
407411
raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e
408412

409413
def _get_hive_table(self, open_client: Client, *, dbname: str, tbl_name: str) -> HiveTable:
410-
if open_client._hive_version < 4:
414+
try:
415+
if all((self._client._hive_version.major >= 4, self._client._hive_version.patch > 0)):
416+
return open_client.get_table_req(GetTableRequest(dbName=dbname, tblName=tbl_name)).table
411417
return open_client.get_table(dbname=dbname, tbl_name=tbl_name)
412-
return open_client.get_table_req(GetTableRequest(dbName=dbname, tblName=tbl_name)).table
418+
except NoSuchObjectException as e:
419+
raise NoSuchTableError(f"Table does not exists: {tbl_name}") from e
413420

414421
def _get_table_objects_by_name(self, open_client: Client, *, dbname: str, tbl_names: list[str]) -> list[HiveTable]:
415-
if open_client._hive_version < 4:
416-
return open_client.get_table_objects_by_name(dbname=dbname, tbl_names=tbl_names)
417-
return open_client.get_table_objects_by_name_req(GetTablesRequest(dbName=dbname, tblNames=tbl_names)).tables
422+
if all((self._client._hive_version.major >= 4, self._client._hive_version.patch > 0)):
423+
return open_client.get_table_objects_by_name_req(GetTablesRequest(dbName=dbname, tblNames=tbl_names)).tables
424+
return open_client.get_table_objects_by_name(dbname=dbname, tbl_names=tbl_names)
418425

419426
def create_table(
420427
self,
@@ -458,7 +465,7 @@ def create_table(
458465

459466
with self._client as open_client:
460467
self._create_hive_table(open_client, tbl)
461-
hive_table: HiveTable = self._get_hive_table(open_client, dbname=database_name, tbl_name=table_name)
468+
hive_table = self._get_hive_table(open_client, dbname=database_name, tbl_name=table_name)
462469

463470
return self._convert_hive_into_iceberg(hive_table)
464471

@@ -488,7 +495,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
488495
tbl = self._convert_iceberg_into_hive(staged_table)
489496
with self._client as open_client:
490497
self._create_hive_table(open_client, tbl)
491-
hive_table: HiveTable = self._get_hive_table(open_client, dbname=database_name, tbl_name=table_name)
498+
hive_table = self._get_hive_table(open_client, dbname=database_name, tbl_name=table_name)
492499

493500
return self._convert_hive_into_iceberg(hive_table)
494501

@@ -674,12 +681,17 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
674681
ValueError: When from table identifier is invalid.
675682
NoSuchTableError: When a table with the name does not exist.
676683
NoSuchNamespaceError: When the destination namespace doesn't exist.
684+
TableAlreadyExistsError: When the destination table already exists.
677685
"""
678686
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
679687
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
688+
689+
if self.table_exists(to_identifier):
690+
raise TableAlreadyExistsError(f"Table already exists: {to_table_name}")
691+
680692
try:
681693
with self._client as open_client:
682-
tbl: HiveTable = self._get_hive_table(open_client, dbname=from_database_name, tbl_name=from_table_name)
694+
tbl = self._get_hive_table(open_client, dbname=from_database_name, tbl_name=from_table_name)
683695
tbl.dbName = to_database_name
684696
tbl.tableName = to_table_name
685697
open_client.alter_table_with_environment_context(
@@ -823,7 +835,7 @@ def update_namespace_properties(
823835
if removals:
824836
for key in removals:
825837
if key in parameters:
826-
parameters[key] = None
838+
parameters.pop(key)
827839
removed.add(key)
828840
if updates:
829841
for key, value in updates.items():

tests/catalog/test_hive.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
import pytest
2929
import thrift.transport.TSocket
30-
from hive_metastore.v4.ttypes import (
30+
from hive_metastore.v3.ttypes import (
3131
AlreadyExistsException,
3232
EnvironmentContext,
3333
FieldSchema,
@@ -40,10 +40,10 @@
4040
SkewedInfo,
4141
StorageDescriptor,
4242
)
43-
from hive_metastore.v4.ttypes import (
43+
from hive_metastore.v3.ttypes import (
4444
Database as HiveDatabase,
4545
)
46-
from hive_metastore.v4.ttypes import (
46+
from hive_metastore.v3.ttypes import (
4747
Table as HiveTable,
4848
)
4949

@@ -57,6 +57,7 @@
5757
LOCK_CHECK_MIN_WAIT_TIME,
5858
LOCK_CHECK_RETRIES,
5959
HiveCatalog,
60+
HiveVersion,
6061
_construct_hive_storage_descriptor,
6162
_HiveClient,
6263
)
@@ -65,6 +66,7 @@
6566
NamespaceNotEmptyError,
6667
NoSuchNamespaceError,
6768
NoSuchTableError,
69+
TableAlreadyExistsError,
6870
WaitingForLockException,
6971
)
7072
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -258,7 +260,7 @@ def test_no_uri_supplied() -> None:
258260

259261

260262
def test_check_number_of_namespaces(table_schema_simple: Schema) -> None:
261-
_HiveClient._get_hive_version = MagicMock(return_value=3) # type: ignore
263+
_HiveClient._get_hive_version = MagicMock(return_value=HiveVersion(4, 0, 0)) # type: ignore
262264
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
263265

264266
with pytest.raises(ValueError):
@@ -889,6 +891,7 @@ def test_load_table_from_self_identifier(hive_table: HiveTable) -> None:
889891

890892
def test_rename_table(hive_table: HiveTable) -> None:
891893
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
894+
catalog.table_exists = MagicMock(return_value=False) # type: ignore[method-assign]
892895

893896
renamed_table = copy.deepcopy(hive_table)
894897
renamed_table.dbName = "default"
@@ -919,6 +922,7 @@ def test_rename_table(hive_table: HiveTable) -> None:
919922

920923
def test_rename_table_from_self_identifier(hive_table: HiveTable) -> None:
921924
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
925+
catalog.table_exists = MagicMock(return_value=False) # type: ignore[method-assign]
922926

923927
catalog._client = MagicMock()
924928
catalog._get_hive_table = MagicMock(return_value=hive_table) # type: ignore
@@ -953,9 +957,10 @@ def test_rename_table_from_self_identifier(hive_table: HiveTable) -> None:
953957

954958
def test_rename_table_from_does_not_exists() -> None:
955959
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
960+
catalog.table_exists = MagicMock(return_value=False) # type: ignore[method-assign]
956961

957962
catalog._client = MagicMock()
958-
catalog._client.__enter__()._hive_version = 3
963+
catalog._client._hive_version = HiveVersion(4, 0, 0)
959964
catalog._client.__enter__().alter_table_with_environment_context.side_effect = NoSuchObjectException(
960965
message="hive.default.does_not_exists table not found"
961966
)
@@ -966,11 +971,22 @@ def test_rename_table_from_does_not_exists() -> None:
966971
assert "Table does not exist: does_not_exists" in str(exc_info.value)
967972

968973

974+
def test_rename_table_to_table_already_exists(hive_table: HiveTable) -> None:
975+
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
976+
catalog.load_table = MagicMock(return_value=hive_table) # type: ignore[method-assign]
977+
978+
with pytest.raises(TableAlreadyExistsError) as exc_info:
979+
catalog.rename_table(("default", "some_table"), ("default", "new_tabl2e"))
980+
981+
assert "Table already exists: new_tabl2e" in str(exc_info.value)
982+
983+
969984
def test_rename_table_to_namespace_does_not_exists() -> None:
970985
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
986+
catalog.table_exists = MagicMock(return_value=False) # type: ignore[method-assign]
971987

972988
catalog._client = MagicMock()
973-
catalog._client.__enter__()._hive_version = 3
989+
catalog._client._hive_version = HiveVersion(4, 0, 0)
974990
catalog._client.__enter__().alter_table_with_environment_context.side_effect = InvalidOperationException(
975991
message="Unable to change partition or table. Database default does not exist Check metastore logs for detailed stack.does_not_exists"
976992
)
@@ -1171,7 +1187,7 @@ def test_update_namespace_properties(hive_database: HiveDatabase) -> None:
11711187
name="default",
11721188
description=None,
11731189
locationUri=hive_database.locationUri,
1174-
parameters={"test": None, "label": "core"},
1190+
parameters={"label": "core"},
11751191
privileges=None,
11761192
ownerName=None,
11771193
ownerType=1,

tests/integration/test_reads.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import pyarrow as pa
2727
import pyarrow.parquet as pq
2828
import pytest
29-
from hive_metastore.v4.ttypes import LockRequest, LockResponse, LockState, UnlockRequest
29+
from hive_metastore.v3.ttypes import LockRequest, LockResponse, LockState, UnlockRequest
3030
from pyarrow.fs import S3FileSystem
3131
from pydantic_core import ValidationError
3232
from pyspark.sql import SparkSession
@@ -115,22 +115,21 @@ def test_table_properties(catalog: Catalog) -> None:
115115

116116
@pytest.mark.integration
117117
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
118-
def test_hive_properties(catalog: Catalog) -> None:
118+
def test_hive_properties(catalog: HiveCatalog) -> None:
119119
table = create_table(catalog)
120120
table.transaction().set_properties({"abc": "def", "p1": "123"}).commit_transaction()
121-
122121
hive_client: _HiveClient = _HiveClient(catalog.properties["uri"])
123122

124123
with hive_client as open_client:
125-
hive_table = open_client.get_table(*TABLE_NAME)
124+
hive_table = catalog._get_hive_table(open_client, dbname="default", tbl_name="t1")
126125
assert hive_table.parameters.get("abc") == "def"
127126
assert hive_table.parameters.get("p1") == "123"
128127
assert hive_table.parameters.get("not_exist_parameter") is None
129128

130129
table.transaction().remove_properties("abc").commit_transaction()
131130

132131
with hive_client as open_client:
133-
hive_table = open_client.get_table(*TABLE_NAME)
132+
hive_table = catalog._get_hive_table(open_client, dbname="default", tbl_name="t1")
134133
assert hive_table.parameters.get("abc") is None
135134

136135

0 commit comments

Comments
 (0)