Skip to content

Commit 12fc7c0

Browse files
committed
Track bytes written by WAL
1 parent 1f23c3c commit 12fc7c0

11 files changed

Lines changed: 101 additions & 40 deletions

File tree

quickwit/Cargo.lock

Lines changed: 3 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ md5 = "0.8"
164164
mime_guess = "2.0"
165165
mini-moka = "0.10.3"
166166
mockall = "0.14"
167-
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" }
167+
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "3b3562efea468a3a72a244b16d3c555db0ddc95b" }
168168
new_string_template = "1.5"
169169
nom = "8.0"
170170
numfmt = "1.2"

quickwit/quickwit-cli/src/jemalloc.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
use std::time::Duration;
1616

1717
use quickwit_common::metrics::MEMORY_METRICS;
18+
use quickwit_common::rate_limited_warn;
1819
use tikv_jemallocator::Jemalloc;
19-
use tracing::{error, warn};
20+
use tracing::error;
2021

2122
#[cfg(feature = "jemalloc-profiled")]
2223
#[global_allocator]
@@ -39,28 +40,32 @@ pub async fn jemalloc_metrics_loop() -> tikv_jemalloc_ctl::Result<()> {
3940
let resident_mib = tikv_jemalloc_ctl::stats::resident::mib()?;
4041

4142
let mut poll_interval = tokio::time::interval(JEMALLOC_METRICS_POLLING_INTERVAL);
43+
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
4244

4345
loop {
4446
poll_interval.tick().await;
4547

46-
// Many statistics are cached and only updated when the epoch is advanced. A transient
47-
// failure here would not be expected, but we log and retry on the next tick rather than
48-
// exiting the loop and leaving the metrics frozen forever.
4948
if let Err(error) = epoch_mib.advance() {
50-
warn!(%error, "failed to advance jemalloc epoch");
49+
rate_limited_warn!(limit_per_min = 1, %error, "failed to advance jemalloc epoch");
5150
continue;
5251
}
5352
match active_mib.read() {
5453
Ok(active) => memory_metrics.active_bytes.set(active as i64),
55-
Err(error) => warn!(%error, "failed to read jemalloc stats.active"),
54+
Err(error) => {
55+
rate_limited_warn!(limit_per_min = 1, %error, "failed to read jemalloc stats.active");
56+
}
5657
}
5758
match allocated_mib.read() {
5859
Ok(allocated) => memory_metrics.allocated_bytes.set(allocated as i64),
59-
Err(error) => warn!(%error, "failed to read jemalloc stats.allocated"),
60+
Err(error) => {
61+
rate_limited_warn!(limit_per_min = 1, %error, "failed to read jemalloc stats.allocated");
62+
}
6063
}
6164
match resident_mib.read() {
6265
Ok(resident) => memory_metrics.resident_bytes.set(resident as i64),
63-
Err(error) => warn!(%error, "failed to read jemalloc stats.resident"),
66+
Err(error) => {
67+
rate_limited_warn!(limit_per_min = 1, %error, "failed to read jemalloc stats.resident");
68+
}
6469
}
6570
}
6671
}

quickwit/quickwit-cli/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub mod jemalloc;
5252
pub mod logger;
5353
pub mod metrics;
5454
#[cfg(target_os = "linux")]
55-
pub mod procfs;
55+
pub mod proc_io;
5656
pub mod service;
5757
pub mod source;
5858
pub mod split;

quickwit/quickwit-cli/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use quickwit_cli::cli::{CliCommand, build_cli};
2424
use quickwit_cli::jemalloc::start_jemalloc_metrics_loop;
2525
use quickwit_cli::logger::setup_logging_and_tracing;
2626
#[cfg(target_os = "linux")]
27-
use quickwit_cli::procfs::start_proc_io_metrics_loop;
27+
use quickwit_cli::proc_io::start_proc_io_metrics_loop;
2828
use quickwit_cli::{busy_detector, install_default_crypto_ring_provider};
2929
use quickwit_common::runtimes::scrape_tokio_runtime_metrics;
3030
use quickwit_serve::BuildInfo;
Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use std::time::Duration;
1717
use procfs::ProcResult;
1818
use procfs::process::Process;
1919
use quickwit_common::metrics::{IO_METRICS, IntCounter};
20-
use tracing::{error, warn};
20+
use quickwit_common::rate_limited_tracing::rate_limited_warn;
21+
use tracing::error;
2122

2223
const PROC_IO_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(5);
2324

@@ -36,42 +37,50 @@ async fn proc_io_metrics_loop() -> ProcResult<()> {
3637
let mut previous_write_syscalls: u64 = 0;
3738

3839
let mut poll_interval = tokio::time::interval(PROC_IO_METRICS_POLLING_INTERVAL);
40+
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
3941

4042
loop {
4143
poll_interval.tick().await;
4244

4345
let io = match process.io() {
4446
Ok(io) => io,
4547
Err(error) => {
46-
warn!(%error, "failed to read /proc/self/io");
48+
rate_limited_warn!(
49+
limit_per_min = 1,
50+
%error,
51+
"failed to read /proc/self/io"
52+
);
4753
continue;
4854
}
4955
};
50-
51-
increment_counter_by_delta(
56+
increment_counter(
5257
&IO_METRICS.read_bytes_total,
5358
io.read_bytes,
5459
&mut previous_read_bytes,
5560
);
56-
increment_counter_by_delta(
61+
increment_counter(
5762
&IO_METRICS.write_bytes_total,
5863
io.write_bytes,
5964
&mut previous_write_bytes,
6065
);
61-
increment_counter_by_delta(
66+
increment_counter(
6267
&IO_METRICS.read_syscalls_total,
6368
io.syscr,
6469
&mut previous_read_syscalls,
6570
);
66-
increment_counter_by_delta(
71+
increment_counter(
6772
&IO_METRICS.write_syscalls_total,
6873
io.syscw,
6974
&mut previous_write_syscalls,
7075
);
7176
}
7277
}
7378

74-
fn increment_counter_by_delta(counter: &IntCounter, current: u64, previous: &mut u64) {
79+
fn increment_counter(counter: &IntCounter, current: u64, previous: &mut u64) {
80+
debug_assert!(
81+
current >= *previous,
82+
"/proc/self/io counters should be monotonic for a given PID"
83+
);
7584
if current >= *previous {
7685
counter.inc_by(current - *previous);
7786
}

quickwit/quickwit-common/src/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,15 +467,15 @@ impl Default for IoMetrics {
467467
"read_bytes_total",
468468
"Cumulative bytes read from storage by the process, as reported by \
469469
`/proc/self/io` `read_bytes`. Reflects block-layer I/O after page cache absorbs \
470-
reads served from RAM",
470+
reads served from memory",
471471
"io",
472472
&[],
473473
),
474474
write_bytes_total: new_counter(
475475
"write_bytes_total",
476476
"Cumulative bytes written to storage by the process, as reported by \
477477
`/proc/self/io` `write_bytes`. Reflects block-layer I/O after page cache \
478-
coalescing — what EBS `VolumeWriteOps` ultimately bills",
478+
coalescing",
479479
"io",
480480
&[],
481481
),

quickwit/quickwit-indexing/src/actors/packager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ fn list_split_files(
189189
segment_metas: &[SegmentMeta],
190190
scratch_directory: &TempDirectory,
191191
) -> io::Result<Vec<PathBuf>> {
192-
let mut index_files = vec![scratch_directory.path().join("meta.json")];
192+
let mut split_files = vec![scratch_directory.path().join("meta.json")];
193193

194194
// list the segment files
195195
for segment_meta in segment_metas {
@@ -199,12 +199,12 @@ fn list_split_files(
199199
// If the file is missing, this is fine.
200200
// segment_meta.list_files() may actually returns files that
201201
// may not exist.
202-
index_files.push(filepath);
202+
split_files.push(filepath);
203203
}
204204
}
205205
}
206-
index_files.sort();
207-
Ok(index_files)
206+
split_files.sort();
207+
Ok(split_files)
208208
}
209209

210210
fn build_hotcache<W: io::Write>(split_path: &Path, out: &mut W) -> anyhow::Result<()> {

quickwit/quickwit-ingest/src/ingest_v2/metrics.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl Default for IngestResultMetrics {
7272
}
7373
}
7474

75-
pub(super) struct IngestV2Metrics {
75+
pub(crate) struct IngestV2Metrics {
7676
pub reset_shards_operations_total: IntCounterVec<1>,
7777
pub open_shards: IntGauge,
7878
pub closed_shards: IntGauge,
@@ -83,6 +83,7 @@ pub(super) struct IngestV2Metrics {
8383
pub wal_lock_hold_duration_secs: HistogramVec<2>,
8484
pub wal_disk_used_bytes: IntGauge,
8585
pub wal_memory_used_bytes: IntGauge,
86+
pub wal_bytes_written_total: IntCounterVec<1>,
8687
pub ingest_results: IngestResultMetrics,
8788
pub ingest_attempts: IntCounterVec<1>,
8889
}
@@ -164,6 +165,15 @@ impl Default for IngestV2Metrics {
164165
"ingest",
165166
&[],
166167
),
168+
wal_bytes_written_total: new_counter_vec(
169+
"wal_bytes_written_total",
170+
"Total number of bytes written to the WAL by write operations (create_queue, \
171+
append_records, truncate_queue, delete_queue), including frame headers and \
172+
end-of-block padding.",
173+
"ingest",
174+
&[],
175+
["operation"],
176+
),
167177
}
168178
}
169179
}
@@ -181,5 +191,5 @@ pub(super) fn report_wal_usage(wal_usage: ResourceUsage) {
181191
.set(wal_usage.memory_used_bytes as i64);
182192
}
183193

184-
pub(super) static INGEST_V2_METRICS: LazyLock<IngestV2Metrics> =
194+
pub(crate) static INGEST_V2_METRICS: LazyLock<IngestV2Metrics> =
185195
LazyLock::new(IngestV2Metrics::default);

quickwit/quickwit-ingest/src/ingest_v2/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ mod fetch;
1919
mod helpers;
2020
mod idle;
2121
mod ingester;
22-
mod metrics;
22+
pub(crate) mod metrics;
2323
mod models;
2424
mod mrecord;
2525
mod mrecordlog_utils;

0 commit comments

Comments
 (0)