Skip to content

Commit 050a3b4

Browse files
authored
[metrics] Materialize all metrics tags into top level columns (#6237)
* feat: replace fixed MetricDataPoint fields with dynamic tag HashMap * feat: replace ParquetField enum with constants and dynamic validation * feat: derive sort order and bloom filters from batch schema * feat: union schema accumulation and schema-agnostic ingest validation * feat: dynamic column lookup in split writer * feat: remove ParquetSchema dependency from indexing actors * refactor: deduplicate test batch helpers * lint * address comments, fix linter * appease linter * increment rejected points for invalid types * make sure we forward a None batch to the packager, don't overwrite required fields in otel metrics * appease linter * fix edge cases * appease linter * reset workbench+scheduled commit timeout in all cases we send a batch to packager
1 parent cfb53e9 commit 050a3b4

File tree

23 files changed

+1532
-2185
lines changed

23 files changed

+1532
-2185
lines changed

quickwit/Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-indexing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ sqlx = { workspace = true, features = ["runtime-tokio", "postgres"] }
123123
tempfile = { workspace = true }
124124

125125
quickwit-actors = { workspace = true, features = ["testsuite"] }
126+
quickwit-parquet-engine = { workspace = true, features = ["testsuite"] }
126127
quickwit-cluster = { workspace = true, features = ["testsuite"] }
127128
quickwit-common = { workspace = true, features = ["testsuite"] }
128129
quickwit-config = { workspace = true, features = ["testsuite"] }

quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -637,10 +637,8 @@ impl IndexingPipeline {
637637
.spawn(parquet_uploader);
638638

639639
// ParquetPackager
640-
let parquet_schema = quickwit_parquet_engine::schema::ParquetSchema::new();
641640
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
642641
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
643-
parquet_schema,
644642
writer_config,
645643
self.params.indexing_directory.path(),
646644
);

quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs

Lines changed: 7 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use quickwit_common::rate_limited_tracing::rate_limited_warn;
2424
use quickwit_common::runtimes::RuntimeType;
2525
use quickwit_metastore::checkpoint::SourceCheckpointDelta;
2626
use quickwit_parquet_engine::ingest::{IngestError, ParquetIngestProcessor};
27-
use quickwit_parquet_engine::schema::ParquetSchema;
2827
use quickwit_proto::types::{IndexId, SourceId};
2928
use serde::Serialize;
3029
use tokio::runtime::Handle;
@@ -143,8 +142,7 @@ impl ParquetDocProcessor {
143142
source_id: SourceId,
144143
indexer_mailbox: Mailbox<ParquetIndexer>,
145144
) -> Self {
146-
let schema = ParquetSchema::new();
147-
let processor = ParquetIngestProcessor::new(schema);
145+
let processor = ParquetIngestProcessor;
148146
let counters = ParquetDocProcessorCounters::new(index_id.clone(), source_id.clone());
149147

150148
info!(
@@ -306,7 +304,7 @@ impl Handler<RawDocBatch> for ParquetDocProcessor {
306304
// forever.
307305
if !checkpoint_forwarded && !checkpoint_delta.is_empty() {
308306
let empty_batch =
309-
RecordBatch::new_empty(self.processor.schema().arrow_schema().clone());
307+
RecordBatch::new_empty(std::sync::Arc::new(arrow::datatypes::Schema::empty()));
310308
let processed_batch =
311309
ProcessedParquetBatch::new(empty_batch, checkpoint_delta, force_commit);
312310
ctx.send_message(&self.indexer_mailbox, processed_batch)
@@ -399,14 +397,8 @@ mod tests {
399397

400398
#[tokio::test]
401399
async fn test_metrics_doc_processor_valid_arrow_ipc() {
402-
use std::sync::Arc as StdArc;
400+
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;
403401

404-
use arrow::array::{
405-
ArrayRef, BinaryViewArray, DictionaryArray, Float64Array, Int32Array, StringArray,
406-
StructArray, UInt8Array, UInt64Array,
407-
};
408-
use arrow::datatypes::{DataType, Field, Int32Type};
409-
use arrow::record_batch::RecordBatch;
410402
let universe = Universe::with_accelerated_time();
411403

412404
let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
@@ -419,103 +411,7 @@ mod tests {
419411
let (metrics_doc_processor_mailbox, metrics_doc_processor_handle) =
420412
universe.spawn_builder().spawn(metrics_doc_processor);
421413

422-
// Create a test batch matching the metrics schema
423-
let schema = ParquetSchema::new();
424-
let num_rows = 3;
425-
426-
// Helper to create dictionary arrays
427-
fn create_dict_array(values: &[&str]) -> ArrayRef {
428-
let keys: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
429-
let string_array = StringArray::from(values.to_vec());
430-
StdArc::new(
431-
DictionaryArray::<Int32Type>::try_new(
432-
Int32Array::from(keys),
433-
StdArc::new(string_array),
434-
)
435-
.unwrap(),
436-
)
437-
}
438-
439-
fn create_nullable_dict_array(values: &[Option<&str>]) -> ArrayRef {
440-
let keys: Vec<Option<i32>> = values
441-
.iter()
442-
.enumerate()
443-
.map(|(i, v)| v.map(|_| i as i32))
444-
.collect();
445-
let string_values: Vec<&str> = values.iter().filter_map(|v| *v).collect();
446-
let string_array = StringArray::from(string_values);
447-
StdArc::new(
448-
DictionaryArray::<Int32Type>::try_new(
449-
Int32Array::from(keys),
450-
StdArc::new(string_array),
451-
)
452-
.unwrap(),
453-
)
454-
}
455-
456-
let metric_name: ArrayRef = create_dict_array(&vec!["cpu.usage"; num_rows]);
457-
let metric_type: ArrayRef = StdArc::new(UInt8Array::from(vec![0u8; num_rows]));
458-
let metric_unit: ArrayRef = StdArc::new(StringArray::from(vec![Some("bytes"); num_rows]));
459-
let timestamp_secs: ArrayRef = StdArc::new(UInt64Array::from(vec![100u64, 101u64, 102u64]));
460-
let start_timestamp_secs: ArrayRef =
461-
StdArc::new(UInt64Array::from(vec![None::<u64>; num_rows]));
462-
let value: ArrayRef = StdArc::new(Float64Array::from(vec![42.0, 43.0, 44.0]));
463-
let tag_service: ArrayRef = create_nullable_dict_array(&vec![Some("web"); num_rows]);
464-
let tag_env: ArrayRef = create_nullable_dict_array(&vec![Some("prod"); num_rows]);
465-
let tag_datacenter: ArrayRef =
466-
create_nullable_dict_array(&vec![Some("us-east-1"); num_rows]);
467-
let tag_region: ArrayRef = create_nullable_dict_array(&vec![None; num_rows]);
468-
let tag_host: ArrayRef = create_nullable_dict_array(&vec![Some("host-001"); num_rows]);
469-
470-
// Create empty Variant (Struct with metadata and value BinaryView fields)
471-
let metadata_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
472-
let value_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
473-
let attributes: ArrayRef = StdArc::new(StructArray::from(vec![
474-
(
475-
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
476-
metadata_array.clone() as ArrayRef,
477-
),
478-
(
479-
StdArc::new(Field::new("value", DataType::BinaryView, false)),
480-
value_array.clone() as ArrayRef,
481-
),
482-
]));
483-
484-
let service_name: ArrayRef = create_dict_array(&vec!["my-service"; num_rows]);
485-
486-
let resource_attributes: ArrayRef = StdArc::new(StructArray::from(vec![
487-
(
488-
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
489-
metadata_array as ArrayRef,
490-
),
491-
(
492-
StdArc::new(Field::new("value", DataType::BinaryView, false)),
493-
value_array as ArrayRef,
494-
),
495-
]));
496-
497-
let batch = RecordBatch::try_new(
498-
schema.arrow_schema().clone(),
499-
vec![
500-
metric_name,
501-
metric_type,
502-
metric_unit,
503-
timestamp_secs,
504-
start_timestamp_secs,
505-
value,
506-
tag_service,
507-
tag_env,
508-
tag_datacenter,
509-
tag_region,
510-
tag_host,
511-
attributes,
512-
service_name,
513-
resource_attributes,
514-
],
515-
)
516-
.unwrap();
517-
518-
// Serialize to Arrow IPC
414+
let batch = create_test_batch_with_tags(3, &["service"]);
519415
let ipc_bytes = record_batch_to_ipc(&batch).unwrap();
520416

521417
// Create RawDocBatch with the IPC bytes
@@ -624,13 +520,8 @@ mod tests {
624520
async fn test_metrics_doc_processor_with_indexer() {
625521
use std::sync::Arc as StdArc;
626522

627-
use arrow::array::{
628-
ArrayRef, BinaryViewArray, DictionaryArray, Float64Array, Int32Array, StringArray,
629-
StructArray, UInt8Array, UInt64Array,
630-
};
631-
use arrow::datatypes::{DataType, Field, Int32Type};
632-
use arrow::record_batch::RecordBatch;
633523
use quickwit_parquet_engine::storage::{ParquetSplitWriter, ParquetWriterConfig};
524+
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;
634525
use quickwit_proto::metastore::MockMetastoreService;
635526
use quickwit_storage::RamStorage;
636527

@@ -657,9 +548,8 @@ mod tests {
657548
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);
658549

659550
// Create ParquetPackager
660-
let parquet_schema = ParquetSchema::new();
661551
let writer_config = ParquetWriterConfig::default();
662-
let split_writer = ParquetSplitWriter::new(parquet_schema, writer_config, temp_dir.path());
552+
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path());
663553
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
664554
let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager);
665555

@@ -681,104 +571,7 @@ mod tests {
681571
let (metrics_doc_processor_mailbox, metrics_doc_processor_handle) =
682572
universe.spawn_builder().spawn(metrics_doc_processor);
683573

684-
// Create a test batch
685-
let schema = ParquetSchema::new();
686-
let num_rows = 5;
687-
688-
fn create_dict_array(values: &[&str]) -> ArrayRef {
689-
let keys: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
690-
let string_array = StringArray::from(values.to_vec());
691-
StdArc::new(
692-
DictionaryArray::<Int32Type>::try_new(
693-
Int32Array::from(keys),
694-
StdArc::new(string_array),
695-
)
696-
.unwrap(),
697-
)
698-
}
699-
700-
fn create_nullable_dict_array(values: &[Option<&str>]) -> ArrayRef {
701-
let keys: Vec<Option<i32>> = values
702-
.iter()
703-
.enumerate()
704-
.map(|(i, v)| v.map(|_| i as i32))
705-
.collect();
706-
let string_values: Vec<&str> = values.iter().filter_map(|v| *v).collect();
707-
let string_array = StringArray::from(string_values);
708-
StdArc::new(
709-
DictionaryArray::<Int32Type>::try_new(
710-
Int32Array::from(keys),
711-
StdArc::new(string_array),
712-
)
713-
.unwrap(),
714-
)
715-
}
716-
717-
let metric_name: ArrayRef = create_dict_array(&vec!["cpu.usage"; num_rows]);
718-
let metric_type: ArrayRef = StdArc::new(UInt8Array::from(vec![0u8; num_rows]));
719-
let metric_unit: ArrayRef = StdArc::new(StringArray::from(vec![Some("bytes"); num_rows]));
720-
let timestamps: Vec<u64> = (0..num_rows).map(|i| 100 + i as u64).collect();
721-
let timestamp_secs: ArrayRef = StdArc::new(UInt64Array::from(timestamps));
722-
let start_timestamp_secs: ArrayRef =
723-
StdArc::new(UInt64Array::from(vec![None::<u64>; num_rows]));
724-
let values: Vec<f64> = (0..num_rows).map(|i| 42.0 + i as f64).collect();
725-
let value: ArrayRef = StdArc::new(Float64Array::from(values));
726-
let tag_service: ArrayRef = create_nullable_dict_array(&vec![Some("web"); num_rows]);
727-
let tag_env: ArrayRef = create_nullable_dict_array(&vec![Some("prod"); num_rows]);
728-
let tag_datacenter: ArrayRef =
729-
create_nullable_dict_array(&vec![Some("us-east-1"); num_rows]);
730-
let tag_region: ArrayRef = create_nullable_dict_array(&vec![None; num_rows]);
731-
let tag_host: ArrayRef = create_nullable_dict_array(&vec![Some("host-001"); num_rows]);
732-
733-
// Create empty Variant (Struct with metadata and value BinaryView fields)
734-
let metadata_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
735-
let value_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
736-
let attributes: ArrayRef = StdArc::new(StructArray::from(vec![
737-
(
738-
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
739-
metadata_array.clone() as ArrayRef,
740-
),
741-
(
742-
StdArc::new(Field::new("value", DataType::BinaryView, false)),
743-
value_array.clone() as ArrayRef,
744-
),
745-
]));
746-
747-
let service_name: ArrayRef = create_dict_array(&vec!["my-service"; num_rows]);
748-
749-
let resource_attributes: ArrayRef = StdArc::new(StructArray::from(vec![
750-
(
751-
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
752-
metadata_array as ArrayRef,
753-
),
754-
(
755-
StdArc::new(Field::new("value", DataType::BinaryView, false)),
756-
value_array as ArrayRef,
757-
),
758-
]));
759-
760-
let batch = RecordBatch::try_new(
761-
schema.arrow_schema().clone(),
762-
vec![
763-
metric_name,
764-
metric_type,
765-
metric_unit,
766-
timestamp_secs,
767-
start_timestamp_secs,
768-
value,
769-
tag_service,
770-
tag_env,
771-
tag_datacenter,
772-
tag_region,
773-
tag_host,
774-
attributes,
775-
service_name,
776-
resource_attributes,
777-
],
778-
)
779-
.unwrap();
780-
781-
// Serialize to Arrow IPC
574+
let batch = create_test_batch_with_tags(5, &["service"]);
782575
let ipc_bytes = record_batch_to_ipc(&batch).unwrap();
783576

784577
// Create RawDocBatch with force_commit to trigger split production

0 commit comments

Comments
 (0)