Skip to content

Commit ae29c3d

Browse files
evindjInnocent
andauthored
feat: metrics reporting for scan and commit (apache#589)
Initial commit for addressing apache#533 --------- Co-authored-by: Innocent <idjiofack@jabode.com>
1 parent c21d425 commit ae29c3d

25 files changed

Lines changed: 3573 additions & 0 deletions

src/iceberg/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ set(ICEBERG_SOURCES
5959
manifest/v3_metadata.cc
6060
metadata_columns.cc
6161
metrics_config.cc
62+
metrics/commit_report.cc
63+
metrics/counter.cc
64+
metrics/json_serde.cc
65+
metrics/metrics_context.cc
66+
metrics/metrics_reporters.cc
67+
metrics/scan_report.cc
68+
metrics/timer.cc
6269
name_mapping.cc
6370
partition_field.cc
6471
partition_spec.cc
@@ -227,6 +234,7 @@ add_subdirectory(puffin)
227234
add_subdirectory(row)
228235
add_subdirectory(update)
229236
add_subdirectory(util)
237+
add_subdirectory(metrics)
230238

231239
if(ICEBERG_BUILD_BUNDLE)
232240
set(ICEBERG_BUNDLE_SOURCES

src/iceberg/constants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace iceberg {
3333
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
3434
constexpr int64_t kInvalidSnapshotId = -1;
3535
constexpr int64_t kInvalidSequenceNumber = -1;
36+
constexpr int64_t kInvalidSchemaId = -1;
3637
/// \brief Stand-in for the current sequence number that will be assigned when the commit
3738
/// is successful. This is replaced when writing a manifest list by the ManifestFile
3839
/// adapter.

src/iceberg/meson.build

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ iceberg_sources = files(
8080
'manifest/v2_metadata.cc',
8181
'manifest/v3_metadata.cc',
8282
'metadata_columns.cc',
83+
'metrics/commit_report.cc',
84+
'metrics/counter.cc',
85+
'metrics/json_serde.cc',
86+
'metrics/metrics_context.cc',
87+
'metrics/metrics_reporters.cc',
88+
'metrics/scan_report.cc',
89+
'metrics/timer.cc',
8390
'metrics_config.cc',
8491
'name_mapping.cc',
8592
'partition_field.cc',
@@ -282,6 +289,7 @@ subdir('data')
282289
subdir('deletes')
283290
subdir('expression')
284291
subdir('manifest')
292+
subdir('metrics')
285293
subdir('puffin')
286294
subdir('row')
287295
subdir('update')

src/iceberg/metrics/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/metrics)
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/metrics/commit_report.h"
21+
22+
#include "iceberg/snapshot.h"
23+
#include "iceberg/util/string_util.h"
24+
25+
namespace iceberg {
26+
27+
std::unique_ptr<CommitMetrics> CommitMetrics::Make(MetricsContext& context) {
28+
auto m = std::unique_ptr<CommitMetrics>(new CommitMetrics());
29+
m->total_duration = context.GetTimer("total-duration", TimerUnit::kNanoseconds);
30+
m->attempts = context.GetCounter("attempts");
31+
return m;
32+
}
33+
34+
std::unique_ptr<CommitMetrics> CommitMetrics::Noop() {
35+
return CommitMetrics::Make(*MetricsContext::Noop());
36+
}
37+
38+
CommitMetricsResult CommitMetrics::ToResult() const {
39+
CommitMetricsResult result;
40+
if (total_duration && !total_duration->IsNoop()) {
41+
result.total_duration =
42+
TimerResult{.unit = std::string(total_duration->Unit()),
43+
.count = total_duration->Count(),
44+
.total_duration = total_duration->TotalDuration()};
45+
}
46+
if (attempts && !attempts->IsNoop()) {
47+
result.attempts = CounterResult{.unit = attempts->unit(), .value = attempts->value()};
48+
}
49+
return result;
50+
}
51+
52+
CommitMetricsResult CommitMetricsResult::From(
53+
const CommitMetrics& live_metrics,
54+
const std::unordered_map<std::string, std::string>& snapshot_summary) {
55+
auto result = live_metrics.ToResult();
56+
57+
auto count_field =
58+
[&snapshot_summary](const std::string& key) -> std::optional<CounterResult> {
59+
auto it = snapshot_summary.find(key);
60+
if (it == snapshot_summary.end()) return std::nullopt;
61+
auto parsed = StringUtils::ParseNumber<int64_t>(it->second);
62+
if (!parsed.has_value()) return std::nullopt;
63+
return CounterResult{.unit = CounterUnit::kCount, .value = parsed.value()};
64+
};
65+
auto bytes_field =
66+
[&snapshot_summary](const std::string& key) -> std::optional<CounterResult> {
67+
auto it = snapshot_summary.find(key);
68+
if (it == snapshot_summary.end()) return std::nullopt;
69+
auto parsed = StringUtils::ParseNumber<int64_t>(it->second);
70+
if (!parsed.has_value()) return std::nullopt;
71+
return CounterResult{.unit = CounterUnit::kBytes, .value = parsed.value()};
72+
};
73+
74+
result.added_data_files = count_field(SnapshotSummaryFields::kAddedDataFiles);
75+
result.removed_data_files = count_field(SnapshotSummaryFields::kDeletedDataFiles);
76+
result.total_data_files = count_field(SnapshotSummaryFields::kTotalDataFiles);
77+
result.added_delete_files = count_field(SnapshotSummaryFields::kAddedDeleteFiles);
78+
result.added_equality_delete_files =
79+
count_field(SnapshotSummaryFields::kAddedEqDeleteFiles);
80+
result.added_positional_delete_files =
81+
count_field(SnapshotSummaryFields::kAddedPosDeleteFiles);
82+
result.added_dvs = count_field(SnapshotSummaryFields::kAddedDVs);
83+
result.removed_delete_files = count_field(SnapshotSummaryFields::kRemovedDeleteFiles);
84+
result.removed_positional_delete_files =
85+
count_field(SnapshotSummaryFields::kRemovedPosDeleteFiles);
86+
result.removed_dvs = count_field(SnapshotSummaryFields::kRemovedDVs);
87+
result.removed_equality_delete_files =
88+
count_field(SnapshotSummaryFields::kRemovedEqDeleteFiles);
89+
result.total_delete_files = count_field(SnapshotSummaryFields::kTotalDeleteFiles);
90+
result.added_records = count_field(SnapshotSummaryFields::kAddedRecords);
91+
result.removed_records = count_field(SnapshotSummaryFields::kDeletedRecords);
92+
result.total_records = count_field(SnapshotSummaryFields::kTotalRecords);
93+
result.added_files_size_bytes = bytes_field(SnapshotSummaryFields::kAddedFileSize);
94+
result.removed_files_size_bytes = bytes_field(SnapshotSummaryFields::kRemovedFileSize);
95+
result.total_files_size_bytes = bytes_field(SnapshotSummaryFields::kTotalFileSize);
96+
result.added_positional_deletes = count_field(SnapshotSummaryFields::kAddedPosDeletes);
97+
result.removed_positional_deletes =
98+
count_field(SnapshotSummaryFields::kRemovedPosDeletes);
99+
result.total_positional_deletes = count_field(SnapshotSummaryFields::kTotalPosDeletes);
100+
result.added_equality_deletes = count_field(SnapshotSummaryFields::kAddedEqDeletes);
101+
result.removed_equality_deletes = count_field(SnapshotSummaryFields::kRemovedEqDeletes);
102+
result.total_equality_deletes = count_field(SnapshotSummaryFields::kTotalEqDeletes);
103+
result.kept_manifest_count = count_field(SnapshotSummaryFields::kManifestsKept);
104+
result.created_manifest_count = count_field(SnapshotSummaryFields::kManifestsCreated);
105+
result.replaced_manifest_count = count_field(SnapshotSummaryFields::kManifestsReplaced);
106+
result.processed_manifest_entries_count =
107+
count_field(SnapshotSummaryFields::kEntriesProcessed);
108+
return result;
109+
}
110+
111+
} // namespace iceberg
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
#include <optional>
24+
#include <string>
25+
#include <unordered_map>
26+
27+
#include "iceberg/constants.h"
28+
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/metrics/metrics_context.h"
30+
#include "iceberg/metrics/metrics_types.h"
31+
#include "iceberg/metrics/timer.h"
32+
33+
namespace iceberg {
34+
35+
// Forward declaration: CommitMetrics is defined later in this header.
36+
class CommitMetrics;
37+
38+
/// \brief Immutable snapshot of commit metrics for use in CommitReport.
39+
///
40+
/// Populated by CommitMetrics::ToResult() after a commit completes. File and
41+
/// record counts can also be combined from the snapshot summary with From().
42+
struct ICEBERG_EXPORT CommitMetricsResult {
43+
/// \brief Total wall-clock duration of the commit attempt.
44+
std::optional<TimerResult> total_duration;
45+
/// \brief Number of commit attempts (1 on success without retries).
46+
std::optional<CounterResult> attempts;
47+
/// \brief Number of data files added in this commit.
48+
std::optional<CounterResult> added_data_files;
49+
/// \brief Number of data files removed in this commit.
50+
std::optional<CounterResult> removed_data_files;
51+
/// \brief Total live data files after this commit.
52+
std::optional<CounterResult> total_data_files;
53+
/// \brief Number of delete files added in this commit.
54+
std::optional<CounterResult> added_delete_files;
55+
/// \brief Equality delete files added.
56+
std::optional<CounterResult> added_equality_delete_files;
57+
/// \brief Positional delete files added.
58+
std::optional<CounterResult> added_positional_delete_files;
59+
/// \brief Deletion vectors added.
60+
std::optional<CounterResult> added_dvs;
61+
/// \brief Number of delete files removed in this commit.
62+
std::optional<CounterResult> removed_delete_files;
63+
/// \brief Positional delete files removed.
64+
std::optional<CounterResult> removed_positional_delete_files;
65+
/// \brief Deletion vectors removed.
66+
std::optional<CounterResult> removed_dvs;
67+
/// \brief Equality delete files removed.
68+
std::optional<CounterResult> removed_equality_delete_files;
69+
/// \brief Total live delete files after this commit.
70+
std::optional<CounterResult> total_delete_files;
71+
/// \brief Number of records added in this commit.
72+
std::optional<CounterResult> added_records;
73+
/// \brief Number of records removed in this commit.
74+
std::optional<CounterResult> removed_records;
75+
/// \brief Total live records after this commit.
76+
std::optional<CounterResult> total_records;
77+
/// \brief Total byte size of files added.
78+
std::optional<CounterResult> added_files_size_bytes;
79+
/// \brief Total byte size of files removed.
80+
std::optional<CounterResult> removed_files_size_bytes;
81+
/// \brief Total byte size of all live files after this commit.
82+
std::optional<CounterResult> total_files_size_bytes;
83+
/// \brief Positional delete records added.
84+
std::optional<CounterResult> added_positional_deletes;
85+
/// \brief Positional delete records removed.
86+
std::optional<CounterResult> removed_positional_deletes;
87+
/// \brief Total positional delete records after this commit.
88+
std::optional<CounterResult> total_positional_deletes;
89+
/// \brief Equality delete records added.
90+
std::optional<CounterResult> added_equality_deletes;
91+
/// \brief Equality delete records removed.
92+
std::optional<CounterResult> removed_equality_deletes;
93+
/// \brief Total equality delete records after this commit.
94+
std::optional<CounterResult> total_equality_deletes;
95+
/// \brief Manifest files kept unchanged in this commit.
96+
std::optional<CounterResult> kept_manifest_count;
97+
/// \brief Manifest files created in this commit.
98+
std::optional<CounterResult> created_manifest_count;
99+
/// \brief Manifest files replaced in this commit.
100+
std::optional<CounterResult> replaced_manifest_count;
101+
/// \brief Manifest entries processed in this commit.
102+
std::optional<CounterResult> processed_manifest_entries_count;
103+
104+
bool operator==(const CommitMetricsResult&) const = default;
105+
106+
/// \brief Build a CommitMetricsResult from live metrics and a snapshot summary map.
107+
///
108+
/// Combines timer/retry measurements from \p live_metrics with records parsed
109+
/// from \p snapshot_summary. Missing or unparseable summary keys are omitted.
110+
static CommitMetricsResult From(
111+
const CommitMetrics& live_metrics,
112+
const std::unordered_map<std::string, std::string>& snapshot_summary);
113+
};
114+
115+
/// \brief Live commit metrics collected during a table commit operation.
116+
///
117+
/// Tracks the overall commit duration and retry count. File/record counts come
118+
/// from the snapshot summary after the commit succeeds and are stored separately
119+
/// in CommitMetricsResult.
120+
class ICEBERG_EXPORT CommitMetrics {
121+
public:
122+
/// \brief Create a CommitMetrics instance backed by the given MetricsContext.
123+
static std::unique_ptr<CommitMetrics> Make(MetricsContext& context);
124+
125+
/// \brief Create a CommitMetrics instance with all-noop timer and counter.
126+
static std::unique_ptr<CommitMetrics> Noop();
127+
128+
/// \brief Snapshot current timer and counter values into a CommitMetricsResult.
129+
CommitMetricsResult ToResult() const;
130+
131+
/// \brief Timer measuring total wall-clock time of the commit call.
132+
std::shared_ptr<Timer> total_duration;
133+
134+
/// \brief Counter for the number of commit attempts (including retries).
135+
std::shared_ptr<Counter> attempts;
136+
137+
private:
138+
CommitMetrics() = default;
139+
};
140+
141+
/// \brief Report generated after a commit operation.
142+
///
143+
/// Contains metrics about the changes made in a commit.
144+
struct ICEBERG_EXPORT CommitReport {
145+
/// \brief The fully qualified name of the table that was modified.
146+
std::string table_name;
147+
/// \brief The snapshot ID created by this commit.
148+
int64_t snapshot_id = kInvalidSnapshotId;
149+
/// \brief The sequence number assigned to this commit.
150+
int64_t sequence_number = kInvalidSequenceNumber;
151+
/// \brief The operation that was performed (write, delete, etc.).
152+
std::string operation;
153+
/// \brief Metrics collected during the commit operation.
154+
CommitMetricsResult commit_metrics;
155+
/// \brief Additional key-value metadata.
156+
std::unordered_map<std::string, std::string> metadata;
157+
};
158+
159+
} // namespace iceberg

src/iceberg/metrics/counter.cc

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/metrics/counter.h"
21+
22+
#include <memory>
23+
24+
namespace iceberg {
25+
26+
namespace {
27+
28+
class NoopCounter final : public Counter {
29+
public:
30+
using Counter::Increment;
31+
void Increment(int64_t) override {}
32+
int64_t value() const override { return -1; }
33+
CounterUnit unit() const override { return CounterUnit::kUndefined; }
34+
bool IsNoop() const override { return true; }
35+
};
36+
37+
} // namespace
38+
39+
std::shared_ptr<Counter> Counter::Noop() {
40+
static std::shared_ptr<Counter> instance = std::make_shared<NoopCounter>();
41+
return instance;
42+
}
43+
44+
DefaultCounter::DefaultCounter(CounterUnit unit) : unit_(unit) {}
45+
46+
void DefaultCounter::Increment(int64_t amount) {
47+
count_.fetch_add(amount, std::memory_order_relaxed);
48+
}
49+
50+
int64_t DefaultCounter::value() const { return count_.load(std::memory_order_relaxed); }
51+
52+
} // namespace iceberg

0 commit comments

Comments
 (0)