@@ -2293,20 +2293,36 @@ def test_stage_only_delete(
22932293 # a new delete snapshot is added
22942294 snapshots = tbl .snapshots ()
22952295 assert len (snapshots ) == 2
2296+ # snapshot main ref has not changed
2297+ assert current_snapshot == tbl .metadata .current_snapshot_id
2298+ assert len (tbl .scan ().to_arrow ()) == original_count
2299+
2300+ # Write to main branch
2301+ with tbl .transaction () as txn :
2302+ with txn .update_snapshot ().fast_append () as fast_append :
2303+ for data_file in _dataframe_to_data_files (
2304+ table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2305+ ):
2306+ fast_append .append_data_file (data_file = data_file )
2307+
2308+ # Main ref has changed
2309+ assert current_snapshot != tbl .metadata .current_snapshot_id
2310+ assert len (tbl .scan ().to_arrow ()) == 3
2311+ snapshots = tbl .snapshots ()
2312+ assert len (snapshots ) == 3
22962313
22972314 rows = spark .sql (
22982315 f"""
2299- SELECT operation, summary
2300- FROM { identifier } .snapshots
2301- ORDER BY committed_at ASC
2302- """
2316+ SELECT operation, parent_id
2317+ FROM { identifier } .snapshots
2318+ ORDER BY committed_at ASC
2319+ """
23032320 ).collect ()
23042321 operations = [row .operation for row in rows ]
2305- assert operations == ["append" , "delete" ]
2306-
2307- # snapshot main ref has not changed
2308- assert current_snapshot == tbl .metadata .current_snapshot_id
2309- assert len (tbl .scan ().to_arrow ()) == original_count
2322+ parent_snapshot_id = [row .parent_id for row in rows ]
2323+ assert operations == ["append" , "delete" , "append" ]
2324+ # both subsequent parent id should be the first snapshot id
2325+ assert parent_snapshot_id == [None , current_snapshot , current_snapshot ]
23102326
23112327
23122328@pytest .mark .integration
@@ -2323,6 +2339,7 @@ def test_stage_only_fast_append(
23232339 original_count = len (tbl .scan ().to_arrow ())
23242340 assert original_count == 3
23252341
2342+ # Write to staging branch
23262343 with tbl .transaction () as txn :
23272344 with txn .update_snapshot (branch = None ).fast_append () as fast_append :
23282345 for data_file in _dataframe_to_data_files (
@@ -2333,20 +2350,37 @@ def test_stage_only_fast_append(
23332350 # Main ref has not changed and data is not yet appended
23342351 assert current_snapshot == tbl .metadata .current_snapshot_id
23352352 assert len (tbl .scan ().to_arrow ()) == original_count
2336-
23372353 # There should be a new staged snapshot
23382354 snapshots = tbl .snapshots ()
23392355 assert len (snapshots ) == 2
23402356
2357+ # Write to main branch
2358+ with tbl .transaction () as txn :
2359+ with txn .update_snapshot ().fast_append () as fast_append :
2360+ for data_file in _dataframe_to_data_files (
2361+ table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2362+ ):
2363+ fast_append .append_data_file (data_file = data_file )
2364+
2365+ # Main ref has changed
2366+ assert current_snapshot != tbl .metadata .current_snapshot_id
2367+ assert len (tbl .scan ().to_arrow ()) == 6
2368+ snapshots = tbl .snapshots ()
2369+ assert len (snapshots ) == 3
2370+
23412371 rows = spark .sql (
23422372 f"""
2343- SELECT operation, summary
2373+ SELECT operation, parent_id
23442374 FROM { identifier } .snapshots
23452375 ORDER BY committed_at ASC
23462376 """
23472377 ).collect ()
23482378 operations = [row .operation for row in rows ]
2349- assert operations == ["append" , "append" ]
2379+ parent_snapshot_id = [row .parent_id for row in rows ]
2380+ assert operations == ["append" , "append" , "append" ]
2381+ # both subsequent parent id should be the first snapshot id
2382+ assert parent_snapshot_id == [None , current_snapshot , current_snapshot ]
2383+
23502384
23512385
23522386@pytest .mark .integration
@@ -2378,15 +2412,32 @@ def test_stage_only_merge_append(
23782412 snapshots = tbl .snapshots ()
23792413 assert len (snapshots ) == 2
23802414
2415+ # Write to main branch
2416+ with tbl .transaction () as txn :
2417+ with txn .update_snapshot ().fast_append () as fast_append :
2418+ for data_file in _dataframe_to_data_files (
2419+ table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2420+ ):
2421+ fast_append .append_data_file (data_file = data_file )
2422+
2423+ # Main ref has changed
2424+ assert current_snapshot != tbl .metadata .current_snapshot_id
2425+ assert len (tbl .scan ().to_arrow ()) == 6
2426+ snapshots = tbl .snapshots ()
2427+ assert len (snapshots ) == 3
2428+
23812429 rows = spark .sql (
23822430 f"""
2383- SELECT operation, summary
2384- FROM { identifier } .snapshots
2385- ORDER BY committed_at ASC
2386- """
2431+ SELECT operation, parent_id
2432+ FROM { identifier } .snapshots
2433+ ORDER BY committed_at ASC
2434+ """
23872435 ).collect ()
23882436 operations = [row .operation for row in rows ]
2389- assert operations == ["append" , "append" ]
2437+ parent_snapshot_id = [row .parent_id for row in rows ]
2438+ assert operations == ["append" , "append" , "append" ]
2439+ # both subsequent parent id should be the first snapshot id
2440+ assert parent_snapshot_id == [None , current_snapshot , current_snapshot ]
23902441
23912442
23922443@pytest .mark .integration
@@ -2418,16 +2469,32 @@ def test_stage_only_overwrite_files(
24182469
24192470 assert current_snapshot == tbl .metadata .current_snapshot_id
24202471 assert len (tbl .scan ().to_arrow ()) == original_count
2421-
24222472 snapshots = tbl .snapshots ()
24232473 assert len (snapshots ) == 2
24242474
2475+ # Write to main branch
2476+ with tbl .transaction () as txn :
2477+ with txn .update_snapshot ().fast_append () as fast_append :
2478+ for data_file in _dataframe_to_data_files (
2479+ table_metadata = txn .table_metadata , df = arrow_table_with_null , io = txn ._table .io
2480+ ):
2481+ fast_append .append_data_file (data_file = data_file )
2482+
2483+ # Main ref has changed
2484+ assert current_snapshot != tbl .metadata .current_snapshot_id
2485+ assert len (tbl .scan ().to_arrow ()) == 6
2486+ snapshots = tbl .snapshots ()
2487+ assert len (snapshots ) == 3
2488+
24252489 rows = spark .sql (
24262490 f"""
2427- SELECT operation, summary
2428- FROM { identifier } .snapshots
2429- ORDER BY committed_at ASC
2430- """
2491+ SELECT operation, parent_id
2492+ FROM { identifier } .snapshots
2493+ ORDER BY committed_at ASC
2494+ """
24312495 ).collect ()
24322496 operations = [row .operation for row in rows ]
2433- assert operations == ["append" , "overwrite" ]
2497+ parent_snapshot_id = [row .parent_id for row in rows ]
2498+ assert operations == ["append" , "overwrite" , "append" ]
2499+ # both subsequent parent id should be the first snapshot id
2500+ assert parent_snapshot_id == [None , current_snapshot , current_snapshot ]
0 commit comments