Skip to content

Commit 65e6982

Browse files
ForeverAngryJanKaul
authored andcommitted
fix: update expire snapshots implementation to work with current iceberg-rust-spec APIs
- Fix imports for SnapshotReference, SnapshotRetention, and SnapshotBuilder - Update create_test_snapshot to use correct builder pattern - Update create_test_ref to match current SnapshotReference structure - Remove unused imports from test file - All tests passing (9 unit tests + 6 integration tests)
1 parent b081b9e commit 65e6982

2 files changed

Lines changed: 358 additions & 2 deletions

File tree

iceberg-rust/src/table/maintenance/expire_snapshots.rs

Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ mod tests {
517517
use super::*;
518518
use std::collections::HashMap;
519519
use iceberg_rust_spec::spec::table_metadata::FormatVersion;
520+
use iceberg_rust_spec::spec::snapshot::{Snapshot, SnapshotBuilder, SnapshotReference, SnapshotRetention};
520521

521522
#[test]
522523
fn test_expire_snapshots_selection_logic() {
@@ -647,4 +648,360 @@ mod tests {
647648
fn validate_criteria(older_than: Option<i64>, retain_last: Option<usize>) -> bool {
648649
older_than.is_some() || retain_last.is_some()
649650
}
651+
652+
fn create_test_metadata_with_snapshots() -> TableMetadata {
653+
let mut snapshots = HashMap::new();
654+
let now = chrono::Utc::now().timestamp_millis();
655+
656+
// Create snapshots with different timestamps
657+
// Snapshot 1: 5 days old
658+
snapshots.insert(1, create_test_snapshot(1, now - 5 * 86400 * 1000, "s3://bucket/manifest1.avro"));
659+
// Snapshot 2: 10 days old
660+
snapshots.insert(2, create_test_snapshot(2, now - 10 * 86400 * 1000, "s3://bucket/manifest2.avro"));
661+
// Snapshot 3: 15 days old
662+
snapshots.insert(3, create_test_snapshot(3, now - 15 * 86400 * 1000, "s3://bucket/manifest3.avro"));
663+
// Snapshot 4: 20 days old
664+
snapshots.insert(4, create_test_snapshot(4, now - 20 * 86400 * 1000, "s3://bucket/manifest4.avro"));
665+
// Snapshot 5: 25 days old
666+
snapshots.insert(5, create_test_snapshot(5, now - 25 * 86400 * 1000, "s3://bucket/manifest5.avro"));
667+
668+
// Create refs (branches/tags)
669+
let mut refs = HashMap::new();
670+
refs.insert("main".to_string(), create_test_ref(3)); // ref to snapshot 3
671+
refs.insert("tag-v1".to_string(), create_test_ref(4)); // ref to snapshot 4
672+
673+
TableMetadata {
674+
format_version: FormatVersion::V2,
675+
table_uuid: uuid::Uuid::new_v4(),
676+
location: "s3://test-bucket/test-table".to_string(),
677+
last_sequence_number: 5,
678+
last_updated_ms: now,
679+
last_column_id: 10,
680+
schemas: HashMap::new(),
681+
current_schema_id: 0,
682+
partition_specs: HashMap::new(),
683+
default_spec_id: 0,
684+
last_partition_id: 0,
685+
properties: HashMap::new(),
686+
current_snapshot_id: Some(1), // Most recent snapshot is current
687+
snapshots,
688+
snapshot_log: Vec::new(),
689+
metadata_log: Vec::new(),
690+
sort_orders: HashMap::new(),
691+
default_sort_order_id: 0,
692+
refs,
693+
}
694+
}
695+
696+
fn create_test_snapshot(id: i64, timestamp_ms: i64, manifest_list: &str) -> Snapshot {
697+
SnapshotBuilder::default()
698+
.with_snapshot_id(id)
699+
.with_timestamp_ms(timestamp_ms)
700+
.with_manifest_list(manifest_list.to_string())
701+
.with_sequence_number(id)
702+
.build()
703+
.unwrap()
704+
}
705+
706+
fn create_test_ref(snapshot_id: i64) -> SnapshotReference {
707+
SnapshotReference {
708+
snapshot_id,
709+
retention: SnapshotRetention::Branch {
710+
min_snapshots_to_keep: None,
711+
max_snapshot_age_ms: None,
712+
max_ref_age_ms: None,
713+
},
714+
}
715+
}
716+
717+
#[test]
718+
fn test_expire_snapshots_by_timestamp() {
719+
let metadata = create_test_metadata_with_snapshots();
720+
let now = chrono::Utc::now().timestamp_millis();
721+
722+
// Create test expiration with timestamp threshold of 14 days
723+
let test_expire = TestExpireSnapshots {
724+
older_than: Some(now - 14 * 86400 * 1000),
725+
retain_last: None,
726+
retain_ref_snapshots: false,
727+
};
728+
729+
let result = test_expire.select_snapshots_to_expire(&metadata).unwrap();
730+
let (expired, retained) = result;
731+
732+
// Snapshots 3, 4, and 5 should be expired (older than 14 days)
733+
assert_eq!(expired.len(), 3);
734+
assert!(expired.contains(&3));
735+
assert!(expired.contains(&4));
736+
assert!(expired.contains(&5));
737+
738+
// Snapshots 1 and 2 should be retained (newer than 14 days + current)
739+
assert_eq!(retained.len(), 2);
740+
assert!(retained.contains(&1));
741+
assert!(retained.contains(&2));
742+
}
743+
744+
#[test]
745+
fn test_retain_last_n_snapshots() {
746+
let metadata = create_test_metadata_with_snapshots();
747+
748+
// Create test expiration with retain_last = 2
749+
let test_expire = TestExpireSnapshots {
750+
older_than: None,
751+
retain_last: Some(2),
752+
retain_ref_snapshots: false,
753+
};
754+
755+
let result = test_expire.select_snapshots_to_expire(&metadata).unwrap();
756+
let (expired, retained) = result;
757+
758+
// Only the 2 most recent snapshots should be retained
759+
assert_eq!(retained.len(), 2);
760+
assert!(retained.contains(&1)); // Most recent
761+
assert!(retained.contains(&2)); // Second most recent
762+
763+
// Snapshots 3, 4, and 5 should be expired
764+
assert_eq!(expired.len(), 3);
765+
assert!(expired.contains(&3));
766+
assert!(expired.contains(&4));
767+
assert!(expired.contains(&5));
768+
}
769+
770+
#[test]
771+
fn test_protect_current_snapshot() {
772+
let metadata = create_test_metadata_with_snapshots();
773+
let now = chrono::Utc::now().timestamp_millis();
774+
775+
// Create test expiration with aggressive timestamp that would expire all snapshots
776+
let test_expire = TestExpireSnapshots {
777+
older_than: Some(now), // All snapshots are older than now
778+
retain_last: None,
779+
retain_ref_snapshots: false,
780+
};
781+
782+
let result = test_expire.select_snapshots_to_expire(&metadata).unwrap();
783+
let (expired, retained) = result;
784+
785+
// Current snapshot (1) should always be retained
786+
assert_eq!(retained.len(), 1);
787+
assert!(retained.contains(&1));
788+
789+
// All other snapshots should be expired
790+
assert_eq!(expired.len(), 4);
791+
assert!(expired.contains(&2));
792+
assert!(expired.contains(&3));
793+
assert!(expired.contains(&4));
794+
assert!(expired.contains(&5));
795+
}
796+
797+
#[test]
798+
fn test_protect_referenced_snapshots() {
799+
let metadata = create_test_metadata_with_snapshots();
800+
let now = chrono::Utc::now().timestamp_millis();
801+
802+
// Create test expiration that would expire all snapshots except for the refs
803+
let test_expire = TestExpireSnapshots {
804+
older_than: Some(now), // All snapshots are older than now
805+
retain_last: None,
806+
retain_ref_snapshots: true, // But we want to retain referenced snapshots
807+
};
808+
809+
let result = test_expire.select_snapshots_to_expire(&metadata).unwrap();
810+
let (expired, retained) = result;
811+
812+
// Referenced snapshots (3, 4) and current snapshot (1) should be retained
813+
assert_eq!(retained.len(), 3);
814+
assert!(retained.contains(&1)); // Current
815+
assert!(retained.contains(&3)); // Referenced by "main" branch
816+
assert!(retained.contains(&4)); // Referenced by "tag-v1"
817+
818+
// Other snapshots should be expired
819+
assert_eq!(expired.len(), 2);
820+
assert!(expired.contains(&2));
821+
assert!(expired.contains(&5));
822+
}
823+
824+
#[test]
825+
fn test_combined_criteria() {
826+
let metadata = create_test_metadata_with_snapshots();
827+
let now = chrono::Utc::now().timestamp_millis();
828+
829+
// Create test expiration with both timestamp and count criteria
830+
let test_expire = TestExpireSnapshots {
831+
older_than: Some(now - 12 * 86400 * 1000), // Expire older than 12 days
832+
retain_last: Some(3), // But always keep the 3 most recent
833+
retain_ref_snapshots: false,
834+
};
835+
836+
let result = test_expire.select_snapshots_to_expire(&metadata).unwrap();
837+
let (expired, retained) = result;
838+
839+
// The 3 most recent snapshots should be retained
840+
// even though snapshot 3 is older than 12 days
841+
assert_eq!(retained.len(), 3);
842+
assert!(retained.contains(&1));
843+
assert!(retained.contains(&2));
844+
assert!(retained.contains(&3));
845+
846+
// Snapshots 4 and 5 should be expired
847+
assert_eq!(expired.len(), 2);
848+
assert!(expired.contains(&4));
849+
assert!(expired.contains(&5));
850+
}
851+
852+
#[test]
853+
fn test_empty_metadata() {
854+
// Test with empty metadata
855+
let empty_metadata = TableMetadata {
856+
format_version: FormatVersion::V2,
857+
table_uuid: uuid::Uuid::new_v4(),
858+
location: "s3://test-bucket/test-table".to_string(),
859+
last_sequence_number: 0,
860+
last_updated_ms: 0,
861+
last_column_id: 0,
862+
schemas: HashMap::new(),
863+
current_schema_id: 0,
864+
partition_specs: HashMap::new(),
865+
default_spec_id: 0,
866+
last_partition_id: 0,
867+
properties: HashMap::new(),
868+
current_snapshot_id: None,
869+
snapshots: HashMap::new(),
870+
snapshot_log: Vec::new(),
871+
metadata_log: Vec::new(),
872+
sort_orders: HashMap::new(),
873+
default_sort_order_id: 0,
874+
refs: HashMap::new(),
875+
};
876+
877+
let test_expire = TestExpireSnapshots {
878+
older_than: Some(1000),
879+
retain_last: Some(5),
880+
retain_ref_snapshots: true,
881+
};
882+
883+
let result = test_expire.select_snapshots_to_expire(&empty_metadata).unwrap();
884+
let (expired, retained) = result;
885+
886+
// No snapshots to expire or retain
887+
assert!(expired.is_empty());
888+
assert!(retained.is_empty());
889+
}
890+
891+
#[test]
892+
fn test_identify_files_to_delete() {
893+
let metadata = create_test_metadata_with_snapshots();
894+
895+
// Create test expiration that will expire snapshots 4 and 5
896+
let test_expire = TestExpireSnapshots {
897+
older_than: None,
898+
retain_last: Some(3),
899+
retain_ref_snapshots: false,
900+
};
901+
902+
let (expired, retained) = test_expire.select_snapshots_to_expire(&metadata).unwrap();
903+
904+
// Function to identify files to delete
905+
let files_to_delete = identify_test_files_to_delete(&metadata, &expired, &retained);
906+
907+
// Manifest lists from expired snapshots should be included
908+
assert_eq!(files_to_delete.manifest_lists.len(), 2);
909+
assert!(files_to_delete.manifest_lists.contains(&"s3://bucket/manifest4.avro".to_string()));
910+
assert!(files_to_delete.manifest_lists.contains(&"s3://bucket/manifest5.avro".to_string()));
911+
912+
// In a real implementation, we would also check manifests and data files
913+
assert!(files_to_delete.manifests.is_empty());
914+
assert!(files_to_delete.data_files.is_empty());
915+
}
916+
917+
// Helper function to identify files to delete
918+
fn identify_test_files_to_delete(
919+
metadata: &TableMetadata,
920+
snapshots_to_expire: &[i64],
921+
_snapshots_to_retain: &[i64],
922+
) -> DeletedFiles {
923+
let mut deleted_files = DeletedFiles::default();
924+
925+
// In a basic implementation, just collect manifest lists from expired snapshots
926+
for snapshot_id in snapshots_to_expire {
927+
if let Some(snapshot) = metadata.snapshots.get(snapshot_id) {
928+
deleted_files.manifest_lists.push(snapshot.manifest_list().clone());
929+
}
930+
}
931+
932+
// In a complete implementation, we would also:
933+
// 1. Parse manifest lists to find manifest files
934+
// 2. Parse manifests to find data files
935+
// 3. Check which files are only referenced by expired snapshots
936+
937+
deleted_files
938+
}
939+
940+
// Helper struct for testing
941+
struct TestExpireSnapshots {
942+
older_than: Option<i64>,
943+
retain_last: Option<usize>,
944+
retain_ref_snapshots: bool,
945+
}
946+
947+
impl TestExpireSnapshots {
948+
fn select_snapshots_to_expire(&self, metadata: &TableMetadata) -> Result<(Vec<i64>, Vec<i64>), Error> {
949+
let mut snapshots_to_expire = Vec::new();
950+
let mut snapshots_to_retain = Vec::new();
951+
952+
// Get all snapshots sorted by timestamp (newest first)
953+
let mut all_snapshots: Vec<_> = metadata.snapshots.values().collect();
954+
all_snapshots.sort_by(|a, b| b.timestamp_ms().cmp(a.timestamp_ms()));
955+
956+
// Get current snapshot ID to ensure we never expire it
957+
let current_snapshot_id = metadata.current_snapshot_id;
958+
959+
// Get snapshot IDs referenced by branches/tags if we should preserve them
960+
let ref_snapshot_ids = if self.retain_ref_snapshots {
961+
metadata.refs.values()
962+
.map(|r| r.snapshot_id)
963+
.collect::<HashSet<_>>()
964+
} else {
965+
HashSet::new()
966+
};
967+
968+
// Apply retention logic
969+
for (index, snapshot) in all_snapshots.iter().enumerate() {
970+
let snapshot_id = *snapshot.snapshot_id();
971+
let mut should_retain = false;
972+
973+
// Never expire the current snapshot
974+
if Some(snapshot_id) == current_snapshot_id {
975+
should_retain = true;
976+
}
977+
// Never expire snapshots referenced by branches/tags if enabled
978+
else if self.retain_ref_snapshots && ref_snapshot_ids.contains(&snapshot_id) {
979+
should_retain = true;
980+
}
981+
// Keep the most recent N snapshots if retain_last is specified
982+
else if let Some(retain_count) = self.retain_last {
983+
if index < retain_count {
984+
should_retain = true;
985+
}
986+
}
987+
988+
// Apply older_than filter only if not already marked for retention
989+
if !should_retain {
990+
if let Some(threshold) = self.older_than {
991+
if *snapshot.timestamp_ms() >= threshold {
992+
should_retain = true;
993+
}
994+
}
995+
}
996+
997+
if should_retain {
998+
snapshots_to_retain.push(snapshot_id);
999+
} else {
1000+
snapshots_to_expire.push(snapshot_id);
1001+
}
1002+
}
1003+
1004+
Ok((snapshots_to_expire, snapshots_to_retain))
1005+
}
1006+
}
6501007
}

iceberg-rust/tests/snapshot_expiration.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
//! These tests verify that the expire_snapshots API works correctly with real
44
//! table metadata structures and various expiration criteria.
55
6-
use iceberg_rust::table::maintenance::{ExpireSnapshots, ExpireSnapshotsResult};
76
use iceberg_rust_spec::spec::{
87
snapshot::{Snapshot, SnapshotBuilder, SnapshotReference, SnapshotRetention},
9-
table_metadata::{TableMetadata, TableMetadataBuilder, FormatVersion},
8+
table_metadata::{TableMetadata, FormatVersion},
109
};
1110
use std::collections::HashMap;
1211
use uuid::Uuid;

0 commit comments

Comments
 (0)