Skip to content

Commit 9383993

Browse files
fulmicotonguilload
andauthored
Added metrics tracking the memory usage in the index writer. (#4724)
* Added metrics tracking the memory usage in the index writer. * Fix fmt --------- Co-authored-by: Adrien Guillo <adrien@quickwit.io>
1 parent fd497bd commit 9383993

10 files changed

Lines changed: 96 additions & 116 deletions

File tree

quickwit/quickwit-common/src/metrics.rs

Lines changed: 35 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,19 @@ pub struct GaugeGuard {
173173
delta: i64,
174174
}
175175

176+
impl std::fmt::Debug for GaugeGuard {
177+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
178+
self.delta.fmt(f)
179+
}
180+
}
181+
176182
impl GaugeGuard {
177-
pub fn from_gauge(gauge: &'static IntGauge, delta: i64) -> Self {
178-
gauge.add(delta);
183+
pub fn from_gauge(gauge: &'static IntGauge) -> Self {
184+
Self { gauge, delta: 0i64 }
185+
}
179186

180-
Self { gauge, delta }
187+
pub fn get(&self) -> i64 {
188+
self.delta
181189
}
182190

183191
pub fn add(&mut self, delta: i64) {
@@ -245,46 +253,34 @@ impl Default for MemoryMetrics {
245253
#[derive(Clone)]
246254
pub struct InFlightDataGauges {
247255
pub doc_processor_mailbox: IntGauge,
256+
pub index_writer: IntGauge,
248257
pub indexer_mailbox: IntGauge,
249258
pub ingest_router: IntGauge,
250259
pub rest_server: IntGauge,
251260
pub sources: InFlightDataSourceGauges,
252261
}
253262

254-
const IN_FLIGHT_DATA_GAUGES_HELP: &str = "Amount of data in-flight in various buffers in bytes.";
255-
256263
impl Default for InFlightDataGauges {
257264
fn default() -> Self {
265+
let in_flight_gauge_vec = new_gauge_vec(
266+
"in_flight_data_bytes",
267+
"Amount of data in-flight in various buffers in bytes.",
268+
"memory",
269+
&[],
270+
["component"],
271+
);
258272
Self {
259-
doc_processor_mailbox: new_gauge(
260-
"in_flight_data_bytes",
261-
IN_FLIGHT_DATA_GAUGES_HELP,
262-
"memory",
263-
&[("component", "doc_processor_mailbox")],
264-
),
265-
indexer_mailbox: new_gauge(
266-
"in_flight_data_bytes",
267-
IN_FLIGHT_DATA_GAUGES_HELP,
268-
"memory",
269-
&[("component", "indexer_mailbox")],
270-
),
271-
ingest_router: new_gauge(
272-
"in_flight_data_bytes",
273-
IN_FLIGHT_DATA_GAUGES_HELP,
274-
"memory",
275-
&[("component", "ingest_router")],
276-
),
277-
rest_server: new_gauge(
278-
"in_flight_data_bytes",
279-
IN_FLIGHT_DATA_GAUGES_HELP,
280-
"memory",
281-
&[("component", "rest_server")],
282-
),
283-
sources: InFlightDataSourceGauges::default(),
273+
doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]),
274+
index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]),
275+
indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]),
276+
ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]),
277+
rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]),
278+
sources: InFlightDataSourceGauges::new(&in_flight_gauge_vec),
284279
}
285280
}
286281
}
287282

283+
/// TODO make those lazy.
288284
#[derive(Clone)]
289285
pub struct InFlightDataSourceGauges {
290286
pub file: IntGauge,
@@ -296,51 +292,16 @@ pub struct InFlightDataSourceGauges {
296292
pub other: IntGauge,
297293
}
298294

299-
impl Default for InFlightDataSourceGauges {
300-
fn default() -> Self {
295+
impl InFlightDataSourceGauges {
296+
pub fn new(in_flight_gauge_vec: &IntGaugeVec<1>) -> Self {
301297
Self {
302-
file: new_gauge(
303-
"in_flight_data_bytes",
304-
IN_FLIGHT_DATA_GAUGES_HELP,
305-
"memory",
306-
&[("component", "file_source")],
307-
),
308-
ingest: new_gauge(
309-
"in_flight_data_bytes",
310-
IN_FLIGHT_DATA_GAUGES_HELP,
311-
"memory",
312-
&[("component", "ingest_source")],
313-
),
314-
kafka: new_gauge(
315-
"in_flight_data_bytes",
316-
IN_FLIGHT_DATA_GAUGES_HELP,
317-
"memory",
318-
&[("component", "kafka_source")],
319-
),
320-
kinesis: new_gauge(
321-
"in_flight_data_bytes",
322-
IN_FLIGHT_DATA_GAUGES_HELP,
323-
"memory",
324-
&[("component", "kinesis_source")],
325-
),
326-
pubsub: new_gauge(
327-
"in_flight_data_bytes",
328-
IN_FLIGHT_DATA_GAUGES_HELP,
329-
"memory",
330-
&[("component", "pubsub_source")],
331-
),
332-
pulsar: new_gauge(
333-
"in_flight_data_bytes",
334-
IN_FLIGHT_DATA_GAUGES_HELP,
335-
"memory",
336-
&[("component", "pulsar")],
337-
),
338-
other: new_gauge(
339-
"in_flight_data_bytes",
340-
IN_FLIGHT_DATA_GAUGES_HELP,
341-
"memory",
342-
&[("component", "other")],
343-
),
298+
file: in_flight_gauge_vec.with_label_values(["file_source"]),
299+
ingest: in_flight_gauge_vec.with_label_values(["ingest_source"]),
300+
kafka: in_flight_gauge_vec.with_label_values(["kafka_source"]),
301+
kinesis: in_flight_gauge_vec.with_label_values(["kinesis_source"]),
302+
pubsub: in_flight_gauge_vec.with_label_values(["pubsub_source"]),
303+
pulsar: in_flight_gauge_vec.with_label_values(["pulsar_source"]),
304+
other: in_flight_gauge_vec.with_label_values(["other"]),
344305
}
345306
}
346307
}

quickwit/quickwit-control-plane/src/control_plane.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use quickwit_proto::metastore::{
5555
};
5656
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
5757
use serde::Serialize;
58-
use tracing::{error, info};
58+
use tracing::{debug, error, info};
5959

6060
use crate::debouncer::Debouncer;
6161
use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState};
@@ -513,11 +513,12 @@ impl Handler<DeleteIndexRequest> for ControlPlane {
513513
ctx: &ActorContext<Self>,
514514
) -> Result<Self::Reply, ActorExitStatus> {
515515
let index_uid: IndexUid = request.index_uid().clone();
516-
info!(index=%index_uid, "delete index");
516+
debug!(%index_uid, "deleting index");
517517

518518
if let Err(metastore_error) = self.metastore.delete_index(request).await {
519519
return convert_metastore_error(metastore_error);
520520
};
521+
info!(%index_uid, "deleted index");
521522

522523
let ingester_needing_resync: BTreeSet<NodeId> = self
523524
.model
@@ -558,16 +559,18 @@ impl Handler<AddSourceRequest> for ControlPlane {
558559
return Ok(Err(ControlPlaneError::from(error)));
559560
}
560561
};
561-
info!(index=%index_uid, source_config=?source_config, "add source");
562+
let source_id = source_config.source_id.clone();
563+
debug!(%index_uid, source_id, "adding source");
562564

563565
if let Err(error) = self.metastore.add_source(request).await {
564566
return Ok(Err(ControlPlaneError::from(error)));
565567
};
566-
567568
self.model
568569
.add_source(&index_uid, source_config)
569570
.context("failed to add source")?;
570571

572+
info!(%index_uid, source_id, "added source");
573+
571574
// TODO: Refine the event. Notify index will have the effect to reload the entire state from
572575
// the metastore. We should update the state of the control plane.
573576
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
@@ -591,12 +594,13 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
591594
let index_uid: IndexUid = request.index_uid().clone();
592595
let source_id = request.source_id.clone();
593596
let enable = request.enable;
594-
595-
info!(index=%index_uid, source_id=%source_id, enable=enable, "toggle source");
597+
debug!(%index_uid, source_id, enable, "toggling source");
596598

597599
if let Err(error) = self.metastore.toggle_source(request).await {
598600
return Ok(Err(ControlPlaneError::from(error)));
599601
};
602+
info!(%index_uid, source_id, enabled=enable, "toggled source");
603+
600604
let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?;
601605

602606
if mutation_occured {

quickwit/quickwit-indexing/src/actors/indexer.rs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use quickwit_actors::{
3333
Actor, ActorContext, ActorExitStatus, Command, Handler, Mailbox, QueueCapacity,
3434
};
3535
use quickwit_common::io::IoControls;
36+
use quickwit_common::metrics::GaugeGuard;
3637
use quickwit_common::runtimes::RuntimeType;
3738
use quickwit_common::temp_dir::TempDirectory;
3839
use quickwit_config::IndexingSettings;
@@ -151,10 +152,10 @@ impl IndexerState {
151152
other_split_opt: &'a mut Option<IndexedSplitBuilder>,
152153
counter: &'a mut IndexerCounters,
153154
ctx: &ActorContext<Indexer>,
154-
) -> anyhow::Result<&'a mut IndexedSplitBuilder> {
155+
) -> anyhow::Result<(&'a mut IndexedSplitBuilder, bool)> {
155156
let num_splits = splits.len();
156157
match splits.entry(partition_id) {
157-
Entry::Occupied(indexed_split) => Ok(indexed_split.into_mut()),
158+
Entry::Occupied(indexed_split) => Ok((indexed_split.into_mut(), false)),
158159
Entry::Vacant(vacant_entry) => {
159160
if num_splits as u32 >= self.max_num_partitions.get() {
160161
// In order to avoid exceeding max_num_partitions, we map the document to the
@@ -172,11 +173,11 @@ impl IndexerState {
172173
)?;
173174
*other_split_opt = Some(new_other_split);
174175
}
175-
Ok(other_split_opt.as_mut().unwrap())
176+
Ok((other_split_opt.as_mut().unwrap(), true))
176177
} else {
177178
let indexed_split =
178179
self.create_indexed_split_builder(partition_id, last_delete_opstamp, ctx)?;
179-
Ok(vacant_entry.insert(indexed_split))
180+
Ok((vacant_entry.insert(indexed_split), true))
180181
}
181182
}
182183
}
@@ -236,7 +237,11 @@ impl IndexerState {
236237
publish_lock,
237238
publish_token_opt,
238239
last_delete_opstamp,
239-
memory_usage: ByteSize(0),
240+
memory_usage: GaugeGuard::from_gauge(
241+
&quickwit_common::metrics::MEMORY_METRICS
242+
.in_flight_data
243+
.index_writer,
244+
),
240245
};
241246
Ok(workbench)
242247
}
@@ -294,7 +299,7 @@ impl IndexerState {
294299
.source_delta
295300
.extend(batch.checkpoint_delta)
296301
.context("batch delta does not follow indexer checkpoint")?;
297-
let mut memory_usage_delta: u64 = 0;
302+
let mut memory_usage_delta: i64 = 0;
298303
counters.num_doc_batches_in_workbench += 1;
299304
for doc in batch.docs {
300305
let ProcessedDoc {
@@ -304,7 +309,7 @@ impl IndexerState {
304309
num_bytes,
305310
} = doc;
306311
counters.num_docs_in_workbench += 1;
307-
let indexed_split: &mut IndexedSplitBuilder = self.get_or_create_indexed_split(
312+
let (indexed_split, split_created) = self.get_or_create_indexed_split(
308313
partition,
309314
*last_delete_opstamp,
310315
indexed_splits,
@@ -313,6 +318,11 @@ impl IndexerState {
313318
ctx,
314319
)?;
315320
let mem_usage_before = indexed_split.index_writer.mem_usage() as u64;
321+
if split_created {
322+
// The split was just created. We need to account for the initial index writer's
323+
// memory usage.
324+
memory_usage_delta += mem_usage_before as i64;
325+
}
316326
indexed_split.split_attrs.uncompressed_docs_size_in_bytes += num_bytes as u64;
317327
indexed_split.split_attrs.num_docs += 1;
318328
if let Some(timestamp) = timestamp_opt {
@@ -324,10 +334,10 @@ impl IndexerState {
324334
.add_document(doc)
325335
.context("failed to add document")?;
326336
let mem_usage_after = indexed_split.index_writer.mem_usage() as u64;
327-
memory_usage_delta += mem_usage_after - mem_usage_before;
337+
memory_usage_delta += mem_usage_after as i64 - mem_usage_before as i64;
328338
ctx.record_progress();
329339
}
330-
*memory_usage = ByteSize(memory_usage.as_u64() + memory_usage_delta);
340+
memory_usage.add(memory_usage_delta);
331341
Ok(())
332342
}
333343
}
@@ -353,7 +363,7 @@ struct IndexingWorkbench {
353363
// We use this value to set the `delete_opstamp` of the workbench splits.
354364
last_delete_opstamp: u64,
355365
// Number of bytes declared as used by tantivy.
356-
memory_usage: ByteSize,
366+
memory_usage: GaugeGuard,
357367
}
358368

359369
pub struct Indexer {
@@ -570,9 +580,9 @@ impl Indexer {
570580

571581
fn memory_usage(&self) -> ByteSize {
572582
if let Some(workbench) = &self.indexing_workbench_opt {
573-
workbench.memory_usage
583+
ByteSize(workbench.memory_usage.get() as u64)
574584
} else {
575-
ByteSize(0)
585+
ByteSize(0u64)
576586
}
577587
}
578588

@@ -591,7 +601,8 @@ impl Indexer {
591601
ctx,
592602
)
593603
.await?;
594-
if self.memory_usage() >= self.indexer_state.indexing_settings.resources.heap_size {
604+
let memory_usage = self.memory_usage();
605+
if memory_usage >= self.indexer_state.indexing_settings.resources.heap_size {
595606
self.send_to_serializer(CommitTrigger::MemoryLimit, ctx)
596607
.await?;
597608
}
@@ -623,6 +634,7 @@ impl Indexer {
623634
publish_token_opt,
624635
batch_parent_span,
625636
indexing_permit,
637+
memory_usage,
626638
..
627639
}) = self.indexing_workbench_opt.take()
628640
else {
@@ -674,6 +686,7 @@ impl Indexer {
674686
publish_token_opt,
675687
commit_trigger,
676688
batch_parent_span,
689+
memory_usage,
677690
},
678691
)
679692
.await?;
@@ -883,7 +896,7 @@ mod tests {
883896
let body_field = schema.get_field("body").unwrap();
884897
let indexing_directory = TempDirectory::for_test();
885898
let mut indexing_settings = IndexingSettings::for_test();
886-
indexing_settings.resources.heap_size = ByteSize::mb(5);
899+
indexing_settings.resources.heap_size = ByteSize::mb(16);
887900
let (index_serializer_mailbox, index_serializer_inbox) = universe.create_test_mailbox();
888901
let mut metastore = MetastoreServiceClient::mock();
889902
metastore.expect_publish_splits().never();
@@ -1214,7 +1227,8 @@ mod tests {
12141227
let body_field = schema.get_field("body").unwrap();
12151228

12161229
let indexing_directory = TempDirectory::for_test();
1217-
let indexing_settings = IndexingSettings::for_test();
1230+
let mut indexing_settings = IndexingSettings::for_test();
1231+
indexing_settings.resources.heap_size = ByteSize::mb(100);
12181232
let (index_serializer_mailbox, index_serializer_inbox) = universe.create_test_mailbox();
12191233
let mut metastore = MetastoreServiceClient::mock();
12201234
metastore.expect_publish_splits().never();
@@ -1310,7 +1324,8 @@ mod tests {
13101324
Arc::new(serde_json::from_str::<DefaultDocMapper>(DOCMAPPER_SIMPLE_JSON).unwrap());
13111325
let body_field = doc_mapper.schema().get_field("body").unwrap();
13121326
let indexing_directory = TempDirectory::for_test();
1313-
let indexing_settings = IndexingSettings::for_test();
1327+
let mut indexing_settings = IndexingSettings::for_test();
1328+
indexing_settings.resources.heap_size = ByteSize::gb(5);
13141329
let mut metastore = MetastoreServiceClient::mock();
13151330
metastore
13161331
.expect_last_delete_opstamp()

quickwit/quickwit-indexing/src/models/indexed_split.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::fmt;
2121
use std::path::Path;
2222

2323
use quickwit_common::io::IoControls;
24+
use quickwit_common::metrics::GaugeGuard;
2425
use quickwit_common::temp_dir::TempDirectory;
2526
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
2627
use quickwit_proto::indexing::IndexingPipelineId;
@@ -183,6 +184,7 @@ pub struct IndexedSplitBatchBuilder {
183184
pub publish_token_opt: Option<PublishToken>,
184185
pub commit_trigger: CommitTrigger,
185186
pub batch_parent_span: Span,
187+
pub memory_usage: GaugeGuard,
186188
}
187189

188190
/// Sends notifications to the Publisher that the last batch of splits was emtpy.

0 commit comments

Comments
 (0)