Skip to content

Commit 20a961a

Browse files
authored
feat: add SnapshotSummaryBuilder (#515)
Implement SnapshotSummaryBuilder class based on Java's SnapshotSummary.Builder to provide functionality for building snapshot summaries for Iceberg tables.
1 parent f159675 commit 20a961a

5 files changed

Lines changed: 834 additions & 1 deletion

File tree

src/iceberg/manifest/manifest_entry.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
#include "iceberg/file_format.h"
3232
#include "iceberg/iceberg_export.h"
33-
#include "iceberg/partition_spec.h"
3433
#include "iceberg/result.h"
3534
#include "iceberg/row/partition_values.h"
3635
#include "iceberg/schema_field.h"
@@ -294,6 +293,11 @@ struct ICEBERG_EXPORT DataFile {
294293

295294
/// \brief Get the schema of the data file with the given partition type.
296295
static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);
296+
297+
/// \brief Check if this data file is a deletion vector.
298+
bool IsDeletionVector() const {
299+
return content == Content::kPositionDeletes && file_format == FileFormatType::kPuffin;
300+
}
297301
};
298302

299303
/// \brief A manifest is an immutable Avro file that lists data files or delete files,

src/iceberg/snapshot.cc

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,36 @@
2020
#include "iceberg/snapshot.h"
2121

2222
#include <memory>
23+
#include <sstream>
24+
#include <utility>
2325

2426
#include "iceberg/file_io.h"
27+
#include "iceberg/manifest/manifest_entry.h"
2528
#include "iceberg/manifest/manifest_list.h"
2629
#include "iceberg/manifest/manifest_reader.h"
2730
#include "iceberg/util/macros.h"
2831
#include "iceberg/util/string_util.h"
2932

3033
namespace iceberg {
3134

35+
namespace {
36+
37+
/// \brief Helper function to conditionally add a property to the summary
38+
template <typename T>
39+
void SetIf(bool condition, std::unordered_map<std::string, std::string>& builder,
40+
const std::string& property, T value) {
41+
if (condition) {
42+
if constexpr (std::is_same_v<T, const char*> || std::is_same_v<T, std::string> ||
43+
std::is_convertible_v<T, std::string_view>) {
44+
builder[property] = value;
45+
} else {
46+
builder[property] = std::to_string(value);
47+
}
48+
}
49+
}
50+
51+
} // namespace
52+
3253
bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
3354
return min_snapshots_to_keep == other.min_snapshots_to_keep &&
3455
max_snapshot_age_ms == other.max_snapshot_age_ms &&
@@ -255,4 +276,284 @@ Result<std::span<ManifestFile>> SnapshotCache::DeleteManifests(
255276
return std::span<ManifestFile>(cache.first.data() + delete_start, delete_count);
256277
}
257278

279+
// SnapshotSummaryBuilder::UpdateMetrics implementation
280+
281+
void SnapshotSummaryBuilder::UpdateMetrics::Clear() {
282+
added_size_ = 0;
283+
removed_size_ = 0;
284+
added_files_ = 0;
285+
removed_files_ = 0;
286+
added_eq_delete_files_ = 0;
287+
removed_eq_delete_files_ = 0;
288+
added_pos_delete_files_ = 0;
289+
removed_pos_delete_files_ = 0;
290+
added_delete_files_ = 0;
291+
removed_delete_files_ = 0;
292+
added_dvs_ = 0;
293+
removed_dvs_ = 0;
294+
added_records_ = 0;
295+
deleted_records_ = 0;
296+
added_pos_deletes_ = 0;
297+
removed_pos_deletes_ = 0;
298+
added_eq_deletes_ = 0;
299+
removed_eq_deletes_ = 0;
300+
trust_size_and_delete_counts_ = true;
301+
}
302+
303+
void SnapshotSummaryBuilder::UpdateMetrics::AddTo(
304+
std::unordered_map<std::string, std::string>& builder) const {
305+
SetIf(added_files_ > 0, builder, SnapshotSummaryFields::kAddedDataFiles, added_files_);
306+
SetIf(removed_files_ > 0, builder, SnapshotSummaryFields::kDeletedDataFiles,
307+
removed_files_);
308+
SetIf(added_eq_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedEqDeleteFiles,
309+
added_eq_delete_files_);
310+
SetIf(removed_eq_delete_files_ > 0, builder,
311+
SnapshotSummaryFields::kRemovedEqDeleteFiles, removed_eq_delete_files_);
312+
SetIf(added_pos_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedPosDeleteFiles,
313+
added_pos_delete_files_);
314+
SetIf(removed_pos_delete_files_ > 0, builder,
315+
SnapshotSummaryFields::kRemovedPosDeleteFiles, removed_pos_delete_files_);
316+
SetIf(added_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedDeleteFiles,
317+
added_delete_files_);
318+
SetIf(removed_delete_files_ > 0, builder, SnapshotSummaryFields::kRemovedDeleteFiles,
319+
removed_delete_files_);
320+
SetIf(added_dvs_ > 0, builder, SnapshotSummaryFields::kAddedDVs, added_dvs_);
321+
SetIf(removed_dvs_ > 0, builder, SnapshotSummaryFields::kRemovedDVs, removed_dvs_);
322+
SetIf(added_records_ > 0, builder, SnapshotSummaryFields::kAddedRecords,
323+
added_records_);
324+
SetIf(deleted_records_ > 0, builder, SnapshotSummaryFields::kDeletedRecords,
325+
deleted_records_);
326+
327+
if (trust_size_and_delete_counts_) {
328+
SetIf(added_size_ > 0, builder, SnapshotSummaryFields::kAddedFileSize, added_size_);
329+
SetIf(removed_size_ > 0, builder, SnapshotSummaryFields::kRemovedFileSize,
330+
removed_size_);
331+
SetIf(added_pos_deletes_ > 0, builder, SnapshotSummaryFields::kAddedPosDeletes,
332+
added_pos_deletes_);
333+
SetIf(removed_pos_deletes_ > 0, builder, SnapshotSummaryFields::kRemovedPosDeletes,
334+
removed_pos_deletes_);
335+
SetIf(added_eq_deletes_ > 0, builder, SnapshotSummaryFields::kAddedEqDeletes,
336+
added_eq_deletes_);
337+
SetIf(removed_eq_deletes_ > 0, builder, SnapshotSummaryFields::kRemovedEqDeletes,
338+
removed_eq_deletes_);
339+
}
340+
}
341+
342+
void SnapshotSummaryBuilder::UpdateMetrics::AddedFile(const DataFile& file) {
343+
added_size_ += file.file_size_in_bytes;
344+
345+
switch (file.content) {
346+
case DataFile::Content::kData:
347+
added_files_ += 1;
348+
added_records_ += file.record_count;
349+
break;
350+
case DataFile::Content::kPositionDeletes:
351+
if (file.IsDeletionVector()) {
352+
added_dvs_ += 1;
353+
} else {
354+
added_pos_delete_files_ += 1;
355+
}
356+
added_delete_files_ += 1;
357+
added_pos_deletes_ += file.record_count;
358+
break;
359+
case DataFile::Content::kEqualityDeletes:
360+
added_delete_files_ += 1;
361+
added_eq_delete_files_ += 1;
362+
added_eq_deletes_ += file.record_count;
363+
break;
364+
default:
365+
std::unreachable();
366+
}
367+
}
368+
369+
void SnapshotSummaryBuilder::UpdateMetrics::RemovedFile(const DataFile& file) {
370+
removed_size_ += file.file_size_in_bytes;
371+
372+
switch (file.content) {
373+
case DataFile::Content::kData:
374+
removed_files_ += 1;
375+
deleted_records_ += file.record_count;
376+
break;
377+
case DataFile::Content::kPositionDeletes:
378+
if (file.IsDeletionVector()) {
379+
removed_dvs_ += 1;
380+
} else {
381+
removed_pos_delete_files_ += 1;
382+
}
383+
removed_delete_files_ += 1;
384+
removed_pos_deletes_ += file.record_count;
385+
break;
386+
case DataFile::Content::kEqualityDeletes:
387+
removed_delete_files_ += 1;
388+
removed_eq_delete_files_ += 1;
389+
removed_eq_deletes_ += file.record_count;
390+
break;
391+
default:
392+
std::unreachable();
393+
}
394+
}
395+
396+
void SnapshotSummaryBuilder::UpdateMetrics::AddedManifest(const ManifestFile& manifest) {
397+
switch (manifest.content) {
398+
case ManifestContent::kData:
399+
added_files_ += manifest.added_files_count.value_or(0);
400+
added_records_ += manifest.added_rows_count.value_or(0);
401+
removed_files_ += manifest.deleted_files_count.value_or(0);
402+
deleted_records_ += manifest.deleted_rows_count.value_or(0);
403+
break;
404+
case ManifestContent::kDeletes:
405+
added_delete_files_ += manifest.added_files_count.value_or(0);
406+
removed_delete_files_ += manifest.deleted_files_count.value_or(0);
407+
trust_size_and_delete_counts_ = false;
408+
break;
409+
default:
410+
std::unreachable();
411+
}
412+
}
413+
414+
void SnapshotSummaryBuilder::UpdateMetrics::Merge(const UpdateMetrics& other) {
415+
added_files_ += other.added_files_;
416+
removed_files_ += other.removed_files_;
417+
added_eq_delete_files_ += other.added_eq_delete_files_;
418+
removed_eq_delete_files_ += other.removed_eq_delete_files_;
419+
added_pos_delete_files_ += other.added_pos_delete_files_;
420+
removed_pos_delete_files_ += other.removed_pos_delete_files_;
421+
added_dvs_ += other.added_dvs_;
422+
removed_dvs_ += other.removed_dvs_;
423+
added_delete_files_ += other.added_delete_files_;
424+
removed_delete_files_ += other.removed_delete_files_;
425+
added_size_ += other.added_size_;
426+
removed_size_ += other.removed_size_;
427+
added_records_ += other.added_records_;
428+
deleted_records_ += other.deleted_records_;
429+
added_pos_deletes_ += other.added_pos_deletes_;
430+
removed_pos_deletes_ += other.removed_pos_deletes_;
431+
added_eq_deletes_ += other.added_eq_deletes_;
432+
removed_eq_deletes_ += other.removed_eq_deletes_;
433+
trust_size_and_delete_counts_ =
434+
trust_size_and_delete_counts_ && other.trust_size_and_delete_counts_;
435+
}
436+
437+
// SnapshotSummaryBuilder implementation
438+
439+
void SnapshotSummaryBuilder::Clear() {
440+
partition_metrics_.clear();
441+
metrics_.Clear();
442+
deleted_duplicate_files_ = 0;
443+
trust_partition_metrics_ = true;
444+
}
445+
446+
void SnapshotSummaryBuilder::SetPartitionSummaryLimit(int32_t max) {
447+
max_changed_partitions_for_summaries_ = max;
448+
}
449+
450+
void SnapshotSummaryBuilder::IncrementDuplicateDeletes(int32_t increment) {
451+
deleted_duplicate_files_ += increment;
452+
}
453+
454+
Status SnapshotSummaryBuilder::AddedFile(const PartitionSpec& spec,
455+
const DataFile& file) {
456+
metrics_.AddedFile(file);
457+
ICEBERG_RETURN_UNEXPECTED(UpdatePartitions(spec, file, true));
458+
return {};
459+
}
460+
461+
Status SnapshotSummaryBuilder::DeletedFile(const PartitionSpec& spec,
462+
const DataFile& file) {
463+
metrics_.RemovedFile(file);
464+
ICEBERG_RETURN_UNEXPECTED(UpdatePartitions(spec, file, false));
465+
return {};
466+
}
467+
468+
void SnapshotSummaryBuilder::AddedManifest(const ManifestFile& manifest) {
469+
trust_partition_metrics_ = false;
470+
partition_metrics_.clear();
471+
metrics_.AddedManifest(manifest);
472+
}
473+
474+
void SnapshotSummaryBuilder::Set(const std::string& property, const std::string& value) {
475+
properties_[property] = value;
476+
}
477+
478+
void SnapshotSummaryBuilder::Merge(const SnapshotSummaryBuilder& other) {
479+
for (const auto& [key, value] : other.properties_) {
480+
properties_[key] = value;
481+
}
482+
metrics_.Merge(other.metrics_);
483+
484+
trust_partition_metrics_ = trust_partition_metrics_ && other.trust_partition_metrics_;
485+
if (trust_partition_metrics_) {
486+
for (const auto& [key, value] : other.partition_metrics_) {
487+
partition_metrics_[key].Merge(value);
488+
}
489+
} else {
490+
partition_metrics_.clear();
491+
}
492+
493+
deleted_duplicate_files_ += other.deleted_duplicate_files_;
494+
}
495+
496+
std::unordered_map<std::string, std::string> SnapshotSummaryBuilder::Build() const {
497+
std::unordered_map<std::string, std::string> builder;
498+
499+
// Copy custom summary properties
500+
builder.insert(properties_.begin(), properties_.end());
501+
502+
metrics_.AddTo(builder);
503+
504+
SetIf(deleted_duplicate_files_ > 0, builder,
505+
SnapshotSummaryFields::kDeletedDuplicatedFiles, deleted_duplicate_files_);
506+
507+
SetIf(trust_partition_metrics_, builder,
508+
SnapshotSummaryFields::kChangedPartitionCountProp, partition_metrics_.size());
509+
510+
// Add partition summaries if enabled
511+
if (trust_partition_metrics_ && max_changed_partitions_for_summaries_ >= 0 &&
512+
partition_metrics_.size() <=
513+
static_cast<size_t>(max_changed_partitions_for_summaries_)) {
514+
SetIf(!partition_metrics_.empty(), builder,
515+
SnapshotSummaryFields::kPartitionSummaryProp, "true");
516+
for (const auto& [key, metrics] : partition_metrics_) {
517+
if (!key.empty()) {
518+
builder[SnapshotSummaryFields::kChangedPartitionPrefix + key] =
519+
PartitionSummary(metrics);
520+
}
521+
}
522+
}
523+
524+
return builder;
525+
}
526+
527+
Status SnapshotSummaryBuilder::UpdatePartitions(const PartitionSpec& spec,
528+
const DataFile& file, bool is_addition) {
529+
if (trust_partition_metrics_) {
530+
ICEBERG_ASSIGN_OR_RAISE(std::string partition_path,
531+
spec.PartitionPath(file.partition));
532+
auto& part_metrics = partition_metrics_[partition_path];
533+
if (is_addition) {
534+
part_metrics.AddedFile(file);
535+
} else {
536+
part_metrics.RemovedFile(file);
537+
}
538+
}
539+
return {};
540+
}
541+
542+
std::string SnapshotSummaryBuilder::PartitionSummary(const UpdateMetrics& metrics) const {
543+
std::unordered_map<std::string, std::string> part_builder;
544+
metrics.AddTo(part_builder);
545+
546+
// Format as comma-separated key=value pairs
547+
std::ostringstream oss;
548+
bool first = true;
549+
for (const auto& [key, value] : part_builder) {
550+
if (!first) {
551+
oss << ",";
552+
}
553+
oss << key << "=" << value;
554+
first = false;
555+
}
556+
return oss.str();
557+
}
558+
258559
} // namespace iceberg

0 commit comments

Comments
 (0)