Skip to content

Commit 4d6fd21

Browse files
mattmkimg-talbotclaude
authored
enable partitioning for metrics data (#6340)
* partitioning for metrics * fix checkpointing * lint * address comment, fix max num partition handling * move partitioning to indexer, not doc processor * lint * feat: add Parquet merge policy for compaction (Phase 2) (#6351) * feat: add Parquet merge policy for compaction (Phase 2) Adds a constant write amplification merge policy for Parquet splits, adapted from the existing ConstWriteAmplificationMergePolicy but using byte size instead of document count as the primary size metric. This is Phase 2 of the Parquet compaction project — the decision layer that determines which splits to merge within each compaction scope. Key components: - ParquetMergePolicy trait mirroring the MergePolicy interface - CompactionScope grouping by (index_uid, sort_fields, window_start) - ConstWriteAmplificationParquetMergePolicy with bounded write amp - finalize_operations() for cold window compaction - 33 tests: unit, proptest (MC-CONSERVE/LEVEL/WA/IDEMPOTENT), simulation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: add partition_id to CompactionScope, rebase on #6340 Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340), include it in CompactionScope so splits with different partitions are never merged together. Adds 2 new scope tests for partition isolation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: rustfmt nightly comment wrapping and import ordering Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: finalize_operations must respect MC-LEVEL invariant finalize_operations() was running single_merge_operation over all young splits without grouping by num_merge_ops level. This could merge level-0 and level-1 splits together, stamping the output with max(levels) + 1 and prematurely maturing lower-level data. Fix: group by num_merge_ops in finalize just like operations() does, then apply the lower merge factor within each level independently. Added test_finalize_respects_mc_level_invariant (unit) and proptest_finalize_respects_mc_level (property test) — both caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: include window_duration_secs in CompactionScope CompactionScope only used window_start_secs, so splits with the same start but different durations (e.g. after a window config change) would be grouped together. The merge engine requires all inputs to agree on both window_start and window_duration, so merging across durations would fail validation. Added test_different_window_duration which caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add MP-1/MP-2/MP-3 runtime invariant checks for merge operations Add three merge-policy invariants to the shared verification layer (quickwit-dst) with check_invariant! enforcement in ParquetMergeOperation::new(): - MP-1: all splits share the same num_merge_ops level - MP-2: merge op has at least 2 input splits - MP-3: all splits share the same compaction scope (sort_fields + window) Shared pure functions in quickwit_dst::invariants::merge_policy are the single source of truth, usable by Stateright models and production code. Debug builds panic on violation; all builds emit metrics via the invariant recorder. Tests written first (4 should_panic tests failed before adding checks, pass after). Plus 1 positive test and 3 unit tests for shared functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: nightly rustfmt — merge imports, unwrap short line, trailing newline Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: George Talbot <george.talbot@datadoghq.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 149a8cb commit 4d6fd21

22 files changed

Lines changed: 3027 additions & 239 deletions

File tree

quickwit/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-doc-mapper/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ license.workspace = true
1212

1313
[dependencies]
1414
anyhow = { workspace = true }
15+
arrow = { workspace = true, optional = true }
1516
base64 = { workspace = true }
1617
fnv = { workspace = true }
1718
hex = { workspace = true }
@@ -44,6 +45,7 @@ quickwit-common = { workspace = true, features = ["testsuite"] }
4445
quickwit-query = { workspace = true }
4546

4647
[features]
48+
metrics = ["dep:arrow"]
4749
testsuite = []
4850

4951
[[bench]]

quickwit/quickwit-doc-mapper/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ pub use doc_mapping::{DocMapping, Mode, ModeType};
4343
pub use error::{DocParsingError, QueryParserError};
4444
use quickwit_common::shared_consts::FIELD_PRESENCE_FIELD_NAME;
4545
use quickwit_proto::types::DocMappingUid;
46+
#[cfg(feature = "metrics")]
47+
pub use routing_expression::ArrowRowContext;
4648
pub use routing_expression::RoutingExpr;
4749

4850
/// Field name reserved for storing the source document.
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::hash::Hasher;
16+
17+
use arrow::array::{Array, AsArray, DictionaryArray};
18+
use arrow::datatypes::{
19+
ArrowDictionaryKeyType, DataType, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type,
20+
UInt16Type, UInt32Type, UInt64Type,
21+
};
22+
use arrow::record_batch::RecordBatch;
23+
#[cfg(test)]
24+
use serde_json::Value as JsonValue;
25+
26+
#[cfg(test)]
27+
use super::RoutingExpr;
28+
use super::RoutingExprContext;
29+
30+
/// Context for evaluating routing expressions against a single row of an Arrow `RecordBatch`.
31+
///
32+
/// Hashing is deliberately consistent with the JSON-backed `RoutingExprContext`
33+
/// implementation so identical logical values produce the same `partition_id`
34+
/// whether they arrive as JSON or Arrow IPC.
35+
pub struct ArrowRowContext<'a> {
36+
batch: &'a RecordBatch,
37+
row_idx: usize,
38+
}
39+
40+
impl<'a> ArrowRowContext<'a> {
41+
/// Creates an Arrow-backed routing context for one row in a `RecordBatch`.
42+
pub fn new(batch: &'a RecordBatch, row_idx: usize) -> Self {
43+
Self { batch, row_idx }
44+
}
45+
46+
fn dictionary_utf8_value<K: ArrowDictionaryKeyType>(
47+
column: &'a dyn Array,
48+
row_idx: usize,
49+
) -> Option<&'a str>
50+
where
51+
usize: TryFrom<K::Native>,
52+
{
53+
let dict = column.as_any().downcast_ref::<DictionaryArray<K>>()?;
54+
let values = dict.values().as_string::<i32>();
55+
let key = usize::try_from(dict.keys().value(row_idx)).ok()?;
56+
Some(values.value(key))
57+
}
58+
}
59+
60+
impl<'a> RoutingExprContext for ArrowRowContext<'a> {
61+
fn hash_attribute<H: Hasher>(&self, attr_name: &[String], hasher: &mut H) {
62+
// Metrics/sketches have flat schemas — attr_name is always a single column name.
63+
let col_name = &attr_name[0];
64+
let col_idx = match self.batch.schema().index_of(col_name) {
65+
Ok(idx) => idx,
66+
Err(_) => {
67+
hasher.write_u8(0u8);
68+
return;
69+
}
70+
};
71+
let column = self.batch.column(col_idx);
72+
if column.is_null(self.row_idx) {
73+
hasher.write_u8(0u8);
74+
return;
75+
}
76+
// Extract the string value. Routing expressions reference string tag columns;
77+
// non-string columns are treated as absent.
78+
let string_value = match column.data_type() {
79+
DataType::Dictionary(key_type, value_type)
80+
if value_type.as_ref() == &DataType::Utf8 =>
81+
{
82+
match key_type.as_ref() {
83+
DataType::Int8 => {
84+
Self::dictionary_utf8_value::<Int8Type>(column.as_ref(), self.row_idx)
85+
}
86+
DataType::Int16 => {
87+
Self::dictionary_utf8_value::<Int16Type>(column.as_ref(), self.row_idx)
88+
}
89+
DataType::Int32 => {
90+
Self::dictionary_utf8_value::<Int32Type>(column.as_ref(), self.row_idx)
91+
}
92+
DataType::Int64 => {
93+
Self::dictionary_utf8_value::<Int64Type>(column.as_ref(), self.row_idx)
94+
}
95+
DataType::UInt8 => {
96+
Self::dictionary_utf8_value::<UInt8Type>(column.as_ref(), self.row_idx)
97+
}
98+
DataType::UInt16 => {
99+
Self::dictionary_utf8_value::<UInt16Type>(column.as_ref(), self.row_idx)
100+
}
101+
DataType::UInt32 => {
102+
Self::dictionary_utf8_value::<UInt32Type>(column.as_ref(), self.row_idx)
103+
}
104+
DataType::UInt64 => {
105+
Self::dictionary_utf8_value::<UInt64Type>(column.as_ref(), self.row_idx)
106+
}
107+
_ => None,
108+
}
109+
}
110+
DataType::Utf8 => {
111+
let arr = column.as_string::<i32>();
112+
Some(arr.value(self.row_idx))
113+
}
114+
_ => None,
115+
};
116+
match string_value {
117+
Some(s) => {
118+
// Match JSON impl: 1u8 (present) + hash_json_val for String (3u8 + len + bytes).
119+
hasher.write_u8(1u8);
120+
hasher.write_u8(3u8);
121+
hasher.write_u64(s.len() as u64);
122+
hasher.write(s.as_bytes());
123+
}
124+
None => {
125+
hasher.write_u8(0u8);
126+
}
127+
}
128+
}
129+
}
130+
131+
#[cfg(test)]
132+
mod tests {
133+
use std::sync::Arc;
134+
135+
use arrow::array::StringDictionaryBuilder;
136+
use arrow::datatypes::{Field, Schema as ArrowSchema};
137+
138+
use super::*;
139+
140+
#[test]
141+
fn test_arrow_row_context_hash_matches_json() {
142+
let routing_expr = RoutingExpr::new("hash_mod((metric_name,host), 100)").unwrap();
143+
144+
let json_ctx: serde_json::Map<String, JsonValue> = serde_json::from_str(
145+
r#"{"metric_name": "cpu.usage", "host": "server-01", "env": "prod"}"#,
146+
)
147+
.unwrap();
148+
let json_hash = routing_expr.eval_hash(&json_ctx);
149+
150+
let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
151+
let schema = Arc::new(ArrowSchema::new(vec![
152+
Field::new("metric_name", dict_type.clone(), false),
153+
Field::new("host", dict_type.clone(), true),
154+
Field::new("env", dict_type, true),
155+
]));
156+
157+
let mut metric_name_builder = StringDictionaryBuilder::<Int32Type>::new();
158+
metric_name_builder.append_value("cpu.usage");
159+
let mut host_builder = StringDictionaryBuilder::<Int32Type>::new();
160+
host_builder.append_value("server-01");
161+
let mut env_builder = StringDictionaryBuilder::<Int32Type>::new();
162+
env_builder.append_value("prod");
163+
164+
let batch = RecordBatch::try_new(
165+
schema,
166+
vec![
167+
Arc::new(metric_name_builder.finish()),
168+
Arc::new(host_builder.finish()),
169+
Arc::new(env_builder.finish()),
170+
],
171+
)
172+
.unwrap();
173+
174+
let arrow_ctx = ArrowRowContext::new(&batch, 0);
175+
let arrow_hash = routing_expr.eval_hash(&arrow_ctx);
176+
177+
assert_eq!(
178+
json_hash, arrow_hash,
179+
"Arrow and JSON contexts must produce identical partition hashes"
180+
);
181+
}
182+
183+
#[test]
184+
fn test_arrow_row_context_hashes_non_int32_dictionary_keys() {
185+
let routing_expr = RoutingExpr::new("hash_mod((metric_name,host), 100)").unwrap();
186+
187+
let json_ctx: serde_json::Map<String, JsonValue> =
188+
serde_json::from_str(r#"{"metric_name": "cpu.usage", "host": "server-01"}"#).unwrap();
189+
let json_hash = routing_expr.eval_hash(&json_ctx);
190+
191+
let int32_dict_type =
192+
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
193+
let uint8_dict_type =
194+
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8));
195+
let schema = Arc::new(ArrowSchema::new(vec![
196+
Field::new("metric_name", int32_dict_type, false),
197+
Field::new("host", uint8_dict_type, true),
198+
]));
199+
200+
let mut metric_name_builder = StringDictionaryBuilder::<Int32Type>::new();
201+
metric_name_builder.append_value("cpu.usage");
202+
let mut host_builder = StringDictionaryBuilder::<UInt8Type>::new();
203+
host_builder.append_value("server-01");
204+
205+
let batch = RecordBatch::try_new(
206+
schema,
207+
vec![
208+
Arc::new(metric_name_builder.finish()),
209+
Arc::new(host_builder.finish()),
210+
],
211+
)
212+
.unwrap();
213+
214+
let arrow_ctx = ArrowRowContext::new(&batch, 0);
215+
let arrow_hash = routing_expr.eval_hash(&arrow_ctx);
216+
217+
assert_eq!(
218+
json_hash, arrow_hash,
219+
"Arrow dictionary keys other than Int32 should hash like JSON strings"
220+
);
221+
}
222+
}

quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ pub(crate) use expression_dsl::parse_field_name;
2222
use serde_json::Value as JsonValue;
2323
use siphasher::sip::SipHasher;
2424

25+
#[cfg(feature = "metrics")]
26+
mod metrics;
27+
28+
#[cfg(feature = "metrics")]
29+
pub use metrics::ArrowRowContext;
30+
2531
pub trait RoutingExprContext {
2632
fn hash_attribute<H: Hasher>(&self, attr_name: &[String], hasher: &mut H);
2733
}
@@ -134,6 +140,11 @@ impl RoutingExpr {
134140
})
135141
}
136142

143+
/// Returns `true` if no routing expression is configured.
144+
pub fn is_empty(&self) -> bool {
145+
self.inner_opt.is_none()
146+
}
147+
137148
/// Evaluates the expression applied to the given
138149
/// context and returns a u64 hash.
139150
///

0 commit comments

Comments
 (0)