Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 19 additions & 73 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -32,23 +32,6 @@

namespace iceberg {

namespace {

/// \brief Helper function to conditionally add a property to the summary
template <typename T>
void SetIf(bool condition, std::unordered_map<std::string, std::string>& builder,
const std::string& property, T value) {
if (condition) {
if constexpr (std::is_same_v<T, const char*> || std::is_same_v<T, std::string> ||
std::is_convertible_v<T, std::string_view>) {
builder[property] = value;
} else {
builder[property] = std::to_string(value);
}
}
}

} // namespace

bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
return min_snapshots_to_keep == other.min_snapshots_to_keep &&
Expand Down Expand Up @@ -300,45 +283,6 @@
trust_size_and_delete_counts_ = true;
}

void SnapshotSummaryBuilder::UpdateMetrics::AddTo(
std::unordered_map<std::string, std::string>& builder) const {
SetIf(added_files_ > 0, builder, SnapshotSummaryFields::kAddedDataFiles, added_files_);
SetIf(removed_files_ > 0, builder, SnapshotSummaryFields::kDeletedDataFiles,
removed_files_);
SetIf(added_eq_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedEqDeleteFiles,
added_eq_delete_files_);
SetIf(removed_eq_delete_files_ > 0, builder,
SnapshotSummaryFields::kRemovedEqDeleteFiles, removed_eq_delete_files_);
SetIf(added_pos_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedPosDeleteFiles,
added_pos_delete_files_);
SetIf(removed_pos_delete_files_ > 0, builder,
SnapshotSummaryFields::kRemovedPosDeleteFiles, removed_pos_delete_files_);
SetIf(added_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedDeleteFiles,
added_delete_files_);
SetIf(removed_delete_files_ > 0, builder, SnapshotSummaryFields::kRemovedDeleteFiles,
removed_delete_files_);
SetIf(added_dvs_ > 0, builder, SnapshotSummaryFields::kAddedDVs, added_dvs_);
SetIf(removed_dvs_ > 0, builder, SnapshotSummaryFields::kRemovedDVs, removed_dvs_);
SetIf(added_records_ > 0, builder, SnapshotSummaryFields::kAddedRecords,
added_records_);
SetIf(deleted_records_ > 0, builder, SnapshotSummaryFields::kDeletedRecords,
deleted_records_);

if (trust_size_and_delete_counts_) {
SetIf(added_size_ > 0, builder, SnapshotSummaryFields::kAddedFileSize, added_size_);
SetIf(removed_size_ > 0, builder, SnapshotSummaryFields::kRemovedFileSize,
removed_size_);
SetIf(added_pos_deletes_ > 0, builder, SnapshotSummaryFields::kAddedPosDeletes,
added_pos_deletes_);
SetIf(removed_pos_deletes_ > 0, builder, SnapshotSummaryFields::kRemovedPosDeletes,
removed_pos_deletes_);
SetIf(added_eq_deletes_ > 0, builder, SnapshotSummaryFields::kAddedEqDeletes,
added_eq_deletes_);
SetIf(removed_eq_deletes_ > 0, builder, SnapshotSummaryFields::kRemovedEqDeletes,
removed_eq_deletes_);
}
}

void SnapshotSummaryBuilder::UpdateMetrics::AddedFile(const DataFile& file) {
added_size_ += file.file_size_in_bytes;

Expand Down Expand Up @@ -499,20 +443,24 @@
// Copy custom summary properties
builder.insert(properties_.begin(), properties_.end());

metrics_.AddTo(builder);
auto sink = [&builder](const std::string& key, const std::string& value) {
builder[key] = value;
};

metrics_.AddTo(sink);

SetIf(deleted_duplicate_files_ > 0, builder,
SnapshotSummaryFields::kDeletedDuplicatedFiles, deleted_duplicate_files_);
SetIf(deleted_duplicate_files_ > 0, sink, SnapshotSummaryFields::kDeletedDuplicatedFiles,
deleted_duplicate_files_);

SetIf(trust_partition_metrics_, builder,
SnapshotSummaryFields::kChangedPartitionCountProp, partition_metrics_.size());
SetIf(trust_partition_metrics_, sink, SnapshotSummaryFields::kChangedPartitionCountProp,
partition_metrics_.size());

// Add partition summaries if enabled
if (trust_partition_metrics_ && max_changed_partitions_for_summaries_ >= 0 &&
partition_metrics_.size() <=
static_cast<size_t>(max_changed_partitions_for_summaries_)) {
SetIf(!partition_metrics_.empty(), builder,
SnapshotSummaryFields::kPartitionSummaryProp, "true");
SetIf(!partition_metrics_.empty(), sink, SnapshotSummaryFields::kPartitionSummaryProp,
"true");
for (const auto& [key, metrics] : partition_metrics_) {
if (!key.empty()) {
builder[SnapshotSummaryFields::kChangedPartitionPrefix + key] =
Expand Down Expand Up @@ -540,20 +488,18 @@
}

std::string SnapshotSummaryBuilder::PartitionSummary(const UpdateMetrics& metrics) const {
std::unordered_map<std::string, std::string> part_builder;
metrics.AddTo(part_builder);

// Format as comma-separated key=value pairs
std::ostringstream oss;
std::string result;
bool first = true;
for (const auto& [key, value] : part_builder) {
metrics.AddTo([&result, &first](const std::string& key, const std::string& value) {
if (!first) {
oss << ",";
result += ",";
}
oss << key << "=" << value;
result += key;
result += "=";
result += value;
first = false;
}
return oss.str();
});
return result;
}

} // namespace iceberg
51 changes: 50 additions & 1 deletion src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -263,7 +263,44 @@
class UpdateMetrics {
public:
void Clear();
void AddTo(std::unordered_map<std::string, std::string>& builder) const;
template <typename Sink>
void AddTo(Sink&& sink) const {
SetIf(added_files_ > 0, sink, SnapshotSummaryFields::kAddedDataFiles, added_files_);
SetIf(removed_files_ > 0, sink, SnapshotSummaryFields::kDeletedDataFiles,
removed_files_);
SetIf(added_eq_delete_files_ > 0, sink, SnapshotSummaryFields::kAddedEqDeleteFiles,
added_eq_delete_files_);
SetIf(removed_eq_delete_files_ > 0, sink,
SnapshotSummaryFields::kRemovedEqDeleteFiles, removed_eq_delete_files_);
SetIf(added_pos_delete_files_ > 0, sink, SnapshotSummaryFields::kAddedPosDeleteFiles,
added_pos_delete_files_);
SetIf(removed_pos_delete_files_ > 0, sink,
SnapshotSummaryFields::kRemovedPosDeleteFiles, removed_pos_delete_files_);
SetIf(added_delete_files_ > 0, sink, SnapshotSummaryFields::kAddedDeleteFiles,
added_delete_files_);
SetIf(removed_delete_files_ > 0, sink, SnapshotSummaryFields::kRemovedDeleteFiles,
removed_delete_files_);
SetIf(added_dvs_ > 0, sink, SnapshotSummaryFields::kAddedDVs, added_dvs_);
SetIf(removed_dvs_ > 0, sink, SnapshotSummaryFields::kRemovedDVs, removed_dvs_);
SetIf(added_records_ > 0, sink, SnapshotSummaryFields::kAddedRecords,
added_records_);
SetIf(deleted_records_ > 0, sink, SnapshotSummaryFields::kDeletedRecords,
deleted_records_);

if (trust_size_and_delete_counts_) {
SetIf(added_size_ > 0, sink, SnapshotSummaryFields::kAddedFileSize, added_size_);
SetIf(removed_size_ > 0, sink, SnapshotSummaryFields::kRemovedFileSize,
removed_size_);
SetIf(added_pos_deletes_ > 0, sink, SnapshotSummaryFields::kAddedPosDeletes,
added_pos_deletes_);
SetIf(removed_pos_deletes_ > 0, sink, SnapshotSummaryFields::kRemovedPosDeletes,
removed_pos_deletes_);
SetIf(added_eq_deletes_ > 0, sink, SnapshotSummaryFields::kAddedEqDeletes,
added_eq_deletes_);
SetIf(removed_eq_deletes_ > 0, sink, SnapshotSummaryFields::kRemovedEqDeletes,
removed_eq_deletes_);
}
}
void AddedFile(const DataFile& file);
void RemovedFile(const DataFile& file);
void AddedManifest(const ManifestFile& manifest);
Expand Down Expand Up @@ -349,6 +386,18 @@
std::unordered_map<std::string, std::string> Build() const;

private:
template <typename Sink, typename T>
static void SetIf(bool condition, Sink&& sink, const std::string& property, T value) {
if (condition) {
if constexpr (std::is_same_v<T, const char*> || std::is_same_v<T, std::string> ||
std::is_convertible_v<T, std::string_view>) {
sink(property, std::string(value));
} else {
sink(property, std::to_string(value));
}
}
}

Status UpdatePartitions(const PartitionSpec& spec, const DataFile& file,
bool is_addition);
std::string PartitionSummary(const UpdateMetrics& metrics) const;
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ add_iceberg_test(util_test

add_iceberg_test(roaring_test SOURCES roaring_test.cc)

add_executable(snapshot_summary_benchmark snapshot_summary_benchmark.cc)
target_link_libraries(snapshot_summary_benchmark PRIVATE iceberg_static)

if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(avro_test
USE_BUNDLE
Expand Down
52 changes: 52 additions & 0 deletions src/iceberg/test/snapshot_summary_benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

#include <chrono>
#include <iostream>
#include <vector>
#include <string>

#define private public
#include "iceberg/snapshot.h"
#undef private
#include "iceberg/manifest/manifest_entry.h"

namespace iceberg {

void RunBenchmark() {
SnapshotSummaryBuilder builder;
SnapshotSummaryBuilder::UpdateMetrics metrics;

// Setup a complex UpdateMetrics manually since fields are public with #define private public
metrics.added_files_ = 10;
metrics.removed_files_ = 5;
metrics.added_records_ = 1000;
metrics.deleted_records_ = 500;
metrics.added_size_ = 1234567;
metrics.removed_size_ = 765432;
metrics.added_pos_delete_files_ = 2;
metrics.added_eq_delete_files_ = 1;
metrics.added_pos_deletes_ = 100;
metrics.added_eq_deletes_ = 50;
metrics.trust_size_and_delete_counts_ = true;

const int iterations = 100000;
auto start = std::chrono::high_resolution_clock::now();

volatile size_t total_len = 0;
for (int i = 0; i < iterations; ++i) {
std::string summary = builder.PartitionSummary(metrics);
total_len += summary.length();
}

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;

std::cout << "Time for " << iterations << " iterations: " << diff.count() << " seconds" << std::endl;
std::cout << "Average time: " << (diff.count() / iterations) * 1e6 << " microseconds" << std::endl;
}

}

Check warning on line 47 in src/iceberg/test/snapshot_summary_benchmark.cc

View workflow job for this annotation

GitHub Actions / cpp-linter

src/iceberg/test/snapshot_summary_benchmark.cc:47:1 [google-readability-namespace-comments]

namespace 'iceberg' not terminated with a closing comment

int main() {
iceberg::RunBenchmark();
return 0;
}
Loading