Skip to content

Commit fd26ee4

Browse files
committed
Trino: Restructure to make Trino integration optional and modular
Address reviewer feedback from kevinjqliu: 1. Consolidated Trino infrastructure: - All Trino config files remain in dev/trino/ directory - docker-compose-trino.yml moved to dev/ (alongside integration compose) - run-trino.sh moved to dev/ (alongside other run scripts) 2. Removed Trino from main integration docker-compose: - Trino service removed from dev/docker-compose-integration.yml - Trino can now be spun up separately alongside main integration - Keeps Trino testing optional and not part of CI 3. Created dedicated test file: - tests/integration/test_trino.py for all Trino-specific tests - Moved test_schema_exists_in_trino from test_rest_catalog.py - Moved test_uuid_partitioning_with_trino from test_writes.py - Better separation of concerns and easier to maintain 4. Simplified pytest marker: - Changed from @pytest.mark.integration_trino to @pytest.mark.trino - Updated Makefile target: test-integration-trino -> test-trino - Updated pyproject.toml and conftest.py references This makes Trino integration testing opt-in and follows the same pattern as other optional test suites (s3, adls, gcs).
1 parent 311f7b6 commit fd26ee4

File tree

7 files changed

+121
-100
lines changed

7 files changed

+121
-100
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ test-integration-rebuild: ## Rebuild integration Docker services from scratch
117117
docker compose -f dev/docker-compose-integration.yml rm -f
118118
docker compose -f dev/docker-compose-integration.yml build --no-cache
119119

120-
test-integration-trino: ## Run tests marked with @pytest.mark.integration_trino
120+
test-trino: ## Run tests marked with @pytest.mark.trino
121121
sh ./dev/run-trino.sh
122-
$(TEST_RUNNER) pytest tests/ -m integration_trino $(PYTEST_ARGS)
122+
$(TEST_RUNNER) pytest tests/ -m trino $(PYTEST_ARGS)
123123

124124
test-s3: ## Run tests marked with @pytest.mark.s3
125125
sh ./dev/run-minio.sh
@@ -134,7 +134,7 @@ test-gcs: ## Run tests marked with @pytest.mark.gcs
134134
$(TEST_RUNNER) pytest tests/ -m gcs $(PYTEST_ARGS)
135135

136136
test-coverage: COVERAGE=1
137-
test-coverage: test test-integration test-integration-trino test-s3 test-adls test-gcs coverage-report ## Run all tests with coverage and report
137+
test-coverage: test test-integration test-trino test-s3 test-adls test-gcs coverage-report ## Run all tests with coverage and report
138138

139139
coverage-report: ## Combine and report coverage
140140
uv run $(PYTHON_ARG) coverage combine

dev/docker-compose-integration.yml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,6 @@ services:
5858
- CATALOG_S3_ENDPOINT=http://minio:9000
5959
- CATALOG_JDBC_STRICT__MODE=true
6060

61-
trino:
62-
image: trinodb/trino:478
63-
container_name: pyiceberg-trino
64-
networks:
65-
iceberg_net:
66-
ports:
67-
- 8082:8080
68-
environment:
69-
- CATALOG_MANAGEMENT=dynamic
70-
depends_on:
71-
- rest
72-
- hive
73-
volumes:
74-
- ./trino/catalog:/etc/trino/catalog
7561
minio:
7662
image: minio/minio
7763
container_name: pyiceberg-minio

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ markers = [
154154
"s3: marks a test as requiring access to s3 compliant storage (use with --aws-access-key-id, --aws-secret-access-key, and --endpoint args)",
155155
"adls: marks a test as requiring access to adls compliant storage (use with --adls.account-name, --adls.account-key, and --adls.endpoint args)",
156156
"integration: marks integration tests against Apache Spark",
157-
"integration_trino: marks integration tests against Trino",
157+
"trino: marks integration tests against Trino",
158158
"gcs: marks a test as requiring access to gcs compliant storage (use with --gs.token, --gs.project, and --gs.endpoint)",
159159
"benchmark: collection of tests to validate read/write performance before and after a change",
160160
]

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,13 @@ def pytest_addoption(parser: pytest.Parser) -> None:
148148
"--trino.rest.endpoint",
149149
action="store",
150150
default="trino://test@localhost:8082/warehouse_rest",
151-
help="The Trino REST endpoint URL for tests marked as integration_trino",
151+
help="The Trino REST endpoint URL for tests marked as trino",
152152
)
153153
parser.addoption(
154154
"--trino.hive.endpoint",
155155
action="store",
156156
default="trino://test@localhost:8082/warehouse_hive",
157-
help="The Trino Hive endpoint URL for tests marked as integration_trino",
157+
help="The Trino Hive endpoint URL for tests marked as trino",
158158
)
159159

160160

tests/integration/test_rest_catalog.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,3 @@ def test_create_namespace_if_already_existing(catalog: RestCatalog) -> None:
6262
catalog.create_namespace_if_not_exists(TEST_NAMESPACE_IDENTIFIER)
6363

6464
assert catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER)
65-
66-
67-
@pytest.mark.integration
68-
@pytest.mark.integration_trino
69-
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
70-
def test_schema_exists_in_trino(trino_rest_conn: Connection, catalog: RestCatalog) -> None:
71-
"""Verifies that an Iceberg namespace correctly appears as a schema in Trino.
72-
73-
This test ensures the synchronization between Iceberg's namespace concept and
74-
Trino's schema concept, confirming that after creating a namespace in the Iceberg
75-
catalog, it becomes visible as a schema in the Trino environment.
76-
"""
77-
78-
if catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER):
79-
catalog.drop_namespace(TEST_NAMESPACE_IDENTIFIER)
80-
catalog.create_namespace_if_not_exists(TEST_NAMESPACE_IDENTIFIER)
81-
82-
assert catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER)
83-
assert TEST_NAMESPACE_IDENTIFIER.lower() in inspect(trino_rest_conn).get_schema_names()

tests/integration/test_trino.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Integration tests for Trino engine."""
18+
19+
import uuid
20+
21+
import pyarrow as pa
22+
import pytest
23+
from sqlalchemy import Connection, inspect, text
24+
from sqlalchemy.engine import Engine
25+
26+
from pyiceberg.catalog import Catalog
27+
from pyiceberg.catalog.rest import RestCatalog
28+
from pyiceberg.exceptions import NoSuchTableError
29+
from pyiceberg.partitioning import PartitionField, PartitionSpec
30+
from pyiceberg.schema import Schema
31+
from pyiceberg.transforms import BucketTransform, IdentityTransform, Transform
32+
from pyiceberg.types import NestedField, UUIDType
33+
34+
TEST_NAMESPACE = "test_trino_namespace"
35+
TEST_NAMESPACE_IDENTIFIER = (TEST_NAMESPACE,)
36+
37+
38+
@pytest.mark.trino
39+
def test_schema_exists_in_trino(trino_rest_conn: Connection, catalog: RestCatalog) -> None:
40+
"""Verifies that an Iceberg namespace correctly appears as a schema in Trino.
41+
42+
This test ensures the synchronization between Iceberg's namespace concept and
43+
Trino's schema concept, confirming that after creating a namespace in the Iceberg
44+
catalog, it becomes visible as a schema in the Trino environment.
45+
"""
46+
if catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER):
47+
catalog.drop_namespace(TEST_NAMESPACE_IDENTIFIER)
48+
catalog.create_namespace_if_not_exists(TEST_NAMESPACE_IDENTIFIER)
49+
50+
assert catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER)
51+
assert TEST_NAMESPACE_IDENTIFIER[0].lower() in inspect(trino_rest_conn).get_schema_names()
52+
53+
54+
@pytest.mark.trino
55+
@pytest.mark.parametrize(
56+
"transform",
57+
[
58+
IdentityTransform(),
59+
BucketTransform(num_buckets=32),
60+
],
61+
)
62+
@pytest.mark.parametrize(
63+
"catalog,trino_conn",
64+
[
65+
(pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("trino_hive_conn")),
66+
(pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("trino_rest_conn")),
67+
],
68+
)
69+
def test_uuid_partitioning_with_trino(catalog: Catalog, trino_conn: Connection, transform: Transform) -> None: # type: ignore
70+
"""Test UUID partitioning using Trino engine.
71+
72+
This test verifies that UUID-partitioned tables created via PyIceberg can be
73+
correctly queried through Trino. It tests both Identity and Bucket transforms
74+
on UUID columns, which are not fully supported in Spark but work in Trino.
75+
"""
76+
identifier = f"default.test_uuid_partitioning_{str(transform).replace('[32]', '')}"
77+
78+
schema = Schema(NestedField(field_id=1, name="uuid", field_type=UUIDType(), required=True))
79+
80+
try:
81+
catalog.drop_table(identifier=identifier)
82+
except NoSuchTableError:
83+
pass
84+
85+
partition_spec = PartitionSpec(
86+
PartitionField(source_id=1, field_id=1000, transform=transform, name=f"uuid_{str(transform).replace('[32]', '')}")
87+
)
88+
89+
arr_table = pa.Table.from_pydict(
90+
{
91+
"uuid": [
92+
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
93+
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
94+
],
95+
},
96+
schema=pa.schema(
97+
[
98+
# Uuid not yet supported, so we have to stick with `binary(16)`
99+
# https://github.com/apache/arrow/issues/46468
100+
pa.field("uuid", pa.binary(16), nullable=False),
101+
]
102+
),
103+
)
104+
105+
tbl = catalog.create_table(
106+
identifier=identifier,
107+
schema=schema,
108+
partition_spec=partition_spec,
109+
)
110+
111+
tbl.append(arr_table)
112+
rows = trino_conn.execute(text(f"SELECT * FROM {identifier}")).fetchall()
113+
lhs = sorted([r[0] for r in rows])
114+
rhs = sorted([u.as_py() for u in tbl.scan().to_arrow()["uuid"].combine_chunks()])
115+
assert lhs == rhs

tests/integration/test_writes/test_writes.py

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2148,67 +2148,6 @@ def test_uuid_partitioning(session_catalog: Catalog, spark: SparkSession, transf
21482148
lhs = [r[0] for r in spark.table(identifier).collect()]
21492149
rhs = [str(u.as_py()) for u in tbl.scan().to_arrow()["uuid"].combine_chunks()]
21502150
assert lhs == rhs
2151-
2152-
2153-
@pytest.mark.integration_trino
2154-
@pytest.mark.integration
2155-
@pytest.mark.parametrize(
2156-
"transform",
2157-
[IdentityTransform(), BucketTransform(32)],
2158-
)
2159-
@pytest.mark.parametrize(
2160-
"catalog, trino_conn",
2161-
[
2162-
(pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("trino_hive_conn")),
2163-
(pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("trino_rest_conn")),
2164-
],
2165-
)
2166-
def test_uuid_partitioning_with_trino(catalog: Catalog, trino_conn: Connection, transform: Transform) -> None: # type: ignore
2167-
identifier = f"default.test_uuid_partitioning_{str(transform).replace('[32]', '')}"
2168-
2169-
schema = Schema(NestedField(field_id=1, name="uuid", field_type=UUIDType(), required=True))
2170-
2171-
try:
2172-
catalog.drop_table(identifier=identifier)
2173-
except NoSuchTableError:
2174-
pass
2175-
2176-
partition_spec = PartitionSpec(
2177-
PartitionField(source_id=1, field_id=1000, transform=transform, name=f"uuid_{str(transform).replace('[32]', '')}")
2178-
)
2179-
2180-
import pyarrow as pa
2181-
2182-
arr_table = pa.Table.from_pydict(
2183-
{
2184-
"uuid": [
2185-
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
2186-
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
2187-
],
2188-
},
2189-
schema=pa.schema(
2190-
[
2191-
# Uuid not yet supported, so we have to stick with `binary(16)`
2192-
# https://github.com/apache/arrow/issues/46468
2193-
pa.field("uuid", pa.binary(16), nullable=False),
2194-
]
2195-
),
2196-
)
2197-
2198-
tbl = catalog.create_table(
2199-
identifier=identifier,
2200-
schema=schema,
2201-
partition_spec=partition_spec,
2202-
)
2203-
2204-
tbl.append(arr_table)
2205-
rows = trino_conn.execute(text(f"SELECT * FROM {identifier}")).fetchall()
2206-
lhs = sorted([r[0] for r in rows])
2207-
rhs = sorted([u.as_py() for u in tbl.scan().to_arrow()["uuid"].combine_chunks()])
2208-
assert lhs == rhs
2209-
2210-
2211-
@pytest.mark.integration
22122151
def test_avro_compression_codecs(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
22132152
identifier = "default.test_avro_compression_codecs"
22142153
tbl = _create_table(session_catalog, identifier, schema=arrow_table_with_null.schema, data=[arrow_table_with_null])

0 commit comments

Comments
 (0)