3636#include " iceberg/catalog/rest/json_serde_internal.h"
3737#include " iceberg/catalog/rest/resource_paths.h"
3838#include " iceberg/catalog/rest/rest_file_io.h"
39+ #include " iceberg/catalog/rest/rest_metrics_reporter.h"
3940#include " iceberg/catalog/rest/rest_util.h"
4041#include " iceberg/catalog/rest/types.h"
4142#include " iceberg/json_serde_internal.h"
43+ #include " iceberg/metrics/metrics_reporters.h"
4244#include " iceberg/partition_spec.h"
4345#include " iceberg/result.h"
4446#include " iceberg/schema.h"
4850#include " iceberg/table_update.h"
4951#include " iceberg/transaction.h"
5052#include " iceberg/util/macros.h"
53+ #include " iceberg/util/string_util.h"
5154
5255namespace iceberg ::rest {
5356
@@ -171,7 +174,7 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
171174 // Get snapshot loading mode
172175 ICEBERG_ASSIGN_OR_RAISE (auto snapshot_mode, final_config.SnapshotLoadingMode ());
173176
174- auto client = std::make_unique <HttpClient>(final_config.ExtractHeaders ());
177+ auto client = std::make_shared <HttpClient>(final_config.ExtractHeaders ());
175178 ICEBERG_ASSIGN_OR_RAISE (auto catalog_session,
176179 auth_manager->CatalogSession (*client, final_config.configs ()));
177180
@@ -185,7 +188,7 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
185188}
186189
187190RestCatalog::RestCatalog (RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
188- std::unique_ptr <HttpClient> client,
191+ std::shared_ptr <HttpClient> client,
189192 std::unique_ptr<ResourcePaths> paths,
190193 std::unordered_set<Endpoint> endpoints,
191194 std::unique_ptr<auth::AuthManager> auth_manager,
@@ -201,10 +204,33 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> f
201204 catalog_session_(std::move(catalog_session)),
202205 snapshot_mode_(snapshot_mode) {
203206 ICEBERG_DCHECK (catalog_session_ != nullptr , " catalog_session must not be null" );
207+ const auto & props = config_.configs ();
208+ auto it = props.find (std::string (kMetricsReporterImpl ));
209+ if (it != props.end () && !it->second .empty () &&
210+ it->second != kMetricsReporterTypeNoop ) {
211+ if (auto r = MetricsReporters::Load (props); r.has_value ()) {
212+ reporter_ = std::shared_ptr<MetricsReporter>(std::move (r.value ()));
213+ }
214+ }
204215}
205216
206217std::string_view RestCatalog::name () const { return name_; }
207218
219+ std::shared_ptr<MetricsReporter> RestCatalog::MakeTableReporter (
220+ const TableIdentifier& identifier) const {
221+ auto enabled = config_.Get (RestCatalogProperties::kMetricsReportingEnabled );
222+ if (StringUtils::ToLower (enabled) == " true" &&
223+ supported_endpoints_.contains (Endpoint::ReportMetrics ())) {
224+ auto path = paths_->Metrics (identifier);
225+ if (path.has_value ()) {
226+ auto rest_reporter =
227+ std::make_shared<RestMetricsReporter>(client_, *path, catalog_session_);
228+ return MetricsReporters::Combine (reporter_, rest_reporter);
229+ }
230+ }
231+ return reporter_;
232+ }
233+
208234Result<std::vector<Namespace>> RestCatalog::ListNamespaces (const Namespace& ns) const {
209235 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::ListNamespaces ());
210236 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Namespaces ());
@@ -367,7 +393,8 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
367393 CreateTableInternal (identifier, schema, spec, order, location,
368394 properties, /* stage_create=*/ false ));
369395 return Table::Make (identifier, std::move (result.metadata ),
370- std::move (result.metadata_location ), file_io_, shared_from_this ());
396+ std::move (result.metadata_location ), file_io_, shared_from_this (),
397+ MakeTableReporter (identifier));
371398}
372399
373400Result<std::shared_ptr<Table>> RestCatalog::UpdateTable (
@@ -398,7 +425,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
398425
399426 return Table::Make (identifier, std::move (commit_response.metadata ),
400427 std::move (commit_response.metadata_location ), file_io_,
401- shared_from_this ());
428+ shared_from_this (), MakeTableReporter (identifier) );
402429}
403430
404431Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable (
@@ -409,10 +436,11 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
409436 ICEBERG_ASSIGN_OR_RAISE (auto result,
410437 CreateTableInternal (identifier, schema, spec, order, location,
411438 properties, /* stage_create=*/ true ));
412- ICEBERG_ASSIGN_OR_RAISE (auto staged_table,
413- StagedTable::Make (identifier, std::move (result.metadata ),
414- std::move (result.metadata_location ), file_io_,
415- shared_from_this ()));
439+ ICEBERG_ASSIGN_OR_RAISE (
440+ auto staged_table,
441+ StagedTable::Make (identifier, std::move (result.metadata ),
442+ std::move (result.metadata_location ), file_io_, shared_from_this (),
443+ MakeTableReporter (identifier)));
416444 return Transaction::Make (std::move (staged_table), TransactionKind::kCreate );
417445}
418446
@@ -480,9 +508,11 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
480508 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (body));
481509 ICEBERG_ASSIGN_OR_RAISE (auto load_result, LoadTableResultFromJson (json));
482510 // / FIXME: support per-table FileIO creation
511+ // / FIXME: support per-table auth session (currently uses catalog-level
512+ // / catalog_session_)
483513 return Table::Make (identifier, std::move (load_result.metadata ),
484514 std::move (load_result.metadata_location ), file_io_,
485- shared_from_this ());
515+ shared_from_this (), MakeTableReporter (identifier) );
486516}
487517
488518Result<std::shared_ptr<Table>> RestCatalog::RegisterTable (
@@ -505,7 +535,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
505535 ICEBERG_ASSIGN_OR_RAISE (auto load_result, LoadTableResultFromJson (json));
506536 return Table::Make (identifier, std::move (load_result.metadata ),
507537 std::move (load_result.metadata_location ), file_io_,
508- shared_from_this ());
538+ shared_from_this (), MakeTableReporter (identifier) );
509539}
510540
511541} // namespace iceberg::rest
0 commit comments