Skip to content

Commit ebaa128

Browse files
committed
Merge PR apache#1606: Add RewriteFilesAction for truncate support
2 parents 090cd3d + a37be14 commit ebaa128

11 files changed

Lines changed: 807 additions & 50 deletions

File tree

crates/iceberg/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,5 @@ pub mod writer;
101101
mod delete_vector;
102102
pub mod metadata_columns;
103103
pub mod puffin;
104+
/// Utility functions and modules.
105+
pub mod util;

crates/iceberg/src/spec/manifest/writer.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,6 @@ impl ManifestWriter {
295295
/// Add a delete manifest entry. This method will update following status of the entry:
296296
/// - Update the entry status to `Deleted`
297297
/// - Set the snapshot id to the current snapshot id
298-
///
299-
/// # TODO
300-
/// Remove this allow later
301-
#[allow(dead_code)]
302298
pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
303299
self.check_data_file(&entry.data_file)?;
304300
entry.status = ManifestStatus::Deleted;
@@ -341,7 +337,7 @@ impl ManifestWriter {
341337
Ok(())
342338
}
343339

344-
/// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
340+
/// Add a file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
345341
/// which were assigned at commit, must be preserved when adding an existing entry.
346342
pub fn add_existing_file(
347343
&mut self,

crates/iceberg/src/spec/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
3838

3939
/// Reference to [`Snapshot`].
4040
pub type SnapshotRef = Arc<Snapshot>;
41-
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
41+
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)]
4242
#[serde(rename_all = "lowercase")]
4343
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
4444
pub enum Operation {

crates/iceberg/src/spec/snapshot_summary.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ pub(crate) fn update_snapshot_summaries(
339339
if summary.operation != Operation::Append
340340
&& summary.operation != Operation::Overwrite
341341
&& summary.operation != Operation::Delete
342+
&& summary.operation != Operation::Replace
342343
{
343344
return Err(Error::new(
344345
ErrorKind::DataInvalid,

crates/iceberg/src/transaction/append.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::table::Table;
2727
use crate::transaction::snapshot::{
2828
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
2929
};
30+
use crate::transaction::validate::SnapshotValidator;
3031
use crate::transaction::{ActionCommit, TransactionAction};
3132

3233
/// FastAppendAction is a transaction action for fast append data files to the table.
@@ -103,6 +104,8 @@ impl TransactionAction for FastAppendAction {
103104
self.snapshot_properties.clone(),
104105
self.added_data_files.clone(),
105106
self.added_delete_files.clone(),
107+
vec![],
108+
vec![],
106109
);
107110

108111
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
@@ -121,6 +124,8 @@ impl TransactionAction for FastAppendAction {
121124

122125
struct FastAppendOperation;
123126

127+
impl SnapshotValidator for FastAppendOperation {}
128+
124129
impl SnapshotProduceOperation for FastAppendOperation {
125130
fn operation(&self) -> Operation {
126131
Operation::Append
@@ -135,7 +140,7 @@ impl SnapshotProduceOperation for FastAppendOperation {
135140

136141
async fn existing_manifest(
137142
&self,
138-
snapshot_produce: &SnapshotProducer<'_>,
143+
snapshot_produce: &mut SnapshotProducer<'_>,
139144
) -> Result<Vec<ManifestFile>> {
140145
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
141146
return Ok(vec![]);

crates/iceberg/src/transaction/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,15 @@ mod action;
5454

5555
pub use action::*;
5656
mod append;
57+
mod rewrite_files;
5758
mod snapshot;
5859
mod sort_order;
5960
mod update_location;
6061
mod update_properties;
6162
mod update_schema;
6263
mod update_statistics;
6364
mod upgrade_format_version;
65+
mod validate;
6466

6567
use std::sync::Arc;
6668
use std::time::Duration;
@@ -73,6 +75,7 @@ use crate::spec::TableProperties;
7375
use crate::table::Table;
7476
use crate::transaction::action::BoxedTransactionAction;
7577
use crate::transaction::append::FastAppendAction;
78+
use crate::transaction::rewrite_files::RewriteFilesAction;
7679
use crate::transaction::sort_order::ReplaceSortOrderAction;
7780
use crate::transaction::update_location::UpdateLocationAction;
7881
use crate::transaction::update_properties::UpdatePropertiesAction;
@@ -154,6 +157,11 @@ impl Transaction {
154157
ReplaceSortOrderAction::new()
155158
}
156159

160+
/// Rewrite a set of data files of table
161+
pub fn rewrite_files(&self) -> RewriteFilesAction {
162+
RewriteFilesAction::new()
163+
}
164+
157165
/// Set the location of table
158166
pub fn update_location(&self) -> UpdateLocationAction {
159167
UpdateLocationAction::new()

0 commit comments

Comments
 (0)