Skip to content

Commit 4f5190b

Browse files
committed
Add spark integration tests, fix duckdb integration test
1 parent ed75062 commit 4f5190b

1 file changed

Lines changed: 91 additions & 3 deletions

File tree

tests/integration/test_writes/test_writes.py

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,11 +1237,99 @@ def test_sanitize_character_partitioned_avro_bug(catalog: Catalog) -> None:
12371237

12381238
assert len(tbl.scan().to_arrow()) == 22
12391239

1240-
con = tbl.scan().to_duckdb("table_test_debug")
1241-
result = con.query("SELECT * FROM table_test_debug").fetchall()
1240+
# verify that we can read the table with DuckDB
1241+
import duckdb
1242+
1243+
location = tbl.metadata_location
1244+
duckdb.sql("INSTALL iceberg; LOAD iceberg;")
1245+
# Configure S3 settings for DuckDB to match the catalog configuration
1246+
duckdb.sql("SET s3_endpoint='localhost:9000';")
1247+
duckdb.sql("SET s3_access_key_id='admin';")
1248+
duckdb.sql("SET s3_secret_access_key='password';")
1249+
duckdb.sql("SET s3_use_ssl=false;")
1250+
duckdb.sql("SET s3_url_style='path';")
1251+
result = duckdb.sql(f"SELECT * FROM iceberg_scan('{location}')").fetchall()
12421252
assert len(result) == 22
12431253

1244-
assert con.query("SHOW table_test_debug").fetchone() == ("😎", "VARCHAR", "YES", None, None, None)
1254+
1255+
@pytest.mark.integration
1256+
@pytest.mark.parametrize("format_version", [1, 2])
1257+
def test_cross_platform_special_character_compatibility(
1258+
spark: SparkSession, session_catalog: Catalog, format_version: int
1259+
) -> None:
1260+
"""Test cross-platform compatibility with special characters in column names."""
1261+
identifier = "default.test_cross_platform_special_characters"
1262+
1263+
# Test various special characters that need sanitization
1264+
special_characters = [
1265+
"😎", # emoji - Java produces _xD83D_xDE0E, Python produces _x1F60E
1266+
"a.b", # dot - both should produce a_x2Eb
1267+
"a#b", # hash - both should produce a_x23b
1268+
"9x", # starts with digit - both should produce _9x
1269+
"x_", # valid - should remain unchanged
1270+
"letter/abc", # slash - both should produce letter_x2Fabc
1271+
]
1272+
1273+
for i, special_char in enumerate(special_characters):
1274+
table_name = f"{identifier}_{format_version}_{i}"
1275+
pyiceberg_table_name = f"{identifier}_pyiceberg_{format_version}_{i}"
1276+
1277+
try:
1278+
session_catalog.drop_table(table_name)
1279+
except Exception:
1280+
pass
1281+
try:
1282+
session_catalog.drop_table(pyiceberg_table_name)
1283+
except Exception:
1284+
pass
1285+
1286+
try:
1287+
# Test 1: Spark writes, PyIceberg reads
1288+
spark_df = spark.createDataFrame([("test_value",)], [special_char])
1289+
spark_df.writeTo(table_name).using("iceberg").createOrReplace()
1290+
1291+
# Read with PyIceberg table scan
1292+
tbl = session_catalog.load_table(table_name)
1293+
pyiceberg_df = tbl.scan().to_pandas()
1294+
assert len(pyiceberg_df) == 1
1295+
assert special_char in pyiceberg_df.columns
1296+
assert pyiceberg_df.iloc[0][special_char] == "test_value"
1297+
1298+
# Test 2: PyIceberg writes, Spark reads
1299+
from pyiceberg.schema import Schema
1300+
from pyiceberg.types import NestedField, StringType
1301+
1302+
schema = Schema(NestedField(field_id=1, name=special_char, field_type=StringType(), required=True))
1303+
1304+
tbl_pyiceberg = session_catalog.create_table(
1305+
identifier=pyiceberg_table_name, schema=schema, properties={"format-version": str(format_version)}
1306+
)
1307+
1308+
import pyarrow as pa
1309+
1310+
# Create PyArrow schema with required field to match Iceberg schema
1311+
pa_schema = pa.schema([pa.field(special_char, pa.string(), nullable=False)])
1312+
data = pa.Table.from_pydict({special_char: ["pyiceberg_value"]}, schema=pa_schema)
1313+
tbl_pyiceberg.append(data)
1314+
1315+
# Read with Spark
1316+
spark_df_read = spark.table(pyiceberg_table_name)
1317+
spark_result = spark_df_read.collect()
1318+
1319+
# Verify data integrity
1320+
assert len(spark_result) == 1
1321+
assert special_char in spark_df_read.columns
1322+
assert spark_result[0][special_char] == "pyiceberg_value"
1323+
1324+
finally:
1325+
try:
1326+
session_catalog.drop_table(table_name)
1327+
except Exception:
1328+
pass
1329+
try:
1330+
session_catalog.drop_table(pyiceberg_table_name)
1331+
except Exception:
1332+
pass
12451333

12461334

12471335
@pytest.mark.integration

0 commit comments

Comments
 (0)