Skip to content

Commit f7cbe3f

Browse files
committed
add version tuple
1 parent 4bc8189 commit f7cbe3f

4 files changed

Lines changed: 36 additions & 21 deletions

File tree

dev/Dockerfile

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ WORKDIR ${SPARK_HOME}
4040
ENV SPARK_VERSION=3.5.6
4141
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
4242
ENV ICEBERG_VERSION=1.9.1
43-
ENV PYICEBERG_VERSION=0.9.1
4443

4544
RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
4645
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
@@ -55,18 +54,30 @@ RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-
5554
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \
5655
-Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
5756

58-
COPY spark-defaults.conf /opt/spark/conf
57+
COPY dev/spark-defaults.conf /opt/spark/conf
5958
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"
6059

6160
RUN chmod u+x /opt/spark/sbin/* && \
6261
chmod u+x /opt/spark/bin/*
6362

6463
RUN pip3 install -q ipython
6564

66-
RUN pip3 install "pyiceberg[s3fs,hive,pyarrow]==${PYICEBERG_VERSION}"
65+
# Copy the local pyiceberg source code and install locally
66+
COPY pyiceberg/ /tmp/pyiceberg/pyiceberg
67+
COPY pyproject.toml /tmp/pyiceberg/
68+
COPY build-module.py /tmp/pyiceberg/
69+
COPY vendor/ /tmp/pyiceberg/vendor
70+
COPY README.md /tmp/pyiceberg/
71+
COPY NOTICE /tmp/pyiceberg/
6772

68-
COPY entrypoint.sh .
69-
COPY provision.py .
73+
# Install pyiceberg from the copied source
74+
RUN cd /tmp/pyiceberg && pip3 install ".[s3fs,hive,pyarrow]"
75+
76+
# Clean up
77+
RUN rm -rf /tmp/pyiceberg
78+
79+
COPY dev/entrypoint.sh ${SPARK_HOME}/
80+
COPY dev/provision.py ${SPARK_HOME}/
7081

7182
ENTRYPOINT ["./entrypoint.sh"]
7283
CMD ["notebook"]

dev/docker-compose-integration.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ services:
1919
spark-iceberg:
2020
image: python-integration
2121
container_name: pyiceberg-spark
22-
build: .
22+
build:
23+
context: ..
24+
dockerfile: dev/Dockerfile
2325
networks:
2426
iceberg_net:
2527
depends_on:
@@ -53,7 +55,6 @@ services:
5355
- CATALOG_WAREHOUSE=s3://warehouse/
5456
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
5557
- CATALOG_S3_ENDPOINT=http://minio:9000
56-
- CATALOG_JDBC_STRICT__MODE=true
5758
minio:
5859
image: minio/minio
5960
container_name: pyiceberg-minio

pyiceberg/catalog/hive.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import logging
2020
import socket
2121
import time
22+
from collections import namedtuple
2223
from types import TracebackType
2324
from typing import (
2425
TYPE_CHECKING,
@@ -148,6 +149,7 @@
148149
DEFAULT_LOCK_CHECK_RETRIES = 4
149150
DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS"
150151
DO_NOT_UPDATE_STATS_DEFAULT = "true"
152+
HiveVersion = namedtuple("HiveVersion", "major minor patch")
151153

152154
logger = logging.getLogger(__name__)
153155

@@ -157,7 +159,7 @@ class _HiveClient:
157159

158160
_transport: TTransport
159161
_ugi: Optional[List[str]]
160-
_hive_version: int = 4
162+
_hive_version: HiveVersion = HiveVersion(4, 0, 0)
161163
_hms_v3: object
162164
_hms_v4: object
163165

@@ -177,10 +179,10 @@ def __init__(
177179
self.hms_v4 = importlib.import_module("hive_metastore.v4.ThriftHiveMetastore")
178180
self._hive_version = self._get_hive_version()
179181

180-
def _get_hive_version(self) -> int:
182+
def _get_hive_version(self) -> HiveVersion:
181183
with self as open_client:
182-
major, *_ = open_client.getVersion().split(".")
183-
return int(major)
184+
version = map(int, open_client.getVersion().split("."))
185+
return HiveVersion(*version)
184186

185187
def _init_thrift_transport(self) -> TTransport:
186188
url_parts = urlparse(self._uri)
@@ -192,7 +194,7 @@ def _init_thrift_transport(self) -> TTransport:
192194

193195
def _client(self) -> Client:
194196
protocol = TBinaryProtocol.TBinaryProtocol(self._transport)
195-
hms = self.hms_v3 if self._hive_version < 4 else self.hms_v4
197+
hms = self.hms_v4 if all((self._hive_version.major >= 4, self._hive_version.minor > 0)) else self.hms_v3
196198
client: Client = hms.Client(protocol)
197199
if self._ugi:
198200
client.set_ugi(*self._ugi)
@@ -407,14 +409,14 @@ def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None
407409
raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e
408410

409411
def _get_hive_table(self, open_client: Client, *, dbname: str, tbl_name: str) -> HiveTable:
410-
if self._client._hive_version < 4:
411-
return open_client.get_table(dbname=dbname, tbl_name=tbl_name)
412-
return open_client.get_table_req(GetTableRequest(dbName=dbname, tblName=tbl_name)).table
412+
if all((self._client._hive_version.major >= 4, self._client._hive_version.minor > 0)):
413+
return open_client.get_table_req(GetTableRequest(dbName=dbname, tblName=tbl_name)).table
414+
return open_client.get_table(dbname=dbname, tbl_name=tbl_name)
413415

414416
def _get_table_objects_by_name(self, open_client: Client, *, dbname: str, tbl_names: list[str]) -> list[HiveTable]:
415-
if self._client._hive_version < 4:
416-
return open_client.get_table_objects_by_name(dbname=dbname, tbl_names=tbl_names)
417-
return open_client.get_table_objects_by_name_req(GetTablesRequest(dbName=dbname, tblNames=tbl_names)).tables
417+
if all((self._client._hive_version.major >= 4, self._client._hive_version.minor > 0)):
418+
return open_client.get_table_objects_by_name_req(GetTablesRequest(dbName=dbname, tblNames=tbl_names)).tables
419+
return open_client.get_table_objects_by_name(dbname=dbname, tbl_names=tbl_names)
418420

419421
def create_table(
420422
self,

tests/catalog/test_hive.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
LOCK_CHECK_MIN_WAIT_TIME,
5858
LOCK_CHECK_RETRIES,
5959
HiveCatalog,
60+
HiveVersion,
6061
_construct_hive_storage_descriptor,
6162
_HiveClient,
6263
)
@@ -258,7 +259,7 @@ def test_no_uri_supplied() -> None:
258259

259260

260261
def test_check_number_of_namespaces(table_schema_simple: Schema) -> None:
261-
_HiveClient._get_hive_version = MagicMock(return_value=3) # type: ignore
262+
_HiveClient._get_hive_version = MagicMock(return_value=HiveVersion(4, 0, 0)) # type: ignore
262263
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
263264

264265
with pytest.raises(ValueError):
@@ -955,7 +956,7 @@ def test_rename_table_from_does_not_exists() -> None:
955956
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
956957

957958
catalog._client = MagicMock()
958-
catalog._client._hive_version = 3
959+
catalog._client._hive_version = HiveVersion(4, 0, 0)
959960
catalog._client.__enter__().alter_table_with_environment_context.side_effect = NoSuchObjectException(
960961
message="hive.default.does_not_exists table not found"
961962
)
@@ -970,7 +971,7 @@ def test_rename_table_to_namespace_does_not_exists() -> None:
970971
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
971972

972973
catalog._client = MagicMock()
973-
catalog._client._hive_version = 3
974+
catalog._client._hive_version = HiveVersion(4, 0, 0)
974975
catalog._client.__enter__().alter_table_with_environment_context.side_effect = InvalidOperationException(
975976
message="Unable to change partition or table. Database default does not exist Check metastore logs for detailed stack.does_not_exists"
976977
)

0 commit comments

Comments
 (0)