Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ set(ICEBERG_SOURCES
update/snapshot_update.cc
update/update_location.cc
update/update_partition_spec.cc
update/update_partition_statistics.cc
update/update_properties.cc
update/update_schema.cc
update/update_snapshot_reference.cc
Expand Down
33 changes: 33 additions & 0 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ constexpr std::string_view kActionRemoveProperties = "remove-properties";
constexpr std::string_view kActionSetLocation = "set-location";
constexpr std::string_view kActionSetStatistics = "set-statistics";
constexpr std::string_view kActionRemoveStatistics = "remove-statistics";
constexpr std::string_view kActionSetPartitionStatistics = "set-partition-statistics";
constexpr std::string_view kActionRemovePartitionStatistics =
"remove-partition-statistics";

// TableUpdate field constants
constexpr std::string_view kUUID = "uuid";
Expand Down Expand Up @@ -1439,6 +1442,24 @@ nlohmann::json ToJson(const TableUpdate& update) {
json[kSnapshotId] = u.snapshot_id();
break;
}
case TableUpdate::Kind::kSetPartitionStatistics: {
const auto& u =
internal::checked_cast<const table::SetPartitionStatistics&>(update);
json[kAction] = kActionSetPartitionStatistics;
if (u.partition_statistics_file()) {
json[kPartitionStatistics] = ToJson(*u.partition_statistics_file());
} else {
json[kPartitionStatistics] = nlohmann::json::value_t::null;
}
break;
}
case TableUpdate::Kind::kRemovePartitionStatistics: {
const auto& u =
internal::checked_cast<const table::RemovePartitionStatistics&>(update);
json[kAction] = kActionRemovePartitionStatistics;
json[kSnapshotId] = u.snapshot_id();
break;
}
}
return json;
}
Expand Down Expand Up @@ -1628,6 +1649,18 @@ Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& j
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
return std::make_unique<table::RemoveStatistics>(snapshot_id);
}
if (action == kActionSetPartitionStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_json,
GetJsonValue<nlohmann::json>(json, kPartitionStatistics));
ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_file,
PartitionStatisticsFileFromJson(partition_statistics_json));
return std::make_unique<table::SetPartitionStatistics>(
std::move(partition_statistics_file));
}
if (action == kActionRemovePartitionStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
return std::make_unique<table::RemovePartitionStatistics>(snapshot_id);
}

return JsonParseError("Unknown table update action: {}", action);
}
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ iceberg_sources = files(
'update/snapshot_update.cc',
'update/update_location.cc',
'update/update_partition_spec.cc',
'update/update_partition_statistics.cc',
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_snapshot_reference.cc',
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "iceberg/transaction.h"
#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_partition_statistics.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_statistics.h"
Expand Down Expand Up @@ -214,6 +215,13 @@ Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
return transaction->NewUpdateStatistics();
}

Result<std::shared_ptr<UpdatePartitionStatistics>> Table::NewUpdatePartitionStatistics() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewUpdatePartitionStatistics();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();

/// \brief Create a new UpdatePartitionStatistics to update partition statistics and
/// commit the changes.
virtual Result<std::shared_ptr<UpdatePartitionStatistics>>
NewUpdatePartitionStatistics();

/// \brief Create a new UpdateLocation to update the table location and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
Expand Down
45 changes: 43 additions & 2 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,9 @@ class TableMetadataBuilder::Impl {
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
Status SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);
Status RemoveStatistics(int64_t snapshot_id);
Status SetPartitionStatistics(
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file);
Status RemovePartitionStatistics(int64_t snapshot_id);

Result<std::unique_ptr<TableMetadata>> Build();

Expand Down Expand Up @@ -1208,6 +1211,41 @@ Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
return {};
}

Status TableMetadataBuilder::Impl::SetPartitionStatistics(
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file) {
ICEBERG_PRECHECK(partition_statistics_file != nullptr,
Comment thread
wgtmac marked this conversation as resolved.
"Cannot set null partition statistics file");

// Find and replace existing partition statistics for the same snapshot_id, or add new
// one
auto it = std::ranges::find_if(
metadata_.partition_statistics,
[snapshot_id = partition_statistics_file->snapshot_id](const auto& stat) {
return stat && stat->snapshot_id == snapshot_id;
});

if (it != metadata_.partition_statistics.end()) {
*it = partition_statistics_file;
} else {
metadata_.partition_statistics.push_back(partition_statistics_file);
}

changes_.push_back(std::make_unique<table::SetPartitionStatistics>(
std::move(partition_statistics_file)));
return {};
}

Status TableMetadataBuilder::Impl::RemovePartitionStatistics(int64_t snapshot_id) {
auto removed_count =
std::erase_if(metadata_.partition_statistics, [snapshot_id](const auto& stat) {
return stat && stat->snapshot_id == snapshot_id;
});
if (removed_count != 0) {
changes_.push_back(std::make_unique<table::RemovePartitionStatistics>(snapshot_id));
}
return {};
}

std::unordered_set<int64_t> TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
int64_t current_snapshot_id) const {
std::unordered_set<int64_t> added_snapshot_ids;
Expand Down Expand Up @@ -1636,12 +1674,15 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id

TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(
impl_->SetPartitionStatistics(partition_statistics_file));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics(
int64_t snapshot_id) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionStatistics(snapshot_id));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::SetProperties(
Expand Down
56 changes: 56 additions & 0 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,60 @@ std::unique_ptr<TableUpdate> RemoveStatistics::Clone() const {
return std::make_unique<RemoveStatistics>(snapshot_id_);
}

// SetPartitionStatistics

int64_t SetPartitionStatistics::snapshot_id() const {
return partition_statistics_file_->snapshot_id;
}

void SetPartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
builder.SetPartitionStatistics(partition_statistics_file_);
}

void SetPartitionStatistics::GenerateRequirements(TableUpdateContext& context) const {
// SetPartitionStatistics doesn't generate any requirements
}

bool SetPartitionStatistics::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetPartitionStatistics) {
return false;
}
const auto& other_set = internal::checked_cast<const SetPartitionStatistics&>(other);
if (!partition_statistics_file_ != !other_set.partition_statistics_file_) {
return false;
}
if (partition_statistics_file_ &&
!(*partition_statistics_file_ == *other_set.partition_statistics_file_)) {
return false;
}
return true;
}

std::unique_ptr<TableUpdate> SetPartitionStatistics::Clone() const {
return std::make_unique<SetPartitionStatistics>(partition_statistics_file_);
}

// RemovePartitionStatistics

void RemovePartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
builder.RemovePartitionStatistics(snapshot_id_);
}

void RemovePartitionStatistics::GenerateRequirements(TableUpdateContext& context) const {
// RemovePartitionStatistics doesn't generate any requirements
}

bool RemovePartitionStatistics::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemovePartitionStatistics) {
return false;
}
const auto& other_remove =
internal::checked_cast<const RemovePartitionStatistics&>(other);
return snapshot_id_ == other_remove.snapshot_id_;
}

std::unique_ptr<TableUpdate> RemovePartitionStatistics::Clone() const {
return std::make_unique<RemovePartitionStatistics>(snapshot_id_);
}

} // namespace iceberg::table
50 changes: 50 additions & 0 deletions src/iceberg/table_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class ICEBERG_EXPORT TableUpdate {
kSetLocation,
kSetStatistics,
kRemoveStatistics,
kSetPartitionStatistics,
kRemovePartitionStatistics,
};

virtual ~TableUpdate();
Expand Down Expand Up @@ -558,6 +560,54 @@ class ICEBERG_EXPORT RemoveStatistics : public TableUpdate {
int64_t snapshot_id_;
};

/// \brief Represents setting partition statistics for a snapshot
class ICEBERG_EXPORT SetPartitionStatistics : public TableUpdate {
public:
explicit SetPartitionStatistics(
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file)
: partition_statistics_file_(std::move(partition_statistics_file)) {}

int64_t snapshot_id() const;

const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file() const {
return partition_statistics_file_;
}

void ApplyTo(TableMetadataBuilder& builder) const override;

void GenerateRequirements(TableUpdateContext& context) const override;

Kind kind() const override { return Kind::kSetPartitionStatistics; }

bool Equals(const TableUpdate& other) const override;

std::unique_ptr<TableUpdate> Clone() const override;

private:
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file_;
};

/// \brief Represents removing partition statistics for a snapshot
class ICEBERG_EXPORT RemovePartitionStatistics : public TableUpdate {
public:
explicit RemovePartitionStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {}

int64_t snapshot_id() const { return snapshot_id_; }

void ApplyTo(TableMetadataBuilder& builder) const override;

void GenerateRequirements(TableUpdateContext& context) const override;

Kind kind() const override { return Kind::kRemovePartitionStatistics; }

bool Equals(const TableUpdate& other) const override;

std::unique_ptr<TableUpdate> Clone() const override;

private:
int64_t snapshot_id_;
};

} // namespace table

} // namespace iceberg
3 changes: 2 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ if(ICEBERG_BUILD_BUNDLE)
update_properties_test.cc
update_schema_test.cc
update_sort_order_test.cc
update_statistics_test.cc)
update_statistics_test.cc
update_partition_statistics_test.cc)
Comment thread
HeartLinked marked this conversation as resolved.
Outdated

add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)

Expand Down
37 changes: 37 additions & 0 deletions src/iceberg/test/json_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,43 @@ TEST(JsonInternalTest, TableUpdateRemoveStatistics) {
update);
}

TEST(JsonInternalTest, TableUpdateSetPartitionStatistics) {
auto partition_stats_file = std::make_shared<PartitionStatisticsFile>();
partition_stats_file->snapshot_id = 123456789;
partition_stats_file->path =
"s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet";
partition_stats_file->file_size_in_bytes = 2048;

table::SetPartitionStatistics update(partition_stats_file);
nlohmann::json expected = R"({
"action": "set-partition-statistics",
"partition-statistics": {
"snapshot-id": 123456789,
"statistics-path": "s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet",
"file-size-in-bytes": 2048
}
})"_json;

EXPECT_EQ(ToJson(update), expected);
auto parsed = TableUpdateFromJson(expected);
ASSERT_THAT(parsed, IsOk());
EXPECT_EQ(*internal::checked_cast<table::SetPartitionStatistics*>(parsed.value().get()),
update);
}

TEST(JsonInternalTest, TableUpdateRemovePartitionStatistics) {
table::RemovePartitionStatistics update(123456789);
nlohmann::json expected =
R"({"action":"remove-partition-statistics","snapshot-id":123456789})"_json;

EXPECT_EQ(ToJson(update), expected);
auto parsed = TableUpdateFromJson(expected);
ASSERT_THAT(parsed, IsOk());
EXPECT_EQ(
*internal::checked_cast<table::RemovePartitionStatistics*>(parsed.value().get()),
update);
}

TEST(JsonInternalTest, TableUpdateUnknownAction) {
nlohmann::json json = R"({"action":"unknown-action"})"_json;
auto result = TableUpdateFromJson(json);
Expand Down
Loading
Loading