@@ -93,7 +93,7 @@ use super::transaction::{
9393 Operation , RewriteGroup , RewrittenIndex , Transaction , TransactionBuilder ,
9494} ;
9595use super :: utils:: make_rowid_capture_stream;
96- use super :: { WriteMode , WriteParams , write_fragments_internal} ;
96+ use super :: { WriteMode , WriteParams , cleanup_data_fragments , write_fragments_internal} ;
9797use crate :: Dataset ;
9898use crate :: Result ;
9999use crate :: dataset:: utils:: CapturedRowIds ;
@@ -1263,26 +1263,37 @@ async fn rewrite_files(
12631263
12641264 log:: info!( "Compaction task {}: file written" , task_id) ;
12651265
1266- let row_addrs = if let Some ( row_ids_rx) = row_ids_rx {
1267- let captured_ids = row_ids_rx
1268- . try_recv ( )
1269- . map_err ( |err| Error :: internal ( format ! ( "Failed to receive row ids: {}" , err) ) ) ?;
1270- let row_addrs = captured_ids. row_addrs ( None ) . into_owned ( ) ;
1271- let mut serialized = Vec :: with_capacity ( row_addrs. serialized_size ( ) ) ;
1272- row_addrs. serialize_into ( & mut serialized) ?;
1273- Some ( serialized)
1274- } else {
1275- if dataset. manifest . uses_stable_row_ids ( ) {
1276- log:: info!( "Compaction task {}: rechunking stable row ids" , task_id) ;
1277- rechunk_stable_row_ids ( dataset. as_ref ( ) , & mut new_fragments, & fragments) . await ?;
1278- recalc_versions_for_rewritten_fragments (
1279- dataset. as_ref ( ) ,
1280- & mut new_fragments,
1281- & fragments,
1282- )
1283- . await ?;
1266+ let row_addrs_result: Result < Option < Vec < u8 > > > = async {
1267+ if let Some ( row_ids_rx) = row_ids_rx {
1268+ let captured_ids = row_ids_rx
1269+ . try_recv ( )
1270+ . map_err ( |err| Error :: internal ( format ! ( "Failed to receive row ids: {}" , err) ) ) ?;
1271+ let row_addrs = captured_ids. row_addrs ( None ) . into_owned ( ) ;
1272+ let mut serialized = Vec :: with_capacity ( row_addrs. serialized_size ( ) ) ;
1273+ row_addrs. serialize_into ( & mut serialized) ?;
1274+ Ok ( Some ( serialized) )
1275+ } else {
1276+ if dataset. manifest . uses_stable_row_ids ( ) {
1277+ log:: info!( "Compaction task {}: rechunking stable row ids" , task_id) ;
1278+ rechunk_stable_row_ids ( dataset. as_ref ( ) , & mut new_fragments, & fragments) . await ?;
1279+ recalc_versions_for_rewritten_fragments (
1280+ dataset. as_ref ( ) ,
1281+ & mut new_fragments,
1282+ & fragments,
1283+ )
1284+ . await ?;
1285+ }
1286+ Ok ( None )
1287+ }
1288+ }
1289+ . await ;
1290+
1291+ let row_addrs = match row_addrs_result {
1292+ Ok ( v) => v,
1293+ Err ( e) => {
1294+ cleanup_data_fragments ( & dataset. object_store , & dataset. base , & new_fragments) . await ;
1295+ return Err ( e) ;
12841296 }
1285- None
12861297 } ;
12871298
12881299 metrics. files_removed = task
@@ -1585,6 +1596,13 @@ pub async fn commit_compaction(
15851596 None
15861597 } ;
15871598
1599+ // Collect new fragment paths before moving rewrite_groups into the transaction,
1600+ // so we can clean them up if the commit fails.
1601+ let all_new_fragments: Vec < Fragment > = rewrite_groups
1602+ . iter ( )
1603+ . flat_map ( |g| g. new_fragments . iter ( ) . cloned ( ) )
1604+ . collect ( ) ;
1605+
15881606 let transaction = TransactionBuilder :: new (
15891607 dataset. manifest . version ,
15901608 Operation :: Rewrite {
@@ -1596,9 +1614,13 @@ pub async fn commit_compaction(
15961614 . transaction_properties ( options. transaction_properties . clone ( ) )
15971615 . build ( ) ;
15981616
1599- dataset
1617+ if let Err ( e ) = dataset
16001618 . apply_commit ( transaction, & Default :: default ( ) , & Default :: default ( ) )
1601- . await ?;
1619+ . await
1620+ {
1621+ cleanup_data_fragments ( & dataset. object_store , & dataset. base , & all_new_fragments) . await ;
1622+ return Err ( e) ;
1623+ }
16021624
16031625 Ok ( metrics)
16041626}
0 commit comments