Skip to content

Commit fc14260

Browse files
authored
Merge branch 'JanKaul:main' into main
2 parents 9815427 + b162972 commit fc14260

3 files changed

Lines changed: 48 additions & 26 deletions

File tree

iceberg-rust/src/table/transaction/append.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::cmp::Ordering;
2-
3-
use iceberg_rust_spec::{manifest::ManifestEntry, manifest_list::ManifestListEntry};
2+
use std::collections::HashMap;
43
use smallvec::SmallVec;
4+
use iceberg_rust_spec::{
5+
manifest::ManifestEntry, manifest_list::ManifestListEntry, manifest::Content, manifest::DataFile};
56

67
use crate::{
78
error::Error,
@@ -189,3 +190,24 @@ pub(crate) fn select_manifest_unpartitioned(
189190
})
190191
.ok_or(Error::NotFound("Manifest for insert".to_owned()))
191192
}
193+
194+
195+
pub(crate) fn append_summary(files: &[DataFile]) -> Option<HashMap<String, String>> {
196+
if files.is_empty() {
197+
return None;
198+
}
199+
200+
let (mut added_data_files, mut added_records, mut added_files_size) = (0usize, 0i64, 0i64);
201+
202+
for file in files.iter().filter(|f| *f.content() == Content::Data) {
203+
added_data_files += 1;
204+
added_records += file.record_count();
205+
added_files_size += file.file_size_in_bytes();
206+
}
207+
208+
Some(HashMap::from([
209+
("added-files-size".into(), added_files_size.to_string()),
210+
("added-records".into(), added_records.to_string()),
211+
("added-data-files".into(), added_data_files.to_string()),
212+
]))
213+
}

iceberg-rust/src/table/transaction/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::collections::HashMap;
2020
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
2121

2222
use crate::{catalog::commit::CommitTable, error::Error, table::Table};
23+
use crate::table::transaction::append::append_summary;
2324

2425
use self::operation::Operation;
2526

@@ -116,15 +117,12 @@ impl<'table> TableTransaction<'table> {
116117
/// .await?;
117118
/// ```
118119
pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
120+
let summary = append_summary(&files);
121+
119122
self.operations
120123
.entry(APPEND_KEY.to_owned())
121124
.and_modify(|mut x| {
122-
if let Operation::Append {
123-
branch: _,
124-
data_files: old,
125-
delete_files: _,
126-
additional_summary: None,
127-
} = &mut x
125+
if let Operation::Append { data_files: old, ..} = &mut x
128126
{
129127
old.extend_from_slice(&files)
130128
}
@@ -133,7 +131,7 @@ impl<'table> TableTransaction<'table> {
133131
branch: self.branch.clone(),
134132
data_files: files,
135133
delete_files: Vec::new(),
136-
additional_summary: None,
134+
additional_summary: summary,
137135
});
138136
self
139137
}
@@ -346,6 +344,10 @@ impl<'table> TableTransaction<'table> {
346344
updates.extend(update);
347345
}
348346

347+
if updates.is_empty() {
348+
return Ok(());
349+
}
350+
349351
let new_table = catalog
350352
.clone()
351353
.update_table(CommitTable {

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,8 @@ impl Operation {
106106
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
107107

108108
let snapshot_operation = match (data_files.len(), delete_files.len()) {
109-
(0, 0) => Err(Error::InvalidFormat(
110-
"Empty data and delete files".to_string(),
111-
)),
112-
(_, 0) => Ok(SnapshotOperation::Append),
109+
(0, 0) => return Ok((None, Vec::new())),
110+
(_, 0) => Ok::<_, Error>(SnapshotOperation::Append),
113111
(0, _) => Ok(SnapshotOperation::Delete),
114112
(_, _) => Ok(SnapshotOperation::Overwrite),
115113
}?;
@@ -279,18 +277,17 @@ impl Operation {
279277
(selected_manifest_opt, selected_manifest_bytes_opt)
280278
{
281279
let manifest_bytes = manifest_bytes.await??;
282-
let manifest_reader = ManifestReader::new(&*manifest_bytes)?
283-
.map(|entry| {
284-
let mut entry = entry?;
285-
*entry.status_mut() = Status::Existing;
286-
if entry.sequence_number().is_none() {
287-
*entry.sequence_number_mut() = Some(manifest.sequence_number);
288-
}
289-
if entry.snapshot_id().is_none() {
290-
*entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
291-
}
292-
Ok(entry)
293-
});
280+
let manifest_reader = ManifestReader::new(&*manifest_bytes)?.map(|entry| {
281+
let mut entry = entry?;
282+
*entry.status_mut() = Status::Existing;
283+
if entry.sequence_number().is_none() {
284+
*entry.sequence_number_mut() = Some(manifest.sequence_number);
285+
}
286+
if entry.snapshot_id().is_none() {
287+
*entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
288+
}
289+
Ok(entry)
290+
});
294291

295292
split_datafiles(
296293
new_datafile_iter.chain(manifest_reader),
@@ -459,7 +456,8 @@ impl Operation {
459456
let bounding_partition_values = files
460457
.iter()
461458
.try_fold(None, |acc, x| {
462-
let node = partition_struct_to_vec(x.partition(), &partition_column_names)?;
459+
let node =
460+
partition_struct_to_vec(x.partition(), &partition_column_names)?;
463461
let Some(mut acc) = acc else {
464462
return Ok::<_, Error>(Some(Rectangle::new(node.clone(), node)));
465463
};

0 commit comments

Comments
 (0)