Skip to content

Commit b9566a6

Browse files
g-talbotclaude
andcommitted
feat(31): compaction metadata types — extend split metadata, postgres model, field lookup
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4d42fd9 commit b9566a6

File tree

3 files changed

+349
-1
lines changed

3 files changed

+349
-1
lines changed

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2312,6 +2312,12 @@ impl MetastoreService for PostgresqlMetastore {
23122312
size_bytes: row.13,
23132313
split_metadata_json: row.14,
23142314
update_timestamp: row.15,
2315+
window_start: None,
2316+
window_duration_secs: 0,
2317+
sort_fields: String::new(),
2318+
num_merge_ops: 0,
2319+
row_keys: None,
2320+
zonemap_regexes: String::new(),
23152321
};
23162322

23172323
let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged);

quickwit/quickwit-parquet-engine/src/split/metadata.rs

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//! Metrics split metadata definitions.
1616
1717
use std::collections::{HashMap, HashSet};
18+
use std::ops::Range;
1819
use std::time::SystemTime;
1920

2021
use serde::{Deserialize, Serialize};
@@ -120,7 +121,12 @@ impl std::fmt::Display for MetricsSplitState {
120121
}
121122

122123
/// Metadata for a metrics split.
124+
///
125+
/// The `window` field stores the time window as `[start, start + duration)`.
126+
/// For JSON serialization, it is decomposed into `window_start` and
127+
/// `window_duration_secs` for backward compatibility with pre-Phase-31 code.
123128
#[derive(Debug, Clone, Serialize, Deserialize)]
129+
#[serde(from = "MetricsSplitMetadataSerde", into = "MetricsSplitMetadataSerde")]
124130
pub struct MetricsSplitMetadata {
125131
/// Unique split identifier.
126132
pub split_id: SplitId,
@@ -153,6 +159,117 @@ pub struct MetricsSplitMetadata {
153159

154160
/// When this split was created.
155161
pub created_at: SystemTime,
162+
163+
/// Parquet file path(s) relative to storage root.
164+
pub parquet_files: Vec<String>,
165+
166+
/// Time window as `[start, start + duration)` in epoch seconds.
167+
/// None for pre-Phase-31 splits (backward compat).
168+
pub window: Option<Range<i64>>,
169+
170+
/// Sort schema as Husky-style string (e.g., "metric_name|host|timestamp/V2").
171+
/// Empty string for pre-Phase-31 splits.
172+
pub sort_fields: String,
173+
174+
/// Number of merge operations this split has been through.
175+
/// 0 for newly ingested splits.
176+
pub num_merge_ops: u32,
177+
178+
/// RowKeys (sort-key min/max boundaries) as proto bytes.
179+
/// None for pre-Phase-31 splits or splits without sort schema.
180+
pub row_keys_proto: Option<Vec<u8>>,
181+
182+
/// Per-column zonemap regex strings, keyed by column name.
183+
/// Empty for pre-Phase-31 splits.
184+
pub zonemap_regexes: HashMap<String, String>,
185+
}
186+
187+
/// Serde helper struct that uses `window_start` / `window_duration_secs` field
188+
/// names for JSON backward compatibility while the in-memory representation uses
189+
/// `Option<Range<i64>>`.
190+
#[derive(Serialize, Deserialize)]
191+
struct MetricsSplitMetadataSerde {
192+
split_id: SplitId,
193+
index_uid: String,
194+
time_range: TimeRange,
195+
num_rows: u64,
196+
size_bytes: u64,
197+
metric_names: HashSet<String>,
198+
low_cardinality_tags: HashMap<String, HashSet<String>>,
199+
high_cardinality_tag_keys: HashSet<String>,
200+
created_at: SystemTime,
201+
parquet_files: Vec<String>,
202+
203+
#[serde(default, skip_serializing_if = "Option::is_none")]
204+
window_start: Option<i64>,
205+
206+
#[serde(default)]
207+
window_duration_secs: u32,
208+
209+
#[serde(default)]
210+
sort_fields: String,
211+
212+
#[serde(default)]
213+
num_merge_ops: u32,
214+
215+
#[serde(default, skip_serializing_if = "Option::is_none")]
216+
row_keys_proto: Option<Vec<u8>>,
217+
218+
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
219+
zonemap_regexes: HashMap<String, String>,
220+
}
221+
222+
impl From<MetricsSplitMetadataSerde> for MetricsSplitMetadata {
223+
fn from(s: MetricsSplitMetadataSerde) -> Self {
224+
let window = match (s.window_start, s.window_duration_secs) {
225+
(Some(start), dur) if dur > 0 => Some(start..start + dur as i64),
226+
_ => None,
227+
};
228+
Self {
229+
split_id: s.split_id,
230+
index_uid: s.index_uid,
231+
time_range: s.time_range,
232+
num_rows: s.num_rows,
233+
size_bytes: s.size_bytes,
234+
metric_names: s.metric_names,
235+
low_cardinality_tags: s.low_cardinality_tags,
236+
high_cardinality_tag_keys: s.high_cardinality_tag_keys,
237+
created_at: s.created_at,
238+
parquet_files: s.parquet_files,
239+
window,
240+
sort_fields: s.sort_fields,
241+
num_merge_ops: s.num_merge_ops,
242+
row_keys_proto: s.row_keys_proto,
243+
zonemap_regexes: s.zonemap_regexes,
244+
}
245+
}
246+
}
247+
248+
impl From<MetricsSplitMetadata> for MetricsSplitMetadataSerde {
249+
fn from(m: MetricsSplitMetadata) -> Self {
250+
let (window_start, window_duration_secs) = match &m.window {
251+
Some(w) => (Some(w.start), (w.end - w.start) as u32),
252+
None => (None, 0),
253+
};
254+
Self {
255+
split_id: m.split_id,
256+
index_uid: m.index_uid,
257+
time_range: m.time_range,
258+
num_rows: m.num_rows,
259+
size_bytes: m.size_bytes,
260+
metric_names: m.metric_names,
261+
low_cardinality_tags: m.low_cardinality_tags,
262+
high_cardinality_tag_keys: m.high_cardinality_tag_keys,
263+
created_at: m.created_at,
264+
parquet_files: m.parquet_files,
265+
window_start,
266+
window_duration_secs,
267+
sort_fields: m.sort_fields,
268+
num_merge_ops: m.num_merge_ops,
269+
row_keys_proto: m.row_keys_proto,
270+
zonemap_regexes: m.zonemap_regexes,
271+
}
272+
}
156273
}
157274

158275
impl MetricsSplitMetadata {
@@ -167,6 +284,19 @@ impl MetricsSplitMetadata {
167284
/// Tags with >= CARDINALITY_THRESHOLD unique values use Parquet bloom filters.
168285
pub const CARDINALITY_THRESHOLD: usize = 1000;
169286

287+
/// Returns the window start in epoch seconds, or `None` for pre-Phase-31 splits.
288+
pub fn window_start(&self) -> Option<i64> {
289+
self.window.as_ref().map(|w| w.start)
290+
}
291+
292+
/// Returns the window duration in seconds, or 0 for pre-Phase-31 splits.
293+
pub fn window_duration_secs(&self) -> u32 {
294+
match &self.window {
295+
Some(w) => (w.end - w.start) as u32,
296+
None => 0,
297+
}
298+
}
299+
170300
/// Create a new MetricsSplitMetadata builder.
171301
pub fn builder() -> MetricsSplitMetadataBuilder {
172302
MetricsSplitMetadataBuilder::default()
@@ -221,8 +351,19 @@ pub struct MetricsSplitMetadataBuilder {
221351
metric_names: HashSet<String>,
222352
low_cardinality_tags: HashMap<String, HashSet<String>>,
223353
high_cardinality_tag_keys: HashSet<String>,
354+
parquet_files: Vec<String>,
355+
window_start: Option<i64>,
356+
window_duration_secs: u32,
357+
sort_fields: String,
358+
num_merge_ops: u32,
359+
row_keys_proto: Option<Vec<u8>>,
360+
zonemap_regexes: HashMap<String, String>,
224361
}
225362

363+
// The builder still accepts window_start and window_duration_secs separately
364+
// to remain compatible with callers that compute them independently (e.g.,
365+
// split_writer). The `build()` method fuses them into `Option<Range<i64>>`.
366+
226367
impl MetricsSplitMetadataBuilder {
227368
pub fn split_id(mut self, id: SplitId) -> Self {
228369
self.split_id = Some(id);
@@ -284,7 +425,71 @@ impl MetricsSplitMetadataBuilder {
284425
self
285426
}
286427

428+
pub fn add_parquet_file(mut self, path: impl Into<String>) -> Self {
429+
self.parquet_files.push(path.into());
430+
self
431+
}
432+
433+
pub fn window_start_secs(mut self, epoch_secs: i64) -> Self {
434+
self.window_start = Some(epoch_secs);
435+
self
436+
}
437+
438+
pub fn window_duration_secs(mut self, dur: u32) -> Self {
439+
self.window_duration_secs = dur;
440+
self
441+
}
442+
443+
pub fn sort_fields(mut self, schema: impl Into<String>) -> Self {
444+
self.sort_fields = schema.into();
445+
self
446+
}
447+
448+
pub fn num_merge_ops(mut self, ops: u32) -> Self {
449+
self.num_merge_ops = ops;
450+
self
451+
}
452+
453+
pub fn row_keys_proto(mut self, bytes: Vec<u8>) -> Self {
454+
self.row_keys_proto = Some(bytes);
455+
self
456+
}
457+
458+
pub fn add_zonemap_regex(
459+
mut self,
460+
column: impl Into<String>,
461+
regex: impl Into<String>,
462+
) -> Self {
463+
self.zonemap_regexes.insert(column.into(), regex.into());
464+
self
465+
}
466+
287467
pub fn build(self) -> MetricsSplitMetadata {
468+
// TW-2 (ADR-003): window_duration must evenly divide 3600.
469+
// Enforced at build time so no invalid metadata propagates to storage.
470+
debug_assert!(
471+
self.window_duration_secs == 0 || 3600 % self.window_duration_secs == 0,
472+
"TW-2 violated: window_duration_secs={} does not divide 3600",
473+
self.window_duration_secs
474+
);
475+
476+
// TW-1 (ADR-003, partial): window_start and window_duration_secs are paired.
477+
// If one is set, the other must be too. Pre-Phase-31 splits have both at defaults.
478+
debug_assert!(
479+
(self.window_start.is_none() && self.window_duration_secs == 0)
480+
|| (self.window_start.is_some() && self.window_duration_secs > 0),
481+
"TW-1 violated: window_start and window_duration_secs must be set together \
482+
(window_start={:?}, window_duration_secs={})",
483+
self.window_start,
484+
self.window_duration_secs
485+
);
486+
487+
// Fuse the two builder fields into a single Range.
488+
let window = match (self.window_start, self.window_duration_secs) {
489+
(Some(start), dur) if dur > 0 => Some(start..start + dur as i64),
490+
_ => None,
491+
};
492+
288493
MetricsSplitMetadata {
289494
split_id: self.split_id.unwrap_or_else(SplitId::generate),
290495
index_uid: self.index_uid.expect("index_uid is required"),
@@ -295,6 +500,12 @@ impl MetricsSplitMetadataBuilder {
295500
low_cardinality_tags: self.low_cardinality_tags,
296501
high_cardinality_tag_keys: self.high_cardinality_tag_keys,
297502
created_at: SystemTime::now(),
503+
parquet_files: self.parquet_files,
504+
window,
505+
sort_fields: self.sort_fields,
506+
num_merge_ops: self.num_merge_ops,
507+
row_keys_proto: self.row_keys_proto,
508+
zonemap_regexes: self.zonemap_regexes,
298509
}
299510
}
300511
}
@@ -401,4 +612,98 @@ mod tests {
401612
);
402613
assert_eq!(format!("{}", MetricsSplitState::Published), "Published");
403614
}
615+
616+
#[test]
617+
fn test_backward_compat_deserialize_pre_phase31_json() {
618+
// Simulate a JSON string from pre-Phase-31 code (no compaction fields).
619+
let pre_phase31_json = r#"{
620+
"split_id": "metrics_abc123",
621+
"index_uid": "test-index:00000000000000000000000000",
622+
"time_range": {"start_secs": 1000, "end_secs": 2000},
623+
"num_rows": 500,
624+
"size_bytes": 1024,
625+
"metric_names": ["cpu.usage"],
626+
"low_cardinality_tags": {},
627+
"high_cardinality_tag_keys": [],
628+
"created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0},
629+
"parquet_files": ["split1.parquet"]
630+
}"#;
631+
632+
let metadata: MetricsSplitMetadata =
633+
serde_json::from_str(pre_phase31_json).expect("should deserialize pre-Phase-31 JSON");
634+
635+
// New fields should be at their defaults.
636+
assert!(metadata.window.is_none());
637+
assert!(metadata.window_start().is_none());
638+
assert_eq!(metadata.window_duration_secs(), 0);
639+
assert_eq!(metadata.sort_fields, "");
640+
assert_eq!(metadata.num_merge_ops, 0);
641+
assert!(metadata.row_keys_proto.is_none());
642+
assert!(metadata.zonemap_regexes.is_empty());
643+
644+
// Existing fields should be intact.
645+
assert_eq!(metadata.split_id.as_str(), "metrics_abc123");
646+
assert_eq!(metadata.index_uid, "test-index:00000000000000000000000000");
647+
assert_eq!(metadata.num_rows, 500);
648+
}
649+
650+
#[test]
651+
fn test_round_trip_with_compaction_fields() {
652+
let metadata = MetricsSplitMetadata::builder()
653+
.split_id(SplitId::new("roundtrip-compaction"))
654+
.index_uid("test-index:00000000000000000000000000")
655+
.time_range(TimeRange::new(1000, 2000))
656+
.num_rows(100)
657+
.size_bytes(500)
658+
.window_start_secs(1700000000)
659+
.window_duration_secs(3600)
660+
.sort_fields("metric_name|host|timestamp/V2")
661+
.num_merge_ops(3)
662+
.row_keys_proto(vec![0x08, 0x01, 0x10, 0x02])
663+
.add_zonemap_regex("metric_name", "cpu\\..*")
664+
.add_zonemap_regex("host", "host-\\d+")
665+
.build();
666+
667+
let json = serde_json::to_string(&metadata).expect("should serialize");
668+
let recovered: MetricsSplitMetadata =
669+
serde_json::from_str(&json).expect("should deserialize");
670+
671+
assert_eq!(recovered.window, Some(1700000000..1700003600));
672+
assert_eq!(recovered.window_start(), Some(1700000000));
673+
assert_eq!(recovered.window_duration_secs(), 3600);
674+
assert_eq!(recovered.sort_fields, "metric_name|host|timestamp/V2");
675+
assert_eq!(recovered.num_merge_ops, 3);
676+
assert_eq!(recovered.row_keys_proto, Some(vec![0x08, 0x01, 0x10, 0x02]));
677+
assert_eq!(recovered.zonemap_regexes.len(), 2);
678+
assert_eq!(
679+
recovered.zonemap_regexes.get("metric_name").unwrap(),
680+
"cpu\\..*"
681+
);
682+
assert_eq!(recovered.zonemap_regexes.get("host").unwrap(), "host-\\d+");
683+
}
684+
685+
#[test]
686+
fn test_skip_serializing_empty_compaction_fields() {
687+
let metadata = MetricsSplitMetadata::builder()
688+
.split_id(SplitId::new("skip-test"))
689+
.index_uid("test-index:00000000000000000000000000")
690+
.time_range(TimeRange::new(1000, 2000))
691+
.build();
692+
693+
let json = serde_json::to_string(&metadata).expect("should serialize");
694+
695+
// Optional fields with skip_serializing_if should be absent.
696+
assert!(
697+
!json.contains("\"window_start\""),
698+
"window_start should not appear when None"
699+
);
700+
assert!(
701+
!json.contains("\"row_keys_proto\""),
702+
"row_keys_proto should not appear when None"
703+
);
704+
assert!(
705+
!json.contains("\"zonemap_regexes\""),
706+
"zonemap_regexes should not appear when empty"
707+
);
708+
}
404709
}

0 commit comments

Comments
 (0)