Skip to content

Commit 8dd3f71

Browse files
author
Innocent
committed
feat:metrics reporting integration
1 parent ae29c3d commit 8dd3f71

28 files changed

Lines changed: 1193 additions & 45 deletions

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
# under the License.
1717

1818
build/
19+
build_bundle/
20+
builddir/
1921
cmake-build/
2022
cmake-build-debug/
2123
cmake-build-release/

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <algorithm>
2323
#include <iterator>
2424

25+
#include "iceberg/metrics/metrics_reporters.h"
2526
#include "iceberg/table.h"
2627
#include "iceberg/table_identifier.h"
2728
#include "iceberg/table_metadata.h"
@@ -351,7 +352,15 @@ InMemoryCatalog::InMemoryCatalog(
351352
properties_(std::move(properties)),
352353
file_io_(std::move(file_io)),
353354
warehouse_location_(std::move(warehouse_location)),
354-
root_namespace_(std::make_unique<InMemoryNamespace>()) {}
355+
root_namespace_(std::make_unique<InMemoryNamespace>()) {
356+
auto it = properties_.find(std::string(kMetricsReporterImpl));
357+
if (it != properties_.end() && !it->second.empty() &&
358+
it->second != kMetricsReporterTypeNoop) {
359+
if (auto r = MetricsReporters::Load(properties_); r.has_value()) {
360+
reporter_ = std::shared_ptr<MetricsReporter>(std::move(r.value()));
361+
}
362+
}
363+
}
355364

356365
InMemoryCatalog::~InMemoryCatalog() = default;
357366

@@ -428,7 +437,8 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
428437
ICEBERG_RETURN_UNEXPECTED(
429438
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
430439
return Table::Make(identifier, std::move(table_metadata),
431-
std::move(metadata_file_location), file_io_, shared_from_this());
440+
std::move(metadata_file_location), file_io_, shared_from_this(),
441+
reporter_);
432442
}
433443

434444
Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
@@ -479,7 +489,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
479489
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);
480490

481491
return Table::Make(identifier, std::move(updated), std::move(new_metadata_location),
482-
file_io_, shared_from_this());
492+
file_io_, shared_from_this(), reporter_);
483493
}
484494

485495
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
@@ -500,7 +510,7 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
500510
TableMetadata::Make(*schema, *spec, *order, base_location, properties));
501511
ICEBERG_ASSIGN_OR_RAISE(
502512
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
503-
shared_from_this()));
513+
shared_from_this(), reporter_));
504514
return Transaction::Make(std::move(table), TransactionKind::kCreate);
505515
}
506516

@@ -537,7 +547,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
537547
ICEBERG_ASSIGN_OR_RAISE(auto metadata,
538548
TableMetadataUtil::Read(*file_io_, metadata_location));
539549
return Table::Make(identifier, std::move(metadata), std::move(metadata_location),
540-
file_io_, shared_from_this());
550+
file_io_, shared_from_this(), reporter_);
541551
}
542552

543553
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
@@ -557,7 +567,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
557567
return UnknownError("The registry failed.");
558568
}
559569
return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_,
560-
shared_from_this());
570+
shared_from_this(), reporter_);
561571
}
562572

563573
} // namespace iceberg

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919

2020
#pragma once
2121

22+
#include <memory>
2223
#include <shared_mutex>
2324

2425
#include "iceberg/catalog.h"
2526

2627
namespace iceberg {
2728

29+
class MetricsReporter;
30+
2831
/**
2932
* @brief An in-memory implementation of the Iceberg Catalog interface.
3033
*
@@ -106,6 +109,7 @@ class ICEBERG_EXPORT InMemoryCatalog
106109
std::string warehouse_location_;
107110
std::unique_ptr<class InMemoryNamespace> root_namespace_;
108111
mutable std::shared_mutex mutex_;
112+
std::shared_ptr<MetricsReporter> reporter_;
109113
};
110114

111115
} // namespace iceberg

src/iceberg/catalog/rest/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ set(ICEBERG_REST_SOURCES
3232
resource_paths.cc
3333
rest_catalog.cc
3434
rest_file_io.cc
35+
rest_metrics_reporter.cc
3536
rest_util.cc
3637
types.cc)
3738

src/iceberg/catalog/rest/catalog_properties.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
5555
inline static Entry<std::string> kNamespaceSeparator{"namespace-separator", "%1F"};
5656
/// \brief The snapshot loading mode (ALL or REFS).
5757
inline static Entry<std::string> kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"};
58+
/// \brief Whether to report metrics to the REST catalog server (default: true).
59+
///
60+
/// When true and the server advertises the ReportMetrics endpoint, RestCatalog
61+
/// automatically POSTs scan and commit reports to the per-table metrics endpoint.
62+
inline static Entry<std::string> kMetricsReportingEnabled{
63+
"rest-metrics-reporting-enabled", "true"};
5864
/// \brief The prefix for HTTP headers.
5965
inline static constexpr std::string_view kHeaderPrefix = "header.";
6066

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
#include "iceberg/catalog/rest/json_serde_internal.h"
3737
#include "iceberg/catalog/rest/resource_paths.h"
3838
#include "iceberg/catalog/rest/rest_file_io.h"
39+
#include "iceberg/catalog/rest/rest_metrics_reporter.h"
3940
#include "iceberg/catalog/rest/rest_util.h"
4041
#include "iceberg/catalog/rest/types.h"
4142
#include "iceberg/json_serde_internal.h"
43+
#include "iceberg/metrics/metrics_reporters.h"
4244
#include "iceberg/partition_spec.h"
4345
#include "iceberg/result.h"
4446
#include "iceberg/schema.h"
@@ -48,6 +50,7 @@
4850
#include "iceberg/table_update.h"
4951
#include "iceberg/transaction.h"
5052
#include "iceberg/util/macros.h"
53+
#include "iceberg/util/string_util.h"
5154

5255
namespace iceberg::rest {
5356

@@ -171,7 +174,7 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
171174
// Get snapshot loading mode
172175
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode());
173176

174-
auto client = std::make_unique<HttpClient>(final_config.ExtractHeaders());
177+
auto client = std::make_shared<HttpClient>(final_config.ExtractHeaders());
175178
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
176179
auth_manager->CatalogSession(*client, final_config.configs()));
177180

@@ -185,7 +188,7 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
185188
}
186189

187190
RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
188-
std::unique_ptr<HttpClient> client,
191+
std::shared_ptr<HttpClient> client,
189192
std::unique_ptr<ResourcePaths> paths,
190193
std::unordered_set<Endpoint> endpoints,
191194
std::unique_ptr<auth::AuthManager> auth_manager,
@@ -201,10 +204,33 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> f
201204
catalog_session_(std::move(catalog_session)),
202205
snapshot_mode_(snapshot_mode) {
203206
ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null");
207+
const auto& props = config_.configs();
208+
auto it = props.find(std::string(kMetricsReporterImpl));
209+
if (it != props.end() && !it->second.empty() &&
210+
it->second != kMetricsReporterTypeNoop) {
211+
if (auto r = MetricsReporters::Load(props); r.has_value()) {
212+
reporter_ = std::shared_ptr<MetricsReporter>(std::move(r.value()));
213+
}
214+
}
204215
}
205216

206217
std::string_view RestCatalog::name() const { return name_; }
207218

219+
std::shared_ptr<MetricsReporter> RestCatalog::MakeTableReporter(
220+
const TableIdentifier& identifier) const {
221+
auto enabled = config_.Get(RestCatalogProperties::kMetricsReportingEnabled);
222+
if (StringUtils::ToLower(enabled) == "true" &&
223+
supported_endpoints_.contains(Endpoint::ReportMetrics())) {
224+
auto path = paths_->Metrics(identifier);
225+
if (path.has_value()) {
226+
auto rest_reporter =
227+
std::make_shared<RestMetricsReporter>(client_, *path, catalog_session_);
228+
return MetricsReporters::Combine(reporter_, rest_reporter);
229+
}
230+
}
231+
return reporter_;
232+
}
233+
208234
Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
209235
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces());
210236
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
@@ -367,7 +393,8 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
367393
CreateTableInternal(identifier, schema, spec, order, location,
368394
properties, /*stage_create=*/false));
369395
return Table::Make(identifier, std::move(result.metadata),
370-
std::move(result.metadata_location), file_io_, shared_from_this());
396+
std::move(result.metadata_location), file_io_, shared_from_this(),
397+
MakeTableReporter(identifier));
371398
}
372399

373400
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -398,7 +425,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
398425

399426
return Table::Make(identifier, std::move(commit_response.metadata),
400427
std::move(commit_response.metadata_location), file_io_,
401-
shared_from_this());
428+
shared_from_this(), MakeTableReporter(identifier));
402429
}
403430

404431
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
@@ -409,10 +436,11 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
409436
ICEBERG_ASSIGN_OR_RAISE(auto result,
410437
CreateTableInternal(identifier, schema, spec, order, location,
411438
properties, /*stage_create=*/true));
412-
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
413-
StagedTable::Make(identifier, std::move(result.metadata),
414-
std::move(result.metadata_location), file_io_,
415-
shared_from_this()));
439+
ICEBERG_ASSIGN_OR_RAISE(
440+
auto staged_table,
441+
StagedTable::Make(identifier, std::move(result.metadata),
442+
std::move(result.metadata_location), file_io_, shared_from_this(),
443+
MakeTableReporter(identifier)));
416444
return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
417445
}
418446

@@ -480,9 +508,11 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
480508
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
481509
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
482510
/// FIXME: support per-table FileIO creation
511+
/// FIXME: support per-table auth session (currently uses catalog-level
512+
/// catalog_session_)
483513
return Table::Make(identifier, std::move(load_result.metadata),
484514
std::move(load_result.metadata_location), file_io_,
485-
shared_from_this());
515+
shared_from_this(), MakeTableReporter(identifier));
486516
}
487517

488518
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
@@ -505,7 +535,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
505535
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
506536
return Table::Make(identifier, std::move(load_result.metadata),
507537
std::move(load_result.metadata_location), file_io_,
508-
shared_from_this());
538+
shared_from_this(), MakeTableReporter(identifier));
509539
}
510540

511541
} // namespace iceberg::rest

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
#include "iceberg/catalog/rest/type_fwd.h"
3131
#include "iceberg/result.h"
3232

33+
namespace iceberg {
34+
class MetricsReporter;
35+
} // namespace iceberg
36+
3337
/// \file iceberg/catalog/rest/rest_catalog.h
3438
/// RestCatalog implementation for Iceberg REST API.
3539

@@ -101,14 +105,23 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
101105

102106
private:
103107
RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
104-
std::unique_ptr<HttpClient> client, std::unique_ptr<ResourcePaths> paths,
108+
std::shared_ptr<HttpClient> client, std::unique_ptr<ResourcePaths> paths,
105109
std::unordered_set<Endpoint> endpoints,
106110
std::unique_ptr<auth::AuthManager> auth_manager,
107111
std::shared_ptr<auth::AuthSession> catalog_session,
108112
SnapshotMode snapshot_mode);
109113

110114
Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
111115

116+
/// \brief Build the per-table metrics reporter.
117+
///
118+
/// When rest-metrics-reporting-enabled is true and the server advertises the
119+
/// ReportMetrics endpoint, returns a CompositeMetricsReporter combining configured
120+
/// reporter with a RestMetricsReporter targeting this table. Otherwise returns the
121+
/// configured reporter.
122+
std::shared_ptr<MetricsReporter> MakeTableReporter(
123+
const TableIdentifier& identifier) const;
124+
112125
Result<LoadTableResult> CreateTableInternal(
113126
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
114127
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
@@ -117,13 +130,14 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
117130

118131
RestCatalogProperties config_;
119132
std::shared_ptr<FileIO> file_io_;
120-
std::unique_ptr<HttpClient> client_;
133+
std::shared_ptr<HttpClient> client_;
121134
std::unique_ptr<ResourcePaths> paths_;
122135
std::string name_;
123136
std::unordered_set<Endpoint> supported_endpoints_;
124137
std::unique_ptr<auth::AuthManager> auth_manager_;
125138
std::shared_ptr<auth::AuthSession> catalog_session_;
126139
SnapshotMode snapshot_mode_;
140+
std::shared_ptr<MetricsReporter> reporter_;
127141
};
128142

129143
} // namespace iceberg::rest
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/catalog/rest/rest_metrics_reporter.h"
21+
22+
#include <cstdio>
23+
#include <utility>
24+
25+
#include <nlohmann/json.hpp>
26+
27+
#include "iceberg/catalog/rest/auth/auth_session.h"
28+
#include "iceberg/catalog/rest/error_handlers.h"
29+
#include "iceberg/catalog/rest/http_client.h"
30+
#include "iceberg/metrics/json_serde_internal.h"
31+
#include "iceberg/metrics/metrics_reporter.h"
32+
33+
namespace iceberg::rest {
34+
35+
namespace {
36+
37+
constexpr std::string_view kReportType = "report-type";
38+
constexpr std::string_view kScanReportType = "scan-report";
39+
constexpr std::string_view kCommitReportType = "commit-report";
40+
41+
} // namespace
42+
43+
RestMetricsReporter::RestMetricsReporter(std::shared_ptr<HttpClient> client,
44+
std::string metrics_endpoint,
45+
std::shared_ptr<auth::AuthSession> session)
46+
: client_(std::move(client)),
47+
metrics_endpoint_(std::move(metrics_endpoint)),
48+
session_(std::move(session)) {}
49+
50+
Status RestMetricsReporter::Report(const MetricsReport& report) {
51+
// Serialize the report variant to JSON.
52+
Result<nlohmann::json> json_result = std::visit(
53+
[](const auto& r) -> Result<nlohmann::json> { return ToJson(r); }, report);
54+
if (!json_result) {
55+
return {};
56+
}
57+
58+
// Inject "report-type" required by the REST spec (not included in core ToJson).
59+
auto& json = json_result.value();
60+
json[kReportType] =
61+
std::holds_alternative<ScanReport>(report) ? kScanReportType : kCommitReportType;
62+
63+
// POST to the metrics endpoint; suppress errors to match Java fire-and-forget behavior.
64+
std::ignore = client_->Post(metrics_endpoint_, json.dump(), /*headers=*/{},
65+
*DefaultErrorHandler::Instance(), *session_);
66+
return {};
67+
}
68+
69+
} // namespace iceberg::rest

0 commit comments

Comments
 (0)