Skip to content

Commit 0101dba

Browse files
committed
Merge latest main into metrics migration
2 parents 29dd754 + 516b8d6 commit 0101dba

28 files changed

Lines changed: 2407 additions & 462 deletions

File tree

quickwit/Cargo.lock

Lines changed: 12 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ rdkafka = { version = "0.39", default-features = false, features = [
228228
"zstd",
229229
] }
230230
regex = "1.12"
231+
regex-automata = "0.4"
231232
regex-syntax = "0.8"
232233
reqwest = { version = "0.12", default-features = false, features = [
233234
"json",
@@ -403,7 +404,7 @@ quickwit-serve = { path = "quickwit-serve" }
403404
quickwit-storage = { path = "quickwit-storage" }
404405
quickwit-telemetry = { path = "quickwit-telemetry" }
405406

406-
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "4480cf0", default-features = false, features = [
407+
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "edfb02b", default-features = false, features = [
407408
"lz4-compression",
408409
"mmap",
409410
"quickwit",

quickwit/quickwit-common/src/rate_limited_tracing.rs

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,16 @@ use std::sync::atomic::{AtomicU64, Ordering};
1919

2020
use coarsetime::{Duration, Instant};
2121

22+
#[derive(Debug, PartialEq)]
23+
pub enum ShouldLog {
24+
/// Emit the log normally, within the rate limit.
25+
Yes,
26+
/// Emit the log; `N` similar messages were suppressed since the last emission.
27+
YesAfterSuppression(u32),
28+
/// Suppressed — do not emit.
29+
No,
30+
}
31+
2232
/// Metadata for a log site. This is stored inside a single AtomicU64 when not in use.
2333
///
2434
/// `call_count` is the number of calls since the last upgrade of generation, it's stored
@@ -54,7 +64,7 @@ pub fn should_log<F: Fn() -> Instant>(
5464
last_reset_atomic: &AtomicU64,
5565
limit: u32,
5666
now: F,
57-
) -> bool {
67+
) -> ShouldLog {
5868
// count_atomic is treated as 2 u32: upper bits count "generation", lower bits count number of
5969
// calls since LAST_RESET. We assume there won't be 2**32 calls to this log in ~60s.
6070
// Generation is free to wrap around.
@@ -73,7 +83,7 @@ pub fn should_log<F: Fn() -> Instant>(
7383
} = logsite_meta_u64.into();
7484

7585
if call_count < limit {
76-
return true;
86+
return ShouldLog::Yes;
7787
}
7888

7989
let current_time = Duration::from_ticks(now().as_ticks());
@@ -83,7 +93,7 @@ pub fn should_log<F: Fn() -> Instant>(
8393

8494
if !should_reset {
8595
// we are over-limit and not far enough in time to reset: don't log
86-
return false;
96+
return ShouldLog::No;
8797
}
8898

8999
let mut update_time = false;
@@ -123,7 +133,20 @@ pub fn should_log<F: Fn() -> Instant>(
123133
// *we* updated generation, so we must update last_reset too
124134
last_reset_atomic.store(current_time.as_ticks(), Ordering::Release);
125135
}
126-
can_log
136+
137+
if !can_log {
138+
return ShouldLog::No;
139+
}
140+
// call_count is the pre-fetch_add value, equal to the number of prior calls in this period.
141+
// Of those, `limit` were allowed through; the rest were suppressed.
142+
// We only report when *we* did the reset (update_time), to avoid double-reporting.
143+
if update_time {
144+
let skipped = call_count.saturating_sub(limit);
145+
if skipped > 0 {
146+
return ShouldLog::YesAfterSuppression(skipped);
147+
}
148+
}
149+
ShouldLog::Yes
127150
}
128151

129152
#[macro_export]
@@ -136,8 +159,15 @@ macro_rules! rate_limited_tracing {
136159
// we can't get time from constant context, so we pre-initialize with zero
137160
static LAST_RESET: AtomicU64 = AtomicU64::new(0);
138161

139-
if $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now) {
140-
::tracing::$log_fn!($($args)*);
162+
match $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now) {
163+
$crate::rate_limited_tracing::ShouldLog::No => {}
164+
$crate::rate_limited_tracing::ShouldLog::Yes => {
165+
::tracing::$log_fn!($($args)*);
166+
}
167+
$crate::rate_limited_tracing::ShouldLog::YesAfterSuppression(skipped) => {
168+
::tracing::$log_fn!("suppressed {skipped} similar log messages in the last minute");
169+
::tracing::$log_fn!($($args)*);
170+
}
141171
}
142172
}};
143173
}
@@ -193,7 +223,7 @@ mod tests {
193223

194224
use coarsetime::{Duration, Instant};
195225

196-
use super::should_log;
226+
use super::{ShouldLog, should_log};
197227

198228
// TODO as this is atomic code, we should test it with multiple threads to verify it behaves
199229
// like we'd expect, maybe using something like `loom`?
@@ -207,30 +237,31 @@ mod tests {
207237
let mut simulated_time = Instant::now();
208238
let simulation_step = Duration::from_secs(1);
209239

210-
assert!(should_log(&count, &last_reset, limit as _, || {
211-
simulated_time
212-
}));
240+
assert_eq!(
241+
should_log(&count, &last_reset, limit as _, || simulated_time),
242+
ShouldLog::Yes
243+
);
213244
assert_eq!(count.load(Ordering::Relaxed), 1);
214245
let reset_timestamp = last_reset.load(Ordering::Relaxed);
215246
assert_ne!(reset_timestamp, 0);
216247

217248
simulated_time += simulation_step;
218249

219250
for i in 1..limit {
220-
// we log as many time as expected
221-
assert!(should_log(&count, &last_reset, limit as _, || {
222-
simulated_time
223-
}));
251+
assert_eq!(
252+
should_log(&count, &last_reset, limit as _, || simulated_time),
253+
ShouldLog::Yes
254+
);
224255
assert_eq!(count.load(Ordering::Relaxed), i + 1);
225256
assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);
226257
simulated_time += simulation_step;
227258
}
228259

229260
for i in limit..(limit * 2) {
230-
// we don't log, nor update
231-
assert!(!should_log(&count, &last_reset, limit as _, || {
232-
simulated_time
233-
}));
261+
assert_eq!(
262+
should_log(&count, &last_reset, limit as _, || simulated_time),
263+
ShouldLog::No
264+
);
234265
assert_eq!(count.load(Ordering::Relaxed), i + 1);
235266
assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);
236267
simulated_time += simulation_step;
@@ -239,30 +270,32 @@ mod tests {
239270
// advance enough to reset counter
240271
simulated_time += simulation_step * 60;
241272

242-
assert!(should_log(&count, &last_reset, limit as _, || {
243-
simulated_time
244-
}));
273+
// the first log after a reset reports how many were suppressed
274+
assert_eq!(
275+
should_log(&count, &last_reset, limit as _, || simulated_time),
276+
ShouldLog::YesAfterSuppression(limit as u32)
277+
);
245278
// counter got reset, generation increased
246279
assert_eq!(count.load(Ordering::Relaxed), 1 + (1 << 32));
247280
// last reset changed too
248281
assert_ne!(last_reset.load(Ordering::Relaxed), reset_timestamp);
249282
let reset_timestamp = last_reset.load(Ordering::Relaxed);
250283

251284
for i in 1..limit {
252-
// we log as many time as expected
253-
assert!(should_log(&count, &last_reset, limit as _, || {
254-
simulated_time
255-
}));
285+
assert_eq!(
286+
should_log(&count, &last_reset, limit as _, || simulated_time),
287+
ShouldLog::Yes
288+
);
256289
assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32));
257290
assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);
258291
simulated_time += simulation_step;
259292
}
260293

261294
for i in limit..(limit * 2) {
262-
// we don't log, nor update
263-
assert!(!should_log(&count, &last_reset, limit as _, || {
264-
simulated_time
265-
}));
295+
assert_eq!(
296+
should_log(&count, &last_reset, limit as _, || simulated_time),
297+
ShouldLog::No
298+
);
266299
assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32));
267300
assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);
268301
simulated_time += simulation_step;

quickwit/quickwit-datafusion/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ async-trait = { workspace = true }
1515
bytes = { workspace = true }
1616
chrono = { workspace = true }
1717
futures = { workspace = true }
18+
mini-moka = { workspace = true }
1819
prost = { workspace = true }
20+
regex = { workspace = true }
21+
regex-automata = { workspace = true }
1922
serde_json = { workspace = true }
2023
tokio = { workspace = true }
2124
tonic = { workspace = true }

0 commit comments

Comments
 (0)