Skip to content

Commit 21b2d5a

Browse files
committed
Add test for migrated tables
Identified two issues that can be worked on in parallel
1 parent 5c604c2 commit 21b2d5a

File tree

2 files changed

+100
-1
lines changed

2 files changed

+100
-1
lines changed

tests/conftest.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2502,9 +2502,14 @@ def spark() -> "SparkSession":
25022502
spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
25032503
scala_version = "2.12"
25042504
iceberg_version = "1.9.0"
2505+
# Should match with Spark:
2506+
hadoop_version = "3.3.4"
2507+
aws_sdk_version = "1.12.753"
25052508

25062509
os.environ["PYSPARK_SUBMIT_ARGS"] = (
25072510
f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version},"
2511+
f"org.apache.hadoop:hadoop-aws:{hadoop_version},"
2512+
f"com.amazonaws:aws-java-sdk-bundle:{aws_sdk_version},"
25082513
f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell"
25092514
)
25102515
os.environ["AWS_REGION"] = "us-east-1"
@@ -2518,6 +2523,8 @@ def spark() -> "SparkSession":
25182523
.config("spark.sql.shuffle.partitions", "1")
25192524
.config("spark.default.parallelism", "1")
25202525
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
2526+
.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
2527+
.config("spark.hadoop.fs.s3a.path.style.access", "true")
25212528
.config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog")
25222529
.config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
25232530
.config("spark.sql.catalog.integration.cache-enabled", "false")
@@ -2526,14 +2533,22 @@ def spark() -> "SparkSession":
25262533
.config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/")
25272534
.config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000")
25282535
.config("spark.sql.catalog.integration.s3.path-style-access", "true")
2529-
.config("spark.sql.defaultCatalog", "integration")
25302536
.config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog")
25312537
.config("spark.sql.catalog.hive.type", "hive")
25322538
.config("spark.sql.catalog.hive.uri", "http://localhost:9083")
25332539
.config("spark.sql.catalog.hive.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
25342540
.config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/")
25352541
.config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000")
25362542
.config("spark.sql.catalog.hive.s3.path-style-access", "true")
2543+
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
2544+
.config("spark.sql.catalog.spark_catalog.type", "hive")
2545+
.config("spark.sql.catalog.spark_catalog.uri", "http://localhost:9083")
2546+
.config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
2547+
.config("spark.sql.catalog.spark_catalog.warehouse", "s3://warehouse/hive/")
2548+
.config("spark.sql.catalog.spark_catalog.s3.endpoint", "http://localhost:9000")
2549+
.config("spark.sql.catalog.spark_catalog.s3.path-style-access", "true")
2550+
.config("spark.sql.catalogImplementation", "hive")
2551+
.config("spark.sql.defaultCatalog", "integration")
25372552
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
25382553
.getOrCreate()
25392554
)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import time
18+
19+
import pytest
20+
from pyspark.sql import SparkSession
21+
22+
from pyiceberg.catalog import Catalog
23+
24+
25+
@pytest.mark.integration
26+
def test_migrate_table(
27+
session_catalog_hive: Catalog,
28+
spark: SparkSession,
29+
) -> None:
30+
"""
31+
Imported tables are an edge case since the partition column is not stored
32+
in the Parquet files:
33+
34+
test_migrate_table_hive_1754486926/dt=2022-01-01/part-00000-30a9798b-7597-4027-86d9-79d7c529bc87.c000.snappy.parquet
35+
{
36+
"type" : "record",
37+
"name" : "spark_schema",
38+
"fields" : [ {
39+
"name" : "number",
40+
"type" : "int"
41+
} ]
42+
}
43+
44+
PyIceberg will project this column when the table is being read
45+
"""
46+
# Create new tables to avoid complex cleanup
47+
src_table_identifier = f"spark_catalog.default.test_migrate_table_hive_{int(time.time())}"
48+
dst_table_identifier = f"default.test_migrate_table_{int(time.time())}"
49+
50+
spark.sql(f"""
51+
CREATE TABLE {src_table_identifier} (
52+
number INTEGER
53+
)
54+
PARTITIONED BY (dt date)
55+
STORED AS parquet
56+
""")
57+
58+
spark.sql(f"""
59+
INSERT OVERWRITE TABLE {src_table_identifier}
60+
PARTITION (dt='2022-01-01')
61+
VALUES (1), (2), (3)
62+
""")
63+
64+
spark.sql(f"""
65+
INSERT OVERWRITE TABLE {src_table_identifier}
66+
PARTITION (dt='2023-01-01')
67+
VALUES (4), (5), (6)
68+
""")
69+
70+
spark.sql(f"""
71+
CALL hive.system.snapshot('{src_table_identifier}', 'hive.{dst_table_identifier}')
72+
""")
73+
74+
tbl = session_catalog_hive.load_table(dst_table_identifier)
75+
assert tbl.schema().column_names == ["number", "dt"]
76+
77+
# TODO: Returns the primitive type (int), rather than the logical type
78+
# assert set(tbl.scan().to_arrow().column(1).combine_chunks().tolist()) == {'2022-01-01', '2023-01-01'}
79+
80+
assert tbl.scan(row_filter="number > 3").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6]
81+
assert tbl.scan(row_filter="dt == '2023-01-01'").to_arrow().column(0).combine_chunks().tolist() == []
82+
83+
# TODO: Issue with filtering the projected column
84+
# assert tbl.scan(row_filter="dt == '2022-01-01'").to_arrow().column(0).combine_chunks().tolist() == [1, 2, 3]

0 commit comments

Comments
 (0)