Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,58 @@
)
spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id")
spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'")

spark.sql(
f"""
CREATE OR REPLACE TABLE {catalog_name}.default.test_incremental_read (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'format-version'='2'
);
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_incremental_read
VALUES (CAST('2022-03-01' AS date), 1, 'a')
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_incremental_read
VALUES (CAST('2022-03-01' AS date), 2, 'b')
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_incremental_read
VALUES (CAST('2022-03-02' AS date), 3, 'c'), (CAST('2022-03-02' AS date), 4, 'b')
"""
)

spark.sql(
f"""
DELETE FROM {catalog_name}.default.test_incremental_read
WHERE number = 2
"""
)

# https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407
# Don't do replace for Hive catalog as REPLACE TABLE requires certain Hive server configuration
if catalog_name != "hive":
# Replace to break snapshot lineage:
spark.sql(
f"""
REPLACE TABLE {catalog_name}.default.test_incremental_read
USING iceberg
TBLPROPERTIES ('format-version'='2')
AS SELECT number, letter FROM {catalog_name}.default.test_incremental_read
"""
)
188 changes: 188 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,3 +1070,191 @@ def test_filter_after_arrow_scan(catalog: Catalog) -> None:

scan = scan.filter("ts >= '2023-03-05T00:00:00+00:00'")
assert len(scan.to_arrow()) > 0


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_append_only(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

scan = (
test_table.incremental_append_scan()
# Only "append"-operation snapshots occurred in this range
.from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
.to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
)

assert len(list(scan.plan_files())) == 2

# Check various read methods
assert len(scan.to_arrow()) == 3
assert len(scan.to_arrow_batch_reader().read_all()) == 3
assert len(scan.to_pandas()) == 3
assert len(scan.to_polars()) == 3


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_ignores_non_append_snapshots(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

scan = test_table.incremental_append_scan(
from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id,
# This is a "delete"-operation snapshot, that should be ignored by the append scan
to_snapshot_id_inclusive=test_table.snapshots()[3].snapshot_id,
)

assert len(list(scan.plan_files())) == 2
assert len(scan.to_arrow()) == 3


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_uses_current_schema(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

scan = (
test_table.incremental_append_scan()
.from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
.to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
)

# The schema within the snapshot range above included an extra date field, but the table was then replaced,
# removing it. An append scan always uses the current schema of the table.
expected_schema = pa.schema(
[
pa.field("number", pa.int32()),
pa.field("letter", pa.string()),
]
)

result_table = scan.to_arrow()
assert result_table.schema.equals(expected_schema)


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_row_filter(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

scan = (
test_table.incremental_append_scan(row_filter=EqualTo("letter", "b"))
.from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
.to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
)

# This filter should match against the only row added in snapshots[1] and one of the two rows added in snapshots[2]
assert len(list(scan.plan_files())) == 2
assert len(scan.to_arrow()) == 2


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_selected_fields(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

scan = (
test_table.incremental_append_scan(selected_fields=("number",))
.from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
.to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
)

expected_schema = pa.schema(
[
pa.field("number", pa.int32()),
]
)

result_table = scan.to_arrow()
assert result_table.schema.equals(expected_schema)
assert sorted(result_table["number"].to_pylist()) == [2, 3, 4]


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_limit(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

scan = (
test_table.incremental_append_scan(limit=2)
.from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
.to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
)

# Although three rows were added in the range, the limit of 2 should be applied
assert len(scan.to_arrow()) == 2


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_incremental_append_scan_to_snapshot_defaults_to_current(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

assert (
len(test_table.incremental_append_scan().from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id).to_arrow()) == 3
)


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_equal_from_and_to_snapshots(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")
snapshot_id = test_table.snapshots()[0].snapshot_id

# Exclusive-inclusive semantics mean an empty table should be returned if equal from and to snapshots are specified
assert (
len(
test_table.incremental_append_scan()
.from_snapshot_exclusive(snapshot_id)
.to_snapshot_inclusive(snapshot_id)
.to_arrow()
)
== 0
)


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_throws_on_disconnected_snapshots(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

from_id = test_table.snapshots()[0].snapshot_id
to_id = test_table.snapshots()[4].snapshot_id

with pytest.raises(ValueError) as e:
test_table.incremental_append_scan(
from_snapshot_id_exclusive=from_id,
# A table replace occurred just before this snapshot, breaking snapshot lineage / incremental-ity
to_snapshot_id_inclusive=to_id,
).plan_files()

assert f"Append scan's start snapshot {from_id} is not an ancestor of end snapshot {to_id}" in str(e.value)


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_incremental_append_scan_throws_on_missing_snapshot_ids(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_incremental_read")

# from_snapshot_id not specified
with pytest.raises(ValueError) as e:
test_table.incremental_append_scan(
to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id,
).plan_files()
assert "Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive" in str(e.value)

# from_snapshot_id missing from metadata
with pytest.raises(ValueError) as e:
test_table.incremental_append_scan(
from_snapshot_id_exclusive=42,
to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id,
).plan_files()
assert "Start snapshot of append scan not found on table metadata: 42" in str(e.value)

# to_snapshot_id missing from metadata
with pytest.raises(ValueError) as e:
test_table.incremental_append_scan(
from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id,
to_snapshot_id_inclusive=42,
).plan_files()
assert "End snapshot of append scan not found on table metadata: 42" in str(e.value)