Skip to content

Commit aaa2d08

Browse files
committed
Add IO metrics and track bytes written to WAL
1 parent 7ed1efd commit aaa2d08

12 files changed

Lines changed: 211 additions & 32 deletions

File tree

quickwit/Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ metrics-util = "0.20"
176176
mime_guess = "2.0"
177177
mini-moka = "0.10.3"
178178
mockall = "0.14"
179-
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" }
179+
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "3b3562ef" }
180180
new_string_template = "1.5"
181181
nom = "8.0"
182182
numfmt = "1.2"
@@ -200,6 +200,7 @@ predicates = "3"
200200
prettyplease = "0.2"
201201
proc-macro2 = "1.0"
202202
prometheus = { version = "0.14", default-features = false, features = ["process"] }
203+
procfs = { version = "0.17", default-features = false }
203204
proptest = "1"
204205
prost = { version = "0.14", default-features = false, features = [
205206
"derive",

quickwit/quickwit-cli/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ quickwit-search = { workspace = true }
6666
quickwit-serve = { workspace = true }
6767
quickwit-storage = { workspace = true }
6868

69+
[target.'cfg(target_os = "linux")'.dependencies]
70+
procfs = { workspace = true }
71+
6972
[dev-dependencies]
7073
predicates = { workspace = true }
7174
reqwest = { workspace = true }

quickwit/quickwit-cli/src/jemalloc.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414

1515
use std::time::Duration;
1616

17+
use quickwit_common::metrics::{
18+
MEMORY_ACTIVE_BYTES, MEMORY_ALLOCATED_BYTES, MEMORY_RESIDENT_BYTES,
19+
};
20+
use quickwit_common::rate_limited_warn;
1721
use tikv_jemallocator::Jemalloc;
1822
use tracing::error;
1923

@@ -26,7 +30,7 @@ pub static GLOBAL: quickwit_common::jemalloc_profiled::JemallocProfiled =
2630
#[global_allocator]
2731
pub static GLOBAL: Jemalloc = Jemalloc;
2832

29-
const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1);
33+
const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(5);
3034

3135
pub async fn jemalloc_metrics_loop() -> tikv_jemalloc_ctl::Result<()> {
3236
// Obtain a MIB for the `epoch`, `stats.active`, `stats.allocated`, and `stats.resident` keys:
@@ -36,22 +40,33 @@ pub async fn jemalloc_metrics_loop() -> tikv_jemalloc_ctl::Result<()> {
3640
let resident_mib = tikv_jemalloc_ctl::stats::resident::mib()?;
3741

3842
let mut poll_interval = tokio::time::interval(JEMALLOC_METRICS_POLLING_INTERVAL);
43+
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3944

4045
loop {
4146
poll_interval.tick().await;
4247

43-
// Many statistics are cached and only updated when the epoch is advanced:
44-
epoch_mib.advance()?;
45-
46-
// Read statistics using MIB keys:
47-
let active = active_mib.read()?;
48-
quickwit_common::metrics::MEMORY_ACTIVE_BYTES.set(active as f64);
49-
50-
let allocated = allocated_mib.read()?;
51-
quickwit_common::metrics::MEMORY_ALLOCATED_BYTES.set(allocated as f64);
52-
53-
let resident = resident_mib.read()?;
54-
quickwit_common::metrics::MEMORY_RESIDENT_BYTES.set(resident as f64);
48+
if let Err(error) = epoch_mib.advance() {
49+
rate_limited_warn!(limit_per_min = 1, %error, "failed to advance jemalloc epoch");
50+
continue;
51+
}
52+
match active_mib.read() {
53+
Ok(active) => MEMORY_ACTIVE_BYTES.set(active as f64),
54+
Err(error) => {
55+
rate_limited_warn!(limit_per_min = 1, %error, "failed to read jemalloc stats.active");
56+
}
57+
}
58+
match allocated_mib.read() {
59+
Ok(allocated) => MEMORY_ALLOCATED_BYTES.set(allocated as f64),
60+
Err(error) => {
61+
rate_limited_warn!(limit_per_min = 1, %error, "failed to read jemalloc stats.allocated");
62+
}
63+
}
64+
match resident_mib.read() {
65+
Ok(resident) => MEMORY_RESIDENT_BYTES.set(resident as f64),
66+
Err(error) => {
67+
rate_limited_warn!(limit_per_min = 1, %error, "failed to read jemalloc stats.resident");
68+
}
69+
}
5570
}
5671
}
5772

quickwit/quickwit-cli/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub mod index;
5050
#[cfg(feature = "jemalloc")]
5151
pub mod jemalloc;
5252
pub mod metrics;
53+
#[cfg(target_os = "linux")]
54+
pub mod proc_io;
5355
pub mod service;
5456
pub mod source;
5557
pub mod split;

quickwit/quickwit-cli/src/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use quickwit_cli::cli::{CliCommand, build_cli};
2121
#[cfg(feature = "jemalloc")]
2222
use quickwit_cli::jemalloc::start_jemalloc_metrics_loop;
2323
use quickwit_cli::metrics::register_build_info_metric;
24+
#[cfg(target_os = "linux")]
25+
use quickwit_cli::proc_io::start_proc_io_metrics_loop;
2426
use quickwit_cli::{busy_detector, install_default_crypto_ring_provider};
2527
use quickwit_common::runtimes::scrape_tokio_runtime_metrics;
2628
use quickwit_serve::{BuildInfo, EnvFilterReloadFn};
@@ -129,6 +131,9 @@ async fn main_impl() -> anyhow::Result<()> {
129131
#[cfg(feature = "jemalloc")]
130132
start_jemalloc_metrics_loop();
131133

134+
#[cfg(target_os = "linux")]
135+
start_proc_io_metrics_loop();
136+
132137
let return_code: i32 = if let Err(command_error) = command.execute(env_filter_reload_fn).await {
133138
error!(error=%command_error, "command failed");
134139
eprintln!(
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::Duration;
16+
17+
use procfs::ProcResult;
18+
use procfs::process::Process;
19+
use quickwit_common::metrics::{
20+
IO_READ_BYTES, IO_READ_SYSCALLS, IO_WRITE_BYTES, IO_WRITE_SYSCALLS,
21+
};
22+
use quickwit_common::rate_limited_tracing::rate_limited_warn;
23+
use quickwit_metrics::Counter;
24+
use tracing::error;
25+
26+
const PROC_IO_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(5);
27+
28+
/// Reads `/proc/self/io` on a fixed interval and publishes the cumulative byte and syscall
29+
/// counters as Prometheus counters.
30+
///
31+
/// `/proc/self/io` exposes monotonic per-process counters maintained by the kernel. Prometheus
32+
/// counters are also monotonic but only expose an `inc_by(delta)` API, so we keep the previously
33+
/// observed value and increment by the difference on each poll.
34+
async fn proc_io_metrics_loop() -> ProcResult<()> {
35+
let process = Process::myself()?;
36+
37+
let mut previous_read_bytes: u64 = 0;
38+
let mut previous_write_bytes: u64 = 0;
39+
let mut previous_read_syscalls: u64 = 0;
40+
let mut previous_write_syscalls: u64 = 0;
41+
42+
let mut poll_interval = tokio::time::interval(PROC_IO_METRICS_POLLING_INTERVAL);
43+
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
44+
45+
loop {
46+
poll_interval.tick().await;
47+
48+
let io = match process.io() {
49+
Ok(io) => io,
50+
Err(error) => {
51+
rate_limited_warn!(
52+
limit_per_min = 1,
53+
%error,
54+
"failed to read /proc/self/io"
55+
);
56+
continue;
57+
}
58+
};
59+
increment_counter(&IO_READ_BYTES, io.read_bytes, &mut previous_read_bytes);
60+
increment_counter(&IO_WRITE_BYTES, io.write_bytes, &mut previous_write_bytes);
61+
increment_counter(&IO_READ_SYSCALLS, io.syscr, &mut previous_read_syscalls);
62+
increment_counter(&IO_WRITE_SYSCALLS, io.syscw, &mut previous_write_syscalls);
63+
}
64+
}
65+
66+
fn increment_counter(counter: &Counter, current: u64, previous: &mut u64) {
67+
debug_assert!(
68+
current >= *previous,
69+
"/proc/self/io counters should be monotonic for a given PID"
70+
);
71+
let diff = current.saturating_sub(*previous);
72+
counter.inc_by(diff);
73+
*previous = current;
74+
}
75+
76+
pub fn start_proc_io_metrics_loop() {
77+
tokio::task::spawn(async {
78+
if let Err(error) = proc_io_metrics_loop().await {
79+
error!(%error, "failed to collect /proc/self/io metrics");
80+
}
81+
});
82+
}

quickwit/quickwit-common/src/metrics.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::sync::LazyLock;
1616

1717
pub use prometheus::{exponential_buckets, linear_buckets};
18-
use quickwit_metrics::{Gauge, LazyGauge, gauge, lazy_gauge};
18+
use quickwit_metrics::{Gauge, LazyCounter, LazyGauge, gauge, lazy_counter, lazy_gauge};
1919

2020
pub fn index_label(index_id: &str) -> &str {
2121
static PER_INDEX_METRICS_ENABLED: LazyLock<bool> =
@@ -106,6 +106,30 @@ fn in_flight_data_gauge(component: &'static str) -> Gauge {
106106
gauge!(parent: IN_FLIGHT_DATA_BYTES, "component" => component)
107107
}
108108

109+
pub static IO_READ_BYTES: LazyCounter = lazy_counter!(
110+
name: "read_bytes_total",
111+
description: "Cumulative bytes read from storage by the process, as reported by `/proc/self/io` `read_bytes`. Reflects block-layer I/O after page cache absorbs reads served from memory.",
112+
subsystem: "io",
113+
);
114+
115+
pub static IO_WRITE_BYTES: LazyCounter = lazy_counter!(
116+
name: "write_bytes_total",
117+
description: "Cumulative bytes written to storage by the process, as reported by `/proc/self/io` `write_bytes`. Reflects block-layer I/O after page cache coalescing.",
118+
subsystem: "io",
119+
);
120+
121+
pub static IO_READ_SYSCALLS: LazyCounter = lazy_counter!(
122+
name: "read_syscalls_total",
123+
description: "Cumulative number of read syscalls issued by the process, as reported by `/proc/self/io` `syscr`.",
124+
subsystem: "io",
125+
);
126+
127+
pub static IO_WRITE_SYSCALLS: LazyCounter = lazy_counter!(
128+
name: "write_syscalls_total",
129+
description: "Cumulative number of write syscalls issued by the process, as reported by `/proc/self/io` `syscw`.",
130+
subsystem: "io",
131+
);
132+
109133
#[cfg(test)]
110134
mod tests {
111135
use super::*;

quickwit/quickwit-indexing/src/actors/packager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ fn list_split_files(
189189
segment_metas: &[SegmentMeta],
190190
scratch_directory: &TempDirectory,
191191
) -> io::Result<Vec<PathBuf>> {
192-
let mut index_files = vec![scratch_directory.path().join("meta.json")];
192+
let mut split_files = vec![scratch_directory.path().join("meta.json")];
193193

194194
// list the segment files
195195
for segment_meta in segment_metas {
@@ -199,12 +199,12 @@ fn list_split_files(
199199
// If the file is missing, this is fine.
200200
// segment_meta.list_files() may actually returns files that
201201
// may not exist.
202-
index_files.push(filepath);
202+
split_files.push(filepath);
203203
}
204204
}
205205
}
206-
index_files.sort();
207-
Ok(index_files)
206+
split_files.sort();
207+
Ok(split_files)
208208
}
209209

210210
fn build_hotcache<W: io::Write>(split_path: &Path, out: &mut W) -> anyhow::Result<()> {

quickwit/quickwit-ingest/src/ingest_v2/metrics.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,24 @@ pub(super) static WAL_MEMORY_USED_BYTES: LazyGauge = lazy_gauge!(
140140
subsystem: "ingest",
141141
);
142142

143+
static WAL_BYTES_WRITTEN_TOTAL: LazyCounter = lazy_counter!(
144+
name: "wal_bytes_written_total",
145+
description: "Total number of bytes written to the WAL by write operations (create_queue, append_records, truncate_queue, delete_queue), including frame headers and end-of-block padding.",
146+
subsystem: "ingest",
147+
);
148+
149+
pub(crate) static WAL_BYTES_WRITTEN_CREATE_QUEUE: LazyCounter =
150+
lazy_counter!(parent: WAL_BYTES_WRITTEN_TOTAL, "operation" => "create_queue");
151+
152+
pub(crate) static WAL_BYTES_WRITTEN_DELETE_QUEUE: LazyCounter =
153+
lazy_counter!(parent: WAL_BYTES_WRITTEN_TOTAL, "operation" => "delete_queue");
154+
155+
pub(crate) static WAL_BYTES_WRITTEN_APPEND: LazyCounter =
156+
lazy_counter!(parent: WAL_BYTES_WRITTEN_TOTAL, "operation" => "append");
157+
158+
pub(crate) static WAL_BYTES_WRITTEN_TRUNCATE: LazyCounter =
159+
lazy_counter!(parent: WAL_BYTES_WRITTEN_TOTAL, "operation" => "truncate");
160+
143161
pub(super) fn report_wal_usage(wal_usage: ResourceUsage) {
144162
WAL_DISK_USED_BYTES.set(wal_usage.disk_used_bytes as f64);
145163
IN_FLIGHT_WAL.set(wal_usage.memory_allocated_bytes as f64);

0 commit comments

Comments
 (0)