Skip to content

Commit 32fbda2

Browse files
Add UDF's for sketch support (#6349)
* add UDF's for sketch support * substrait test * edge case with empty groups * address df-review * match on datadog sketches * Fix sketch quantile clippy warning * cleanup dead code in tests * Use conventional test helper modules
1 parent 697ff0e commit 32fbda2

20 files changed

Lines changed: 1556 additions & 140 deletions

File tree

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-common/src/metrics_specific.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,22 @@
1515
/// Returns whether the given index ID corresponds to a metrics index.
1616
///
1717
/// Metrics indexes use the Parquet/DataFusion pipeline instead of the Tantivy pipeline.
18-
/// An index is considered a metrics index if it starts with "otel-metrics" or "metrics-".
18+
/// An index is considered a metrics index if it uses one of the BYOC or OSS
19+
/// parquet metrics prefixes.
1920
pub fn is_metrics_index(index_id: &str) -> bool {
20-
index_id.starts_with("otel-metrics") || index_id.starts_with("metrics-")
21+
["datadog-metrics", "metrics-", "otel-metrics"]
22+
.iter()
23+
.any(|prefix| index_id.starts_with(prefix))
2124
}
2225

2326
/// Returns whether the given index ID corresponds to a sketches index.
2427
///
2528
/// Sketches indexes use the Parquet/DataFusion pipeline with sketch-specific
2629
/// processors and writers.
2730
pub fn is_sketches_index(index_id: &str) -> bool {
28-
index_id.starts_with("sketches-")
31+
["datadog-sketches", "sketches-"]
32+
.iter()
33+
.any(|prefix| index_id.starts_with(prefix))
2934
}
3035

3136
/// Returns whether the given index ID uses the Parquet/DataFusion pipeline.
@@ -44,6 +49,10 @@ mod tests {
4449
assert!(is_metrics_index("otel-metrics"));
4550
assert!(is_metrics_index("otel-metrics-custom"));
4651

52+
// BYOC metrics indexes
53+
assert!(is_metrics_index("datadog-metrics"));
54+
assert!(is_metrics_index("datadog-metrics-v2"));
55+
4756
// Generic metrics indexes
4857
assert!(is_metrics_index("metrics-default"));
4958
assert!(is_metrics_index("metrics-"));
@@ -60,13 +69,18 @@ mod tests {
6069

6170
#[test]
6271
fn test_is_sketches_index() {
72+
assert!(is_sketches_index("datadog-sketches"));
73+
assert!(is_sketches_index("datadog-sketches-v2"));
6374
assert!(is_sketches_index("sketches-default"));
75+
assert!(!is_sketches_index("datadog-metrics"));
6476
assert!(!is_sketches_index("otel-metrics"));
6577
assert!(!is_sketches_index("my-index"));
6678
}
6779

6880
#[test]
6981
fn test_is_parquet_pipeline_index() {
82+
assert!(is_parquet_pipeline_index("datadog-metrics"));
83+
assert!(is_parquet_pipeline_index("datadog-sketches"));
7084
assert!(is_parquet_pipeline_index("otel-metrics"));
7185
assert!(is_parquet_pipeline_index("sketches-default"));
7286
assert!(!is_parquet_pipeline_index("otel-logs-v0_7"));

quickwit/quickwit-datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ object_store = "0.13"
4343
bytesize = { workspace = true }
4444
datafusion = "53"
4545
datafusion-substrait = "53"
46+
parquet = { workspace = true }
4647
prost = { workspace = true }
4748
serde_json = { workspace = true }
4849
tempfile = { workspace = true }

quickwit/quickwit-datafusion/src/sources/metrics/factory.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@
2424
//! service VARCHAR,
2525
//! env VARCHAR
2626
//! ) STORED AS metrics LOCATION 'my-metrics';
27+
//!
28+
//! CREATE EXTERNAL TABLE "my-sketches" (
29+
//! metric_name VARCHAR NOT NULL,
30+
//! timestamp_secs BIGINT UNSIGNED NOT NULL,
31+
//! count BIGINT UNSIGNED NOT NULL,
32+
//! sum DOUBLE NOT NULL,
33+
//! min DOUBLE NOT NULL,
34+
//! max DOUBLE NOT NULL,
35+
//! flags INT UNSIGNED NOT NULL,
36+
//! keys ARRAY<SMALLINT> NOT NULL,
37+
//! counts ARRAY<BIGINT UNSIGNED> NOT NULL
38+
//! ) STORED AS sketches LOCATION 'my-sketches';
2739
//! ```
2840
2941
use std::sync::Arc;
@@ -34,22 +46,32 @@ use datafusion::arrow;
3446
use datafusion::catalog::{Session, TableProviderFactory};
3547
use datafusion::error::{DataFusionError, Result as DFResult};
3648
use datafusion::logical_expr::CreateExternalTable;
49+
use quickwit_parquet_engine::split::ParquetSplitKind;
3750

3851
use super::index_resolver::MetricsIndexResolver;
3952
use super::table_provider::MetricsTableProvider;
4053

4154
/// The file type string used in `STORED AS metrics`.
4255
pub const METRICS_FILE_TYPE: &str = "metrics";
56+
/// The file type string used in `STORED AS sketches`.
57+
pub const SKETCHES_FILE_TYPE: &str = "sketches";
4358

4459
/// Creates `MetricsTableProvider` instances from `CREATE EXTERNAL TABLE` DDL.
4560
#[derive(Debug)]
4661
pub struct MetricsTableProviderFactory {
4762
index_resolver: Arc<dyn MetricsIndexResolver>,
63+
split_kind: ParquetSplitKind,
4864
}
4965

5066
impl MetricsTableProviderFactory {
51-
pub fn new(index_resolver: Arc<dyn MetricsIndexResolver>) -> Self {
52-
Self { index_resolver }
67+
pub fn new(
68+
index_resolver: Arc<dyn MetricsIndexResolver>,
69+
split_kind: ParquetSplitKind,
70+
) -> Self {
71+
Self {
72+
index_resolver,
73+
split_kind,
74+
}
5375
}
5476
}
5577

@@ -66,7 +88,10 @@ impl TableProviderFactory for MetricsTableProviderFactory {
6688
cmd.location.clone()
6789
};
6890

69-
let (split_provider, index_uri) = self.index_resolver.resolve(&index_name).await?;
91+
let (split_provider, index_uri) = self
92+
.index_resolver
93+
.resolve(&index_name, self.split_kind)
94+
.await?;
7095

7196
let arrow_schema: SchemaRef = Arc::new(cmd.schema.as_arrow().clone());
7297

quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use datafusion::error::Result as DFResult;
2727
use quickwit_common::uri::Uri;
2828
use quickwit_metastore::{IndexMetadataResponseExt, ListIndexesMetadataResponseExt};
29+
use quickwit_parquet_engine::split::ParquetSplitKind;
2930
use quickwit_proto::metastore::{
3031
IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
3132
};
@@ -40,7 +41,11 @@ pub trait MetricsIndexResolver: Send + Sync + std::fmt::Debug {
4041
/// Returns the split provider and storage URI for `index_name`. The
4142
/// `ObjectStore` for that URI is built on demand by the registry the
4243
/// first time DataFusion reads from it.
43-
async fn resolve(&self, index_name: &str) -> DFResult<(Arc<dyn MetricsSplitProvider>, Uri)>;
44+
async fn resolve(
45+
&self,
46+
index_name: &str,
47+
split_kind: ParquetSplitKind,
48+
) -> DFResult<(Arc<dyn MetricsSplitProvider>, Uri)>;
4449

4550
async fn list_index_names(&self) -> DFResult<Vec<String>>;
4651
}
@@ -71,8 +76,12 @@ impl std::fmt::Debug for MetastoreIndexResolver {
7176

7277
#[async_trait]
7378
impl MetricsIndexResolver for MetastoreIndexResolver {
74-
async fn resolve(&self, index_name: &str) -> DFResult<(Arc<dyn MetricsSplitProvider>, Uri)> {
75-
debug!(index_name, "resolving metrics index");
79+
async fn resolve(
80+
&self,
81+
index_name: &str,
82+
split_kind: ParquetSplitKind,
83+
) -> DFResult<(Arc<dyn MetricsSplitProvider>, Uri)> {
84+
debug!(index_name, ?split_kind, "resolving parquet index");
7685

7786
let response = self
7887
.metastore
@@ -93,6 +102,7 @@ impl MetricsIndexResolver for MetastoreIndexResolver {
93102
let split_provider: Arc<dyn MetricsSplitProvider> = Arc::new(MetastoreSplitProvider::new(
94103
self.metastore.clone(),
95104
index_uid,
105+
split_kind,
96106
));
97107

98108
Ok((split_provider, index_uri))

quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ use datafusion::error::Result as DFResult;
2121
use quickwit_metastore::{
2222
ListParquetSplitsQuery, ListParquetSplitsRequestExt, ListParquetSplitsResponseExt,
2323
};
24-
use quickwit_parquet_engine::split::ParquetSplitMetadata;
24+
use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata};
2525
use quickwit_proto::metastore::{
26-
ListMetricsSplitsRequest, MetastoreService, MetastoreServiceClient,
26+
ListMetricsSplitsRequest, ListSketchSplitsRequest, MetastoreService, MetastoreServiceClient,
2727
};
2828
use quickwit_proto::types::IndexUid;
2929
use tracing::{debug, instrument};
@@ -36,13 +36,19 @@ use super::table_provider::MetricsSplitProvider;
3636
pub struct MetastoreSplitProvider {
3737
metastore: MetastoreServiceClient,
3838
index_uid: IndexUid,
39+
split_kind: ParquetSplitKind,
3940
}
4041

4142
impl MetastoreSplitProvider {
42-
pub fn new(metastore: MetastoreServiceClient, index_uid: IndexUid) -> Self {
43+
pub fn new(
44+
metastore: MetastoreServiceClient,
45+
index_uid: IndexUid,
46+
split_kind: ParquetSplitKind,
47+
) -> Self {
4348
Self {
4449
metastore,
4550
index_uid,
51+
split_kind,
4652
}
4753
}
4854
}
@@ -56,26 +62,45 @@ impl MetricsSplitProvider for MetastoreSplitProvider {
5662
metric_names = ?query.metric_names,
5763
time_range_start = ?query.time_range_start,
5864
time_range_end = ?query.time_range_end,
65+
split_kind = ?self.split_kind,
5966
num_splits,
6067
)
6168
)]
6269
async fn list_splits(&self, query: &MetricsSplitQuery) -> DFResult<Vec<ParquetSplitMetadata>> {
6370
let metastore_query = to_metastore_query(&self.index_uid, query);
6471

65-
let request =
66-
ListMetricsSplitsRequest::try_from_query(self.index_uid.clone(), &metastore_query)
72+
let records = match self.split_kind {
73+
ParquetSplitKind::Metrics => {
74+
let request = ListMetricsSplitsRequest::try_from_query(
75+
self.index_uid.clone(),
76+
&metastore_query,
77+
)
6778
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;
6879

69-
let response = self
70-
.metastore
71-
.clone()
72-
.list_metrics_splits(request)
73-
.await
74-
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;
80+
self.metastore
81+
.clone()
82+
.list_metrics_splits(request)
83+
.await
84+
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?
85+
.deserialize_splits()
86+
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?
87+
}
88+
ParquetSplitKind::Sketches => {
89+
let request = ListSketchSplitsRequest::try_from_query(
90+
self.index_uid.clone(),
91+
&metastore_query,
92+
)
93+
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;
7594

76-
let records = response
77-
.deserialize_splits()
78-
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;
95+
self.metastore
96+
.clone()
97+
.list_sketch_splits(request)
98+
.await
99+
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?
100+
.deserialize_splits()
101+
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?
102+
}
103+
};
79104

80105
// The metastore guarantees only Published splits are returned because
81106
// `to_metastore_query` sets `split_states = vec![Published]`. No

0 commit comments

Comments
 (0)