Skip to content

Commit 4a1dc31

Browse files
Merge branch 'main' into bianchi/sorted-series
2 parents 9283ba5 + 6702d41 commit 4a1dc31

8 files changed

Lines changed: 424 additions & 72 deletions

File tree

quickwit/quickwit-config/src/storage_config.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,10 @@ pub struct S3StorageConfig {
331331
pub disable_multipart_upload: bool,
332332
#[serde(default)]
333333
pub disable_checksums: bool,
334+
#[serde(default)]
335+
pub disable_stalled_stream_protection_upload: bool,
336+
#[serde(default)]
337+
pub disable_stalled_stream_protection_download: bool,
334338
}
335339

336340
impl S3StorageConfig {
@@ -399,6 +403,16 @@ impl fmt::Debug for S3StorageConfig {
399403
"disable_multi_object_delete",
400404
&self.disable_multi_object_delete,
401405
)
406+
.field("disable_multipart_upload", &self.disable_multipart_upload)
407+
.field("disable_checksums", &self.disable_checksums)
408+
.field(
409+
"disable_stalled_stream_protection_upload",
410+
&self.disable_stalled_stream_protection_upload,
411+
)
412+
.field(
413+
"disable_stalled_stream_protection_download",
414+
&self.disable_stalled_stream_protection_download,
415+
)
402416
.finish()
403417
}
404418
}
@@ -634,6 +648,8 @@ mod tests {
634648
disable_multi_object_delete_requests: true
635649
disable_multipart_upload: true
636650
disable_checksums: true
651+
disable_stalled_stream_protection_upload: true
652+
disable_stalled_stream_protection_download: true
637653
"#;
638654
let s3_storage_config: S3StorageConfig =
639655
serde_yaml::from_str(s3_storage_config_yaml).unwrap();
@@ -645,6 +661,8 @@ mod tests {
645661
disable_multi_object_delete: true,
646662
disable_multipart_upload: true,
647663
disable_checksums: true,
664+
disable_stalled_stream_protection_upload: true,
665+
disable_stalled_stream_protection_download: true,
648666
..Default::default()
649667
};
650668
assert_eq!(s3_storage_config, expected_s3_config);

quickwit/quickwit-doc-mapper/src/query_builder.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,13 +394,16 @@ impl<'a, 'b: 'a> QueryAstVisitor<'a> for ExtractPrefixTermRanges<'b> {
394394
}
395395

396396
fn visit_regex(&mut self, regex_query: &'a RegexQuery) -> Result<(), Self::Err> {
397-
let (field, path, regex) = match regex_query.to_field_and_regex(self.schema) {
397+
let resolved = match regex_query.to_resolved(self.schema, Some(self.tokenizer_manager)) {
398398
Ok(res) => res,
399399
/* the query will be nullified when casting to a tantivy ast */
400400
Err(InvalidQuery::FieldDoesNotExist { .. }) => return Ok(()),
401401
Err(e) => return Err(e),
402402
};
403-
self.add_automaton(field, Automaton::Regex(path, regex));
403+
self.add_automaton(
404+
resolved.field,
405+
Automaton::Regex(resolved.json_path, resolved.regex),
406+
);
404407
Ok(())
405408
}
406409
}

quickwit/quickwit-opentelemetry/src/otlp/arrow_metrics.rs

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use quickwit_proto::types::DocUid;
3737

3838
use super::otel_metrics::{MetricDataPoint, MetricType};
3939

40+
const COMPUTED_TIMESERIES_ID_FIELD: &str = "timeseries_id";
41+
4042
/// Builder for creating Arrow RecordBatch from MetricDataPoints.
4143
///
4244
/// Accumulates data points and discovers the schema dynamically at `finish()`
@@ -70,7 +72,12 @@ impl ArrowMetricsBatchBuilder {
7072
let mut tag_keys: BTreeSet<&str> = BTreeSet::new();
7173
for dp in &self.data_points {
7274
for key in dp.tags.keys() {
73-
tag_keys.insert(key.as_str());
75+
// Exclude any user-provided tag named "timeseries_id" to prevent collision with the
76+
// computed column. TODO: if user sets "timeseries_id" as a tag, we
77+
// are excluding it.
78+
if key != COMPUTED_TIMESERIES_ID_FIELD {
79+
tag_keys.insert(key.as_str());
80+
}
7481
}
7582
}
7683
let sorted_tag_keys: Vec<&str> = tag_keys.into_iter().collect();
@@ -86,9 +93,11 @@ impl ArrowMetricsBatchBuilder {
8693
fields.push(Field::new("metric_type", DataType::UInt8, false));
8794
fields.push(Field::new("timestamp_secs", DataType::UInt64, false));
8895
fields.push(Field::new("value", DataType::Float64, false));
89-
// TODO: customer could submit a timeseries_id tag, and I don't think we want to explicitly
90-
// reserve it.
91-
fields.push(Field::new("timeseries_id", DataType::Int64, false));
96+
fields.push(Field::new(
97+
COMPUTED_TIMESERIES_ID_FIELD,
98+
DataType::Int64,
99+
false,
100+
));
92101

93102
for &tag_key in &sorted_tag_keys {
94103
fields.push(Field::new(
@@ -132,11 +141,9 @@ impl ArrowMetricsBatchBuilder {
132141
// TODO: can we not have to compute the timeseries for every point? especially with
133142
// compaction, there may be many points with the same tags, in the same
134143
// batch.
135-
timeseries_id_builder.append_value(compute_timeseries_id(
136-
&dp.metric_name,
137-
dp.metric_type as u8,
138-
&dp.tags,
139-
));
144+
let timeseries_id =
145+
compute_metric_timeseries_id(&dp.metric_name, dp.metric_type, &dp.tags);
146+
timeseries_id_builder.append_value(timeseries_id);
140147

141148
// Only touch builders for tags this data point has.
142149
for (tag_key, tag_val) in &dp.tags {
@@ -185,6 +192,22 @@ impl ArrowMetricsBatchBuilder {
185192
}
186193
}
187194

195+
// Computes timeseries_id for a metric data point. If the user provided a timeseries_id tag, we will
196+
// remove it.
197+
fn compute_metric_timeseries_id(
198+
metric_name: &str,
199+
metric_type: MetricType,
200+
tags: &HashMap<String, String>,
201+
) -> i64 {
202+
if !tags.contains_key(COMPUTED_TIMESERIES_ID_FIELD) {
203+
return compute_timeseries_id(metric_name, metric_type as u8, tags);
204+
}
205+
206+
let mut filtered_tags = tags.clone();
207+
filtered_tags.remove(COMPUTED_TIMESERIES_ID_FIELD);
208+
compute_timeseries_id(metric_name, metric_type as u8, &filtered_tags)
209+
}
210+
188211
/// Error type for Arrow IPC operations.
189212
#[derive(Debug, thiserror::Error)]
190213
pub enum ArrowIpcError {
@@ -411,6 +434,35 @@ mod tests {
411434
assert_eq!(batch.num_columns(), 14);
412435
}
413436

437+
#[test]
438+
fn test_arrow_batch_filters_user_timeseries_id_tag() {
439+
let mut dp = make_test_data_point();
440+
dp.tags
441+
.insert("timeseries_id".to_string(), "user-provided".to_string());
442+
443+
let mut builder = ArrowMetricsBatchBuilder::with_capacity(1);
444+
builder.append(dp);
445+
446+
let batch = builder.finish();
447+
let schema = batch.schema();
448+
assert_eq!(
449+
schema
450+
.fields()
451+
.iter()
452+
.filter(|field| field.name().as_str() == "timeseries_id")
453+
.count(),
454+
1
455+
);
456+
assert_eq!(schema.index_of("timeseries_id").unwrap(), 4);
457+
assert!(schema.index_of("service").is_ok());
458+
459+
quickwit_parquet_engine::sorted_series::append_sorted_series_column(
460+
quickwit_parquet_engine::table_config::ProductType::Metrics.default_sort_fields(),
461+
&batch,
462+
)
463+
.unwrap();
464+
}
465+
414466
#[test]
415467
fn test_arrow_batch_builder_multiple_rows() {
416468
let mut builder = ArrowMetricsBatchBuilder::with_capacity(100);

quickwit/quickwit-parquet-engine/src/ingest/arrow_sketches.rs

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,21 @@ use std::collections::{BTreeSet, HashMap};
2121
use std::sync::Arc;
2222

2323
use arrow::array::{
24-
ArrayRef, Float64Builder, Int16Builder, ListBuilder, RecordBatch, StringDictionaryBuilder,
25-
UInt32Builder, UInt64Builder,
24+
ArrayRef, Float64Builder, Int16Builder, Int64Builder, ListBuilder, RecordBatch,
25+
StringDictionaryBuilder, UInt32Builder, UInt64Builder,
2626
};
2727
use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema};
2828

29+
use crate::timeseries_id::compute_timeseries_id;
30+
31+
/// `metric_type` discriminant fed into [`compute_timeseries_id`] for sketch
32+
/// data points. Mirrors `MetricType::Summary as u8` in
33+
/// `quickwit-opentelemetry::otlp::otel_metrics` — chosen because DDSketches
34+
/// are distribution summaries and because it does not collide with the
35+
/// discriminants used on the metrics path (`Gauge = 0`, `Sum = 1`).
36+
const SKETCH_METRIC_TYPE_DISCRIMINANT: u8 = 4;
37+
const COMPUTED_TIMESERIES_ID_FIELD: &str = "timeseries_id";
38+
2939
/// A single DDSketch data point with tags.
3040
#[derive(Debug, Clone)]
3141
pub struct SketchDataPoint {
@@ -75,14 +85,21 @@ impl ArrowSketchBatchBuilder {
7585
let mut tag_keys: BTreeSet<&str> = BTreeSet::new();
7686
for dp in &self.data_points {
7787
for key in dp.tags.keys() {
78-
tag_keys.insert(key.as_str());
88+
// Exclude any user-provided tag named "timeseries_id" to prevent collision with the
89+
// computed column. TODO: if user sets "timeseries_id" as a tag, we
90+
// are excluding it.
91+
if key != COMPUTED_TIMESERIES_ID_FIELD {
92+
tag_keys.insert(key.as_str());
93+
}
7994
}
8095
}
8196
let sorted_tag_keys: Vec<String> = tag_keys.into_iter().map(str::to_owned).collect();
8297

83-
// Build the Arrow schema dynamically
98+
// Build the Arrow schema dynamically.
99+
// 10 fixed columns: metric_name, timestamp_secs, count, sum, min, max,
100+
// flags, keys, counts, timeseries_id.
84101
let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
85-
let mut fields = Vec::with_capacity(9 + sorted_tag_keys.len());
102+
let mut fields = Vec::with_capacity(10 + sorted_tag_keys.len());
86103
fields.push(Field::new("metric_name", dict_type.clone(), false));
87104
fields.push(Field::new("timestamp_secs", DataType::UInt64, false));
88105
fields.push(Field::new("count", DataType::UInt64, false));
@@ -100,6 +117,11 @@ impl ArrowSketchBatchBuilder {
100117
DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))),
101118
false,
102119
));
120+
fields.push(Field::new(
121+
COMPUTED_TIMESERIES_ID_FIELD,
122+
DataType::Int64,
123+
false,
124+
));
103125

104126
for tag_key in &sorted_tag_keys {
105127
fields.push(Field::new(tag_key, dict_type.clone(), true));
@@ -126,6 +148,7 @@ impl ArrowSketchBatchBuilder {
126148
DataType::UInt64,
127149
false,
128150
));
151+
let mut timeseries_id_builder = Int64Builder::with_capacity(num_rows);
129152

130153
let mut tag_builders: Vec<StringDictionaryBuilder<Int32Type>> = sorted_tag_keys
131154
.iter()
@@ -153,6 +176,9 @@ impl ArrowSketchBatchBuilder {
153176
}
154177
counts_builder.append(true);
155178

179+
let timeseries_id = compute_sketch_timeseries_id(&dp.metric_name, &dp.tags);
180+
timeseries_id_builder.append_value(timeseries_id);
181+
156182
for (tag_idx, tag_key) in sorted_tag_keys.iter().enumerate() {
157183
match dp.tags.get(tag_key) {
158184
Some(tag_val) => tag_builders[tag_idx].append_value(tag_val),
@@ -161,7 +187,7 @@ impl ArrowSketchBatchBuilder {
161187
}
162188
}
163189

164-
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(9 + sorted_tag_keys.len());
190+
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(10 + sorted_tag_keys.len());
165191
arrays.push(Arc::new(metric_name_builder.finish()));
166192
arrays.push(Arc::new(timestamp_secs_builder.finish()));
167193
arrays.push(Arc::new(count_builder.finish()));
@@ -171,6 +197,7 @@ impl ArrowSketchBatchBuilder {
171197
arrays.push(Arc::new(flags_builder.finish()));
172198
arrays.push(Arc::new(keys_builder.finish()));
173199
arrays.push(Arc::new(counts_builder.finish()));
200+
arrays.push(Arc::new(timeseries_id_builder.finish()));
174201

175202
for tag_builder in &mut tag_builders {
176203
arrays.push(Arc::new(tag_builder.finish()));
@@ -193,6 +220,17 @@ impl ArrowSketchBatchBuilder {
193220
}
194221
}
195222

223+
// Computes timeseries_id for a sketch. If the user provided a timeseries_id tag, we will remove it.
224+
fn compute_sketch_timeseries_id(metric_name: &str, tags: &HashMap<String, String>) -> i64 {
225+
if !tags.contains_key(COMPUTED_TIMESERIES_ID_FIELD) {
226+
return compute_timeseries_id(metric_name, SKETCH_METRIC_TYPE_DISCRIMINANT, tags);
227+
}
228+
229+
let mut filtered_tags = tags.clone();
230+
filtered_tags.remove(COMPUTED_TIMESERIES_ID_FIELD);
231+
compute_timeseries_id(metric_name, SKETCH_METRIC_TYPE_DISCRIMINANT, &filtered_tags)
232+
}
233+
196234
#[cfg(test)]
197235
mod tests {
198236
use std::collections::HashMap;
@@ -230,8 +268,8 @@ mod tests {
230268

231269
let batch = builder.finish();
232270
assert_eq!(batch.num_rows(), 1);
233-
// 9 fixed columns + 3 tag columns (env, host, service)
234-
assert_eq!(batch.num_columns(), 12);
271+
// 10 fixed columns + 3 tag columns (env, host, service)
272+
assert_eq!(batch.num_columns(), 13);
235273

236274
// Verify schema field names
237275
let schema = batch.schema();
@@ -248,13 +286,43 @@ mod tests {
248286
"flags",
249287
"keys",
250288
"counts",
289+
"timeseries_id",
251290
"env",
252291
"host",
253292
"service",
254293
]
255294
);
256295
}
257296

297+
#[test]
298+
fn test_sketch_batch_filters_user_timeseries_id_tag() {
299+
let mut dp = make_test_sketch_point();
300+
dp.tags
301+
.insert("timeseries_id".to_string(), "user-provided".to_string());
302+
303+
let mut builder = ArrowSketchBatchBuilder::with_capacity(1);
304+
builder.append(dp);
305+
306+
let batch = builder.finish();
307+
let schema = batch.schema();
308+
assert_eq!(
309+
schema
310+
.fields()
311+
.iter()
312+
.filter(|field| field.name().as_str() == "timeseries_id")
313+
.count(),
314+
1
315+
);
316+
assert_eq!(schema.index_of("timeseries_id").unwrap(), 9);
317+
assert!(schema.index_of("service").is_ok());
318+
319+
crate::sorted_series::append_sorted_series_column(
320+
crate::table_config::ProductType::Sketches.default_sort_fields(),
321+
&batch,
322+
)
323+
.unwrap();
324+
}
325+
258326
#[test]
259327
fn test_sketch_batch_builder_multiple_rows() {
260328
let mut builder = ArrowSketchBatchBuilder::with_capacity(100);
@@ -281,8 +349,8 @@ mod tests {
281349

282350
let batch = builder.finish();
283351
assert_eq!(batch.num_rows(), 100);
284-
// 9 fixed + 2 tags
285-
assert_eq!(batch.num_columns(), 11);
352+
// 10 fixed + 2 tags
353+
assert_eq!(batch.num_columns(), 12);
286354
}
287355

288356
#[test]
@@ -325,14 +393,14 @@ mod tests {
325393

326394
let batch = builder.finish();
327395
assert_eq!(batch.num_rows(), 2);
328-
// 9 fixed + 3 tags (env, host, region)
329-
assert_eq!(batch.num_columns(), 12);
396+
// 10 fixed + 3 tags (env, host, region)
397+
assert_eq!(batch.num_columns(), 13);
330398

331399
let schema = batch.schema();
332400
let tag_names: Vec<&str> = schema
333401
.fields()
334402
.iter()
335-
.skip(9)
403+
.skip(10)
336404
.map(|f| f.name().as_str())
337405
.collect();
338406
assert_eq!(tag_names, vec!["env", "host", "region"]);
@@ -345,8 +413,8 @@ mod tests {
345413

346414
let batch = builder.finish();
347415
assert_eq!(batch.num_rows(), 0);
348-
// 9 fixed columns, no tags
349-
assert_eq!(batch.num_columns(), 9);
416+
// 10 fixed columns, no tags
417+
assert_eq!(batch.num_columns(), 10);
350418
}
351419

352420
#[test]

quickwit/quickwit-query/src/query_ast/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub use field_presence::FieldPresenceQuery;
3939
pub use full_text_query::{FullTextMode, FullTextParams, FullTextQuery};
4040
pub use phrase_prefix_query::PhrasePrefixQuery;
4141
pub use range_query::RangeQuery;
42-
pub use regex_query::{AutomatonQuery, JsonPathPrefix, RegexQuery};
42+
pub use regex_query::{AutomatonQuery, JsonPathPrefix, RegexQuery, ResolvedRegex};
4343
use tantivy_query_ast::TantivyQueryAst;
4444
pub use term_query::TermQuery;
4545
pub use term_set_query::TermSetQuery;

0 commit comments

Comments
 (0)