Skip to content

Commit eb9af1a

Browse files
DataFileWithIncrement
1 parent a7879be commit eb9af1a

3 files changed

Lines changed: 71 additions & 34 deletions

File tree

iceberg-rust/src/materialized_view/transaction/mod.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use crate::{
1414
error::Error,
1515
table::{
1616
delete_all_table_files,
17-
transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX},
17+
transaction::{
18+
operation::{DataFileWithIncrement, Operation as TableOperation},
19+
APPEND_INDEX, REPLACE_INDEX,
20+
},
1821
},
1922
view::transaction::operation::Operation as ViewOperation,
2023
};
@@ -102,16 +105,22 @@ impl<'view> Transaction<'view> {
102105
refresh_state: RefreshState,
103106
) -> Result<Self, Error> {
104107
let refresh_state = serde_json::to_string(&refresh_state)?;
108+
let files_with_increments: Vec<DataFileWithIncrement> = files
109+
.into_iter()
110+
.map(|f| DataFileWithIncrement {
111+
data_file: f,
112+
dsn_increment: None,
113+
})
114+
.collect();
105115
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
106116
if let TableOperation::Append {
107117
branch: _,
108118
data_files: old,
109119
delete_files: _,
110120
additional_summary: old_lineage,
111-
dsn_increment: _,
112121
} = operation
113122
{
114-
old.extend_from_slice(&files);
123+
old.extend_from_slice(&files_with_increments);
115124
*old_lineage = Some(HashMap::from_iter(vec![(
116125
REFRESH_STATE.to_owned(),
117126
refresh_state.clone(),
@@ -120,13 +129,12 @@ impl<'view> Transaction<'view> {
120129
} else {
121130
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
122131
branch: self.branch.clone(),
123-
data_files: files,
132+
data_files: files_with_increments,
124133
delete_files: Vec::new(),
125134
additional_summary: Some(HashMap::from_iter(vec![(
126135
REFRESH_STATE.to_owned(),
127136
refresh_state,
128137
)])),
129-
dsn_increment: None,
130138
});
131139
}
132140
Ok(self)
@@ -139,16 +147,22 @@ impl<'view> Transaction<'view> {
139147
refresh_state: RefreshState,
140148
) -> Result<Self, Error> {
141149
let refresh_state = serde_json::to_string(&refresh_state)?;
150+
let files_with_increments: Vec<DataFileWithIncrement> = files
151+
.into_iter()
152+
.map(|f| DataFileWithIncrement {
153+
data_file: f,
154+
dsn_increment: None,
155+
})
156+
.collect();
142157
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
143158
if let TableOperation::Append {
144159
branch: _,
145160
data_files: _,
146161
delete_files: old,
147162
additional_summary: old_lineage,
148-
dsn_increment: _,
149163
} = operation
150164
{
151-
old.extend_from_slice(&files);
165+
old.extend_from_slice(&files_with_increments);
152166
*old_lineage = Some(HashMap::from_iter(vec![(
153167
REFRESH_STATE.to_owned(),
154168
refresh_state.clone(),
@@ -158,12 +172,11 @@ impl<'view> Transaction<'view> {
158172
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
159173
branch: self.branch.clone(),
160174
data_files: Vec::new(),
161-
delete_files: files,
175+
delete_files: files_with_increments,
162176
additional_summary: Some(HashMap::from_iter(vec![(
163177
REFRESH_STATE.to_owned(),
164178
refresh_state,
165179
)])),
166-
dsn_increment: None,
167180
});
168181
}
169182
Ok(self)

iceberg-rust/src/table/transaction/mod.rs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::collections::HashMap;
2020
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
2121

2222
use crate::table::transaction::append::append_summary;
23+
use crate::table::transaction::operation::DataFileWithIncrement;
2324
use crate::{catalog::commit::CommitTable, error::Error, table::Table};
2425

2526
use self::operation::Operation;
@@ -122,27 +123,33 @@ impl<'table> TableTransaction<'table> {
122123

123124
/// Appends data files to the table, increasing the Data Sequence Number by a given amount
124125
///
125-
pub fn append_data_with_dsn_increment(mut self, files: Vec<DataFile>, dsn_increment: Option<u64>) -> Self {
126+
pub fn append_data_with_dsn_increment(
127+
mut self,
128+
files: Vec<DataFile>,
129+
dsn_increment: Option<u64>,
130+
) -> Self {
126131
let summary = append_summary(&files);
132+
let files_with_increments: Vec<DataFileWithIncrement> = files
133+
.into_iter()
134+
.map(|f| DataFileWithIncrement {
135+
data_file: f,
136+
dsn_increment,
137+
})
138+
.collect();
127139

128140
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
129141
if let Operation::Append {
130-
data_files: old,
131-
dsn_increment: old_dsn_increment,
132-
..
142+
data_files: old, ..
133143
} = operation
134144
{
135-
if dsn_increment == *old_dsn_increment {
136-
old.extend_from_slice(&files);
137-
}
145+
old.extend_from_slice(&files_with_increments);
138146
}
139147
} else {
140148
self.operations[APPEND_INDEX] = Some(Operation::Append {
141149
branch: self.branch.clone(),
142-
data_files: files,
150+
data_files: files_with_increments,
143151
delete_files: Vec::new(),
144152
additional_summary: summary,
145-
dsn_increment,
146153
});
147154
}
148155
self
@@ -172,27 +179,34 @@ impl<'table> TableTransaction<'table> {
172179

173180
/// Appends delete files to the table, increasing the Data Sequence Number by a given amount
174181
///
175-
pub fn append_delete_with_dsn_increment(mut self, files: Vec<DataFile>, dsn_increment: Option<u64>) -> Self {
182+
pub fn append_delete_with_dsn_increment(
183+
mut self,
184+
files: Vec<DataFile>,
185+
dsn_increment: Option<u64>,
186+
) -> Self {
187+
let files_with_increments: Vec<DataFileWithIncrement> = files
188+
.into_iter()
189+
.map(|f| DataFileWithIncrement {
190+
data_file: f,
191+
dsn_increment,
192+
})
193+
.collect();
176194
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
177195
if let Operation::Append {
178196
branch: _,
179197
data_files: _,
180198
delete_files: old,
181199
additional_summary: None,
182-
dsn_increment: old_dsn_increment,
183200
} = operation
184201
{
185-
if dsn_increment == *old_dsn_increment {
186-
old.extend_from_slice(&files);
187-
}
202+
old.extend_from_slice(&files_with_increments);
188203
}
189204
} else {
190205
self.operations[APPEND_INDEX] = Some(Operation::Append {
191206
branch: self.branch.clone(),
192207
data_files: Vec::new(),
193-
delete_files: files,
208+
delete_files: files_with_increments,
194209
additional_summary: None,
195-
dsn_increment,
196210
});
197211
}
198212
self

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ use super::append::split_datafiles;
3737
/// The target number of datafiles per manifest is dynamic, but we don't want to go below this number.
3838
static MIN_DATAFILES_PER_MANIFEST: usize = 4;
3939

40+
#[derive(Debug, Clone)]
41+
///Table operations
42+
pub struct DataFileWithIncrement {
43+
pub data_file: DataFile,
44+
pub dsn_increment: Option<u64>,
45+
}
46+
4047
#[derive(Debug)]
4148
///Table operations
4249
pub enum Operation {
@@ -55,10 +62,9 @@ pub enum Operation {
5562
/// Append new files to the table
5663
Append {
5764
branch: Option<String>,
58-
data_files: Vec<DataFile>,
59-
delete_files: Vec<DataFile>,
65+
data_files: Vec<DataFileWithIncrement>,
66+
delete_files: Vec<DataFileWithIncrement>,
6067
additional_summary: Option<HashMap<String, String>>,
61-
dsn_increment: Option<u64>,
6268
},
6369
// /// Quickly append new files to the table
6470
// NewFastAppend {
@@ -102,7 +108,6 @@ impl Operation {
102108
data_files,
103109
delete_files,
104110
additional_summary,
105-
dsn_increment,
106111
} => {
107112
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
108113

@@ -118,7 +123,10 @@ impl Operation {
118123
return Ok((None, Vec::new()));
119124
}
120125

121-
let data_files_iter = delete_files.iter().chain(data_files.iter());
126+
let data_files_iter = delete_files
127+
.iter()
128+
.chain(data_files.iter())
129+
.map(|f| &f.data_file);
122130

123131
let manifest_list_writer = if let Some(manifest_list_bytes) =
124132
prefetch_manifest_list(old_snapshot, &object_store)
@@ -146,14 +154,16 @@ impl Operation {
146154
delete_files
147155
.into_iter()
148156
.chain(data_files.into_iter())
149-
.map(|data_file| {
157+
.map(|dfi| {
150158
let mut builder = ManifestEntry::builder();
151159
builder
152160
.with_format_version(table_metadata.format_version)
153161
.with_status(Status::Added)
154-
.with_data_file(data_file);
155-
if let Some(dsn_increment) = dsn_increment {
156-
builder.with_sequence_number(table_metadata.last_sequence_number + (dsn_increment as i64));
162+
.with_data_file(dfi.data_file);
163+
if let Some(dsn_increment) = dfi.dsn_increment {
164+
builder.with_sequence_number(
165+
table_metadata.last_sequence_number + (dsn_increment as i64),
166+
);
157167
}
158168

159169
builder

0 commit comments

Comments
 (0)