Skip to content

Commit 10d109e

Browse files
authored
compute timeseries_id for sketches (#6348)
* compute timeseries_id for sketches * address comment * linter
1 parent 2c691c7 commit 10d109e

2 files changed

Lines changed: 144 additions & 24 deletions

File tree

quickwit/quickwit-opentelemetry/src/otlp/arrow_metrics.rs

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use quickwit_proto::types::DocUid;
3737

3838
use super::otel_metrics::{MetricDataPoint, MetricType};
3939

40+
const COMPUTED_TIMESERIES_ID_FIELD: &str = "timeseries_id";
41+
4042
/// Builder for creating Arrow RecordBatch from MetricDataPoints.
4143
///
4244
/// Accumulates data points and discovers the schema dynamically at `finish()`
@@ -70,7 +72,12 @@ impl ArrowMetricsBatchBuilder {
7072
let mut tag_keys: BTreeSet<&str> = BTreeSet::new();
7173
for dp in &self.data_points {
7274
for key in dp.tags.keys() {
73-
tag_keys.insert(key.as_str());
75+
// Exclude any user-provided tag named "timeseries_id" to prevent collision with the
76+
// computed column. TODO: if user sets "timeseries_id" as a tag, we
77+
// are excluding it.
78+
if key != COMPUTED_TIMESERIES_ID_FIELD {
79+
tag_keys.insert(key.as_str());
80+
}
7481
}
7582
}
7683
let sorted_tag_keys: Vec<&str> = tag_keys.into_iter().collect();
@@ -86,9 +93,11 @@ impl ArrowMetricsBatchBuilder {
8693
fields.push(Field::new("metric_type", DataType::UInt8, false));
8794
fields.push(Field::new("timestamp_secs", DataType::UInt64, false));
8895
fields.push(Field::new("value", DataType::Float64, false));
89-
// TODO: customer could submit a timeseries_id tag, and I don't think we want to explicitly
90-
// reserve it.
91-
fields.push(Field::new("timeseries_id", DataType::Int64, false));
96+
fields.push(Field::new(
97+
COMPUTED_TIMESERIES_ID_FIELD,
98+
DataType::Int64,
99+
false,
100+
));
92101

93102
for &tag_key in &sorted_tag_keys {
94103
fields.push(Field::new(
@@ -132,11 +141,9 @@ impl ArrowMetricsBatchBuilder {
132141
// TODO: can we not have to compute the timeseries for every point? especially with
133142
// compaction, there may be many points with the same tags, in the same
134143
// batch.
135-
timeseries_id_builder.append_value(compute_timeseries_id(
136-
&dp.metric_name,
137-
dp.metric_type as u8,
138-
&dp.tags,
139-
));
144+
let timeseries_id =
145+
compute_metric_timeseries_id(&dp.metric_name, dp.metric_type, &dp.tags);
146+
timeseries_id_builder.append_value(timeseries_id);
140147

141148
// Only touch builders for tags this data point has.
142149
for (tag_key, tag_val) in &dp.tags {
@@ -185,6 +192,22 @@ impl ArrowMetricsBatchBuilder {
185192
}
186193
}
187194

195+
// Computes timeseries_id for a metric data point. If the user provided a timeseries_id tag, we will
196+
// remove it.
197+
fn compute_metric_timeseries_id(
198+
metric_name: &str,
199+
metric_type: MetricType,
200+
tags: &HashMap<String, String>,
201+
) -> i64 {
202+
if !tags.contains_key(COMPUTED_TIMESERIES_ID_FIELD) {
203+
return compute_timeseries_id(metric_name, metric_type as u8, tags);
204+
}
205+
206+
let mut filtered_tags = tags.clone();
207+
filtered_tags.remove(COMPUTED_TIMESERIES_ID_FIELD);
208+
compute_timeseries_id(metric_name, metric_type as u8, &filtered_tags)
209+
}
210+
188211
/// Error type for Arrow IPC operations.
189212
#[derive(Debug, thiserror::Error)]
190213
pub enum ArrowIpcError {
@@ -411,6 +434,35 @@ mod tests {
411434
assert_eq!(batch.num_columns(), 14);
412435
}
413436

437+
#[test]
438+
fn test_arrow_batch_filters_user_timeseries_id_tag() {
439+
let mut dp = make_test_data_point();
440+
dp.tags
441+
.insert("timeseries_id".to_string(), "user-provided".to_string());
442+
443+
let mut builder = ArrowMetricsBatchBuilder::with_capacity(1);
444+
builder.append(dp);
445+
446+
let batch = builder.finish();
447+
let schema = batch.schema();
448+
assert_eq!(
449+
schema
450+
.fields()
451+
.iter()
452+
.filter(|field| field.name().as_str() == "timeseries_id")
453+
.count(),
454+
1
455+
);
456+
assert_eq!(schema.index_of("timeseries_id").unwrap(), 4);
457+
assert!(schema.index_of("service").is_ok());
458+
459+
quickwit_parquet_engine::sorted_series::append_sorted_series_column(
460+
quickwit_parquet_engine::table_config::ProductType::Metrics.default_sort_fields(),
461+
&batch,
462+
)
463+
.unwrap();
464+
}
465+
414466
#[test]
415467
fn test_arrow_batch_builder_multiple_rows() {
416468
let mut builder = ArrowMetricsBatchBuilder::with_capacity(100);

quickwit/quickwit-parquet-engine/src/ingest/arrow_sketches.rs

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,21 @@ use std::collections::{BTreeSet, HashMap};
2121
use std::sync::Arc;
2222

2323
use arrow::array::{
24-
ArrayRef, Float64Builder, Int16Builder, ListBuilder, RecordBatch, StringDictionaryBuilder,
25-
UInt32Builder, UInt64Builder,
24+
ArrayRef, Float64Builder, Int16Builder, Int64Builder, ListBuilder, RecordBatch,
25+
StringDictionaryBuilder, UInt32Builder, UInt64Builder,
2626
};
2727
use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema};
2828

29+
use crate::timeseries_id::compute_timeseries_id;
30+
31+
/// `metric_type` discriminant fed into [`compute_timeseries_id`] for sketch
32+
/// data points. Mirrors `MetricType::Summary as u8` in
33+
/// `quickwit-opentelemetry::otlp::otel_metrics` — chosen because DDSketches
34+
/// are distribution summaries and because it does not collide with the
35+
/// discriminants used on the metrics path (`Gauge = 0`, `Sum = 1`).
36+
const SKETCH_METRIC_TYPE_DISCRIMINANT: u8 = 4;
37+
const COMPUTED_TIMESERIES_ID_FIELD: &str = "timeseries_id";
38+
2939
/// A single DDSketch data point with tags.
3040
#[derive(Debug, Clone)]
3141
pub struct SketchDataPoint {
@@ -75,14 +85,21 @@ impl ArrowSketchBatchBuilder {
7585
let mut tag_keys: BTreeSet<&str> = BTreeSet::new();
7686
for dp in &self.data_points {
7787
for key in dp.tags.keys() {
78-
tag_keys.insert(key.as_str());
88+
// Exclude any user-provided tag named "timeseries_id" to prevent collision with the
89+
// computed column. TODO: if user sets "timeseries_id" as a tag, we
90+
// are excluding it.
91+
if key != COMPUTED_TIMESERIES_ID_FIELD {
92+
tag_keys.insert(key.as_str());
93+
}
7994
}
8095
}
8196
let sorted_tag_keys: Vec<String> = tag_keys.into_iter().map(str::to_owned).collect();
8297

83-
// Build the Arrow schema dynamically
98+
// Build the Arrow schema dynamically.
99+
// 10 fixed columns: metric_name, timestamp_secs, count, sum, min, max,
100+
// flags, keys, counts, timeseries_id.
84101
let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
85-
let mut fields = Vec::with_capacity(9 + sorted_tag_keys.len());
102+
let mut fields = Vec::with_capacity(10 + sorted_tag_keys.len());
86103
fields.push(Field::new("metric_name", dict_type.clone(), false));
87104
fields.push(Field::new("timestamp_secs", DataType::UInt64, false));
88105
fields.push(Field::new("count", DataType::UInt64, false));
@@ -100,6 +117,11 @@ impl ArrowSketchBatchBuilder {
100117
DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))),
101118
false,
102119
));
120+
fields.push(Field::new(
121+
COMPUTED_TIMESERIES_ID_FIELD,
122+
DataType::Int64,
123+
false,
124+
));
103125

104126
for tag_key in &sorted_tag_keys {
105127
fields.push(Field::new(tag_key, dict_type.clone(), true));
@@ -126,6 +148,7 @@ impl ArrowSketchBatchBuilder {
126148
DataType::UInt64,
127149
false,
128150
));
151+
let mut timeseries_id_builder = Int64Builder::with_capacity(num_rows);
129152

130153
let mut tag_builders: Vec<StringDictionaryBuilder<Int32Type>> = sorted_tag_keys
131154
.iter()
@@ -153,6 +176,9 @@ impl ArrowSketchBatchBuilder {
153176
}
154177
counts_builder.append(true);
155178

179+
let timeseries_id = compute_sketch_timeseries_id(&dp.metric_name, &dp.tags);
180+
timeseries_id_builder.append_value(timeseries_id);
181+
156182
for (tag_idx, tag_key) in sorted_tag_keys.iter().enumerate() {
157183
match dp.tags.get(tag_key) {
158184
Some(tag_val) => tag_builders[tag_idx].append_value(tag_val),
@@ -161,7 +187,7 @@ impl ArrowSketchBatchBuilder {
161187
}
162188
}
163189

164-
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(9 + sorted_tag_keys.len());
190+
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(10 + sorted_tag_keys.len());
165191
arrays.push(Arc::new(metric_name_builder.finish()));
166192
arrays.push(Arc::new(timestamp_secs_builder.finish()));
167193
arrays.push(Arc::new(count_builder.finish()));
@@ -171,6 +197,7 @@ impl ArrowSketchBatchBuilder {
171197
arrays.push(Arc::new(flags_builder.finish()));
172198
arrays.push(Arc::new(keys_builder.finish()));
173199
arrays.push(Arc::new(counts_builder.finish()));
200+
arrays.push(Arc::new(timeseries_id_builder.finish()));
174201

175202
for tag_builder in &mut tag_builders {
176203
arrays.push(Arc::new(tag_builder.finish()));
@@ -193,6 +220,17 @@ impl ArrowSketchBatchBuilder {
193220
}
194221
}
195222

223+
// Computes timeseries_id for a sketch. If the user provided a timeseries_id tag, we will remove it.
224+
fn compute_sketch_timeseries_id(metric_name: &str, tags: &HashMap<String, String>) -> i64 {
225+
if !tags.contains_key(COMPUTED_TIMESERIES_ID_FIELD) {
226+
return compute_timeseries_id(metric_name, SKETCH_METRIC_TYPE_DISCRIMINANT, tags);
227+
}
228+
229+
let mut filtered_tags = tags.clone();
230+
filtered_tags.remove(COMPUTED_TIMESERIES_ID_FIELD);
231+
compute_timeseries_id(metric_name, SKETCH_METRIC_TYPE_DISCRIMINANT, &filtered_tags)
232+
}
233+
196234
#[cfg(test)]
197235
mod tests {
198236
use std::collections::HashMap;
@@ -230,8 +268,8 @@ mod tests {
230268

231269
let batch = builder.finish();
232270
assert_eq!(batch.num_rows(), 1);
233-
// 9 fixed columns + 3 tag columns (env, host, service)
234-
assert_eq!(batch.num_columns(), 12);
271+
// 10 fixed columns + 3 tag columns (env, host, service)
272+
assert_eq!(batch.num_columns(), 13);
235273

236274
// Verify schema field names
237275
let schema = batch.schema();
@@ -248,13 +286,43 @@ mod tests {
248286
"flags",
249287
"keys",
250288
"counts",
289+
"timeseries_id",
251290
"env",
252291
"host",
253292
"service",
254293
]
255294
);
256295
}
257296

297+
#[test]
298+
fn test_sketch_batch_filters_user_timeseries_id_tag() {
299+
let mut dp = make_test_sketch_point();
300+
dp.tags
301+
.insert("timeseries_id".to_string(), "user-provided".to_string());
302+
303+
let mut builder = ArrowSketchBatchBuilder::with_capacity(1);
304+
builder.append(dp);
305+
306+
let batch = builder.finish();
307+
let schema = batch.schema();
308+
assert_eq!(
309+
schema
310+
.fields()
311+
.iter()
312+
.filter(|field| field.name().as_str() == "timeseries_id")
313+
.count(),
314+
1
315+
);
316+
assert_eq!(schema.index_of("timeseries_id").unwrap(), 9);
317+
assert!(schema.index_of("service").is_ok());
318+
319+
crate::sorted_series::append_sorted_series_column(
320+
crate::table_config::ProductType::Sketches.default_sort_fields(),
321+
&batch,
322+
)
323+
.unwrap();
324+
}
325+
258326
#[test]
259327
fn test_sketch_batch_builder_multiple_rows() {
260328
let mut builder = ArrowSketchBatchBuilder::with_capacity(100);
@@ -281,8 +349,8 @@ mod tests {
281349

282350
let batch = builder.finish();
283351
assert_eq!(batch.num_rows(), 100);
284-
// 9 fixed + 2 tags
285-
assert_eq!(batch.num_columns(), 11);
352+
// 10 fixed + 2 tags
353+
assert_eq!(batch.num_columns(), 12);
286354
}
287355

288356
#[test]
@@ -325,14 +393,14 @@ mod tests {
325393

326394
let batch = builder.finish();
327395
assert_eq!(batch.num_rows(), 2);
328-
// 9 fixed + 3 tags (env, host, region)
329-
assert_eq!(batch.num_columns(), 12);
396+
// 10 fixed + 3 tags (env, host, region)
397+
assert_eq!(batch.num_columns(), 13);
330398

331399
let schema = batch.schema();
332400
let tag_names: Vec<&str> = schema
333401
.fields()
334402
.iter()
335-
.skip(9)
403+
.skip(10)
336404
.map(|f| f.name().as_str())
337405
.collect();
338406
assert_eq!(tag_names, vec!["env", "host", "region"]);
@@ -345,8 +413,8 @@ mod tests {
345413

346414
let batch = builder.finish();
347415
assert_eq!(batch.num_rows(), 0);
348-
// 9 fixed columns, no tags
349-
assert_eq!(batch.num_columns(), 9);
416+
// 10 fixed columns, no tags
417+
assert_eq!(batch.num_columns(), 10);
350418
}
351419

352420
#[test]

0 commit comments

Comments
 (0)