Skip to content

Commit 6e55c44

Browse files
committed
ct/l1: add worker_probe
Used at the `compaction_worker` level (i.e across all shards), as well as within the `compaction_source` class to capture statistics about removed data.
1 parent bc5d542 commit 6e55c44

9 files changed

Lines changed: 161 additions & 5 deletions

File tree

src/v/cloud_topics/level_one/compaction/BUILD

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ redpanda_cc_library(
5555
":filter",
5656
":logger",
5757
":meta",
58+
":worker_probe",
5859
"//src/v/bytes",
5960
"//src/v/bytes:iostream",
6061
"//src/v/cloud_topics:log_reader_config",
@@ -95,6 +96,7 @@ redpanda_cc_library(
9596
":meta",
9697
":scheduler_probe",
9798
":source_and_sink",
99+
":worker_probe",
98100
"//src/v/cloud_topics/level_one/common:abstract_io",
99101
"//src/v/cloud_topics/level_one/common:file_io",
100102
"//src/v/cloud_topics/level_one/metastore",
@@ -289,3 +291,20 @@ redpanda_cc_library(
289291
"@seastar",
290292
],
291293
)
294+
295+
redpanda_cc_library(
296+
name = "worker_probe",
297+
srcs = [
298+
"worker_probe.cc",
299+
],
300+
hdrs = [
301+
"worker_probe.h",
302+
],
303+
deps = [
304+
"//src/v/compaction:types",
305+
"//src/v/config",
306+
"//src/v/metrics",
307+
"//src/v/utils:log_hist",
308+
"@seastar",
309+
],
310+
)

src/v/cloud_topics/level_one/compaction/source.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,8 @@ compaction_source::compaction_source(
138138
metastore* metastore,
139139
io* io,
140140
ss::abort_source& as,
141-
compaction_job_state& state)
141+
compaction_job_state& state,
142+
compaction_worker_probe& probe)
142143
: _ntp(std::move(ntp))
143144
, _tp(tp)
144145
, _dirty_range_intervals(dirty_range_intervals)
@@ -158,7 +159,8 @@ compaction_source::compaction_source(
158159
, _metastore(metastore)
159160
, _io(io)
160161
, _as(as)
161-
, _state(state) {}
162+
, _state(state)
163+
, _probe(probe) {}
162164

163165
ss::future<> compaction_source::initialize() { co_return; }
164166

@@ -295,6 +297,8 @@ ss::future<ss::stop_iteration> compaction_source::deduplication_iteration(
295297
last_offset,
296298
stats);
297299
}
300+
301+
_probe.add_stats(stats);
298302
}
299303

300304
co_return ss::stop_iteration::no;

src/v/cloud_topics/level_one/compaction/source.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "cloud_topics/level_one/common/abstract_io.h"
1414
#include "cloud_topics/level_one/compaction/filter.h"
1515
#include "cloud_topics/level_one/compaction/meta.h"
16+
#include "cloud_topics/level_one/compaction/worker_probe.h"
1617
#include "cloud_topics/level_one/metastore/extent_metadata_reader.h"
1718
#include "cloud_topics/level_one/metastore/metastore.h"
1819
#include "cloud_topics/level_one/metastore/offset_interval_set.h"
@@ -36,7 +37,8 @@ class compaction_source : public compaction::sliding_window_reducer::source {
3637
metastore*,
3738
io*,
3839
ss::abort_source&,
39-
compaction_job_state&);
40+
compaction_job_state&,
41+
compaction_worker_probe&);
4042
ss::future<> initialize() final;
4143
ss::future<ss::stop_iteration> map_building_iteration() final;
4244
ss::future<ss::stop_iteration>
@@ -92,6 +94,7 @@ class compaction_source : public compaction::sliding_window_reducer::source {
9294

9395
ss::abort_source& _as;
9496
compaction_job_state& _state;
97+
compaction_worker_probe& _probe;
9598

9699
// Dirty ranges returned by the `metastore` that were indexed during
97100
// `map_deduplication_iteration`.

src/v/cloud_topics/level_one/compaction/tests/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ redpanda_cc_gtest(
3939
"//src/v/cloud_topics/level_one/compaction:filter",
4040
"//src/v/cloud_topics/level_one/compaction:meta",
4141
"//src/v/cloud_topics/level_one/compaction:source_and_sink",
42+
"//src/v/cloud_topics/level_one/compaction:worker_probe",
4243
"//src/v/cloud_topics/level_one/frontend_reader/tests:l1_reader_fixture",
4344
"//src/v/cloud_topics/level_one/metastore",
4445
"//src/v/cloud_topics/level_one/metastore:simple",

src/v/cloud_topics/level_one/compaction/tests/reducer_test.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "cloud_topics/level_one/compaction/sink.h"
2121
#include "cloud_topics/level_one/compaction/source.h"
2222
#include "cloud_topics/level_one/compaction/tests/in_memory_sink.h"
23+
#include "cloud_topics/level_one/compaction/worker_probe.h"
2324
#include "cloud_topics/level_one/frontend_reader/tests/l1_reader_fixture.h"
2425
#include "cloud_topics/level_one/metastore/metastore.h"
2526
#include "cloud_topics/level_one/metastore/simple_metastore.h"
@@ -104,6 +105,7 @@ ss::future<> do_compact(
104105
auto state = l1::compaction_job_state::running;
105106
auto map = compaction::simple_key_offset_map();
106107
auto dirty_range_intervals = offsets_response.dirty_ranges.to_vec();
108+
l1::compaction_worker_probe probe;
107109
auto src = std::make_unique<l1::compaction_source>(
108110
ntp,
109111
tidp,
@@ -115,7 +117,8 @@ ss::future<> do_compact(
115117
metastore,
116118
io,
117119
as,
118-
state);
120+
state,
121+
probe);
119122
auto sink = std::make_unique<l1::compaction_sink>(
120123
tidp,
121124
dirty_range_intervals,

src/v/cloud_topics/level_one/compaction/worker.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ compaction_worker::compaction_worker(
4545
, _metadata_cache(metadata_cache) {}
4646

4747
ss::future<> compaction_worker::start() {
48+
_probe.setup_metrics();
4849
start_work_loop();
4950
co_return;
5051
}
@@ -207,7 +208,8 @@ ss::future<> compaction_worker::compact_log(log_compaction_meta* log) {
207208
_metastore,
208209
_io,
209210
_as,
210-
_job_state);
211+
_job_state,
212+
_probe);
211213
auto sink = std::make_unique<compaction_sink>(
212214
tidp,
213215
dirty_range_intervals,
@@ -219,6 +221,9 @@ ss::future<> compaction_worker::compact_log(log_compaction_meta* log) {
219221
auto reducer = compaction::sliding_window_reducer(
220222
std::move(src), std::move(sink));
221223

224+
// Start measuring time-to-compact here.
225+
auto m = _probe.auto_compaction_measurement();
226+
222227
auto compact_fut = co_await ss::coroutine::as_future(
223228
std::move(reducer).run());
224229

@@ -232,6 +237,9 @@ ss::future<> compaction_worker::compact_log(log_compaction_meta* log) {
232237
"Caught exception {} while compacting CTP {}.",
233238
eptr,
234239
tidp);
240+
241+
// Don't let failed compaction runs contribute to the histogram.
242+
m->cancel();
235243
} else {
236244
vlog(compaction_log.info, "Finished compacting CTP {}", tidp);
237245
}

src/v/cloud_topics/level_one/compaction/worker.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "cloud_topics/level_one/compaction/committer.h"
1515
#include "cloud_topics/level_one/compaction/meta.h"
1616
#include "cloud_topics/level_one/compaction/source.h"
17+
#include "cloud_topics/level_one/compaction/worker_probe.h"
1718
#include "cloud_topics/level_one/metastore/metastore.h"
1819
#include "cluster/metadata_cache.h"
1920
#include "compaction/key_offset_map.h"
@@ -189,6 +190,8 @@ class compaction_worker {
189190
compaction_committer* _committer;
190191

191192
cluster::metadata_cache* _metadata_cache;
193+
194+
compaction_worker_probe _probe;
192195
};
193196

194197
} // namespace cloud_topics::l1
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
11+
#include "cloud_topics/level_one/compaction/worker_probe.h"
12+
13+
#include "config/configuration.h"
14+
#include "metrics/metrics.h"
15+
#include "metrics/prometheus_sanitize.h"
16+
17+
#include <seastar/core/metrics.hh>
18+
19+
namespace cloud_topics::l1 {
20+
21+
void compaction_worker_probe::setup_metrics() {
22+
namespace sm = ss::metrics;
23+
24+
if (config::shard_local_cfg().disable_metrics()) {
25+
return;
26+
}
27+
28+
_metrics.add_group(
29+
prometheus_sanitize::metrics_name("cloud_topics:compaction_worker"),
30+
{
31+
sm::make_gauge(
32+
"batches_removed",
33+
[this] { return _batches_removed; },
34+
sm::description(
35+
"Number of batches removed across all cloud topic partitions on "
36+
"this shard")),
37+
sm::make_gauge(
38+
"records_removed",
39+
[this] { return _records_removed; },
40+
sm::description(
41+
"Number of records removed across all cloud topic partitions on "
42+
"this shard")),
43+
sm::make_gauge(
44+
"tombstones_removed",
45+
[this] { return _tombstones_removed; },
46+
sm::description(
47+
"Number of tombstone records removed across all cloud topic "
48+
"partitions on this shard")),
49+
sm::make_histogram(
50+
"compaction_duration_seconds",
51+
[this] { return _compaction_runs.internal_histogram_logform(); },
52+
sm::description(
53+
"The duration of a compaction run for cloud topic partitions on "
54+
"this shard")),
55+
});
56+
}
57+
58+
} // namespace cloud_topics::l1
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
11+
#pragma once
12+
13+
#include "compaction/types.h"
14+
#include "metrics/metrics.h"
15+
#include "utils/log_hist.h"
16+
17+
#include <seastar/core/metrics_registration.hh>
18+
19+
#include <cstdint>
20+
#include <memory>
21+
22+
namespace cloud_topics::l1 {
23+
24+
class compaction_worker_probe {
25+
public:
26+
using hist_t = log_hist_internal;
27+
compaction_worker_probe() = default;
28+
29+
compaction_worker_probe(const compaction_worker_probe&) = delete;
30+
compaction_worker_probe& operator=(const compaction_worker_probe&) = delete;
31+
compaction_worker_probe(compaction_worker_probe&&) = delete;
32+
compaction_worker_probe& operator=(compaction_worker_probe&&) = delete;
33+
~compaction_worker_probe() = default;
34+
35+
void setup_metrics();
36+
37+
std::unique_ptr<hist_t::measurement> auto_compaction_measurement() {
38+
return _compaction_runs.auto_measure();
39+
}
40+
41+
void add_stats(const compaction::stats& stats) {
42+
_batches_removed += stats.batches_discarded;
43+
_records_removed += stats.records_discarded;
44+
_tombstones_removed += stats.expired_tombstones_discarded;
45+
}
46+
47+
private:
48+
hist_t _compaction_runs;
49+
50+
uint64_t _batches_removed{0};
51+
uint64_t _records_removed{0};
52+
uint64_t _tombstones_removed{0};
53+
54+
metrics::internal_metric_groups _metrics;
55+
};
56+
57+
} // namespace cloud_topics::l1

0 commit comments

Comments
 (0)