Skip to content

Commit 8675256

Browse files
Test RTAS: replace_table_transaction with atomic write
Adds two new behavior tests: - test_replace_table_transaction_with_write_atomic_rtas (memory + sql): replace + fast_append in one transaction lands schema swap and new data atomically. New snapshot is current, old snapshot preserved in history. - test_replace_table_followed_by_separate_append (memory + sql): replace_table clears the current snapshot; a subsequent append restores main ref with new data only. - test_replace_table_transaction_rtas_against_rest_server: same RTAS flow exercised end-to-end against the REST docker stack. The bare replace_table() is the DDL-only form (clears current snapshot, preserves history). RTAS via replace_table_transaction is the primary use case for atomic schema-and-data swaps.
1 parent e649f95 commit 8675256

2 files changed

Lines changed: 113 additions & 0 deletions

File tree

tests/catalog/test_catalog_behaviors.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,74 @@ def test_replace_table_transaction_can_stage_additional_changes(
536536
assert replaced.properties.get("staged") == "yes"
537537

538538

539+
def test_replace_table_transaction_with_write_atomic_rtas(
540+
catalog: Catalog, test_table_identifier: Identifier
541+
) -> None:
542+
"""RTAS (Replace Table As Select): replace the table and write new data in one transaction.
543+
544+
Verifies the primary use case for `replace_table_transaction`: the new schema and the new
545+
data land atomically, the new snapshot becomes the current snapshot (main ref is restored
546+
on commit because the transaction emits a fast-append), and the old snapshot is preserved
547+
in history."""
548+
_create_simple_table(catalog, test_table_identifier)
549+
original_table = catalog.load_table(test_table_identifier)
550+
old_data = pa.Table.from_pydict(
551+
{"id": [1], "data": ["old"]},
552+
schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", pa.large_string())]),
553+
)
554+
original_table.append(old_data)
555+
old_snapshot_id = catalog.load_table(test_table_identifier).current_snapshot().snapshot_id # type: ignore[union-attr]
556+
557+
new_schema = Schema(
558+
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
559+
NestedField(field_id=2, name="name", field_type=StringType(), required=False),
560+
)
561+
new_data = pa.Table.from_pydict(
562+
{"id": [10, 20], "name": ["alice", "bob"]},
563+
schema=pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.large_string())]),
564+
)
565+
with catalog.replace_table_transaction(test_table_identifier, schema=new_schema) as txn:
566+
with txn.update_snapshot().fast_append() as snap:
567+
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=new_data, io=txn._table.io):
568+
snap.append_data_file(data_file)
569+
570+
replaced = catalog.load_table(test_table_identifier)
571+
# Atomically: new schema is current, new snapshot is current, old snapshot is in history.
572+
assert replaced.current_snapshot() is not None
573+
assert replaced.current_snapshot().snapshot_id != old_snapshot_id # type: ignore[union-attr]
574+
assert any(s.snapshot_id == old_snapshot_id for s in replaced.metadata.snapshots)
575+
assert {f.name for f in replaced.schema().fields} == {"id", "name"}
576+
# The new snapshot reflects the new data only — the old "data" column's row is gone from
577+
# the active view (still in history).
578+
assert replaced.scan().to_arrow().num_rows == 2
579+
580+
581+
def test_replace_table_followed_by_separate_append(
582+
catalog: Catalog, test_table_identifier: Identifier
583+
) -> None:
584+
"""`replace_table` clears the current snapshot; a subsequent `append` makes a new one current."""
585+
_, schema = _create_simple_table(catalog, test_table_identifier)
586+
catalog.load_table(test_table_identifier).append(
587+
pa.Table.from_pydict(
588+
{"id": [1], "data": ["x"]},
589+
schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", pa.large_string())]),
590+
)
591+
)
592+
593+
replaced = catalog.replace_table(test_table_identifier, schema=schema)
594+
assert replaced.current_snapshot() is None
595+
596+
replaced.append(
597+
pa.Table.from_pydict(
598+
{"id": [42], "data": ["after-replace"]},
599+
schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", pa.large_string())]),
600+
)
601+
)
602+
after = catalog.load_table(test_table_identifier)
603+
assert after.current_snapshot() is not None
604+
assert after.scan().to_arrow().num_rows == 1 # Only the post-replace row is visible.
605+
606+
539607
# Rename table tests
540608

541609

tests/integration/test_rest_catalog.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,51 @@ def test_replace_table_end_to_end_against_rest_server(catalog: Catalog) -> None:
117117
catalog.drop_table(identifier)
118118

119119

120+
@pytest.mark.integration
121+
@pytest.mark.parametrize("catalog", [lf("session_catalog")])
122+
def test_replace_table_transaction_rtas_against_rest_server(catalog: Catalog) -> None:
123+
"""RTAS (Replace Table As Select) against a real REST server: the schema swap and the
124+
new-data write must land atomically — the new snapshot is current on commit."""
125+
identifier = f"default.test_replace_rtas_{catalog.name}"
126+
if not catalog.namespace_exists("default"):
127+
catalog.create_namespace("default")
128+
if catalog.table_exists(identifier):
129+
catalog.drop_table(identifier)
130+
131+
original_schema = Schema(
132+
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
133+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
134+
)
135+
original = catalog.create_table(identifier, schema=original_schema)
136+
original.append(
137+
pa.Table.from_pydict(
138+
{"id": [1], "data": ["old"]},
139+
schema=pa.schema([pa.field("id", pa.int64()), pa.field("data", pa.large_string())]),
140+
)
141+
)
142+
old_snapshot_id = catalog.load_table(identifier).current_snapshot().snapshot_id # type: ignore[union-attr]
143+
144+
new_schema = Schema(
145+
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
146+
NestedField(field_id=2, name="name", field_type=StringType(), required=False),
147+
)
148+
new_data = pa.Table.from_pydict(
149+
{"id": [10, 20], "name": ["alice", "bob"]},
150+
schema=pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.large_string())]),
151+
)
152+
with catalog.replace_table_transaction(identifier, schema=new_schema) as txn:
153+
with txn.update_snapshot().fast_append() as snap:
154+
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=new_data, io=txn._table.io):
155+
snap.append_data_file(data_file)
156+
157+
replaced = catalog.load_table(identifier)
158+
assert replaced.current_snapshot() is not None
159+
assert replaced.current_snapshot().snapshot_id != old_snapshot_id # type: ignore[union-attr]
160+
assert any(s.snapshot_id == old_snapshot_id for s in replaced.metadata.snapshots)
161+
assert replaced.scan().to_arrow().num_rows == 2
162+
catalog.drop_table(identifier)
163+
164+
120165
@pytest.mark.integration
121166
@pytest.mark.parametrize("catalog", [lf("session_catalog")])
122167
def test_load_view(catalog: RestCatalog, table_schema_nested: Schema, database_name: str, view_name: str) -> None:

0 commit comments

Comments
 (0)