Skip to content

Commit 42c0fe7

Browse files
Clean up replace test suite
- test_replace_table_location split into three named tests (_inherits_existing_location, _uses_explicit_location, _strips_trailing_slash_from_location) — kills the in-body branching on the parametrize id. - _run_complete_replace returns a @DataClass _ReplaceFixture instead of a 5-tuple; callers name what they care about (fx.replaced etc.) instead of writing _, replaced, _, _, _. - snapshot_log assertion tightened from unordered membership to ordered prefix equality. - _rejects_format_version_downgrade / _rejects_non_numeric_format_version / _rejects_empty_location merged into one parametrized test_replace_table_rejects_invalid_inputs. - test_replace_table_upgrades_format_version split: the follow-up "stays at v2" half is now test_replace_table_keeps_upgraded_format_version_on_subsequent_replace. - concurrent-conflict tests now assert that last_column_id / last_partition_id did NOT advance after the failed commit, proving the conflict was a true no-op. - multi-suffix void test: 4-field schema trimmed to 3, columns renamed to name their role (current_source / collide_on_name / collide_on_fallback).
1 parent 0c3dde1 commit 42c0fe7

2 files changed

Lines changed: 100 additions & 64 deletions

File tree

tests/catalog/test_catalog_behaviors.py

Lines changed: 92 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"""
2121

2222
import os
23+
from dataclasses import dataclass
2324
from pathlib import Path
2425
from typing import Any
2526

@@ -439,9 +440,10 @@ def test_replace_transaction(catalog: Catalog, test_table_identifier: Identifier
439440
assert replaced.metadata.table_uuid == original.metadata.table_uuid
440441
assert replaced.metadata.current_snapshot_id is None
441442
assert {f.name for f in replaced.schema().fields} == {"id", "data", "extra"}
442-
# Old snapshot kept by identity (not just count), and snapshot_log entries from before survive.
443+
# Old snapshot kept by identity (not just count), and snapshot_log entries from before survive
444+
# in order at the front of the log.
443445
assert any(s.snapshot_id == old_snapshot_id for s in replaced.metadata.snapshots)
444-
assert all(entry in replaced.metadata.snapshot_log for entry in snapshot_log_before)
446+
assert replaced.metadata.snapshot_log[: len(snapshot_log_before)] == snapshot_log_before
445447
# Old schema is still in the schemas list alongside the new one.
446448
schema_ids = sorted(s.schema_id for s in replaced.metadata.schemas)
447449
assert schema_ids == [0, 1]
@@ -450,14 +452,20 @@ def test_replace_transaction(catalog: Catalog, test_table_identifier: Identifier
450452
assert replaced.scan(snapshot_id=old_snapshot_id).to_arrow().equals(_simple_data())
451453

452454

453-
def _run_complete_replace(
454-
catalog: Catalog, identifier: Identifier, tmp_path: Path
455-
) -> tuple[Table, Table, SortOrder, pa.Table, int]:
456-
"""Set up a table, run a full-six-args RTAS replace, and return the handles needed for assertions.
455+
@dataclass
456+
class _ReplaceFixture:
457+
"""State produced by `_run_complete_replace`: the table before/after the replace plus
458+
the inputs needed to assert on the result."""
457459

458-
Returns:
459-
(original, replaced, new_sort, original_data, old_snapshot_id)
460-
"""
460+
original: Table
461+
replaced: Table
462+
new_sort: SortOrder
463+
original_data: pa.Table
464+
old_snapshot_id: int
465+
466+
467+
def _run_complete_replace(catalog: Catalog, identifier: Identifier, tmp_path: Path) -> _ReplaceFixture:
468+
"""Set up a table, run a full-six-args RTAS replace, and return the handles needed for assertions."""
461469
_create_simple_table(catalog, identifier, properties={"keep": "yes", "override": "old"})
462470
catalog.load_table(identifier).append(_simple_data())
463471
original = catalog.load_table(identifier)
@@ -487,52 +495,58 @@ def _run_complete_replace(
487495
) as txn:
488496
txn.append(new_data)
489497

490-
return original, catalog.load_table(identifier), new_sort, original_data, old_snapshot_id
498+
return _ReplaceFixture(
499+
original=original,
500+
replaced=catalog.load_table(identifier),
501+
new_sort=new_sort,
502+
original_data=original_data,
503+
old_snapshot_id=old_snapshot_id,
504+
)
491505

492506

493507
def test_complete_replace_transaction_applies_new_schema_spec_and_sort(
494508
catalog: Catalog, test_table_identifier: Identifier, tmp_path: Path
495509
) -> None:
496-
original, replaced, new_sort, _, _ = _run_complete_replace(catalog, test_table_identifier, tmp_path)
510+
fx = _run_complete_replace(catalog, test_table_identifier, tmp_path)
497511
# Identity invariants.
498-
assert replaced.metadata.table_uuid == original.metadata.table_uuid
499-
assert replaced.metadata.location == f"file://{tmp_path}/replaced"
512+
assert fx.replaced.metadata.table_uuid == fx.original.metadata.table_uuid
513+
assert fx.replaced.metadata.location == f"file://{tmp_path}/replaced"
500514
# New schema / spec / sort applied; old entries retained in history.
501-
assert {f.name for f in replaced.schema().fields} == {"id", "data", "extra"}
502-
assert sorted(s.schema_id for s in replaced.metadata.schemas) == [0, 1]
503-
assert replaced.spec().fields[0].source_id == 1
504-
assert isinstance(replaced.spec().fields[0].transform, IdentityTransform)
505-
assert {s.spec_id for s in replaced.metadata.partition_specs} == {0, 1}
506-
assert replaced.sort_order().fields == new_sort.fields
507-
assert {s.order_id for s in replaced.metadata.sort_orders} == {0, replaced.metadata.default_sort_order_id}
515+
assert {f.name for f in fx.replaced.schema().fields} == {"id", "data", "extra"}
516+
assert sorted(s.schema_id for s in fx.replaced.metadata.schemas) == [0, 1]
517+
assert fx.replaced.spec().fields[0].source_id == 1
518+
assert isinstance(fx.replaced.spec().fields[0].transform, IdentityTransform)
519+
assert {s.spec_id for s in fx.replaced.metadata.partition_specs} == {0, 1}
520+
assert fx.replaced.sort_order().fields == fx.new_sort.fields
521+
assert {s.order_id for s in fx.replaced.metadata.sort_orders} == {0, fx.replaced.metadata.default_sort_order_id}
508522

509523

510524
def test_complete_replace_transaction_merges_properties(
511525
catalog: Catalog, test_table_identifier: Identifier, tmp_path: Path
512526
) -> None:
513-
_, replaced, _, _, _ = _run_complete_replace(catalog, test_table_identifier, tmp_path)
527+
fx = _run_complete_replace(catalog, test_table_identifier, tmp_path)
514528
# `keep` is preserved, `override` is updated, `added` is new, and `format-version` does not leak.
515-
assert replaced.properties["keep"] == "yes"
516-
assert replaced.properties["override"] == "new"
517-
assert replaced.properties["added"] == "v"
518-
assert "format-version" not in replaced.properties
529+
assert fx.replaced.properties["keep"] == "yes"
530+
assert fx.replaced.properties["override"] == "new"
531+
assert fx.replaced.properties["added"] == "v"
532+
assert "format-version" not in fx.replaced.properties
519533

520534

521535
def test_complete_replace_transaction_rtas_preserves_old_snapshot(
522536
catalog: Catalog, test_table_identifier: Identifier, tmp_path: Path
523537
) -> None:
524-
_, replaced, _, original_data, old_snapshot_id = _run_complete_replace(catalog, test_table_identifier, tmp_path)
538+
fx = _run_complete_replace(catalog, test_table_identifier, tmp_path)
525539
# New snapshot exists, has no parent (fresh start), old snapshot is still in the snapshot list.
526-
new_snapshot = replaced.current_snapshot()
540+
new_snapshot = fx.replaced.current_snapshot()
527541
assert new_snapshot is not None
528-
assert new_snapshot.snapshot_id != old_snapshot_id
542+
assert new_snapshot.snapshot_id != fx.old_snapshot_id
529543
assert new_snapshot.parent_snapshot_id is None
530-
assert any(s.snapshot_id == old_snapshot_id for s in replaced.metadata.snapshots)
531-
assert replaced.scan().to_arrow().num_rows == 2
544+
assert any(s.snapshot_id == fx.old_snapshot_id for s in fx.replaced.metadata.snapshots)
545+
assert fx.replaced.scan().to_arrow().num_rows == 2
532546
# Time-travel back to before the replace returns the original rows from the old schema.
533-
time_travel = replaced.scan(snapshot_id=old_snapshot_id).to_arrow()
534-
assert time_travel.num_rows == original_data.num_rows
535-
assert time_travel.column("id").to_pylist() == original_data.column("id").to_pylist()
547+
time_travel = fx.replaced.scan(snapshot_id=fx.old_snapshot_id).to_arrow()
548+
assert time_travel.num_rows == fx.original_data.num_rows
549+
assert time_travel.column("id").to_pylist() == fx.original_data.column("id").to_pylist()
536550

537551

538552
def test_replace_transaction_requires_table_exists(catalog: Catalog, test_table_identifier: Identifier) -> None:
@@ -644,47 +658,62 @@ def test_replace_table_upgrades_format_version(catalog: Catalog, test_table_iden
644658
# `format-version` is a control input, not a persisted property.
645659
assert "format-version" not in upgraded.properties
646660

647-
# A follow-up replace stays at the upgraded version.
661+
662+
def test_replace_table_keeps_upgraded_format_version_on_subsequent_replace(
663+
catalog: Catalog, test_table_identifier: Identifier
664+
) -> None:
665+
_, schema = _create_simple_table(catalog, test_table_identifier, format_version=1)
666+
catalog.replace_table_transaction(
667+
test_table_identifier, schema=schema, properties={"format-version": "2"}
668+
).commit_transaction()
648669
new_schema = Schema(*schema.fields, NestedField(field_id=3, name="extra", field_type=BooleanType(), required=False))
649670
catalog.replace_table_transaction(test_table_identifier, schema=new_schema).commit_transaction()
650671
replayed = catalog.load_table(test_table_identifier)
651672
assert replayed.format_version == 2
652673
assert {f.name for f in replayed.schema().fields} == {"id", "data", "extra"}
653674

654675

655-
def test_replace_table_rejects_format_version_downgrade(catalog: Catalog, test_table_identifier: Identifier) -> None:
676+
@pytest.mark.parametrize(
677+
"properties, location, expected_match",
678+
[
679+
pytest.param({"format-version": "1"}, None, "Cannot downgrade format-version", id="format-version-downgrade"),
680+
pytest.param({"format-version": "two"}, None, "Invalid format-version property", id="non-numeric-format-version"),
681+
pytest.param({}, "/", "location must not be empty", id="empty-location-after-rstrip"),
682+
],
683+
)
684+
def test_replace_table_rejects_invalid_inputs(
685+
catalog: Catalog,
686+
test_table_identifier: Identifier,
687+
properties: dict[str, str],
688+
location: str | None,
689+
expected_match: str,
690+
) -> None:
656691
_, schema = _create_simple_table(catalog, test_table_identifier, format_version=2)
657-
with pytest.raises(ValueError, match="Cannot downgrade format-version"):
658-
catalog.replace_table_transaction(test_table_identifier, schema=schema, properties={"format-version": "1"})
692+
with pytest.raises(ValueError, match=expected_match):
693+
catalog.replace_table_transaction(test_table_identifier, schema=schema, properties=properties, location=location)
659694

660695

661-
def test_replace_table_rejects_non_numeric_format_version(catalog: Catalog, test_table_identifier: Identifier) -> None:
696+
def test_replace_table_inherits_existing_location(catalog: Catalog, test_table_identifier: Identifier) -> None:
662697
_, schema = _create_simple_table(catalog, test_table_identifier)
663-
with pytest.raises(ValueError, match="Invalid format-version property"):
664-
catalog.replace_table_transaction(test_table_identifier, schema=schema, properties={"format-version": "two"})
698+
existing = catalog.load_table(test_table_identifier).metadata.location
699+
catalog.replace_table_transaction(test_table_identifier, schema=schema).commit_transaction()
700+
assert catalog.load_table(test_table_identifier).metadata.location == existing
665701

666702

667-
def test_replace_table_rejects_empty_location(catalog: Catalog, test_table_identifier: Identifier) -> None:
703+
def test_replace_table_uses_explicit_location(catalog: Catalog, test_table_identifier: Identifier, tmp_path: Path) -> None:
668704
_, schema = _create_simple_table(catalog, test_table_identifier)
669-
with pytest.raises(ValueError, match="location must not be empty"):
670-
catalog.replace_table_transaction(test_table_identifier, schema=schema, location="/")
705+
new_location = f"file://{tmp_path}/relocated"
706+
catalog.replace_table_transaction(test_table_identifier, schema=schema, location=new_location).commit_transaction()
707+
assert catalog.load_table(test_table_identifier).metadata.location == new_location
671708

672709

673-
@pytest.mark.parametrize("location_kind", ["inherit", "explicit", "trailing-slash"])
674-
def test_replace_table_location(catalog: Catalog, test_table_identifier: Identifier, tmp_path: Path, location_kind: str) -> None:
710+
def test_replace_table_strips_trailing_slash_from_location(
711+
catalog: Catalog, test_table_identifier: Identifier, tmp_path: Path
712+
) -> None:
675713
_, schema = _create_simple_table(catalog, test_table_identifier)
676-
existing = catalog.load_table(test_table_identifier).metadata.location
677-
678-
if location_kind == "inherit":
679-
catalog.replace_table_transaction(test_table_identifier, schema=schema).commit_transaction()
680-
replaced = catalog.load_table(test_table_identifier)
681-
assert replaced.metadata.location == existing
682-
else:
683-
bare = f"file://{tmp_path}/relocated"
684-
arg = bare + "/" if location_kind == "trailing-slash" else bare
685-
catalog.replace_table_transaction(test_table_identifier, schema=schema, location=arg).commit_transaction()
686-
replaced = catalog.load_table(test_table_identifier)
687-
assert replaced.metadata.location == bare
714+
bare = f"file://{tmp_path}/relocated"
715+
catalog.replace_table_transaction(test_table_identifier, schema=schema, location=bare + "/").commit_transaction()
716+
assert catalog.load_table(test_table_identifier).metadata.location == bare
688717

689718

690719
def test_replace_table_transaction_rolls_back_on_failure(catalog: Catalog, test_table_identifier: Identifier) -> None:
@@ -712,8 +741,11 @@ def test_concurrent_replace_transaction_schema_conflict(catalog: Catalog, test_t
712741
txn_b = catalog.replace_table_transaction(test_table_identifier, schema=_REPLACE_SCHEMA)
713742

714743
txn_a.commit_transaction()
744+
after_a = catalog.load_table(test_table_identifier).metadata
715745
with pytest.raises(CommitFailedException, match="last assigned field id"):
716746
txn_b.commit_transaction()
747+
# The failed commit must be a true no-op: no metadata advanced past where `txn_a` left things.
748+
assert catalog.load_table(test_table_identifier).metadata.last_column_id == after_a.last_column_id
717749

718750

719751
def test_concurrent_replace_transaction_partition_spec_conflict(catalog: Catalog, test_table_identifier: Identifier) -> None:
@@ -723,8 +755,11 @@ def test_concurrent_replace_transaction_partition_spec_conflict(catalog: Catalog
723755
txn_b = catalog.replace_table_transaction(test_table_identifier, schema=schema, partition_spec=new_spec)
724756

725757
txn_a.commit_transaction()
758+
after_a = catalog.load_table(test_table_identifier).metadata
726759
with pytest.raises(CommitFailedException, match="last assigned partition id"):
727760
txn_b.commit_transaction()
761+
# The failed commit must be a true no-op: no metadata advanced past where `txn_a` left things.
762+
assert catalog.load_table(test_table_identifier).metadata.last_partition_id == after_a.last_partition_id
728763

729764

730765
# Rename table tests

tests/table/test_partitioning.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -434,15 +434,16 @@ def test_assign_fresh_partition_spec_ids_for_replace_v1_keeps_field_preserves_id
434434

435435
def test_assign_fresh_partition_spec_ids_for_replace_v1_void_name_uses_multi_suffix_loop() -> None:
436436
"""When `name` and `name_<field_id>` are both already used, append `_2`, `_3`, ... until unique."""
437+
# Three columns, one role each: source for the current (about-to-be-voided) partition,
438+
# source for the new partition that collides on the void's preferred name, and source for
439+
# the new partition that collides on the void's first fallback name.
437440
schema = Schema(
438-
NestedField(field_id=1, name="foo", field_type=IntegerType(), required=False),
439-
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=False),
440-
NestedField(field_id=3, name="baz", field_type=IntegerType(), required=False),
441-
NestedField(field_id=4, name="qux", field_type=IntegerType(), required=False),
441+
NestedField(field_id=1, name="current_source", field_type=IntegerType(), required=False),
442+
NestedField(field_id=2, name="collide_on_name", field_type=IntegerType(), required=False),
443+
NestedField(field_id=3, name="collide_on_fallback", field_type=IntegerType(), required=False),
442444
)
443-
# Current v1 spec partitions source=1 by bucket(4) at field_id=1000, named "p" — note
444-
# the partition NAME ("p") differs from the source COLUMN NAME ("foo") which is allowed
445-
# for non-identity transforms.
445+
# Current v1 spec partitions source=1 by bucket(4) at field_id=1000, named "p" — for
446+
# non-identity transforms the partition NAME doesn't have to match the source column name.
446447
current_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=BucketTransform(4), name="p"), spec_id=0)
447448
# New spec has two partition fields named "p" and "p_1000" — colliding with both the
448449
# void's preferred name and its first fallback. Both are on different sources, so they

0 commit comments

Comments
 (0)