Skip to content

Commit 5442b5a

Browse files
g-talbotmattmkimclaude
authored
[phase-31 2/4] Compaction metadata types (#6258)
* feat: replace fixed MetricDataPoint fields with dynamic tag HashMap * feat: replace ParquetField enum with constants and dynamic validation * feat: derive sort order and bloom filters from batch schema * feat: union schema accumulation and schema-agnostic ingest validation * feat: dynamic column lookup in split writer * feat: remove ParquetSchema dependency from indexing actors * refactor: deduplicate test batch helpers * lint * feat(31): sort schema foundation — proto, parser, display, validation, window, TableConfig Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: rustdoc link errors — use backticks for private items * feat(31): compaction metadata types — extend split metadata, postgres model, field lookup Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Update quickwit/quickwit-parquet-engine/src/table_config.rs Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> * Update quickwit/quickwit-parquet-engine/src/table_config.rs Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> * style: rustfmt long match arm in default_sort_fields Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: make parquet_file field backward-compatible in MetricsSplitMetadata Pre-existing splits were serialized before the parquet_file field was added, so their JSON doesn't contain it. Adding #[serde(default)] makes deserialization fall back to empty string for old splits. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ba9c817 commit 5442b5a

3 files changed

Lines changed: 351 additions & 1 deletion

File tree

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2312,6 +2312,12 @@ impl MetastoreService for PostgresqlMetastore {
23122312
size_bytes: row.13,
23132313
split_metadata_json: row.14,
23142314
update_timestamp: row.15,
2315+
window_start: None,
2316+
window_duration_secs: 0,
2317+
sort_fields: String::new(),
2318+
num_merge_ops: 0,
2319+
row_keys: None,
2320+
zonemap_regexes: String::new(),
23152321
};
23162322

23172323
let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged);

quickwit/quickwit-parquet-engine/src/split/metadata.rs

Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//! Metrics split metadata definitions.
1616
1717
use std::collections::{HashMap, HashSet};
18+
use std::ops::Range;
1819
use std::time::SystemTime;
1920

2021
use serde::{Deserialize, Serialize};
@@ -120,7 +121,12 @@ impl std::fmt::Display for MetricsSplitState {
120121
}
121122

122123
/// Metadata for a metrics split.
124+
///
125+
/// The `window` field stores the time window as `[start, start + duration)`.
126+
/// For JSON serialization, it is decomposed into `window_start` and
127+
/// `window_duration_secs` for backward compatibility with pre-Phase-31 code.
123128
#[derive(Debug, Clone, Serialize, Deserialize)]
129+
#[serde(from = "MetricsSplitMetadataSerde", into = "MetricsSplitMetadataSerde")]
124130
pub struct MetricsSplitMetadata {
125131
/// Unique split identifier.
126132
pub split_id: SplitId,
@@ -153,6 +159,119 @@ pub struct MetricsSplitMetadata {
153159

154160
/// When this split was created.
155161
pub created_at: SystemTime,
162+
163+
/// Parquet file path relative to storage root.
164+
pub parquet_file: String,
165+
166+
/// Time window as `[start, start + duration)` in epoch seconds.
167+
/// None for pre-Phase-31 splits (backward compat).
168+
pub window: Option<Range<i64>>,
169+
170+
/// Sort schema as Husky-style string (e.g., "metric_name|host|timestamp/V2").
171+
/// Empty string for pre-Phase-31 splits.
172+
pub sort_fields: String,
173+
174+
/// Number of merge operations this split has been through.
175+
/// 0 for newly ingested splits.
176+
pub num_merge_ops: u32,
177+
178+
/// RowKeys (sort-key min/max boundaries) as proto bytes.
179+
/// None for pre-Phase-31 splits or splits without sort schema.
180+
pub row_keys_proto: Option<Vec<u8>>,
181+
182+
/// Per-column zonemap regex strings, keyed by column name.
183+
/// Empty for pre-Phase-31 splits.
184+
pub zonemap_regexes: HashMap<String, String>,
185+
}
186+
187+
/// Serde helper struct that uses `window_start` / `window_duration_secs` field
188+
/// names for JSON backward compatibility while the in-memory representation uses
189+
/// `Option<Range<i64>>`.
190+
#[derive(Serialize, Deserialize)]
191+
struct MetricsSplitMetadataSerde {
192+
split_id: SplitId,
193+
index_uid: String,
194+
time_range: TimeRange,
195+
num_rows: u64,
196+
size_bytes: u64,
197+
metric_names: HashSet<String>,
198+
low_cardinality_tags: HashMap<String, HashSet<String>>,
199+
high_cardinality_tag_keys: HashSet<String>,
200+
created_at: SystemTime,
201+
202+
#[serde(default)]
203+
parquet_file: String,
204+
205+
#[serde(default, skip_serializing_if = "Option::is_none")]
206+
window_start: Option<i64>,
207+
208+
#[serde(default)]
209+
window_duration_secs: u32,
210+
211+
#[serde(default)]
212+
sort_fields: String,
213+
214+
#[serde(default)]
215+
num_merge_ops: u32,
216+
217+
#[serde(default, skip_serializing_if = "Option::is_none")]
218+
row_keys_proto: Option<Vec<u8>>,
219+
220+
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
221+
zonemap_regexes: HashMap<String, String>,
222+
}
223+
224+
impl From<MetricsSplitMetadataSerde> for MetricsSplitMetadata {
225+
fn from(s: MetricsSplitMetadataSerde) -> Self {
226+
let window = match (s.window_start, s.window_duration_secs) {
227+
(Some(start), dur) if dur > 0 => Some(start..start + dur as i64),
228+
_ => None,
229+
};
230+
Self {
231+
split_id: s.split_id,
232+
index_uid: s.index_uid,
233+
time_range: s.time_range,
234+
num_rows: s.num_rows,
235+
size_bytes: s.size_bytes,
236+
metric_names: s.metric_names,
237+
low_cardinality_tags: s.low_cardinality_tags,
238+
high_cardinality_tag_keys: s.high_cardinality_tag_keys,
239+
created_at: s.created_at,
240+
parquet_file: s.parquet_file,
241+
window,
242+
sort_fields: s.sort_fields,
243+
num_merge_ops: s.num_merge_ops,
244+
row_keys_proto: s.row_keys_proto,
245+
zonemap_regexes: s.zonemap_regexes,
246+
}
247+
}
248+
}
249+
250+
impl From<MetricsSplitMetadata> for MetricsSplitMetadataSerde {
251+
fn from(m: MetricsSplitMetadata) -> Self {
252+
let (window_start, window_duration_secs) = match &m.window {
253+
Some(w) => (Some(w.start), (w.end - w.start) as u32),
254+
None => (None, 0),
255+
};
256+
Self {
257+
split_id: m.split_id,
258+
index_uid: m.index_uid,
259+
time_range: m.time_range,
260+
num_rows: m.num_rows,
261+
size_bytes: m.size_bytes,
262+
metric_names: m.metric_names,
263+
low_cardinality_tags: m.low_cardinality_tags,
264+
high_cardinality_tag_keys: m.high_cardinality_tag_keys,
265+
created_at: m.created_at,
266+
parquet_file: m.parquet_file,
267+
window_start,
268+
window_duration_secs,
269+
sort_fields: m.sort_fields,
270+
num_merge_ops: m.num_merge_ops,
271+
row_keys_proto: m.row_keys_proto,
272+
zonemap_regexes: m.zonemap_regexes,
273+
}
274+
}
156275
}
157276

158277
impl MetricsSplitMetadata {
@@ -167,6 +286,19 @@ impl MetricsSplitMetadata {
167286
/// Tags with >= CARDINALITY_THRESHOLD unique values use Parquet bloom filters.
168287
pub const CARDINALITY_THRESHOLD: usize = 1000;
169288

289+
/// Returns the window start in epoch seconds, or `None` for pre-Phase-31 splits.
290+
pub fn window_start(&self) -> Option<i64> {
291+
self.window.as_ref().map(|w| w.start)
292+
}
293+
294+
/// Returns the window duration in seconds, or 0 for pre-Phase-31 splits.
295+
pub fn window_duration_secs(&self) -> u32 {
296+
match &self.window {
297+
Some(w) => (w.end - w.start) as u32,
298+
None => 0,
299+
}
300+
}
301+
170302
/// Create a new MetricsSplitMetadata builder.
171303
pub fn builder() -> MetricsSplitMetadataBuilder {
172304
MetricsSplitMetadataBuilder::default()
@@ -221,8 +353,19 @@ pub struct MetricsSplitMetadataBuilder {
221353
metric_names: HashSet<String>,
222354
low_cardinality_tags: HashMap<String, HashSet<String>>,
223355
high_cardinality_tag_keys: HashSet<String>,
356+
parquet_file: String,
357+
window_start: Option<i64>,
358+
window_duration_secs: u32,
359+
sort_fields: String,
360+
num_merge_ops: u32,
361+
row_keys_proto: Option<Vec<u8>>,
362+
zonemap_regexes: HashMap<String, String>,
224363
}
225364

365+
// The builder still accepts window_start and window_duration_secs separately
366+
// to remain compatible with callers that compute them independently (e.g.,
367+
// split_writer). The `build()` method fuses them into `Option<Range<i64>>`.
368+
226369
impl MetricsSplitMetadataBuilder {
227370
pub fn split_id(mut self, id: SplitId) -> Self {
228371
self.split_id = Some(id);
@@ -284,7 +427,71 @@ impl MetricsSplitMetadataBuilder {
284427
self
285428
}
286429

430+
pub fn parquet_file(mut self, path: impl Into<String>) -> Self {
431+
self.parquet_file = path.into();
432+
self
433+
}
434+
435+
pub fn window_start_secs(mut self, epoch_secs: i64) -> Self {
436+
self.window_start = Some(epoch_secs);
437+
self
438+
}
439+
440+
pub fn window_duration_secs(mut self, dur: u32) -> Self {
441+
self.window_duration_secs = dur;
442+
self
443+
}
444+
445+
pub fn sort_fields(mut self, schema: impl Into<String>) -> Self {
446+
self.sort_fields = schema.into();
447+
self
448+
}
449+
450+
pub fn num_merge_ops(mut self, ops: u32) -> Self {
451+
self.num_merge_ops = ops;
452+
self
453+
}
454+
455+
pub fn row_keys_proto(mut self, bytes: Vec<u8>) -> Self {
456+
self.row_keys_proto = Some(bytes);
457+
self
458+
}
459+
460+
pub fn add_zonemap_regex(
461+
mut self,
462+
column: impl Into<String>,
463+
regex: impl Into<String>,
464+
) -> Self {
465+
self.zonemap_regexes.insert(column.into(), regex.into());
466+
self
467+
}
468+
287469
pub fn build(self) -> MetricsSplitMetadata {
470+
// TW-2 (ADR-003): window_duration must evenly divide 3600.
471+
// Enforced at build time so no invalid metadata propagates to storage.
472+
debug_assert!(
473+
self.window_duration_secs == 0 || 3600 % self.window_duration_secs == 0,
474+
"TW-2 violated: window_duration_secs={} does not divide 3600",
475+
self.window_duration_secs
476+
);
477+
478+
// TW-1 (ADR-003, partial): window_start and window_duration_secs are paired.
479+
// If one is set, the other must be too. Pre-Phase-31 splits have both at defaults.
480+
debug_assert!(
481+
(self.window_start.is_none() && self.window_duration_secs == 0)
482+
|| (self.window_start.is_some() && self.window_duration_secs > 0),
483+
"TW-1 violated: window_start and window_duration_secs must be set together \
484+
(window_start={:?}, window_duration_secs={})",
485+
self.window_start,
486+
self.window_duration_secs
487+
);
488+
489+
// Fuse the two builder fields into a single Range.
490+
let window = match (self.window_start, self.window_duration_secs) {
491+
(Some(start), dur) if dur > 0 => Some(start..start + dur as i64),
492+
_ => None,
493+
};
494+
288495
MetricsSplitMetadata {
289496
split_id: self.split_id.unwrap_or_else(SplitId::generate),
290497
index_uid: self.index_uid.expect("index_uid is required"),
@@ -295,6 +502,12 @@ impl MetricsSplitMetadataBuilder {
295502
low_cardinality_tags: self.low_cardinality_tags,
296503
high_cardinality_tag_keys: self.high_cardinality_tag_keys,
297504
created_at: SystemTime::now(),
505+
parquet_file: self.parquet_file,
506+
window,
507+
sort_fields: self.sort_fields,
508+
num_merge_ops: self.num_merge_ops,
509+
row_keys_proto: self.row_keys_proto,
510+
zonemap_regexes: self.zonemap_regexes,
298511
}
299512
}
300513
}
@@ -401,4 +614,98 @@ mod tests {
401614
);
402615
assert_eq!(format!("{}", MetricsSplitState::Published), "Published");
403616
}
617+
618+
#[test]
619+
fn test_backward_compat_deserialize_pre_phase31_json() {
620+
// Simulate a JSON string from pre-Phase-31 code (no compaction fields).
621+
let pre_phase31_json = r#"{
622+
"split_id": "metrics_abc123",
623+
"index_uid": "test-index:00000000000000000000000000",
624+
"time_range": {"start_secs": 1000, "end_secs": 2000},
625+
"num_rows": 500,
626+
"size_bytes": 1024,
627+
"metric_names": ["cpu.usage"],
628+
"low_cardinality_tags": {},
629+
"high_cardinality_tag_keys": [],
630+
"created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0},
631+
"parquet_file": "split1.parquet"
632+
}"#;
633+
634+
let metadata: MetricsSplitMetadata =
635+
serde_json::from_str(pre_phase31_json).expect("should deserialize pre-Phase-31 JSON");
636+
637+
// New fields should be at their defaults.
638+
assert!(metadata.window.is_none());
639+
assert!(metadata.window_start().is_none());
640+
assert_eq!(metadata.window_duration_secs(), 0);
641+
assert_eq!(metadata.sort_fields, "");
642+
assert_eq!(metadata.num_merge_ops, 0);
643+
assert!(metadata.row_keys_proto.is_none());
644+
assert!(metadata.zonemap_regexes.is_empty());
645+
646+
// Existing fields should be intact.
647+
assert_eq!(metadata.split_id.as_str(), "metrics_abc123");
648+
assert_eq!(metadata.index_uid, "test-index:00000000000000000000000000");
649+
assert_eq!(metadata.num_rows, 500);
650+
}
651+
652+
#[test]
653+
fn test_round_trip_with_compaction_fields() {
654+
let metadata = MetricsSplitMetadata::builder()
655+
.split_id(SplitId::new("roundtrip-compaction"))
656+
.index_uid("test-index:00000000000000000000000000")
657+
.time_range(TimeRange::new(1000, 2000))
658+
.num_rows(100)
659+
.size_bytes(500)
660+
.window_start_secs(1700000000)
661+
.window_duration_secs(3600)
662+
.sort_fields("metric_name|host|timestamp/V2")
663+
.num_merge_ops(3)
664+
.row_keys_proto(vec![0x08, 0x01, 0x10, 0x02])
665+
.add_zonemap_regex("metric_name", "cpu\\..*")
666+
.add_zonemap_regex("host", "host-\\d+")
667+
.build();
668+
669+
let json = serde_json::to_string(&metadata).expect("should serialize");
670+
let recovered: MetricsSplitMetadata =
671+
serde_json::from_str(&json).expect("should deserialize");
672+
673+
assert_eq!(recovered.window, Some(1700000000..1700003600));
674+
assert_eq!(recovered.window_start(), Some(1700000000));
675+
assert_eq!(recovered.window_duration_secs(), 3600);
676+
assert_eq!(recovered.sort_fields, "metric_name|host|timestamp/V2");
677+
assert_eq!(recovered.num_merge_ops, 3);
678+
assert_eq!(recovered.row_keys_proto, Some(vec![0x08, 0x01, 0x10, 0x02]));
679+
assert_eq!(recovered.zonemap_regexes.len(), 2);
680+
assert_eq!(
681+
recovered.zonemap_regexes.get("metric_name").unwrap(),
682+
"cpu\\..*"
683+
);
684+
assert_eq!(recovered.zonemap_regexes.get("host").unwrap(), "host-\\d+");
685+
}
686+
687+
#[test]
688+
fn test_skip_serializing_empty_compaction_fields() {
689+
let metadata = MetricsSplitMetadata::builder()
690+
.split_id(SplitId::new("skip-test"))
691+
.index_uid("test-index:00000000000000000000000000")
692+
.time_range(TimeRange::new(1000, 2000))
693+
.build();
694+
695+
let json = serde_json::to_string(&metadata).expect("should serialize");
696+
697+
// Optional fields with skip_serializing_if should be absent.
698+
assert!(
699+
!json.contains("\"window_start\""),
700+
"window_start should not appear when None"
701+
);
702+
assert!(
703+
!json.contains("\"row_keys_proto\""),
704+
"row_keys_proto should not appear when None"
705+
);
706+
assert!(
707+
!json.contains("\"zonemap_regexes\""),
708+
"zonemap_regexes should not appear when empty"
709+
);
710+
}
404711
}

0 commit comments

Comments
 (0)