Skip to content

Commit 6f1cdfd

Browse files
authored
feat: add update partition stats (#538)
1 parent c426aff commit 6f1cdfd

18 files changed

+607
-2
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ set(ICEBERG_SOURCES
9292
update/snapshot_update.cc
9393
update/update_location.cc
9494
update/update_partition_spec.cc
95+
update/update_partition_statistics.cc
9596
update/update_properties.cc
9697
update/update_schema.cc
9798
update/update_snapshot_reference.cc

src/iceberg/json_internal.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ constexpr std::string_view kActionRemoveProperties = "remove-properties";
192192
constexpr std::string_view kActionSetLocation = "set-location";
193193
constexpr std::string_view kActionSetStatistics = "set-statistics";
194194
constexpr std::string_view kActionRemoveStatistics = "remove-statistics";
195+
constexpr std::string_view kActionSetPartitionStatistics = "set-partition-statistics";
196+
constexpr std::string_view kActionRemovePartitionStatistics =
197+
"remove-partition-statistics";
195198

196199
// TableUpdate field constants
197200
constexpr std::string_view kUUID = "uuid";
@@ -1439,6 +1442,24 @@ nlohmann::json ToJson(const TableUpdate& update) {
14391442
json[kSnapshotId] = u.snapshot_id();
14401443
break;
14411444
}
1445+
case TableUpdate::Kind::kSetPartitionStatistics: {
1446+
const auto& u =
1447+
internal::checked_cast<const table::SetPartitionStatistics&>(update);
1448+
json[kAction] = kActionSetPartitionStatistics;
1449+
if (u.partition_statistics_file()) {
1450+
json[kPartitionStatistics] = ToJson(*u.partition_statistics_file());
1451+
} else {
1452+
json[kPartitionStatistics] = nlohmann::json::value_t::null;
1453+
}
1454+
break;
1455+
}
1456+
case TableUpdate::Kind::kRemovePartitionStatistics: {
1457+
const auto& u =
1458+
internal::checked_cast<const table::RemovePartitionStatistics&>(update);
1459+
json[kAction] = kActionRemovePartitionStatistics;
1460+
json[kSnapshotId] = u.snapshot_id();
1461+
break;
1462+
}
14421463
}
14431464
return json;
14441465
}
@@ -1628,6 +1649,18 @@ Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& j
16281649
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
16291650
return std::make_unique<table::RemoveStatistics>(snapshot_id);
16301651
}
1652+
if (action == kActionSetPartitionStatistics) {
1653+
ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_json,
1654+
GetJsonValue<nlohmann::json>(json, kPartitionStatistics));
1655+
ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_file,
1656+
PartitionStatisticsFileFromJson(partition_statistics_json));
1657+
return std::make_unique<table::SetPartitionStatistics>(
1658+
std::move(partition_statistics_file));
1659+
}
1660+
if (action == kActionRemovePartitionStatistics) {
1661+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
1662+
return std::make_unique<table::RemovePartitionStatistics>(snapshot_id);
1663+
}
16311664

16321665
return JsonParseError("Unknown table update action: {}", action);
16331666
}

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ iceberg_sources = files(
110110
'update/snapshot_update.cc',
111111
'update/update_location.cc',
112112
'update/update_partition_spec.cc',
113+
'update/update_partition_statistics.cc',
113114
'update/update_properties.cc',
114115
'update/update_schema.cc',
115116
'update/update_snapshot_reference.cc',

src/iceberg/table.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "iceberg/transaction.h"
3434
#include "iceberg/update/expire_snapshots.h"
3535
#include "iceberg/update/update_partition_spec.h"
36+
#include "iceberg/update/update_partition_statistics.h"
3637
#include "iceberg/update/update_properties.h"
3738
#include "iceberg/update/update_schema.h"
3839
#include "iceberg/update/update_statistics.h"
@@ -214,6 +215,13 @@ Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
214215
return transaction->NewUpdateStatistics();
215216
}
216217

218+
Result<std::shared_ptr<UpdatePartitionStatistics>> Table::NewUpdatePartitionStatistics() {
219+
ICEBERG_ASSIGN_OR_RAISE(
220+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
221+
/*auto_commit=*/true));
222+
return transaction->NewUpdatePartitionStatistics();
223+
}
224+
217225
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
218226
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
219227
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
156156
/// changes.
157157
virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
158158

159+
/// \brief Create a new UpdatePartitionStatistics to update partition statistics and
160+
/// commit the changes.
161+
virtual Result<std::shared_ptr<UpdatePartitionStatistics>>
162+
NewUpdatePartitionStatistics();
163+
159164
/// \brief Create a new UpdateLocation to update the table location and commit the
160165
/// changes.
161166
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();

src/iceberg/table_metadata.cc

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,9 @@ class TableMetadataBuilder::Impl {
623623
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
624624
Status SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);
625625
Status RemoveStatistics(int64_t snapshot_id);
626+
Status SetPartitionStatistics(
627+
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file);
628+
Status RemovePartitionStatistics(int64_t snapshot_id);
626629

627630
Result<std::unique_ptr<TableMetadata>> Build();
628631

@@ -1208,6 +1211,41 @@ Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
12081211
return {};
12091212
}
12101213

1214+
Status TableMetadataBuilder::Impl::SetPartitionStatistics(
1215+
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file) {
1216+
ICEBERG_PRECHECK(partition_statistics_file != nullptr,
1217+
"Cannot set null partition statistics file");
1218+
1219+
// Find and replace existing partition statistics for the same snapshot_id, or add new
1220+
// one
1221+
auto it = std::ranges::find_if(
1222+
metadata_.partition_statistics,
1223+
[snapshot_id = partition_statistics_file->snapshot_id](const auto& stat) {
1224+
return stat && stat->snapshot_id == snapshot_id;
1225+
});
1226+
1227+
if (it != metadata_.partition_statistics.end()) {
1228+
*it = partition_statistics_file;
1229+
} else {
1230+
metadata_.partition_statistics.push_back(partition_statistics_file);
1231+
}
1232+
1233+
changes_.push_back(std::make_unique<table::SetPartitionStatistics>(
1234+
std::move(partition_statistics_file)));
1235+
return {};
1236+
}
1237+
1238+
Status TableMetadataBuilder::Impl::RemovePartitionStatistics(int64_t snapshot_id) {
1239+
auto removed_count =
1240+
std::erase_if(metadata_.partition_statistics, [snapshot_id](const auto& stat) {
1241+
return stat && stat->snapshot_id == snapshot_id;
1242+
});
1243+
if (removed_count != 0) {
1244+
changes_.push_back(std::make_unique<table::RemovePartitionStatistics>(snapshot_id));
1245+
}
1246+
return {};
1247+
}
1248+
12111249
std::unordered_set<int64_t> TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
12121250
int64_t current_snapshot_id) const {
12131251
std::unordered_set<int64_t> added_snapshot_ids;
@@ -1636,12 +1674,15 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id
16361674

16371675
TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
16381676
const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file) {
1639-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1677+
ICEBERG_BUILDER_RETURN_IF_ERROR(
1678+
impl_->SetPartitionStatistics(partition_statistics_file));
1679+
return *this;
16401680
}
16411681

16421682
TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics(
16431683
int64_t snapshot_id) {
1644-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1684+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionStatistics(snapshot_id));
1685+
return *this;
16451686
}
16461687

16471688
TableMetadataBuilder& TableMetadataBuilder::SetProperties(

src/iceberg/table_update.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,4 +500,60 @@ std::unique_ptr<TableUpdate> RemoveStatistics::Clone() const {
500500
return std::make_unique<RemoveStatistics>(snapshot_id_);
501501
}
502502

503+
// SetPartitionStatistics
504+
505+
int64_t SetPartitionStatistics::snapshot_id() const {
506+
return partition_statistics_file_->snapshot_id;
507+
}
508+
509+
void SetPartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
510+
builder.SetPartitionStatistics(partition_statistics_file_);
511+
}
512+
513+
void SetPartitionStatistics::GenerateRequirements(TableUpdateContext& context) const {
514+
// SetPartitionStatistics doesn't generate any requirements
515+
}
516+
517+
bool SetPartitionStatistics::Equals(const TableUpdate& other) const {
518+
if (other.kind() != Kind::kSetPartitionStatistics) {
519+
return false;
520+
}
521+
const auto& other_set = internal::checked_cast<const SetPartitionStatistics&>(other);
522+
if (!partition_statistics_file_ != !other_set.partition_statistics_file_) {
523+
return false;
524+
}
525+
if (partition_statistics_file_ &&
526+
!(*partition_statistics_file_ == *other_set.partition_statistics_file_)) {
527+
return false;
528+
}
529+
return true;
530+
}
531+
532+
std::unique_ptr<TableUpdate> SetPartitionStatistics::Clone() const {
533+
return std::make_unique<SetPartitionStatistics>(partition_statistics_file_);
534+
}
535+
536+
// RemovePartitionStatistics
537+
538+
void RemovePartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
539+
builder.RemovePartitionStatistics(snapshot_id_);
540+
}
541+
542+
void RemovePartitionStatistics::GenerateRequirements(TableUpdateContext& context) const {
543+
// RemovePartitionStatistics doesn't generate any requirements
544+
}
545+
546+
bool RemovePartitionStatistics::Equals(const TableUpdate& other) const {
547+
if (other.kind() != Kind::kRemovePartitionStatistics) {
548+
return false;
549+
}
550+
const auto& other_remove =
551+
internal::checked_cast<const RemovePartitionStatistics&>(other);
552+
return snapshot_id_ == other_remove.snapshot_id_;
553+
}
554+
555+
std::unique_ptr<TableUpdate> RemovePartitionStatistics::Clone() const {
556+
return std::make_unique<RemovePartitionStatistics>(snapshot_id_);
557+
}
558+
503559
} // namespace iceberg::table

src/iceberg/table_update.h

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class ICEBERG_EXPORT TableUpdate {
6161
kSetLocation,
6262
kSetStatistics,
6363
kRemoveStatistics,
64+
kSetPartitionStatistics,
65+
kRemovePartitionStatistics,
6466
};
6567

6668
virtual ~TableUpdate();
@@ -558,6 +560,54 @@ class ICEBERG_EXPORT RemoveStatistics : public TableUpdate {
558560
int64_t snapshot_id_;
559561
};
560562

563+
/// \brief Represents setting partition statistics for a snapshot
564+
class ICEBERG_EXPORT SetPartitionStatistics : public TableUpdate {
565+
public:
566+
explicit SetPartitionStatistics(
567+
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file)
568+
: partition_statistics_file_(std::move(partition_statistics_file)) {}
569+
570+
int64_t snapshot_id() const;
571+
572+
const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file() const {
573+
return partition_statistics_file_;
574+
}
575+
576+
void ApplyTo(TableMetadataBuilder& builder) const override;
577+
578+
void GenerateRequirements(TableUpdateContext& context) const override;
579+
580+
Kind kind() const override { return Kind::kSetPartitionStatistics; }
581+
582+
bool Equals(const TableUpdate& other) const override;
583+
584+
std::unique_ptr<TableUpdate> Clone() const override;
585+
586+
private:
587+
std::shared_ptr<PartitionStatisticsFile> partition_statistics_file_;
588+
};
589+
590+
/// \brief Represents removing partition statistics for a snapshot
591+
class ICEBERG_EXPORT RemovePartitionStatistics : public TableUpdate {
592+
public:
593+
explicit RemovePartitionStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {}
594+
595+
int64_t snapshot_id() const { return snapshot_id_; }
596+
597+
void ApplyTo(TableMetadataBuilder& builder) const override;
598+
599+
void GenerateRequirements(TableUpdateContext& context) const override;
600+
601+
Kind kind() const override { return Kind::kRemovePartitionStatistics; }
602+
603+
bool Equals(const TableUpdate& other) const override;
604+
605+
std::unique_ptr<TableUpdate> Clone() const override;
606+
607+
private:
608+
int64_t snapshot_id_;
609+
};
610+
561611
} // namespace table
562612

563613
} // namespace iceberg

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ if(ICEBERG_BUILD_BUNDLE)
182182
transaction_test.cc
183183
update_location_test.cc
184184
update_partition_spec_test.cc
185+
update_partition_statistics_test.cc
185186
update_properties_test.cc
186187
update_schema_test.cc
187188
update_sort_order_test.cc

src/iceberg/test/json_internal_test.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,43 @@ TEST(JsonInternalTest, TableUpdateRemoveStatistics) {
613613
update);
614614
}
615615

616+
TEST(JsonInternalTest, TableUpdateSetPartitionStatistics) {
617+
auto partition_stats_file = std::make_shared<PartitionStatisticsFile>();
618+
partition_stats_file->snapshot_id = 123456789;
619+
partition_stats_file->path =
620+
"s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet";
621+
partition_stats_file->file_size_in_bytes = 2048;
622+
623+
table::SetPartitionStatistics update(partition_stats_file);
624+
nlohmann::json expected = R"({
625+
"action": "set-partition-statistics",
626+
"partition-statistics": {
627+
"snapshot-id": 123456789,
628+
"statistics-path": "s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet",
629+
"file-size-in-bytes": 2048
630+
}
631+
})"_json;
632+
633+
EXPECT_EQ(ToJson(update), expected);
634+
auto parsed = TableUpdateFromJson(expected);
635+
ASSERT_THAT(parsed, IsOk());
636+
EXPECT_EQ(*internal::checked_cast<table::SetPartitionStatistics*>(parsed.value().get()),
637+
update);
638+
}
639+
640+
TEST(JsonInternalTest, TableUpdateRemovePartitionStatistics) {
641+
table::RemovePartitionStatistics update(123456789);
642+
nlohmann::json expected =
643+
R"({"action":"remove-partition-statistics","snapshot-id":123456789})"_json;
644+
645+
EXPECT_EQ(ToJson(update), expected);
646+
auto parsed = TableUpdateFromJson(expected);
647+
ASSERT_THAT(parsed, IsOk());
648+
EXPECT_EQ(
649+
*internal::checked_cast<table::RemovePartitionStatistics*>(parsed.value().get()),
650+
update);
651+
}
652+
616653
TEST(JsonInternalTest, TableUpdateUnknownAction) {
617654
nlohmann::json json = R"({"action":"unknown-action"})"_json;
618655
auto result = TableUpdateFromJson(json);

0 commit comments

Comments
 (0)