Skip to content

Commit 9a4e302

Browse files
authored
Merge pull request #29989 from vbotbuildovich/backport-pr-29970-v26.1.x-827
[v26.1.x] k/fetch: Add fetch_response_dropped_bytes metric
2 parents e74f9aa + 5ed16da commit 9a4e302

2 files changed

Lines changed: 26 additions & 1 deletion

File tree

src/v/kafka/server/handlers/fetch.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,13 @@ static void fill_fetch_responses(
460460
resp_units = std::move(res.memory_units);
461461
resp.records = batch_reader(std::move(res).release_data());
462462
} else {
463-
// TODO: add probe to measure how much of read data is discarded
463+
if (res.has_data()) {
464+
// Data was read from cloud storage but cannot fit in the
465+
// response budget. This is pure read amplification: S3 bytes
466+
// were downloaded, materialized, and now dropped.
467+
octx.rctx.probe().add_fetch_response_dropped_bytes(
468+
res.data_size_bytes());
469+
}
464470
resp.records = batch_reader();
465471
}
466472

src/v/kafka/server/kafka_probe.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@ class kafka_probe {
113113
"in the future or in the past"))},
114114
{},
115115
{sm::shard_label});
116+
117+
_metrics.add_group(
118+
"kafka",
119+
{sm::make_counter(
120+
"fetch_response_dropped_bytes",
121+
[this] { return _fetch_response_dropped_bytes; },
122+
sm::description(
123+
"Total bytes read from storage (including cloud storage) that "
124+
"were discarded in fill_fetch_responses because the response "
125+
"budget was already consumed by other partitions. Non-zero "
126+
"values indicate read amplification: data was fetched from S3 "
127+
"but not delivered to the consumer."))},
128+
{},
129+
{sm::shard_label});
116130
}
117131

118132
void setup_public_metrics() {
@@ -153,6 +167,10 @@ class kafka_probe {
153167
_fetch_latency.record(micros.count());
154168
}
155169

170+
void add_fetch_response_dropped_bytes(uint64_t bytes) {
171+
_fetch_response_dropped_bytes += bytes;
172+
}
173+
156174
void record_batch(uint64_t size, model::compression compression) {
157175
_batch_size.record(size);
158176
if (
@@ -167,6 +185,7 @@ class kafka_probe {
167185
friend prod_consume_fixture;
168186

169187
uint32_t _produce_bad_create_time = 0;
188+
uint64_t _fetch_response_dropped_bytes = 0;
170189

171190
hist_t _produce_latency;
172191
hist_t _fetch_latency;

0 commit comments

Comments
 (0)