Skip to content

Commit f2ef422

Browse files
ForeverAngryJanKaul
authored andcommitted
refactor: simplify snapshot expiration implementation and remove builder struct
1 parent 314406d commit f2ef422

3 files changed

Lines changed: 149 additions & 119 deletions

File tree

iceberg-rust/src/lib.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,17 @@
4646
//! .await?;
4747
//!
4848
//! // Expire old snapshots for maintenance
49-
//! let expired_snapshot_ids = table.expire_snapshots()
50-
//! .expire_older_than(chrono::Utc::now().timestamp_millis() - 30 * 24 * 60 * 60 * 1000)
51-
//! .retain_last(10)
52-
//! .execute()
49+
//! table
50+
//! .new_transaction(None)
51+
//! .expire_snapshots(
52+
//! Some(chrono::Utc::now().timestamp_millis() - 30 * 24 * 60 * 60 * 1000),
53+
//! Some(10),
54+
//! true,
55+
//! true,
56+
//! false,
57+
//! )
58+
//! .commit()
5359
//! .await?;
54-
//!
55-
//! println!("Expired {} snapshots", expired_snapshot_ids.len());
5660
//! # Ok(())
5761
//! # }
5862
//! ```

iceberg-rust/src/table/mod.rs

Lines changed: 0 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -45,86 +45,6 @@ pub mod manifest;
4545
pub mod manifest_list;
4646
pub mod transaction;
4747

48-
/// Builder for configuring and executing snapshot expiration operations
49-
///
50-
/// This builder provides a fluent API for configuring how snapshots should be expired:
51-
/// * [`expire_older_than`](ExpireSnapshotsBuilder::expire_older_than) - Remove snapshots older than a timestamp
52-
/// * [`retain_last`](ExpireSnapshotsBuilder::retain_last) - Keep only the most recent N snapshots
53-
/// * [`clean_orphan_files`](ExpireSnapshotsBuilder::clean_orphan_files) - Also remove unreferenced data files
54-
/// * [`dry_run`](ExpireSnapshotsBuilder::dry_run) - Preview what would be deleted without actually deleting
55-
pub struct ExpireSnapshotsBuilder<'a> {
56-
table: &'a mut Table,
57-
older_than: Option<i64>,
58-
retain_last: Option<usize>,
59-
clean_orphan_files: bool,
60-
retain_ref_snapshots: bool,
61-
dry_run: bool,
62-
}
63-
64-
impl<'a> ExpireSnapshotsBuilder<'a> {
65-
/// Create a new snapshot expiration builder for the given table
66-
fn new(table: &'a mut Table) -> Self {
67-
Self {
68-
table,
69-
older_than: None,
70-
retain_last: None,
71-
clean_orphan_files: false,
72-
retain_ref_snapshots: true,
73-
dry_run: false,
74-
}
75-
}
76-
77-
/// Expire snapshots older than the given timestamp (in milliseconds since Unix epoch)
78-
pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self {
79-
self.older_than = Some(timestamp_ms);
80-
self
81-
}
82-
83-
/// Retain only the most recent N snapshots, expiring all others
84-
pub fn retain_last(mut self, count: usize) -> Self {
85-
self.retain_last = Some(count);
86-
self
87-
}
88-
89-
/// Enable or disable cleanup of orphaned data files
90-
pub fn clean_orphan_files(mut self, enabled: bool) -> Self {
91-
self.clean_orphan_files = enabled;
92-
self
93-
}
94-
95-
/// Control whether snapshots referenced by branches/tags should be preserved
96-
pub fn retain_ref_snapshots(mut self, enabled: bool) -> Self {
97-
self.retain_ref_snapshots = enabled;
98-
self
99-
}
100-
101-
/// Enable dry run mode to preview what would be deleted without actually deleting
102-
pub fn dry_run(mut self, enabled: bool) -> Self {
103-
self.dry_run = enabled;
104-
self
105-
}
106-
107-
/// Execute the snapshot expiration operation
108-
pub async fn execute(self) -> Result<Vec<i64>, Error> {
109-
let _result = self.table.new_transaction(None)
110-
.expire_snapshots(
111-
self.older_than,
112-
self.retain_last,
113-
self.clean_orphan_files,
114-
self.retain_ref_snapshots,
115-
self.dry_run,
116-
)
117-
.commit()
118-
.await?;
119-
120-
// Extract the expired snapshot IDs from the commit result
121-
// For now, we'll need to return empty vec since the transaction commit
122-
// doesn't directly return the expired snapshot IDs
123-
// TODO: Enhance transaction result to include operation-specific details
124-
Ok(vec![])
125-
}
126-
}
127-
12848
#[derive(Debug, Clone)]
12949
/// Iceberg table
13050
pub struct Table {
@@ -378,39 +298,6 @@ impl Table {
378298
TableTransaction::new(self, branch)
379299
}
380300

381-
/// Configures snapshot expiration for this table
382-
///
383-
/// Returns a builder that allows configuring snapshot expiration policies:
384-
/// * Time-based expiration: Remove snapshots older than a timestamp
385-
/// * Count-based retention: Keep only the most recent N snapshots
386-
/// * Orphan file cleanup: Remove data files no longer referenced by any snapshot
387-
/// * Reference preservation: Protect snapshots referenced by branches/tags
388-
/// * Dry run mode: Preview what would be deleted without actually deleting
389-
///
390-
/// The operation is executed through the table's transaction system, ensuring
391-
/// atomicity and consistency with other table operations.
392-
///
393-
/// # Returns
394-
/// * `ExpireSnapshotsBuilder` - A builder for configuring expiration parameters
395-
///
396-
/// # Examples
397-
/// ```rust,no_run
398-
/// # async fn example(table: &mut Table) -> Result<(), Box<dyn std::error::Error>> {
399-
/// // Expire snapshots older than 7 days but keep at least 5 snapshots
400-
/// let result = table.expire_snapshots()
401-
/// .expire_older_than(chrono::Utc::now().timestamp_millis() - 7 * 24 * 60 * 60 * 1000)
402-
/// .retain_last(5)
403-
/// .clean_orphan_files(true)
404-
/// .execute()
405-
/// .await?;
406-
///
407-
/// println!("Expired {} snapshots", result.len());
408-
/// # Ok(())
409-
/// # }
410-
/// ```
411-
pub fn expire_snapshots(&mut self) -> ExpireSnapshotsBuilder<'_> {
412-
ExpireSnapshotsBuilder::new(self)
413-
}
414301
}
415302

416303
/// Path of a Manifest file

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,145 @@ impl Operation {
941941
}
942942
}
943943

944+
#[cfg(test)]
945+
mod tests {
946+
use super::*;
947+
use futures::executor::block_on;
948+
use iceberg_rust_spec::spec::schema::SchemaBuilder;
949+
use iceberg_rust_spec::spec::table_metadata::TableMetadataBuilder;
950+
use iceberg_rust_spec::spec::types::{PrimitiveType, StructField, Type};
951+
use object_store::memory::InMemory;
952+
953+
fn sample_metadata(
954+
snapshot_defs: &[(i64, i64)],
955+
current_snapshot: Option<i64>,
956+
refs: &[(&str, i64)],
957+
) -> TableMetadata {
958+
let schema = SchemaBuilder::default()
959+
.with_schema_id(0)
960+
.with_struct_field(StructField {
961+
id: 1,
962+
name: "id".to_string(),
963+
required: true,
964+
field_type: Type::Primitive(PrimitiveType::Long),
965+
doc: None,
966+
})
967+
.build()
968+
.unwrap();
969+
970+
let snapshots = snapshot_defs
971+
.iter()
972+
.enumerate()
973+
.map(|(idx, (snapshot_id, timestamp))| {
974+
let snapshot = SnapshotBuilder::default()
975+
.with_snapshot_id(*snapshot_id)
976+
.with_sequence_number((idx + 1) as i64)
977+
.with_timestamp_ms(*timestamp)
978+
.with_manifest_list(format!("manifest-{snapshot_id}.avro"))
979+
.with_summary(Summary {
980+
operation: SnapshotOperation::Append,
981+
other: HashMap::new(),
982+
})
983+
.with_schema_id(0)
984+
.build()
985+
.unwrap();
986+
(*snapshot_id, snapshot)
987+
})
988+
.collect::<HashMap<_, _>>();
989+
990+
let refs = refs
991+
.iter()
992+
.map(|(name, snapshot_id)| {
993+
(
994+
(*name).to_string(),
995+
SnapshotReference {
996+
snapshot_id: *snapshot_id,
997+
retention: SnapshotRetention::default(),
998+
},
999+
)
1000+
})
1001+
.collect::<HashMap<_, _>>();
1002+
1003+
TableMetadataBuilder::default()
1004+
.location("s3://tests/table".to_owned())
1005+
.current_schema_id(0)
1006+
.schemas(HashMap::from_iter(vec![(0, schema)]))
1007+
.snapshots(snapshots)
1008+
.current_snapshot_id(current_snapshot)
1009+
.last_sequence_number(snapshot_defs.len() as i64)
1010+
.refs(refs)
1011+
.build()
1012+
.unwrap()
1013+
}
1014+
1015+
fn execute_operation(
1016+
metadata: &TableMetadata,
1017+
older_than: Option<i64>,
1018+
retain_last: Option<usize>,
1019+
retain_refs: bool,
1020+
dry_run: bool,
1021+
) -> Result<Vec<TableUpdate>, Error> {
1022+
let op = Operation::ExpireSnapshots {
1023+
older_than,
1024+
retain_last,
1025+
clean_orphan_files: false,
1026+
retain_ref_snapshots: retain_refs,
1027+
dry_run,
1028+
};
1029+
let store = Arc::new(InMemory::new());
1030+
block_on(op.execute(metadata, store)).map(|(_, updates)| updates)
1031+
}
1032+
1033+
fn collect_snapshot_ids(updates: &[TableUpdate]) -> Vec<i64> {
1034+
updates
1035+
.iter()
1036+
.flat_map(|update| match update {
1037+
TableUpdate::RemoveSnapshots { snapshot_ids } => snapshot_ids.clone(),
1038+
_ => Vec::new(),
1039+
})
1040+
.collect()
1041+
}
1042+
1043+
#[test]
1044+
fn snapshot_expiration_requires_policy() {
1045+
let metadata = sample_metadata(&[(1, 1_000)], Some(1), &[]);
1046+
let result = execute_operation(&metadata, None, None, true, false);
1047+
assert!(matches!(result, Err(Error::InvalidFormat(_))));
1048+
}
1049+
1050+
#[test]
1051+
fn snapshot_expiration_applies_time_and_count_filters() {
1052+
let metadata = sample_metadata(
1053+
&[(1, 1_000), (2, 2_000), (3, 3_000), (4, 4_000)],
1054+
Some(4),
1055+
&[],
1056+
);
1057+
let updates = execute_operation(&metadata, Some(2_500), Some(2), true, false).unwrap();
1058+
let mut expired = collect_snapshot_ids(&updates);
1059+
expired.sort();
1060+
assert_eq!(expired, vec![1, 2]);
1061+
}
1062+
1063+
#[test]
1064+
fn snapshot_expiration_preserves_current_and_refs() {
1065+
let metadata = sample_metadata(
1066+
&[(10, 1_000), (20, 2_000), (30, 3_000)],
1067+
Some(30),
1068+
&[ ("branch", 20) ],
1069+
);
1070+
let updates = execute_operation(&metadata, Some(1_500), None, true, false).unwrap();
1071+
// Snapshot 10 is the only candidate because 20 is referenced and 30 is current.
1072+
assert_eq!(collect_snapshot_ids(&updates), vec![10]);
1073+
}
1074+
1075+
#[test]
1076+
fn snapshot_expiration_supports_dry_run() {
1077+
let metadata = sample_metadata(&[(1, 1_000), (2, 900)], Some(1), &[]);
1078+
let updates = execute_operation(&metadata, Some(950), None, true, true).unwrap();
1079+
assert!(updates.is_empty());
1080+
}
1081+
}
1082+
9441083
pub fn bounding_partition_values<'a>(
9451084
mut iter: impl Iterator<Item = &'a DataFile>,
9461085
partition_column_names: &SmallVec<[&str; 4]>,

0 commit comments

Comments
 (0)