Skip to content

Commit 0847d20

Browse files
Apply test review findings
Critical: - test_concurrent_replace_table now uses identical new schemas across the two transactions and matches on 'last assigned field id' — this is the only path that actually validates AssertLastAssignedFieldId. Coverage: - Add test_replace_table_with_sort_order_changes: unsorted → sorted → unsorted with order_id reuse from history. - Extend the v2 partition-spec helper test with a reuse-by-bucket case (matching field_id under a renamed partition field). Brittleness fixes: - Drop dead HEAD mock in test_replace_table_transaction_404_raises; no view-exists pre-flight to mock anymore. - Pin wire-payload assertions to fixture metadata fields (no more magic 999 / spec_id / sort_order_id constants). - Tighten set-default-spec / set-default-sort-order to check the emitted id, not just existence. - Split test_replace_table_location_resolution into two non-stringly- discriminated tests. Removed: - test_replace_table_accepts_pyarrow_schema: redundant with the RTAS test which already exercises pa.Schema via df.schema.
1 parent a620054 commit 0847d20

3 files changed

Lines changed: 78 additions & 62 deletions

File tree

tests/catalog/test_catalog_behaviors.py

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -474,14 +474,6 @@ def test_replace_table_schema_id_reuse(
474474
assert replaced.metadata.last_column_id == expected_last_col_id
475475

476476

477-
def test_replace_table_accepts_pyarrow_schema(catalog: Catalog, test_table_identifier: Identifier) -> None:
478-
"""The schema argument may be a PyArrow schema; it must be converted and have IDs assigned."""
479-
_create_simple_table(catalog, test_table_identifier)
480-
pa_schema = pa.schema([pa.field("id", pa.int64()), pa.field("data", pa.large_string()), pa.field("extra", pa.bool_())])
481-
replaced = catalog.replace_table(test_table_identifier, schema=pa_schema)
482-
assert {f.name for f in replaced.schema().fields} == {"id", "data", "extra"}
483-
484-
485477
def test_replace_table_preserves_identifier_field_ids(catalog: Catalog, test_table_identifier: Identifier) -> None:
486478
"""Identifier-field IDs on the new schema are honored after reuse-by-name."""
487479
schema = Schema(
@@ -509,37 +501,42 @@ def test_replace_table_reuses_partition_spec_id(catalog: Catalog, test_table_ide
509501
assert replaced.metadata.default_spec_id == 0
510502

511503

512-
@pytest.mark.parametrize(
513-
"location_kwarg, expected",
514-
[
515-
("explicit", "explicit"), # explicit location is used verbatim (minus any trailing slash)
516-
(None, "existing"), # no location → existing location is preserved
517-
("explicit-with-trailing-slash", "explicit-with-trailing-slash-stripped"),
518-
],
519-
ids=["explicit", "inherited", "trailing-slash-stripped"],
520-
)
521-
def test_replace_table_location_resolution(
522-
catalog: Catalog,
523-
test_table_identifier: Identifier,
524-
tmp_path: Path,
525-
location_kwarg: str | None,
526-
expected: str,
527-
) -> None:
528-
"""`location=None` keeps the existing location; an explicit location is used (sans trailing slash)."""
504+
def test_replace_table_with_sort_order_changes(catalog: Catalog, test_table_identifier: Identifier) -> None:
505+
"""Replace can change the sort order. The new sort order is appended to the history and
506+
becomes the default; a follow-up replace back to unsorted reuses the unsorted order_id
507+
rather than appending a duplicate."""
508+
_, schema = _create_simple_table(catalog, test_table_identifier)
509+
sort = SortOrder(SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC))
510+
511+
# unsorted → sorted: a new order is added and becomes the default.
512+
sorted_table = catalog.replace_table(test_table_identifier, schema=schema, sort_order=sort)
513+
assert sorted_table.sort_order().fields == sort.fields
514+
assert sorted_table.metadata.default_sort_order_id != 0
515+
516+
# sorted → unsorted: reuses the unsorted order_id 0 from history.
517+
unsorted_table = catalog.replace_table(test_table_identifier, schema=schema)
518+
assert unsorted_table.sort_order().is_unsorted
519+
assert unsorted_table.metadata.default_sort_order_id == 0
520+
521+
522+
def test_replace_table_inherits_existing_location(catalog: Catalog, test_table_identifier: Identifier) -> None:
523+
"""`location=None` keeps the existing table's location."""
529524
_, schema = _create_simple_table(catalog, test_table_identifier)
530525
existing = catalog.load_table(test_table_identifier).metadata.location
526+
replaced = catalog.replace_table(test_table_identifier, schema=schema)
527+
assert replaced.metadata.location == existing
531528

532-
if location_kwarg is None:
533-
replaced = catalog.replace_table(test_table_identifier, schema=schema)
534-
assert replaced.metadata.location == existing
535-
elif location_kwarg == "explicit":
536-
new_location = f"file://{tmp_path}/relocated"
537-
replaced = catalog.replace_table(test_table_identifier, schema=schema, location=new_location)
538-
assert replaced.metadata.location == new_location
539-
else:
540-
new_location = f"file://{tmp_path}/with-slash"
541-
replaced = catalog.replace_table(test_table_identifier, schema=schema, location=new_location + "/")
542-
assert replaced.metadata.location == new_location
529+
530+
@pytest.mark.parametrize("trailing_slash", [False, True], ids=["no-slash", "trailing-slash"])
531+
def test_replace_table_uses_explicit_location(
532+
catalog: Catalog, test_table_identifier: Identifier, tmp_path: Path, trailing_slash: bool
533+
) -> None:
534+
"""An explicit `location` is used verbatim; trailing slash is stripped."""
535+
_, schema = _create_simple_table(catalog, test_table_identifier)
536+
bare = f"file://{tmp_path}/relocated"
537+
arg = bare + "/" if trailing_slash else bare
538+
replaced = catalog.replace_table(test_table_identifier, schema=schema, location=arg)
539+
assert replaced.metadata.location == bare
543540

544541

545542
def test_replace_table_merges_properties_with_overrides_and_additions(
@@ -657,23 +654,22 @@ def run_failing_replace() -> None:
657654

658655

659656
def test_concurrent_replace_table(catalog: Catalog, test_table_identifier: Identifier) -> None:
660-
"""Two replace_table calls staged from the same base metadata cannot both commit.
661-
662-
The first replace updates the catalog pointer to a new metadata location; the second
663-
replace was built against the now-stale base, so its commit must be rejected with
664-
`CommitFailedException`."""
665-
_, schema = _create_simple_table(catalog, test_table_identifier)
657+
"""Two concurrent replace_table calls staged from the same base both adding a column
658+
must fail on the second commit with an `assert-last-assigned-field-id` violation —
659+
proving the `AssertLastAssignedFieldId` requirement actually guards against duplicate
660+
field-id assignment under concurrent writers."""
661+
_create_simple_table(catalog, test_table_identifier)
666662
new_schema = Schema(
667663
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
668664
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
669665
NestedField(field_id=3, name="extra", field_type=BooleanType(), required=False),
670666
)
671-
# Both transactions are built from the same base metadata.
667+
# Both transactions build from the same base metadata with the same new schema.
672668
txn_a = catalog.replace_table_transaction(test_table_identifier, schema=new_schema)
673-
txn_b = catalog.replace_table_transaction(test_table_identifier, schema=schema)
669+
txn_b = catalog.replace_table_transaction(test_table_identifier, schema=new_schema)
674670

675671
txn_a.commit_transaction()
676-
with pytest.raises(CommitFailedException):
672+
with pytest.raises(CommitFailedException, match="last assigned field id"):
677673
txn_b.commit_transaction()
678674

679675

tests/catalog/test_rest.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2948,10 +2948,13 @@ def test_replace_table_transaction_wire_payload(
29482948
catalog.replace_table_transaction(identifier=("fokko", "fokko2"), schema=new_schema).commit_transaction()
29492949
request = rest_mock.last_request.json()
29502950

2951+
# Pin the requirement values to the fixture's last-column-id / last-partition-id so the
2952+
# assertion fails loudly if the fixture changes underneath us.
2953+
fixture_metadata = example_table_metadata_with_snapshot_v1_rest_json["metadata"]
29512954
assert request["requirements"] == [
29522955
{"type": "assert-table-uuid", "uuid": table_uuid},
2953-
{"type": "assert-last-assigned-field-id", "last-assigned-field-id": 2},
2954-
{"type": "assert-last-assigned-partition-id", "last-assigned-partition-id": 999},
2956+
{"type": "assert-last-assigned-field-id", "last-assigned-field-id": fixture_metadata["last-column-id"]},
2957+
{"type": "assert-last-assigned-partition-id", "last-assigned-partition-id": fixture_metadata["last-partition-id"]},
29552958
]
29562959

29572960
# No duplicate actions in the payload (the dict-by-action collapses them — guard against it).
@@ -2969,18 +2972,14 @@ def test_replace_table_transaction_wire_payload(
29692972
# Set* updates are emitted unconditionally even when their resulting id is reused.
29702973
# schema-id=-1 is the wire sentinel meaning "the schema we just added in this commit".
29712974
assert updates_by_action["set-current-schema"]["schema-id"] == -1
2972-
assert "set-default-spec" in updates_by_action
2973-
assert "set-default-sort-order" in updates_by_action
2975+
# Spec/sort-order are reused from the fixture (default ids) since the replace doesn't change them.
2976+
assert updates_by_action["set-default-spec"]["spec-id"] == fixture_metadata["default-spec-id"]
2977+
assert updates_by_action["set-default-sort-order"]["sort-order-id"] == fixture_metadata["default-sort-order-id"]
29742978

29752979

29762980
def test_replace_table_transaction_404_raises(
29772981
rest_mock: Mocker,
29782982
) -> None:
2979-
rest_mock.head(
2980-
f"{TEST_URI}v1/namespaces/fokko/views/missing",
2981-
status_code=404,
2982-
request_headers=TEST_HEADERS,
2983-
)
29842983
rest_mock.get(
29852984
f"{TEST_URI}v1/namespaces/fokko/tables/missing",
29862985
json={"error": {"message": "Table not found", "type": "NoSuchTableException", "code": 404}},

tests/table/test_partitioning.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,29 +313,50 @@ def test_incompatible_transform_source_type() -> None:
313313

314314

315315
@pytest.mark.parametrize(
316-
"existing_specs, last_partition_id, expected_field_id, expected_last_partition_id",
316+
"new_spec, existing_specs, last_partition_id, expected_field_id, expected_last_partition_id",
317317
[
318-
# Reuse: same (source_id, transform) already in an existing spec → reuse its field_id.
319-
(
318+
# Reuse-by-identity: same (source_id, IdentityTransform) already in an existing spec.
319+
pytest.param(
320+
PartitionSpec(PartitionField(source_id=1, field_id=999, transform=IdentityTransform(), name="id")),
320321
[PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id"), spec_id=0)],
321322
1000,
322323
1000,
323324
1000,
325+
id="reuse-identity",
326+
),
327+
# Reuse-by-(source,bucket): same source_id + same BucketTransform, even under a renamed field.
328+
pytest.param(
329+
PartitionSpec(PartitionField(source_id=1, field_id=999, transform=BucketTransform(8), name="id_bucket_renamed")),
330+
[
331+
PartitionSpec(
332+
PartitionField(source_id=1, field_id=1042, transform=BucketTransform(8), name="id_bucket"), spec_id=0
333+
)
334+
],
335+
1042,
336+
1042,
337+
1042,
338+
id="reuse-bucket-under-rename",
339+
),
340+
# No match: fresh id above last_partition_id.
341+
pytest.param(
342+
PartitionSpec(PartitionField(source_id=1, field_id=999, transform=IdentityTransform(), name="id")),
343+
[PartitionSpec(spec_id=0)],
344+
999,
345+
1000,
346+
1000,
347+
id="new-field-above-last-partition-id",
324348
),
325-
# New: no matching (source_id, transform) → fresh id above last_partition_id.
326-
([PartitionSpec(spec_id=0)], 999, 1000, 1000),
327349
],
328-
ids=["reuse-from-existing-spec", "new-field-above-last-partition-id"],
329350
)
330351
def test_assign_fresh_partition_spec_ids_for_replace_v2(
352+
new_spec: PartitionSpec,
331353
existing_specs: list[PartitionSpec],
332354
last_partition_id: int,
333355
expected_field_id: int,
334356
expected_last_partition_id: int,
335357
) -> None:
336-
spec = PartitionSpec(PartitionField(source_id=1, field_id=999, transform=IdentityTransform(), name="id"))
337358
fresh_spec, new_last_pid = assign_fresh_partition_spec_ids_for_replace(
338-
spec, _REPLACE_SCHEMA_FOR_PARTITION, _REPLACE_SCHEMA_FOR_PARTITION, existing_specs, last_partition_id
359+
new_spec, _REPLACE_SCHEMA_FOR_PARTITION, _REPLACE_SCHEMA_FOR_PARTITION, existing_specs, last_partition_id
339360
)
340361
assert fresh_spec.fields[0].field_id == expected_field_id
341362
assert new_last_pid == expected_last_partition_id

0 commit comments

Comments
 (0)