Skip to content

Scan with filtering on projected field rerurn empty table #2028

@Erigara

Description

@Erigara

Apache Iceberg version

0.9.1 (latest release)

Please describe the bug 🐞

I've noticed than when including filters on projected fields (not included in parquet data files) scan return empty result.

Spark implementation produce correct result here.

I did a bit of digging and it looks like the problem is somewhere in ds.Scanner.from_fragment function call here because it produces empty batch iterator.
I've noticed that translated_row_filter and bound_file_filter are AlwaysFalse (due to VendorId missing from file_schema).
Maybe filters for projected columns could be excluded from filter somehow.

Second issue might be that batch is first filtred and only than projected column is added.

I would like to contribute but some directions would be nice.

Here is self-contained script to reproduce the issue:

#!/usr/bin/env python
import polars as pl
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg import expressions as expr
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table import _parquet_files_to_data_files
from pyiceberg.table.name_mapping import NameMapping, MappedField
from pyiceberg.io.pyarrow import pyarrow_to_schema

catalog = load_catalog(
    "default",
    **{
        "type": "in-memory",
        "warehouse": "warehouse/",
    }
)

catalog.create_namespace_if_not_exists("default")

# write filtered data 
file = "warehouse/VendorID=1_yellow_tripdata_2025-01.parquet"
df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet")
df.filter(pl.col("VendorID") == 1).drop("VendorID").write_parquet(file)

# create iceberg table
df = df.to_arrow()
schema = df.schema
mapping = NameMapping([MappedField(field_id=i,names=[name]) for i, name in enumerate(schema.names, 1)])
schema = pyarrow_to_schema(schema, mapping)
table = catalog.create_table_if_not_exists(
    "default.taxi",
    schema=schema,
    partition_spec=PartitionSpec(
        PartitionField(source_id=schema.find_field("VendorID").field_id, field_id=schema.find_field("VendorID").field_id, transform=IdentityTransform(), name="VendorID"),
    ),
    properties={"schema.name-mapping.default": mapping.model_dump_json()},
)

# add_files
files = [file]
with table.transaction() as tx:
    with tx.update_snapshot().fast_append() as fast_append:
        data_files = _parquet_files_to_data_files(
            table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io
        )
        for data_file in data_files:
            # set partition for VendorID
            # current file has only one partition anyway
            # VendorID = 1
            data_file.partition[0] = 1
            fast_append.append_data_file(data_file)

# query with projected field in predicate
scan = table.scan(row_filter=expr.EqualTo("VendorID", 1))
print(f"WITH PROJECTED COLUMN LEN: {len(scan.to_arrow())}")

# query without projected field in predicate
scan = table.scan()
print(f"WITHOUT PROJECTED COLUM LEN: {len(scan.to_arrow())}")

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions