diff --git a/dev/provision.py b/dev/provision.py index 231f5123ce..b5918d481f 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -409,3 +409,58 @@ ) spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id") spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'") + + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_incremental_read ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='2' + ); + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-01' AS date), 1, 'a') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-01' AS date), 2, 'b') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-02' AS date), 3, 'c'), (CAST('2022-03-02' AS date), 4, 'b') + """ + ) + + spark.sql( + f""" + DELETE FROM {catalog_name}.default.test_incremental_read + WHERE number = 2 + """ + ) + + # https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407 + # Don't do replace for Hive catalog as REPLACE TABLE requires certain Hive server configuration + if catalog_name != "hive": + # Replace to break snapshot lineage: + spark.sql( + f""" + REPLACE TABLE {catalog_name}.default.test_incremental_read + USING iceberg + TBLPROPERTIES ('format-version'='2') + AS SELECT number, letter FROM {catalog_name}.default.test_incremental_read + """ + ) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 96844fd995..9713570025 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1070,3 +1070,191 @@ def test_filter_after_arrow_scan(catalog: Catalog) -> None: scan = scan.filter("ts >= '2023-03-05T00:00:00+00:00'") assert len(scan.to_arrow()) > 0 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_append_only(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + # Only "append"-operation snapshots occurred in this range + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + assert len(list(scan.plan_files())) == 2 + + # Check various read methods + assert len(scan.to_arrow()) == 3 + assert len(scan.to_arrow_batch_reader().read_all()) == 3 + assert len(scan.to_pandas()) == 3 + assert len(scan.to_polars()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_ignores_non_append_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = test_table.incremental_append_scan( + from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id, + # This is a "delete"-operation snapshot, that should be ignored by the append scan + to_snapshot_id_inclusive=test_table.snapshots()[3].snapshot_id, + ) + + assert len(list(scan.plan_files())) == 2 + assert len(scan.to_arrow()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_uses_current_schema(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # The schema within the snapshot range above included an extra date field, but the table was then replaced, + # removing it. An append scan always uses the current schema of the table. + expected_schema = pa.schema( + [ + pa.field("number", pa.int32()), + pa.field("letter", pa.string()), + ] + ) + + result_table = scan.to_arrow() + assert result_table.schema.equals(expected_schema) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_row_filter(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(row_filter=EqualTo("letter", "b")) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # This filter should match against the only row added in snapshots[1] and one of the two rows added in snapshots[2] + assert len(list(scan.plan_files())) == 2 + assert len(scan.to_arrow()) == 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_selected_fields(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(selected_fields=("number",)) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + expected_schema = pa.schema( + [ + pa.field("number", pa.int32()), + ] + ) + + result_table = scan.to_arrow() + assert result_table.schema.equals(expected_schema) + assert sorted(result_table["number"].to_pylist()) == [2, 3, 4] + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_limit(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(limit=2) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # Although three rows were added in the range, the limit of 2 should be applied + assert len(scan.to_arrow()) == 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) +def test_incremental_append_scan_to_snapshot_defaults_to_current(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + assert ( + len(test_table.incremental_append_scan().from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id).to_arrow()) == 3 + ) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_equal_from_and_to_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + snapshot_id = test_table.snapshots()[0].snapshot_id + + # Exclusive-inclusive semantics mean an empty table should be returned if equal from and to snapshots are specified + assert ( + len( + test_table.incremental_append_scan() + .from_snapshot_exclusive(snapshot_id) + .to_snapshot_inclusive(snapshot_id) + .to_arrow() + ) + == 0 + ) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_throws_on_disconnected_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + from_id = test_table.snapshots()[0].snapshot_id + to_id = test_table.snapshots()[4].snapshot_id + + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id_exclusive=from_id, + # A table replace occurred just before this snapshot, breaking snapshot lineage / incremental-ity + to_snapshot_id_inclusive=to_id, + ).plan_files() + + assert f"Append scan's start snapshot {from_id} is not an ancestor of end snapshot {to_id}" in str(e.value) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_throws_on_missing_snapshot_ids(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + # from_snapshot_id not specified + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id, + ).plan_files() + assert "Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive" in str(e.value) + + # from_snapshot_id missing from metadata + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id_exclusive=42, + to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id, + ).plan_files() + assert "Start snapshot of append scan not found on table metadata: 42" in str(e.value) + + # to_snapshot_id missing from metadata + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id, + to_snapshot_id_inclusive=42, + ).plan_files() + assert "End snapshot of append scan not found on table metadata: 42" in str(e.value)