Skip to content

Commit b87c145

Browse files
author
Innocent
committed
feat: metrics reporting for scan and commit
1 parent 18e055c commit b87c145

25 files changed

Lines changed: 3024 additions & 0 deletions

src/iceberg/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ set(ICEBERG_SOURCES
5959
manifest/v3_metadata.cc
6060
metadata_columns.cc
6161
metrics_config.cc
62+
metrics/commit_report.cc
63+
metrics/counter.cc
64+
metrics/json_serde.cc
65+
metrics/metrics_context.cc
66+
metrics/metrics_reporters.cc
67+
metrics/scan_report.cc
68+
metrics/timer.cc
6269
name_mapping.cc
6370
partition_field.cc
6471
partition_spec.cc
@@ -224,6 +231,7 @@ add_subdirectory(puffin)
224231
add_subdirectory(row)
225232
add_subdirectory(update)
226233
add_subdirectory(util)
234+
add_subdirectory(metrics)
227235

228236
if(ICEBERG_BUILD_BUNDLE)
229237
set(ICEBERG_BUNDLE_SOURCES

src/iceberg/constants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace iceberg {
3333
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
3434
constexpr int64_t kInvalidSnapshotId = -1;
3535
constexpr int64_t kInvalidSequenceNumber = -1;
36+
constexpr int64_t kInvalidSchemaId = -1;
3637
/// \brief Stand-in for the current sequence number that will be assigned when the commit
3738
/// is successful. This is replaced when writing a manifest list by the ManifestFile
3839
/// adapter.

src/iceberg/meson.build

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ iceberg_sources = files(
8080
'manifest/v2_metadata.cc',
8181
'manifest/v3_metadata.cc',
8282
'metadata_columns.cc',
83+
'metrics/commit_report.cc',
84+
'metrics/counter.cc',
85+
'metrics/json_serde.cc',
86+
'metrics/metrics_context.cc',
87+
'metrics/metrics_reporters.cc',
88+
'metrics/scan_report.cc',
89+
'metrics/timer.cc',
8390
'metrics_config.cc',
8491
'name_mapping.cc',
8592
'partition_field.cc',
@@ -279,6 +286,7 @@ subdir('data')
279286
subdir('deletes')
280287
subdir('expression')
281288
subdir('manifest')
289+
subdir('metrics')
282290
subdir('puffin')
283291
subdir('row')
284292
subdir('update')

src/iceberg/metrics/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/metrics)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/metrics/commit_report.h"
21+
22+
#include "iceberg/snapshot.h"
23+
#include "iceberg/util/string_util.h"
24+
25+
namespace iceberg {
26+
27+
std::unique_ptr<CommitMetrics> CommitMetrics::Of(MetricsContext& context) {
28+
auto m = std::unique_ptr<CommitMetrics>(new CommitMetrics());
29+
m->total_duration = context.GetTimer("total-duration");
30+
m->attempts = context.GetCounter("attempts");
31+
return m;
32+
}
33+
34+
std::unique_ptr<CommitMetrics> CommitMetrics::Noop() {
35+
return CommitMetrics::Of(*MetricsContext::Null());
36+
}
37+
38+
void CommitMetrics::PopulateResult(CommitMetricsResult& result) const {
39+
result.total_duration =
40+
total_duration ? TimerResult{.unit = std::string(total_duration->Unit()),
41+
.count = total_duration->Count(),
42+
.total_duration = total_duration->TotalDuration()}
43+
: TimerResult{};
44+
result.attempts =
45+
attempts ? CounterResult{.unit = attempts->unit(), .value = attempts->value()}
46+
: CounterResult{};
47+
}
48+
49+
CommitMetricsResult CommitMetricsResult::From(
50+
const CommitMetrics& live_metrics,
51+
const std::unordered_map<std::string, std::string>& snapshot_summary) {
52+
CommitMetricsResult result;
53+
live_metrics.PopulateResult(result);
54+
55+
// Helpers: parse a summary key and wrap as a typed CounterResult.
56+
auto count_field = [&snapshot_summary](const std::string& key) -> CounterResult {
57+
auto it = snapshot_summary.find(key);
58+
if (it == snapshot_summary.end()) return {};
59+
auto parsed = StringUtils::ParseNumber<int64_t>(it->second);
60+
return {.unit = CounterUnit::kCount,
61+
.value = parsed.has_value() ? parsed.value() : 0};
62+
};
63+
auto bytes_field = [&snapshot_summary](const std::string& key) -> CounterResult {
64+
auto it = snapshot_summary.find(key);
65+
if (it == snapshot_summary.end()) return {.unit = CounterUnit::kBytes};
66+
auto parsed = StringUtils::ParseNumber<int64_t>(it->second);
67+
return {.unit = CounterUnit::kBytes,
68+
.value = parsed.has_value() ? parsed.value() : 0};
69+
};
70+
71+
result.added_data_files = count_field(SnapshotSummaryFields::kAddedDataFiles);
72+
result.removed_data_files = count_field(SnapshotSummaryFields::kDeletedDataFiles);
73+
result.total_data_files = count_field(SnapshotSummaryFields::kTotalDataFiles);
74+
result.added_delete_files = count_field(SnapshotSummaryFields::kAddedDeleteFiles);
75+
result.added_equality_delete_files =
76+
count_field(SnapshotSummaryFields::kAddedEqDeleteFiles);
77+
result.added_positional_delete_files =
78+
count_field(SnapshotSummaryFields::kAddedPosDeleteFiles);
79+
result.added_dvs = count_field(SnapshotSummaryFields::kAddedDVs);
80+
result.removed_positional_delete_files =
81+
count_field(SnapshotSummaryFields::kRemovedPosDeleteFiles);
82+
result.removed_dvs = count_field(SnapshotSummaryFields::kRemovedDVs);
83+
result.removed_equality_delete_files =
84+
count_field(SnapshotSummaryFields::kRemovedEqDeleteFiles);
85+
result.removed_delete_files = count_field(SnapshotSummaryFields::kRemovedDeleteFiles);
86+
result.total_delete_files = count_field(SnapshotSummaryFields::kTotalDeleteFiles);
87+
result.added_records = count_field(SnapshotSummaryFields::kAddedRecords);
88+
result.removed_records = count_field(SnapshotSummaryFields::kDeletedRecords);
89+
result.total_records = count_field(SnapshotSummaryFields::kTotalRecords);
90+
result.added_files_size_bytes = bytes_field(SnapshotSummaryFields::kAddedFileSize);
91+
result.removed_files_size_bytes = bytes_field(SnapshotSummaryFields::kRemovedFileSize);
92+
result.total_files_size_bytes = bytes_field(SnapshotSummaryFields::kTotalFileSize);
93+
result.added_positional_deletes = count_field(SnapshotSummaryFields::kAddedPosDeletes);
94+
result.removed_positional_deletes =
95+
count_field(SnapshotSummaryFields::kRemovedPosDeletes);
96+
result.total_positional_deletes = count_field(SnapshotSummaryFields::kTotalPosDeletes);
97+
result.added_equality_deletes = count_field(SnapshotSummaryFields::kAddedEqDeletes);
98+
result.removed_equality_deletes = count_field(SnapshotSummaryFields::kRemovedEqDeletes);
99+
result.total_equality_deletes = count_field(SnapshotSummaryFields::kTotalEqDeletes);
100+
result.kept_manifest_count = count_field(SnapshotSummaryFields::kManifestsKept);
101+
result.created_manifest_count = count_field(SnapshotSummaryFields::kManifestsCreated);
102+
result.replaced_manifest_count = count_field(SnapshotSummaryFields::kManifestsReplaced);
103+
result.processed_manifest_entries_count =
104+
count_field(SnapshotSummaryFields::kEntriesProcessed);
105+
return result;
106+
}
107+
108+
} // namespace iceberg
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
#include <string>
24+
#include <unordered_map>
25+
26+
#include "iceberg/constants.h"
27+
#include "iceberg/iceberg_export.h"
28+
#include "iceberg/metrics/metrics_context.h"
29+
#include "iceberg/metrics/metrics_types.h"
30+
#include "iceberg/metrics/timer.h"
31+
32+
namespace iceberg {
33+
34+
// Forward declaration: CommitMetricsResult is defined later in this header.
35+
struct CommitMetricsResult;
36+
37+
/// \brief Live commit metrics collected during a table commit operation.
38+
///
39+
/// Tracks the overall commit duration and retry count. File/record counts come
40+
/// from the snapshot summary after the commit succeeds and are stored separately
41+
/// in CommitMetricsResult.
42+
class ICEBERG_EXPORT CommitMetrics {
43+
public:
44+
/// \brief Create a CommitMetrics instance backed by the given MetricsContext.
45+
static std::unique_ptr<CommitMetrics> Of(MetricsContext& context);
46+
47+
/// \brief Create a CommitMetrics instance with all-noop timer and counter.
48+
static std::unique_ptr<CommitMetrics> Noop();
49+
50+
/// \brief Snapshot timer and counter values into the corresponding fields of result.
51+
///
52+
/// Only total_duration and attempts are written; the caller is responsible for
53+
/// populating the remaining snapshot-summary fields.
54+
void PopulateResult(CommitMetricsResult& result) const;
55+
56+
/// \brief Timer measuring total wall-clock time of the commit call.
57+
std::shared_ptr<Timer> total_duration;
58+
59+
/// \brief Counter for the number of commit attempts (including retries).
60+
std::shared_ptr<Counter> attempts;
61+
62+
private:
63+
CommitMetrics() = default;
64+
};
65+
66+
/// \brief Immutable snapshot of commit metrics for use in CommitReport.
67+
struct ICEBERG_EXPORT CommitMetricsResult {
68+
/// \brief Total wall-clock duration of the commit attempt.
69+
TimerResult total_duration;
70+
/// \brief Number of commit attempts (1 on success without retries).
71+
CounterResult attempts;
72+
/// \brief Number of data files added in this commit.
73+
CounterResult added_data_files;
74+
/// \brief Number of data files removed in this commit.
75+
CounterResult removed_data_files;
76+
/// \brief Total live data files after this commit.
77+
CounterResult total_data_files;
78+
/// \brief Number of delete files added in this commit.
79+
CounterResult added_delete_files;
80+
/// \brief Equality delete files added.
81+
CounterResult added_equality_delete_files;
82+
/// \brief Positional delete files added.
83+
CounterResult added_positional_delete_files;
84+
/// \brief Deletion vectors added.
85+
CounterResult added_dvs;
86+
/// \brief Positional delete files removed.
87+
CounterResult removed_positional_delete_files;
88+
/// \brief Deletion vectors removed.
89+
CounterResult removed_dvs;
90+
/// \brief Equality delete files removed.
91+
CounterResult removed_equality_delete_files;
92+
/// \brief Number of delete files removed in this commit.
93+
CounterResult removed_delete_files;
94+
/// \brief Total live delete files after this commit.
95+
CounterResult total_delete_files;
96+
/// \brief Number of records added in this commit.
97+
CounterResult added_records;
98+
/// \brief Number of records removed in this commit.
99+
CounterResult removed_records;
100+
/// \brief Total live records after this commit.
101+
CounterResult total_records;
102+
/// \brief Total byte size of files added.
103+
CounterResult added_files_size_bytes;
104+
/// \brief Total byte size of files removed.
105+
CounterResult removed_files_size_bytes;
106+
/// \brief Total byte size of all live files after this commit.
107+
CounterResult total_files_size_bytes;
108+
/// \brief Positional delete records added.
109+
CounterResult added_positional_deletes;
110+
/// \brief Positional delete records removed.
111+
CounterResult removed_positional_deletes;
112+
/// \brief Total positional delete records after this commit.
113+
CounterResult total_positional_deletes;
114+
/// \brief Equality delete records added.
115+
CounterResult added_equality_deletes;
116+
/// \brief Equality delete records removed.
117+
CounterResult removed_equality_deletes;
118+
/// \brief Total equality delete records after this commit.
119+
CounterResult total_equality_deletes;
120+
/// \brief Manifest files kept unchanged in this commit.
121+
CounterResult kept_manifest_count;
122+
/// \brief Manifest files created in this commit.
123+
CounterResult created_manifest_count;
124+
/// \brief Manifest files replaced in this commit.
125+
CounterResult replaced_manifest_count;
126+
/// \brief Manifest entries processed in this commit.
127+
CounterResult processed_manifest_entries_count;
128+
129+
bool operator==(const CommitMetricsResult&) const = default;
130+
131+
/// \brief Build a CommitMetricsResult from live metrics and a snapshot summary map.
132+
///
133+
/// Combines timer/retry measurements from \p live_metrics with records parsed
134+
/// from \p snapshot_summary. Missing or unparseable summary keys default to 0.
135+
static CommitMetricsResult From(
136+
const CommitMetrics& live_metrics,
137+
const std::unordered_map<std::string, std::string>& snapshot_summary);
138+
};
139+
140+
/// \brief Report generated after a commit operation.
141+
///
142+
/// Contains metrics about the changes made in a commit.
143+
struct ICEBERG_EXPORT CommitReport {
144+
/// \brief The fully qualified name of the table that was modified.
145+
std::string table_name;
146+
/// \brief The snapshot ID created by this commit.
147+
int64_t snapshot_id = kInvalidSnapshotId;
148+
/// \brief The sequence number assigned to this commit.
149+
int64_t sequence_number = kInvalidSequenceNumber;
150+
/// \brief The operation that was performed (write, delete, etc.).
151+
std::string operation;
152+
/// \brief Metrics collected during the commit operation.
153+
CommitMetricsResult commit_metrics;
154+
/// \brief Additional key-value metadata.
155+
std::unordered_map<std::string, std::string> metadata;
156+
};
157+
158+
} // namespace iceberg

src/iceberg/metrics/counter.cc

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/metrics/counter.h"
21+
22+
#include <memory>
23+
24+
namespace iceberg {
25+
26+
namespace {
27+
28+
class NoopCounter final : public Counter {
29+
public:
30+
using Counter::Increment;
31+
void Increment(int64_t) override {}
32+
int64_t value() const override { return 0; }
33+
bool IsNoop() const override { return true; }
34+
};
35+
36+
} // namespace
37+
38+
std::shared_ptr<Counter> Counter::Noop() {
39+
static std::shared_ptr<Counter> instance = std::make_shared<NoopCounter>();
40+
return instance;
41+
}
42+
43+
DefaultCounter::DefaultCounter(CounterUnit unit) : unit_(unit) {}
44+
45+
void DefaultCounter::Increment(int64_t amount) {
46+
count_.fetch_add(amount, std::memory_order_relaxed);
47+
}
48+
49+
int64_t DefaultCounter::value() const { return count_.load(std::memory_order_relaxed); }
50+
51+
} // namespace iceberg

0 commit comments

Comments
 (0)