Skip to content

Commit b081b9e

Browse files
ForeverAngryJanKaul
authored andcommitted
feat: add snapshot expiration functionality and maintenance operations
- Implemented snapshot expiration logic in the maintenance module. - Added methods for time-based and count-based snapshot retention. - Created examples demonstrating the usage of snapshot expiration. - Updated README with new features and examples. - Added integration tests for snapshot expiration functionality.
1 parent 1aff500 commit b081b9e

7 files changed

Lines changed: 1121 additions & 1 deletion

File tree

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ It provides an Iceberg integration for the [Datafusion](https://arrow.apache.org
2828
| Equality deletes | :white_check_mark: |
2929
| Positional deletes | |
3030

31+
### Table Maintenance
32+
33+
| Feature | Status |
34+
| --- | --- |
35+
| Expire snapshots | :white_check_mark: |
36+
| Orphan file cleanup | :white_check_mark: |
37+
3138
### Iceberg Views
3239

3340
| Feature | Status |
@@ -60,7 +67,9 @@ It provides an Iceberg integration for the [Datafusion](https://arrow.apache.org
6067

6168
## Example
6269

63-
Check out the [datafusion examples](datafusion_iceberg/examples).
70+
Check out the [datafusion examples](datafusion_iceberg/examples) and [maintenance examples](iceberg-rust/examples/).
71+
72+
### Basic Table Operations
6473

6574
```rust
6675
use datafusion::{arrow::array::Int64Array, prelude::SessionContext};
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*!
2+
Example demonstrating snapshot expiration functionality
3+
4+
This example shows how to use the expire_snapshots API to clean up old snapshots
5+
from an Iceberg table. Note that this is a conceptual example - in practice, you
6+
would need a fully configured table with a catalog and object store.
7+
*/
8+
9+
use std::error::Error;
10+
11+
#[tokio::main]
12+
async fn main() -> Result<(), Box<dyn Error>> {
13+
// Note: This is a conceptual example. In practice, you would load a real table:
14+
// let mut table = catalog.load_table(&identifier).await?;
15+
16+
println!("Iceberg Snapshot Expiration Example");
17+
println!("==================================");
18+
19+
// Example 1: Expire snapshots older than 30 days
20+
println!("\n1. Expire snapshots older than 30 days:");
21+
demonstrate_time_based_expiration().await;
22+
23+
// Example 2: Keep only the last 10 snapshots
24+
println!("\n2. Keep only the last 10 snapshots:");
25+
demonstrate_count_based_expiration().await;
26+
27+
// Example 3: Combined criteria with orphan file cleanup
28+
println!("\n3. Combined criteria with file cleanup:");
29+
demonstrate_combined_expiration().await;
30+
31+
// Example 4: Dry run to preview what would be expired
32+
println!("\n4. Dry run to preview expiration:");
33+
demonstrate_dry_run().await;
34+
35+
Ok(())
36+
}
37+
38+
async fn demonstrate_time_based_expiration() {
39+
// Calculate timestamp for 30 days ago
40+
let thirty_days_ago = chrono::Utc::now().timestamp_millis() - 30 * 24 * 60 * 60 * 1000;
41+
42+
println!(" expire_snapshots()");
43+
println!(" .expire_older_than({}) // 30 days ago", thirty_days_ago);
44+
println!(" .execute().await");
45+
println!(" // This would expire all snapshots older than 30 days");
46+
}
47+
48+
async fn demonstrate_count_based_expiration() {
49+
println!(" expire_snapshots()");
50+
println!(" .retain_last(10)");
51+
println!(" .execute().await");
52+
println!(" // This would keep only the 10 most recent snapshots");
53+
}
54+
55+
async fn demonstrate_combined_expiration() {
56+
let seven_days_ago = chrono::Utc::now().timestamp_millis() - 7 * 24 * 60 * 60 * 1000;
57+
58+
println!(" expire_snapshots()");
59+
println!(" .expire_older_than({}) // 7 days ago", seven_days_ago);
60+
println!(" .retain_last(5) // But keep at least 5");
61+
println!(" .clean_orphan_files(true) // Also delete unreferenced files");
62+
println!(" .execute().await");
63+
println!(" // This would expire snapshots older than 7 days, but always keep");
64+
println!(" // the 5 most recent snapshots and clean up orphaned files");
65+
}
66+
67+
async fn demonstrate_dry_run() {
68+
println!(" let result = expire_snapshots()");
69+
println!(" .retain_last(5)");
70+
println!(" .dry_run(true) // Preview mode");
71+
println!(" .execute().await?;");
72+
println!(" ");
73+
println!(" println!(\"Would expire {{}} snapshots\", result.expired_snapshot_ids.len());");
74+
println!(" println!(\"Would delete {{}} manifest files\", result.deleted_files.manifests.len());");
75+
println!(" // No actual changes made in dry run mode");
76+
}
77+
78+
// This function would show a real example if we had a table instance
79+
#[allow(dead_code)]
80+
async fn real_expiration_example() -> Result<(), Box<dyn Error>> {
81+
// In a real implementation, you would:
82+
// 1. Load a table from your catalog
83+
// 2. Call expire_snapshots with your desired criteria
84+
// 3. Handle the results
85+
86+
/* Example code (commented out since we don't have a real table):
87+
88+
let mut table = catalog.load_table(&table_identifier).await?;
89+
90+
let result = table.expire_snapshots()
91+
.expire_older_than(chrono::Utc::now().timestamp_millis() - 30 * 24 * 60 * 60 * 1000)
92+
.retain_last(10)
93+
.clean_orphan_files(true)
94+
.execute()
95+
.await?;
96+
97+
println!("Expired {} snapshots", result.expired_snapshot_ids.len());
98+
println!("Deleted {} manifest lists", result.deleted_files.manifest_lists.len());
99+
println!("Deleted {} manifest files", result.deleted_files.manifests.len());
100+
println!("Deleted {} data files", result.deleted_files.data_files.len());
101+
102+
if !result.expired_snapshot_ids.is_empty() {
103+
println!("Expired snapshot IDs: {:?}", result.expired_snapshot_ids);
104+
}
105+
*/
106+
107+
Ok(())
108+
}

iceberg-rust/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//! * Time travel and snapshot isolation
1414
//! * View and materialized view support
1515
//! * Multiple catalog implementations (REST, AWS Glue, File-based)
16+
//! * Table maintenance operations (snapshot expiration, orphan file cleanup)
1617
//!
1718
//! # Components
1819
//!
@@ -43,6 +44,15 @@
4344
//! .update_schema(new_schema)
4445
//! .commit()
4546
//! .await?;
47+
//!
48+
//! // Expire old snapshots for maintenance
49+
//! let result = table.expire_snapshots()
50+
//! .expire_older_than(chrono::Utc::now().timestamp_millis() - 30 * 24 * 60 * 60 * 1000)
51+
//! .retain_last(10)
52+
//! .execute()
53+
//! .await?;
54+
//!
55+
//! println!("Expired {} snapshots", result.expired_snapshot_ids.len());
4656
//! # Ok(())
4757
//! # }
4858
//! ```

0 commit comments

Comments
 (0)