Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,23 @@
spark.sql(f"ALTER TABLE {catalog_name}.default.test_table_add_column ADD COLUMN b string")

spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column VALUES ('2', '2')")

spark.sql(
f"""
CREATE TABLE {catalog_name}.default.test_table_empty_list_and_map (
col_list array<int>,
col_map map<int, int>
)
USING iceberg
TBLPROPERTIES (
'format-version'='1'
);
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_empty_list_and_map
VALUES (null, null)
"""
)
52 changes: 35 additions & 17 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
LIST_ELEMENT_NAME = "element"
MAP_KEY_NAME = "key"
MAP_VALUE_NAME = "value"
DOC = "doc"

T = TypeVar("T")

Expand Down Expand Up @@ -1118,12 +1119,22 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
def __init__(self, file_schema: Schema):
self.file_schema = file_schema

def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
file_field = self.file_schema.find_field(field.field_id)
if field.field_type.is_primitive and field.field_type != file_field.field_type:
return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type)))
return values

def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field:
return pa.field(
name=field.name,
type=arrow_type,
nullable=field.optional,
metadata={DOC: field.doc, PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)}
if field.doc
else {PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)},
)

def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]:
return struct_result

Expand All @@ -1136,13 +1147,13 @@ def struct(
fields: List[pa.Field] = []
for field, field_array in zip(struct.fields, field_results):
if field_array is not None:
array = self.cast_if_needed(field, field_array)
array = self._cast_if_needed(field, field_array)
field_arrays.append(array)
fields.append(pa.field(field.name, array.type, field.optional))
fields.append(self._construct_field(field, array.type))
elif field.optional:
arrow_type = schema_to_pyarrow(field.field_type)
field_arrays.append(pa.nulls(len(struct_array), type=arrow_type))
fields.append(pa.field(field.name, arrow_type, field.optional))
fields.append(self._construct_field(field, arrow_type))
else:
raise ResolveError(f"Field is required, and could not be found in the file: {field}")

Expand All @@ -1152,24 +1163,31 @@ def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional
return field_array

def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]:
return (
pa.ListArray.from_arrays(list_array.offsets, self.cast_if_needed(list_type.element_field, value_array))
if isinstance(list_array, pa.ListArray)
else None
)
if isinstance(list_array, pa.ListArray) and value_array is not None:
arrow_field = pa.list_(self._construct_field(list_type.element_field, value_array.type))
if isinstance(value_array, pa.StructArray):
# Arrow does not allow reordering of fields, therefore we have to copy the array :(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm my understanding, another reason that we have to copy the array: Arrow also does not allow field-mismatch, which happens when we have an optional schema field and no values for that field in the file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this limitation will make the issue remaining in this edge case (col_list array<struct<test:int>>):

spark.sql(
        f"""
    CREATE TABLE {catalog_name}.default.test_table_empty_list_and_map (
        col_list array<struct<test:int>>,
        col_map map<int, int>
    )
    USING iceberg
    TBLPROPERTIES (
        'format-version'='1'
    );
    """
    )

    spark.sql(
        f"""
    INSERT INTO {catalog_name}.default.test_table_empty_list_and_map
    VALUES (null, null)
    """
    )

 def test_null_list_and_map(catalog: Catalog) -> None:
        table_test_empty_list_and_map = catalog.load_table("default.test_table_empty_list_and_map")
        arrow_table = table_test_empty_list_and_map.scan().to_arrow()
>       assert arrow_table["col_list"].to_pylist() == [None]
E       assert [[]] == [None]

Copy link
Copy Markdown
Contributor

@HonahX HonahX Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems adding the following before line 1167

            if list_array.is_null():
                return None

can let me pass the above test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test-case, let me add that one 👍 I don't think the last fix is going to work, since not all values might be null 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed a fix, but I don't think we can solve this yet without a fix in Arrow upstream 😢 apache/arrow#38809

return pa.ListArray.from_arrays(list_array.offsets, value_array, arrow_field)
else:
return list_array.cast(arrow_field)
else:
return None

def map(
self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], value_result: Optional[pa.Array]
) -> Optional[pa.Array]:
return (
pa.MapArray.from_arrays(
map_array.offsets,
self.cast_if_needed(map_type.key_field, key_result),
self.cast_if_needed(map_type.value_field, value_result),
if isinstance(map_array, pa.MapArray) and key_result is not None and value_result is not None:
arrow_field = pa.map_(
self._construct_field(map_type.key_field, key_result.type),
self._construct_field(map_type.value_field, value_result.type),
)
if isinstance(map_array, pa.MapArray)
else None
)
if isinstance(value_result, pa.StructArray):
# Arrow does not allow reordering of fields, therefore we have to copy the array :(
return pa.MapArray.from_arrays(map_array.offsets, key_result, value_result, arrow_field)
else:
return map_array.cast(arrow_field)
else:
return None

def primitive(self, _: PrimitiveType, array: Optional[pa.Array]) -> Optional[pa.Array]:
return array
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,12 @@ def test_sanitize_character(catalog: Catalog) -> None:
assert len(arrow_table.schema.names), 1
assert len(table_test_table_sanitized_character.schema().fields), 1
assert arrow_table.schema.names[0] == table_test_table_sanitized_character.schema().fields[0].name


@pytest.mark.integration
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')])
def test_null_list_and_map(catalog: Catalog) -> None:
table_test_empty_list_and_map = catalog.load_table("default.test_table_empty_list_and_map")
arrow_table = table_test_empty_list_and_map.scan().to_arrow()
assert arrow_table["col_list"].to_pylist() == [None]
assert arrow_table["col_map"].to_pylist() == [None]
154 changes: 145 additions & 9 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,24 @@ def schema_list_of_structs() -> Schema:
)


@pytest.fixture
def schema_map_of_structs() -> Schema:
return Schema(
NestedField(
5,
"locations",
MapType(
key_id=51,
value_id=52,
key_type=StringType(),
value_type=StructType(NestedField(511, "lat", DoubleType()), NestedField(512, "long", DoubleType())),
element_required=False,
),
required=False,
),
)


@pytest.fixture
def schema_map() -> Schema:
return Schema(
Expand Down Expand Up @@ -793,6 +811,25 @@ def file_list_of_structs(schema_list_of_structs: Schema, tmpdir: str) -> str:
)


@pytest.fixture
def file_map_of_structs(schema_map_of_structs: Schema, tmpdir: str) -> str:
pyarrow_schema = schema_to_pyarrow(
schema_map_of_structs, metadata={ICEBERG_SCHEMA: bytes(schema_map_of_structs.model_dump_json(), UTF8)}
)
return _write_table_to_file(
f"file:{tmpdir}/e.parquet",
pyarrow_schema,
pa.Table.from_pylist(
[
{"locations": {"1": {"lat": 52.371807, "long": 4.896029}, "2": {"lat": 52.387386, "long": 4.646219}}},
{"locations": {}},
{"locations": {"3": {"lat": 52.078663, "long": 4.288788}, "4": {"lat": 52.387386, "long": 4.646219}}},
],
schema=pyarrow_schema,
),
)


@pytest.fixture
def file_map(schema_map: Schema, tmpdir: str) -> str:
pyarrow_schema = schema_to_pyarrow(schema_map, metadata={ICEBERG_SCHEMA: bytes(schema_map.model_dump_json(), UTF8)})
Expand Down Expand Up @@ -914,7 +951,13 @@ def test_read_list(schema_list: Schema, file_list: str) -> None:
for actual, expected in zip(result_table.columns[0], [list(range(1, 10)), list(range(2, 20)), list(range(3, 30))]):
assert actual.as_py() == expected

assert repr(result_table.schema) == "ids: list<item: int32>\n child 0, item: int32"
assert (
repr(result_table.schema)
== """ids: list<element: int32>
child 0, element: int32
-- field metadata --
PARQUET:field_id: '51'"""
)


def test_read_map(schema_map: Schema, file_map: str) -> None:
Expand All @@ -927,9 +970,13 @@ def test_read_map(schema_map: Schema, file_map: str) -> None:
assert (
repr(result_table.schema)
== """properties: map<string, string>
child 0, entries: struct<key: string not null, value: string> not null
child 0, entries: struct<key: string not null, value: string not null> not null
child 0, key: string not null
child 1, value: string"""
-- field metadata --
PARQUET:field_id: '51'
child 1, value: string not null
-- field metadata --
PARQUET:field_id: '52'"""
)


Expand Down Expand Up @@ -1063,7 +1110,13 @@ def test_projection_nested_struct_subset(file_struct: str) -> None:
assert actual.as_py() == {"lat": expected}

assert len(result_table.columns[0]) == 3
assert repr(result_table.schema) == "location: struct<lat: double not null> not null\n child 0, lat: double not null"
assert (
repr(result_table.schema)
== """location: struct<lat: double not null> not null
child 0, lat: double not null
-- field metadata --
PARQUET:field_id: '41'"""
)


def test_projection_nested_new_field(file_struct: str) -> None:
Expand All @@ -1082,7 +1135,13 @@ def test_projection_nested_new_field(file_struct: str) -> None:
for actual, expected in zip(result_table.columns[0], [None, None, None]):
assert actual.as_py() == {"null": expected}
assert len(result_table.columns[0]) == 3
assert repr(result_table.schema) == "location: struct<null: double> not null\n child 0, null: double"
assert (
repr(result_table.schema)
== """location: struct<null: double> not null
child 0, null: double
-- field metadata --
PARQUET:field_id: '43'"""
)


def test_projection_nested_struct(schema_struct: Schema, file_struct: str) -> None:
Expand Down Expand Up @@ -1111,7 +1170,16 @@ def test_projection_nested_struct(schema_struct: Schema, file_struct: str) -> No
assert len(result_table.columns[0]) == 3
assert (
repr(result_table.schema)
== "location: struct<lat: double, null: double, long: double> not null\n child 0, lat: double\n child 1, null: double\n child 2, long: double"
== """location: struct<lat: double, null: double, long: double> not null
child 0, lat: double
-- field metadata --
PARQUET:field_id: '41'
child 1, null: double
-- field metadata --
PARQUET:field_id: '43'
child 2, long: double
-- field metadata --
PARQUET:field_id: '42'"""
)


Expand Down Expand Up @@ -1153,11 +1221,79 @@ def test_projection_list_of_structs(schema_list_of_structs: Schema, file_list_of
assert actual.as_py() == expected
assert (
repr(result_table.schema)
== """locations: list<item: struct<latitude: double not null, longitude: double not null, altitude: double>>
child 0, item: struct<latitude: double not null, longitude: double not null, altitude: double>
== """locations: list<element: struct<latitude: double not null, longitude: double not null, altitude: double>>
child 0, element: struct<latitude: double not null, longitude: double not null, altitude: double>
child 0, latitude: double not null
-- field metadata --
PARQUET:field_id: '511'
child 1, longitude: double not null
child 2, altitude: double"""
-- field metadata --
PARQUET:field_id: '512'
child 2, altitude: double
-- field metadata --
PARQUET:field_id: '513'
-- field metadata --
PARQUET:field_id: '51'"""
)


def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_structs: str) -> None:
schema = Schema(
NestedField(
5,
"locations",
MapType(
key_id=51,
value_id=52,
key_type=StringType(),
value_type=StructType(
NestedField(511, "latitude", DoubleType()),
NestedField(512, "longitude", DoubleType()),
NestedField(513, "altitude", DoubleType(), required=False),
),
element_required=False,
),
required=False,
),
)

result_table = project(schema, [file_map_of_structs])
assert len(result_table.columns) == 1
assert len(result_table.columns[0]) == 3
for actual, expected in zip(
result_table.columns[0],
[
[
("1", {"latitude": 52.371807, "longitude": 4.896029, "altitude": None}),
("2", {"latitude": 52.387386, "longitude": 4.646219, "altitude": None}),
],
[],
[
("3", {"latitude": 52.078663, "longitude": 4.288788, "altitude": None}),
("4", {"latitude": 52.387386, "longitude": 4.646219, "altitude": None}),
],
],
):
assert actual.as_py() == expected
assert (
repr(result_table.schema)
== """locations: map<string, struct<latitude: double not null, longitude: double not null, altitude: double>>
child 0, entries: struct<key: string not null, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null> not null
child 0, key: string not null
-- field metadata --
PARQUET:field_id: '51'
child 1, value: struct<latitude: double not null, longitude: double not null, altitude: double> not null
child 0, latitude: double not null
-- field metadata --
PARQUET:field_id: '511'
child 1, longitude: double not null
-- field metadata --
PARQUET:field_id: '512'
child 2, altitude: double
-- field metadata --
PARQUET:field_id: '513'
-- field metadata --
PARQUET:field_id: '52'"""
)


Expand Down