Skip to content

Commit 8295d50

Browse files
authored
feat: impl metrics config (#488)
1 parent 3c5b00a commit 8295d50

File tree

8 files changed

+558
-26
lines changed

8 files changed

+558
-26
lines changed

src/iceberg/metrics_config.cc

Lines changed: 197 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,201 @@
1919

2020
#include "iceberg/metrics_config.h"
2121

22+
#include <charconv>
2223
#include <string>
2324
#include <unordered_map>
2425

2526
#include "iceberg/result.h"
2627
#include "iceberg/schema.h"
28+
#include "iceberg/sort_order.h"
29+
#include "iceberg/table.h"
2730
#include "iceberg/table_properties.h"
31+
#include "iceberg/util/checked_cast.h"
32+
#include "iceberg/util/type_util.h"
2833

2934
namespace iceberg {
3035

36+
namespace {
37+
38+
constexpr std::string_view kNoneName = "none";
39+
constexpr std::string_view kCountsName = "counts";
40+
constexpr std::string_view kFullName = "full";
41+
constexpr std::string_view kTruncatePrefix = "truncate(";
42+
constexpr int32_t kDefaultTruncateLength = 16;
43+
constexpr MetricsMode kDefaultMetricsMode = {.kind = MetricsMode::Kind::kTruncate,
44+
.length = kDefaultTruncateLength};
45+
46+
MetricsMode SortedColumnDefaultMode(MetricsMode default_mode) {
47+
if (default_mode.kind == MetricsMode::Kind::kNone ||
48+
default_mode.kind == MetricsMode::Kind::kCounts) {
49+
return kDefaultMetricsMode;
50+
} else {
51+
return default_mode;
52+
}
53+
}
54+
55+
int32_t MaxInferredColumns(const TableProperties& properties) {
56+
int32_t max_inferred_columns =
57+
properties.Get(TableProperties::kMetricsMaxInferredColumnDefaults);
58+
if (max_inferred_columns < 0) {
59+
// fallback to default
60+
return TableProperties::kMetricsMaxInferredColumnDefaults.value();
61+
}
62+
return max_inferred_columns;
63+
}
64+
65+
Result<MetricsMode> ParseMode(std::string_view mode, MetricsMode fallback) {
66+
return MetricsMode::FromString(mode).value_or(fallback);
67+
}
68+
69+
} // namespace
70+
71+
MetricsMode MetricsMode::None() { return {.kind = Kind::kNone}; }
72+
73+
MetricsMode MetricsMode::Counts() { return {.kind = Kind::kCounts}; }
74+
75+
MetricsMode MetricsMode::Full() { return {.kind = Kind::kFull}; }
76+
77+
Result<MetricsMode> MetricsMode::FromString(std::string_view mode) {
78+
if (StringUtils::EqualsIgnoreCase(mode, kNoneName)) {
79+
return MetricsMode::None();
80+
} else if (StringUtils::EqualsIgnoreCase(mode, kCountsName)) {
81+
return MetricsMode::Counts();
82+
} else if (StringUtils::EqualsIgnoreCase(mode, kFullName)) {
83+
return MetricsMode::Full();
84+
}
85+
86+
if (StringUtils::StartsWithIgnoreCase(mode, kTruncatePrefix) && mode.ends_with(")")) {
87+
int32_t length;
88+
auto [ptr, ec] = std::from_chars(mode.data() + 9 /* "truncate(" length */,
89+
mode.data() + mode.size() - 1, length);
90+
if (ec != std::errc{}) {
91+
return InvalidArgument("Invalid truncate mode: {}", mode);
92+
}
93+
if (length == kDefaultTruncateLength) {
94+
return kDefaultMetricsMode;
95+
}
96+
ICEBERG_PRECHECK(length > 0, "Truncate length should be positive.");
97+
return MetricsMode{.kind = Kind::kTruncate, .length = length};
98+
}
99+
return InvalidArgument("Invalid metrics mode: {}", mode);
100+
}
101+
102+
MetricsConfig::MetricsConfig(ColumnModeMap column_modes, MetricsMode default_mode)
103+
: column_modes_(std::move(column_modes)), default_mode_(default_mode) {}
104+
105+
const std::shared_ptr<MetricsConfig>& MetricsConfig::Default() {
106+
static const std::shared_ptr<MetricsConfig> kDefaultConfig(
107+
new MetricsConfig(/*column_modes=*/{}, kDefaultMetricsMode));
108+
return kDefaultConfig;
109+
}
110+
111+
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(const Table& table) {
112+
ICEBERG_ASSIGN_OR_RAISE(auto schema, table.schema());
113+
auto sort_order = table.sort_order();
114+
return MakeInternal(table.properties(), *schema,
115+
*sort_order.value_or(SortOrder::Unsorted()));
116+
}
117+
118+
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
119+
const TableProperties& props, const Schema& schema, const SortOrder& order) {
120+
ColumnModeMap column_modes;
121+
122+
MetricsMode default_mode = kDefaultMetricsMode;
123+
if (props.configs().contains(TableProperties::kDefaultWriteMetricsMode.key())) {
124+
std::string configured_metrics_mode =
125+
props.Get(TableProperties::kDefaultWriteMetricsMode);
126+
ICEBERG_ASSIGN_OR_RAISE(default_mode,
127+
ParseMode(configured_metrics_mode, kDefaultMetricsMode));
128+
} else {
129+
int32_t max_inferred_columns = MaxInferredColumns(props);
130+
GetProjectedIdsVisitor visitor(/*include_struct_ids=*/true);
131+
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
132+
auto projected_columns = static_cast<int32_t>(visitor.Finish().size());
133+
if (max_inferred_columns < projected_columns) {
134+
ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids,
135+
LimitFieldIds(schema, max_inferred_columns));
136+
for (auto id : limit_field_ids) {
137+
ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema.FindColumnNameById(id));
138+
ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in schema", id);
139+
column_modes[std::string(column_name.value())] = kDefaultMetricsMode;
140+
}
141+
// All other columns don't use metrics
142+
default_mode = MetricsMode::None();
143+
}
144+
}
145+
146+
// First set sorted column with sorted column default (can be overridden by user)
147+
auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
148+
auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order);
149+
for (const auto& sorted_column : sorted_columns) {
150+
column_modes[std::string(sorted_column)] = sorted_col_default_mode;
151+
}
152+
153+
// Handle user overrides of defaults
154+
for (const auto& prop : props.configs()) {
155+
if (prop.first.starts_with(TableProperties::kMetricModeColumnConfPrefix)) {
156+
std::string column_alias =
157+
prop.first.substr(TableProperties::kMetricModeColumnConfPrefix.size());
158+
ICEBERG_ASSIGN_OR_RAISE(auto mode, ParseMode(prop.second, default_mode));
159+
column_modes[std::move(column_alias)] = mode;
160+
}
161+
}
162+
163+
return std::shared_ptr<MetricsConfig>(
164+
new MetricsConfig(std::move(column_modes), default_mode));
165+
}
166+
167+
Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& schema,
168+
int32_t limit) {
169+
class Visitor {
170+
public:
171+
explicit Visitor(int32_t limit) : limit_(limit) {}
172+
173+
Status Visit(const Type& type) {
174+
if (type.is_nested()) {
175+
return VisitNested(internal::checked_cast<const NestedType&>(type));
176+
} else {
177+
return VisitPrimitive(internal::checked_cast<const PrimitiveType&>(type));
178+
}
179+
}
180+
181+
Status VisitNested(const NestedType& type) {
182+
for (const auto& field : type.fields()) {
183+
if (!ShouldContinue()) {
184+
break;
185+
}
186+
// TODO(zhuo.wang): variant type should also be handled here
187+
if (field.type()->is_primitive()) {
188+
ids_.insert(field.field_id());
189+
}
190+
}
191+
192+
for (const auto& field : type.fields()) {
193+
if (ShouldContinue()) {
194+
ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
195+
}
196+
}
197+
return {};
198+
}
199+
200+
Status VisitPrimitive(const PrimitiveType& type) { return {}; }
201+
202+
std::unordered_set<int32_t> Finish() const { return ids_; }
203+
204+
private:
205+
bool ShouldContinue() { return ids_.size() < limit_; }
206+
207+
private:
208+
std::unordered_set<int32_t> ids_;
209+
int32_t limit_;
210+
};
211+
212+
Visitor visitor(limit);
213+
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
214+
return visitor.Finish();
215+
}
216+
31217
Status MetricsConfig::VerifyReferencedColumns(
32218
const std::unordered_map<std::string, std::string>& updates, const Schema& schema) {
33219
for (const auto& [key, value] : updates) {
@@ -37,14 +223,19 @@ Status MetricsConfig::VerifyReferencedColumns(
37223
auto field_name =
38224
std::string_view(key).substr(TableProperties::kMetricModeColumnConfPrefix.size());
39225
ICEBERG_ASSIGN_OR_RAISE(auto field, schema.FindFieldByName(field_name));
40-
if (!field.has_value()) {
41-
return ValidationFailed(
42-
"Invalid metrics config, could not find column {} from table prop {} in "
43-
"schema {}",
44-
field_name, key, schema.ToString());
45-
}
226+
ICEBERG_CHECK(field.has_value(),
227+
"Invalid metrics config, could not find column {} from table prop {} "
228+
"in schema {}",
229+
field_name, key, schema.ToString());
46230
}
47231
return {};
48232
}
49233

234+
MetricsMode MetricsConfig::ColumnMode(std::string_view column_name) const {
235+
if (auto it = column_modes_.find(column_name); it != column_modes_.end()) {
236+
return it->second;
237+
}
238+
return default_mode_;
239+
}
240+
50241
} // namespace iceberg

src/iceberg/metrics_config.h

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,83 @@
2222
/// \file iceberg/metrics_config.h
2323
/// \brief Metrics configuration for Iceberg tables
2424

25+
#include <memory>
2526
#include <string>
27+
#include <string_view>
2628
#include <unordered_map>
29+
#include <unordered_set>
30+
#include <variant>
2731

2832
#include "iceberg/iceberg_export.h"
2933
#include "iceberg/result.h"
3034
#include "iceberg/type_fwd.h"
35+
#include "iceberg/util/string_util.h"
3136

3237
namespace iceberg {
3338

34-
/// \brief Configuration utilities for table metrics
39+
struct ICEBERG_EXPORT MetricsMode {
40+
public:
41+
enum class Kind : uint8_t {
42+
kNone,
43+
kCounts,
44+
kTruncate,
45+
kFull,
46+
};
47+
48+
static Result<MetricsMode> FromString(std::string_view mode);
49+
static MetricsMode None();
50+
static MetricsMode Counts();
51+
static MetricsMode Full();
52+
53+
Kind kind;
54+
std::variant<std::monostate, int32_t> length;
55+
};
56+
57+
/// \brief Configuration for collecting column metrics for an Iceberg table.
3558
class ICEBERG_EXPORT MetricsConfig {
3659
public:
60+
/// \brief Get the default metrics config.
61+
static const std::shared_ptr<MetricsConfig>& Default();
62+
63+
/// \brief Creates a metrics config from a table.
64+
static Result<std::shared_ptr<MetricsConfig>> Make(const Table& table);
65+
66+
/// \brief Get `limit` num of primitive field ids from schema
67+
static Result<std::unordered_set<int32_t>> LimitFieldIds(const Schema& schema,
68+
int32_t limit);
69+
3770
/// \brief Verify that all referenced columns are valid
3871
/// \param updates The updates to verify
3972
/// \param schema The schema to verify against
4073
/// \return OK if all referenced columns are valid
4174
static Status VerifyReferencedColumns(
4275
const std::unordered_map<std::string, std::string>& updates, const Schema& schema);
76+
77+
/// \brief Get the metrics mode for a specific column
78+
/// \param column_name The full name of the column
79+
/// \return The metrics mode for the column
80+
MetricsMode ColumnMode(std::string_view column_name) const;
81+
82+
private:
83+
using ColumnModeMap =
84+
std::unordered_map<std::string, MetricsMode, StringHash, StringEqual>;
85+
86+
MetricsConfig(ColumnModeMap column_modes, MetricsMode default_mode);
87+
88+
/// \brief Generate a MetricsConfig for all columns based on overrides, schema, and sort
89+
/// order.
90+
///
91+
/// \param props will be read for metrics overrides (write.metadata.metrics.column.*)
92+
/// and default(write.metadata.metrics.default)
93+
/// \param schema table schema
94+
/// \param order sort order columns, will be promoted to truncate(16)
95+
/// \return metrics configuration
96+
static Result<std::shared_ptr<MetricsConfig>> MakeInternal(const TableProperties& props,
97+
const Schema& schema,
98+
const SortOrder& order);
99+
100+
ColumnModeMap column_modes_;
101+
MetricsMode default_mode_;
43102
};
44103

45104
} // namespace iceberg

src/iceberg/sort_order.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,18 @@ Result<std::unique_ptr<SortOrder>> SortOrder::Make(int32_t sort_id,
132132
return std::unique_ptr<SortOrder>(new SortOrder(sort_id, std::move(fields)));
133133
}
134134

135+
std::unordered_set<std::string_view> SortOrder::OrderPreservingSortedColumns(
136+
const Schema& schema, const SortOrder& order) {
137+
return order.fields() | std::views::filter([&schema](const SortField& field) {
138+
return field.transform()->PreservesOrder();
139+
}) |
140+
std::views::transform([&schema](const SortField& field) {
141+
return schema.FindColumnNameById(field.source_id())
142+
.value_or(std::nullopt)
143+
.value_or("");
144+
}) |
145+
std::views::filter([](std::string_view name) { return !name.empty(); }) |
146+
std::ranges::to<std::unordered_set<std::string_view>>();
147+
}
148+
135149
} // namespace iceberg

src/iceberg/sort_order.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <cstdint>
2323
#include <memory>
2424
#include <span>
25+
#include <unordered_set>
2526
#include <vector>
2627

2728
#include "iceberg/iceberg_export.h"
@@ -91,6 +92,9 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
9192
static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
9293
std::vector<SortField> fields);
9394

95+
static std::unordered_set<std::string_view> OrderPreservingSortedColumns(
96+
const Schema& schema, const SortOrder& order);
97+
9498
private:
9599
/// \brief Constructs a SortOrder instance.
96100
/// \param order_id The sort order id.

0 commit comments

Comments
 (0)