|
39 | 39 | from pydantic_core import ValidationError |
40 | 40 | from pyspark.sql import SparkSession |
41 | 41 | from pytest_mock.plugin import MockerFixture |
| 42 | +from sqlalchemy import Connection |
| 43 | +from sqlalchemy.sql.expression import text |
42 | 44 |
|
43 | 45 | from pyiceberg.catalog import Catalog, load_catalog |
44 | 46 | from pyiceberg.catalog.hive import HiveCatalog |
@@ -2089,6 +2091,7 @@ def test_read_write_decimals(session_catalog: Catalog) -> None: |
2089 | 2091 | assert tbl.scan().to_arrow() == arrow_table |
2090 | 2092 |
|
2091 | 2093 |
|
| 2094 | +@pytest.mark.skip("UUID BucketTransform is not supported in Spark Iceberg 1.9.2 yet") |
2092 | 2095 | @pytest.mark.integration |
2093 | 2096 | @pytest.mark.parametrize( |
2094 | 2097 | "transform", |
@@ -2142,6 +2145,64 @@ def test_uuid_partitioning(session_catalog: Catalog, spark: SparkSession, transf |
2142 | 2145 | assert lhs == rhs |
2143 | 2146 |
|
2144 | 2147 |
|
| 2148 | +@pytest.mark.integration_trino |
| 2149 | +@pytest.mark.integration |
| 2150 | +@pytest.mark.parametrize( |
| 2151 | + "transform", |
| 2152 | + [IdentityTransform(), BucketTransform(32)], |
| 2153 | +) |
| 2154 | +@pytest.mark.parametrize( |
| 2155 | + "catalog, trino_conn", |
| 2156 | + [ |
| 2157 | + (pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("trino_hive_conn")), |
| 2158 | + (pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("trino_rest_conn")), |
| 2159 | + ], |
| 2160 | +) |
| 2161 | +def test_uuid_partitioning_with_trino(catalog: Catalog, trino_conn: Connection, transform: Transform) -> None: # type: ignore |
| 2162 | + identifier = f"default.test_uuid_partitioning_{str(transform).replace('[32]', '')}" |
| 2163 | + |
| 2164 | + schema = Schema(NestedField(field_id=1, name="uuid", field_type=UUIDType(), required=True)) |
| 2165 | + |
| 2166 | + try: |
| 2167 | + catalog.drop_table(identifier=identifier) |
| 2168 | + except NoSuchTableError: |
| 2169 | + pass |
| 2170 | + |
| 2171 | + partition_spec = PartitionSpec( |
| 2172 | + PartitionField(source_id=1, field_id=1000, transform=transform, name=f"uuid_{str(transform).replace('[32]', '')}") |
| 2173 | + ) |
| 2174 | + |
| 2175 | + import pyarrow as pa |
| 2176 | + |
| 2177 | + arr_table = pa.Table.from_pydict( |
| 2178 | + { |
| 2179 | + "uuid": [ |
| 2180 | + uuid.UUID("00000000-0000-0000-0000-000000000000").bytes, |
| 2181 | + uuid.UUID("11111111-1111-1111-1111-111111111111").bytes, |
| 2182 | + ], |
| 2183 | + }, |
| 2184 | + schema=pa.schema( |
| 2185 | + [ |
| 2186 | + # Uuid not yet supported, so we have to stick with `binary(16)` |
| 2187 | + # https://github.com/apache/arrow/issues/46468 |
| 2188 | + pa.field("uuid", pa.binary(16), nullable=False), |
| 2189 | + ] |
| 2190 | + ), |
| 2191 | + ) |
| 2192 | + |
| 2193 | + tbl = catalog.create_table( |
| 2194 | + identifier=identifier, |
| 2195 | + schema=schema, |
| 2196 | + partition_spec=partition_spec, |
| 2197 | + ) |
| 2198 | + |
| 2199 | + tbl.append(arr_table) |
| 2200 | + rows = trino_conn.execute(text(f"SELECT * FROM {identifier}")).fetchall() |
| 2201 | + lhs = sorted([r[0] for r in rows]) |
| 2202 | + rhs = sorted([u.as_py() for u in tbl.scan().to_arrow()["uuid"].combine_chunks()]) |
| 2203 | + assert lhs == rhs |
| 2204 | + |
| 2205 | + |
2145 | 2206 | @pytest.mark.integration |
2146 | 2207 | def test_avro_compression_codecs(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: |
2147 | 2208 | identifier = "default.test_avro_compression_codecs" |
|
0 commit comments