Skip to content

Commit ee5931e

Browse files
committed
Add tests, lint
1 parent d7c02c2 commit ee5931e

File tree

4 files changed

+428
-6
lines changed

4 files changed

+428
-6
lines changed

pyiceberg/schema.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@
7979
INITIAL_SCHEMA_ID = 0
8080

8181
FIELD_ID_PROP = "field-id"
82-
ICEBERG_FIELD_NAME_PROP = "iceberg-field-name"
82+
ICEBERG_FIELD_NAME_PROP = "iceberg-field-name"
83+
8384

8485
class Schema(IcebergBaseModel):
8586
"""A table Schema.
@@ -1359,17 +1360,17 @@ def primitive(self, primitive: PrimitiveType) -> PrimitiveType:
13591360
# Implementation copied from Apache Iceberg repo.
13601361
def make_compatible_name(name: str) -> str:
13611362
"""Make a field name compatible with Avro specification.
1362-
1363+
13631364
This function sanitizes field names to comply with Avro naming rules:
13641365
- Names must start with [A-Za-z_]
13651366
- Subsequent characters must be [A-Za-z0-9_]
1366-
1367+
13671368
Invalid characters are replaced with _xHHHH where HHHH is the hex code.
13681369
Names starting with digits get a leading underscore.
1369-
1370+
13701371
Args:
13711372
name: The original field name
1372-
1373+
13731374
Returns:
13741375
A sanitized name that complies with Avro specification
13751376
"""

pyiceberg/utils/schema_conversion.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,15 @@
2626
Union,
2727
)
2828

29-
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit, ICEBERG_FIELD_NAME_PROP, FIELD_ID_PROP, make_compatible_name, _valid_avro_name
29+
from pyiceberg.schema import (
30+
FIELD_ID_PROP,
31+
ICEBERG_FIELD_NAME_PROP,
32+
Schema,
33+
SchemaVisitorPerPrimitiveType,
34+
_valid_avro_name,
35+
make_compatible_name,
36+
visit,
37+
)
3038
from pyiceberg.types import (
3139
BinaryType,
3240
BooleanType,
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import tempfile
19+
20+
import pytest
21+
from fastavro import reader
22+
23+
import pyiceberg.avro.file as avro
24+
from pyiceberg.io.pyarrow import PyArrowFileIO
25+
from pyiceberg.schema import Schema
26+
from pyiceberg.typedef import Record
27+
from pyiceberg.types import IntegerType, NestedField, StringType
28+
from pyiceberg.utils.schema_conversion import AvroSchemaConversion
29+
30+
31+
class AvroTestRecord(Record):
32+
"""Test record class for Avro compatibility testing."""
33+
34+
@property
35+
def valid_field(self) -> str:
36+
return self._data[0]
37+
38+
@property
39+
def invalid_field(self) -> int:
40+
return self._data[1]
41+
42+
@property
43+
def field_with_dot(self) -> str:
44+
return self._data[2]
45+
46+
@property
47+
def field_with_hash(self) -> int:
48+
return self._data[3]
49+
50+
@property
51+
def field_starting_with_digit(self) -> str:
52+
return self._data[4]
53+
54+
55+
@pytest.mark.integration
56+
def test_avro_compatibility() -> None:
57+
"""Test that Avro files with sanitized names can be read by other tools."""
58+
59+
schema = Schema(
60+
NestedField(field_id=1, name="valid_field", field_type=StringType(), required=True),
61+
NestedField(field_id=2, name="invalid.field", field_type=IntegerType(), required=True),
62+
NestedField(field_id=3, name="field_with_dot", field_type=StringType(), required=True),
63+
NestedField(field_id=4, name="field_with_hash", field_type=IntegerType(), required=True),
64+
NestedField(field_id=5, name="9x", field_type=StringType(), required=True),
65+
)
66+
67+
test_records = [
68+
AvroTestRecord("hello", 42, "world", 123, "test"),
69+
AvroTestRecord("goodbye", 99, "universe", 456, "example"),
70+
]
71+
72+
with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file:
73+
tmp_avro_file = tmp_file.name
74+
75+
try:
76+
with avro.AvroOutputFile[AvroTestRecord](
77+
output_file=PyArrowFileIO().new_output(tmp_avro_file),
78+
file_schema=schema,
79+
schema_name="test_schema",
80+
metadata={"test": "metadata"},
81+
) as output_file:
82+
output_file.write_block(test_records)
83+
84+
with open(tmp_avro_file, "rb") as fo:
85+
avro_reader = reader(fo)
86+
87+
avro_schema = avro_reader.writer_schema
88+
field_names = [field["name"] for field in avro_schema["fields"]]
89+
90+
# Expected sanitized names (matching Java implementation)
91+
expected_field_names = [
92+
"valid_field",
93+
"invalid_x2Efield",
94+
"field_with_dot",
95+
"field_with_hash",
96+
"_9x",
97+
]
98+
99+
assert field_names == expected_field_names
100+
101+
for field in avro_schema["fields"]:
102+
if field["name"] == "invalid_x2Efield":
103+
assert "iceberg-field-name" in field
104+
assert field["iceberg-field-name"] == "invalid.field"
105+
elif field["name"] == "_9x":
106+
assert "iceberg-field-name" in field
107+
assert field["iceberg-field-name"] == "9x"
108+
else:
109+
assert "iceberg-field-name" not in field
110+
111+
records = list(avro_reader)
112+
113+
assert len(records) == 2
114+
115+
first_record = records[0]
116+
assert first_record["valid_field"] == "hello"
117+
assert first_record["invalid_x2Efield"] == 42
118+
assert first_record["field_with_dot"] == "world"
119+
assert first_record["field_with_hash"] == 123
120+
assert first_record["_9x"] == "test"
121+
122+
second_record = records[1]
123+
assert second_record["valid_field"] == "goodbye"
124+
assert second_record["invalid_x2Efield"] == 99
125+
assert second_record["field_with_dot"] == "universe"
126+
assert second_record["field_with_hash"] == 456
127+
assert second_record["_9x"] == "example"
128+
129+
assert avro_reader.metadata.get("test") == "metadata"
130+
131+
finally:
132+
import os
133+
134+
if os.path.exists(tmp_avro_file):
135+
os.unlink(tmp_avro_file)
136+
137+
138+
@pytest.mark.integration
139+
def test_avro_schema_conversion_sanitization() -> None:
140+
"""Test that schema conversion properly sanitizes field names."""
141+
142+
# Create schema with various invalid field names
143+
schema = Schema(
144+
NestedField(field_id=1, name="valid_name", field_type=StringType(), required=True),
145+
NestedField(field_id=2, name="invalid.name", field_type=IntegerType(), required=True),
146+
NestedField(field_id=3, name="name#with#hash", field_type=StringType(), required=True),
147+
NestedField(field_id=4, name="☃", field_type=IntegerType(), required=True), # Unicode character
148+
NestedField(field_id=5, name="123number", field_type=StringType(), required=True),
149+
)
150+
151+
avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="test_schema")
152+
153+
field_names = [field["name"] for field in avro_schema["fields"]]
154+
expected_field_names = [
155+
"valid_name", # Valid name, unchanged
156+
"invalid_x2Ename", # Dot becomes _x2E
157+
"name_x23with_x23hash", # Hash becomes _x23
158+
"_x2603", # Unicode snowman becomes _x2603
159+
"_123number", # Starts with digit, gets leading underscore
160+
]
161+
162+
assert field_names == expected_field_names
163+
164+
for field in avro_schema["fields"]:
165+
if field["name"] == "invalid_x2Ename":
166+
assert field["iceberg-field-name"] == "invalid.name"
167+
elif field["name"] == "name_x23with_x23hash":
168+
assert field["iceberg-field-name"] == "name#with#hash"
169+
elif field["name"] == "_x2603":
170+
assert field["iceberg-field-name"] == "☃"
171+
elif field["name"] == "_123number":
172+
assert field["iceberg-field-name"] == "123number"
173+
else:
174+
assert "iceberg-field-name" not in field
175+
176+
177+
@pytest.mark.integration
178+
def test_avro_file_structure_verification() -> None:
179+
"""Test that the Avro file structure is correct and can be parsed."""
180+
181+
schema = Schema(
182+
NestedField(field_id=1, name="test.field", field_type=StringType(), required=True),
183+
)
184+
185+
test_records = [AvroTestRecord("hello")]
186+
187+
with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file:
188+
tmp_avro_file = tmp_file.name
189+
190+
try:
191+
with avro.AvroOutputFile[AvroTestRecord](
192+
output_file=PyArrowFileIO().new_output(tmp_avro_file),
193+
file_schema=schema,
194+
schema_name="simple_test",
195+
) as output_file:
196+
output_file.write_block(test_records)
197+
198+
with open(tmp_avro_file, "rb") as fo:
199+
# Read magic bytes (first 4 bytes should be Avro magic)
200+
magic = fo.read(4)
201+
assert magic == b"Obj\x01" # Avro magic bytes
202+
203+
import struct
204+
205+
metadata_length = struct.unpack(">I", fo.read(4))[0]
206+
assert metadata_length > 0
207+
208+
from fastavro import reader
209+
210+
fo.seek(0)
211+
avro_reader = reader(fo)
212+
213+
avro_schema = avro_reader.writer_schema
214+
215+
assert len(avro_schema["fields"]) == 1
216+
field = avro_schema["fields"][0]
217+
assert field["name"] == "test_x2Efield"
218+
assert field["iceberg-field-name"] == "test.field"
219+
220+
records = list(avro_reader)
221+
assert len(records) == 1
222+
assert records[0]["test_x2Efield"] == "hello"
223+
224+
finally:
225+
import os
226+
227+
if os.path.exists(tmp_avro_file):
228+
os.unlink(tmp_avro_file)
229+
230+
231+
@pytest.mark.integration
232+
def test_edge_cases_sanitization() -> None:
233+
"""Test edge cases for field name sanitization."""
234+
235+
test_cases = [
236+
("123", "_123"), # All digits
237+
("_", "_"), # Just underscore
238+
("a", "a"), # Single letter
239+
("a1", "a1"), # Letter followed by digit
240+
("1a", "_1a"), # Digit followed by letter
241+
("a.b", "a_x2Eb"), # Letter, dot, letter
242+
("a#b", "a_x23b"), # Letter, hash, letter
243+
("☃", "_x2603"), # Unicode character
244+
("a☃b", "a_x2603b"), # Letter, unicode, letter
245+
]
246+
247+
for original_name, expected_sanitized in test_cases:
248+
schema = Schema(
249+
NestedField(field_id=1, name=original_name, field_type=StringType(), required=True),
250+
)
251+
252+
avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="edge_test")
253+
254+
field = avro_schema["fields"][0]
255+
assert field["name"] == expected_sanitized
256+
257+
if original_name != expected_sanitized:
258+
assert field["iceberg-field-name"] == original_name
259+
else:
260+
assert "iceberg-field-name" not in field
261+
262+
263+
@pytest.mark.integration
264+
def test_emoji_field_name_sanitization() -> None:
265+
"""Test that emoji field names are properly sanitized according to Java implementation."""
266+
267+
schema = Schema(
268+
NestedField(field_id=1, name="😎", field_type=IntegerType(), required=True),
269+
NestedField(field_id=2, name="valid_field", field_type=StringType(), required=True),
270+
NestedField(field_id=3, name="😎_with_text", field_type=StringType(), required=True),
271+
)
272+
273+
avro_schema = AvroSchemaConversion().iceberg_to_avro(schema, schema_name="emoji_test")
274+
275+
field_names = [field["name"] for field in avro_schema["fields"]]
276+
expected_field_names = [
277+
"_x1F60E", # 😎 becomes _x1F60E (Unicode 0x1F60E)
278+
"valid_field",
279+
"_x1F60E_with_text",
280+
]
281+
282+
assert field_names == expected_field_names
283+
284+
for field in avro_schema["fields"]:
285+
if field["name"] == "_x1F60E":
286+
assert field["iceberg-field-name"] == "😎"
287+
elif field["name"] == "_x1F60E_with_text":
288+
assert field["iceberg-field-name"] == "😎_with_text"
289+
else:
290+
assert "iceberg-field-name" not in field
291+
292+
test_records = [
293+
AvroTestRecord(42, "hello", "world"),
294+
]
295+
296+
with tempfile.NamedTemporaryFile(suffix=".avro", delete=False) as tmp_file:
297+
tmp_avro_file = tmp_file.name
298+
299+
try:
300+
with avro.AvroOutputFile[AvroTestRecord](
301+
output_file=PyArrowFileIO().new_output(tmp_avro_file),
302+
file_schema=schema,
303+
schema_name="emoji_test",
304+
) as output_file:
305+
output_file.write_block(test_records)
306+
307+
with open(tmp_avro_file, "rb") as fo:
308+
avro_reader = reader(fo)
309+
310+
avro_schema = avro_reader.writer_schema
311+
field_names = [field["name"] for field in avro_schema["fields"]]
312+
313+
assert field_names == expected_field_names
314+
315+
for field in avro_schema["fields"]:
316+
if field["name"] == "_x1F60E":
317+
assert field["iceberg-field-name"] == "😎"
318+
elif field["name"] == "_x1F60E_with_text":
319+
assert field["iceberg-field-name"] == "😎_with_text"
320+
else:
321+
assert "iceberg-field-name" not in field
322+
323+
records = list(avro_reader)
324+
assert len(records) == 1
325+
326+
first_record = records[0]
327+
assert first_record["_x1F60E"] == 42
328+
assert first_record["valid_field"] == "hello"
329+
assert first_record["_x1F60E_with_text"] == "world"
330+
331+
finally:
332+
import os
333+
334+
if os.path.exists(tmp_avro_file):
335+
os.unlink(tmp_avro_file)

0 commit comments

Comments
 (0)