Skip to content

Commit 47a885d

Browse files
author
Innocent
committed
only focus on metrics definition and core components
1 parent d729831 commit 47a885d

30 files changed

Lines changed: 1647 additions & 277 deletions

src/iceberg/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ set(ICEBERG_SOURCES
5858
manifest/v3_metadata.cc
5959
metadata_columns.cc
6060
metrics_config.cc
61+
metrics/commit_report.cc
62+
metrics/counter.cc
63+
metrics/json_serde.cc
64+
metrics/metrics_context.cc
6165
metrics/metrics_reporters.cc
66+
metrics/scan_report.cc
67+
metrics/timer.cc
6268
name_mapping.cc
6369
partition_field.cc
6470
partition_spec.cc

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <algorithm>
2323
#include <iterator>
2424

25-
#include "iceberg/metrics/metrics_reporters.h"
2625
#include "iceberg/table.h"
2726
#include "iceberg/table_identifier.h"
2827
#include "iceberg/table_metadata.h"
@@ -352,12 +351,7 @@ InMemoryCatalog::InMemoryCatalog(
352351
properties_(std::move(properties)),
353352
file_io_(std::move(file_io)),
354353
warehouse_location_(std::move(warehouse_location)),
355-
root_namespace_(std::make_unique<InMemoryNamespace>()) {
356-
auto reporter_result = MetricsReporters::Load(properties_);
357-
if (reporter_result.has_value()) {
358-
reporter_ = std::move(reporter_result.value());
359-
}
360-
}
354+
root_namespace_(std::make_unique<InMemoryNamespace>()) {}
361355

362356
InMemoryCatalog::~InMemoryCatalog() = default;
363357

@@ -434,8 +428,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
434428
ICEBERG_RETURN_UNEXPECTED(
435429
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
436430
return Table::Make(identifier, std::move(table_metadata),
437-
std::move(metadata_file_location), file_io_, shared_from_this(),
438-
reporter_);
431+
std::move(metadata_file_location), file_io_, shared_from_this());
439432
}
440433

441434
Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
@@ -486,7 +479,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
486479
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);
487480

488481
return Table::Make(identifier, std::move(updated), std::move(new_metadata_location),
489-
file_io_, shared_from_this(), reporter_);
482+
file_io_, shared_from_this());
490483
}
491484

492485
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
@@ -507,7 +500,7 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
507500
TableMetadata::Make(*schema, *spec, *order, base_location, properties));
508501
ICEBERG_ASSIGN_OR_RAISE(
509502
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
510-
shared_from_this(), reporter_));
503+
shared_from_this()));
511504
return Transaction::Make(std::move(table), TransactionKind::kCreate);
512505
}
513506

@@ -544,7 +537,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
544537
ICEBERG_ASSIGN_OR_RAISE(auto metadata,
545538
TableMetadataUtil::Read(*file_io_, metadata_location));
546539
return Table::Make(identifier, std::move(metadata), std::move(metadata_location),
547-
file_io_, shared_from_this(), reporter_);
540+
file_io_, shared_from_this());
548541
}
549542

550543
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
@@ -564,7 +557,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
564557
return UnknownError("The registry failed.");
565558
}
566559
return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_,
567-
shared_from_this(), reporter_);
560+
shared_from_this());
568561
}
569562

570563
} // namespace iceberg

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <shared_mutex>
2323

2424
#include "iceberg/catalog.h"
25-
#include "iceberg/metrics/metrics_reporter.h"
2625

2726
namespace iceberg {
2827

@@ -106,7 +105,6 @@ class ICEBERG_EXPORT InMemoryCatalog
106105
std::shared_ptr<FileIO> file_io_;
107106
std::string warehouse_location_;
108107
std::unique_ptr<class InMemoryNamespace> root_namespace_;
109-
std::shared_ptr<MetricsReporter> reporter_;
110108
mutable std::shared_mutex mutex_;
111109
};
112110

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
#include "iceberg/catalog/rest/rest_util.h"
3838
#include "iceberg/catalog/rest/types.h"
3939
#include "iceberg/json_serde_internal.h"
40-
#include "iceberg/metrics/metrics_reporters.h"
4140
#include "iceberg/partition_spec.h"
4241
#include "iceberg/result.h"
4342
#include "iceberg/schema.h"
@@ -169,17 +168,10 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
169168
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
170169
auth_manager->CatalogSession(*client, final_config.configs()));
171170

172-
// Load metrics reporter from catalog properties
173-
std::shared_ptr<MetricsReporter> reporter;
174-
auto reporter_result = MetricsReporters::Load(final_config.configs());
175-
if (reporter_result.has_value()) {
176-
reporter = std::move(reporter_result.value());
177-
}
178-
179171
return std::shared_ptr<RestCatalog>(
180172
new RestCatalog(std::move(final_config), std::move(file_io), std::move(client),
181173
std::move(paths), std::move(endpoints), std::move(auth_manager),
182-
std::move(catalog_session), std::move(reporter), snapshot_mode));
174+
std::move(catalog_session), snapshot_mode));
183175
}
184176

185177
RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
@@ -188,7 +180,6 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> f
188180
std::unordered_set<Endpoint> endpoints,
189181
std::unique_ptr<auth::AuthManager> auth_manager,
190182
std::shared_ptr<auth::AuthSession> catalog_session,
191-
std::shared_ptr<MetricsReporter> reporter,
192183
SnapshotMode snapshot_mode)
193184
: config_(std::move(config)),
194185
file_io_(std::move(file_io)),
@@ -198,7 +189,6 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> f
198189
supported_endpoints_(std::move(endpoints)),
199190
auth_manager_(std::move(auth_manager)),
200191
catalog_session_(std::move(catalog_session)),
201-
reporter_(std::move(reporter)),
202192
snapshot_mode_(snapshot_mode) {
203193
ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null");
204194
}
@@ -365,8 +355,7 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
365355
CreateTableInternal(identifier, schema, spec, order, location,
366356
properties, /*stage_create=*/false));
367357
return Table::Make(identifier, std::move(result.metadata),
368-
std::move(result.metadata_location), file_io_, shared_from_this(),
369-
reporter_);
358+
std::move(result.metadata_location), file_io_, shared_from_this());
370359
}
371360

372361
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -397,7 +386,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
397386

398387
return Table::Make(identifier, std::move(commit_response.metadata),
399388
std::move(commit_response.metadata_location), file_io_,
400-
shared_from_this(), reporter_);
389+
shared_from_this());
401390
}
402391

403392
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
@@ -411,7 +400,7 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
411400
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
412401
StagedTable::Make(identifier, std::move(result.metadata),
413402
std::move(result.metadata_location), file_io_,
414-
shared_from_this(), reporter_));
403+
shared_from_this()));
415404
return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
416405
}
417406

@@ -480,7 +469,7 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
480469
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
481470
return Table::Make(identifier, std::move(load_result.metadata),
482471
std::move(load_result.metadata_location), file_io_,
483-
shared_from_this(), reporter_);
472+
shared_from_this());
484473
}
485474

486475
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
@@ -503,7 +492,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
503492
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
504493
return Table::Make(identifier, std::move(load_result.metadata),
505494
std::move(load_result.metadata_location), file_io_,
506-
shared_from_this(), reporter_);
495+
shared_from_this());
507496
}
508497

509498
} // namespace iceberg::rest

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
#include "iceberg/catalog/rest/endpoint.h"
2929
#include "iceberg/catalog/rest/iceberg_rest_export.h"
3030
#include "iceberg/catalog/rest/type_fwd.h"
31-
#include "iceberg/metrics/metrics_reporter.h"
3231
#include "iceberg/result.h"
3332

3433
/// \file iceberg/catalog/rest/rest_catalog.h
@@ -111,7 +110,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
111110
std::unordered_set<Endpoint> endpoints,
112111
std::unique_ptr<auth::AuthManager> auth_manager,
113112
std::shared_ptr<auth::AuthSession> catalog_session,
114-
std::shared_ptr<MetricsReporter> reporter, SnapshotMode snapshot_mode);
113+
SnapshotMode snapshot_mode);
115114

116115
Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
117116

@@ -129,7 +128,6 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
129128
std::unordered_set<Endpoint> supported_endpoints_;
130129
std::unique_ptr<auth::AuthManager> auth_manager_;
131130
std::shared_ptr<auth::AuthSession> catalog_session_;
132-
std::shared_ptr<MetricsReporter> reporter_;
133131
SnapshotMode snapshot_mode_;
134132
};
135133

src/iceberg/meson.build

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,13 @@ iceberg_sources = files(
7575
'manifest/v2_metadata.cc',
7676
'manifest/v3_metadata.cc',
7777
'metadata_columns.cc',
78+
'metrics/commit_report.cc',
79+
'metrics/counter.cc',
80+
'metrics/json_serde.cc',
81+
'metrics/metrics_context.cc',
7882
'metrics/metrics_reporters.cc',
83+
'metrics/scan_report.cc',
84+
'metrics/timer.cc',
7985
'metrics_config.cc',
8086
'name_mapping.cc',
8187
'partition_field.cc',
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/metrics/commit_report.h"
21+
22+
namespace iceberg {
23+
24+
CommitMetrics CommitMetrics::Of(MetricsContext& context) {
25+
CommitMetrics m;
26+
m.total_duration = context.GetTimer("totalDuration");
27+
m.attempts = context.GetCounter("attempts");
28+
return m;
29+
}
30+
31+
CommitMetrics CommitMetrics::Noop() { return CommitMetrics::Of(MetricsContext::Null()); }
32+
33+
void CommitMetrics::PopulateResult(CommitMetricsResult& result) const {
34+
result.total_duration = total_duration ? TimerResult{total_duration->Count(),
35+
total_duration->TotalDuration()}
36+
: TimerResult{};
37+
result.attempts = attempts ? attempts->Value() : 0;
38+
}
39+
40+
} // namespace iceberg

src/iceberg/metrics/commit_report.h

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,29 @@
2020
#pragma once
2121

2222
#include <cstdint>
23+
#include <memory>
2324
#include <string>
2425
#include <unordered_map>
2526

2627
#include "iceberg/constants.h"
2728
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/metrics/counter.h"
30+
#include "iceberg/metrics/metrics_context.h"
31+
#include "iceberg/metrics/scan_report.h" // TimerResult
32+
#include "iceberg/metrics/timer.h"
2833

2934
namespace iceberg {
3035

31-
/// \brief Metrics collected during a table commit (snapshot creation).
36+
/// \brief Immutable snapshot of commit metrics for use in CommitReport.
3237
///
33-
struct ICEBERG_EXPORT CommitMetrics {
38+
/// Populated by CommitMetrics::ToResult() (for total_duration and attempts) and
39+
/// by parsing snapshot summary fields (for file/record counts).
40+
/// Mirrors org.apache.iceberg.metrics.CommitMetricsResult.
41+
struct ICEBERG_EXPORT CommitMetricsResult {
42+
/// \brief Total wall-clock duration of the commit attempt (count + nanoseconds).
43+
TimerResult total_duration;
44+
/// \brief Number of commit attempts (1 on success without retries).
45+
int64_t attempts = 0;
3446
/// \brief Number of data files added in this commit.
3547
int64_t added_data_files = 0;
3648
/// \brief Number of data files removed in this commit.
@@ -89,6 +101,34 @@ struct ICEBERG_EXPORT CommitMetrics {
89101
int64_t processed_manifest_entries_count = 0;
90102
};
91103

104+
/// \brief Live commit metrics collected during a table commit operation.
105+
///
106+
/// Tracks the overall commit duration and retry count. File/record counts come
107+
/// from the snapshot summary after the commit succeeds and are stored separately
108+
/// in CommitMetricsResult.
109+
///
110+
/// Mirrors org.apache.iceberg.metrics.CommitMetrics.
111+
class ICEBERG_EXPORT CommitMetrics {
112+
public:
113+
/// \brief Create a CommitMetrics instance backed by the given MetricsContext.
114+
static CommitMetrics Of(MetricsContext& context);
115+
116+
/// \brief Create a CommitMetrics instance with all-noop timer and counter.
117+
static CommitMetrics Noop();
118+
119+
/// \brief Snapshot timer and counter values into the corresponding fields of result.
120+
///
121+
/// Only total_duration and attempts are written; the caller is responsible for
122+
/// populating the remaining snapshot-summary fields.
123+
void PopulateResult(CommitMetricsResult& result) const;
124+
125+
/// \brief Timer measuring total wall-clock time of the commit call.
126+
std::shared_ptr<Timer> total_duration;
127+
128+
/// \brief Counter for the number of commit attempts (including retries).
129+
std::shared_ptr<Counter> attempts;
130+
};
131+
92132
/// \brief Report generated after a commit operation.
93133
///
94134
/// Contains metrics about the changes made in a commit, including
@@ -103,7 +143,7 @@ struct ICEBERG_EXPORT CommitReport {
103143
/// \brief The operation that was performed (append, overwrite, delete, etc.).
104144
std::string operation;
105145
/// \brief Metrics collected during the commit operation.
106-
CommitMetrics commit_metrics;
146+
CommitMetricsResult commit_metrics;
107147
/// \brief Additional key-value metadata.
108148
std::unordered_map<std::string, std::string> metadata;
109149
};

src/iceberg/metrics/counter.cc

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/metrics/counter.h"
21+
22+
namespace iceberg {
23+
24+
namespace {
25+
26+
class NoopCounter final : public Counter {
27+
public:
28+
void Increment() override {}
29+
void Increment(int64_t) override {}
30+
int64_t Value() const override { return 0; }
31+
bool IsNoop() const override { return true; }
32+
};
33+
34+
} // namespace
35+
36+
Counter& Counter::Noop() {
37+
static NoopCounter instance;
38+
return instance;
39+
}
40+
41+
DefaultCounter::DefaultCounter(CounterUnit unit) : unit_(unit) {}
42+
43+
void DefaultCounter::Increment() { count_.fetch_add(1, std::memory_order_relaxed); }
44+
45+
void DefaultCounter::Increment(int64_t amount) {
46+
count_.fetch_add(amount, std::memory_order_relaxed);
47+
}
48+
49+
int64_t DefaultCounter::Value() const { return count_.load(std::memory_order_relaxed); }
50+
51+
} // namespace iceberg

0 commit comments

Comments
 (0)