Skip to content

Commit 5b1c080

Browse files
g-talbotclaude
andcommitted
feat(31): compaction metadata types — extend split metadata, postgres model, field lookup
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a90ca2f commit 5b1c080

File tree

3 files changed

+240
-1
lines changed

3 files changed

+240
-1
lines changed

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2312,6 +2312,12 @@ impl MetastoreService for PostgresqlMetastore {
23122312
size_bytes: row.13,
23132313
split_metadata_json: row.14,
23142314
update_timestamp: row.15,
2315+
window_start: None,
2316+
window_duration_secs: 0,
2317+
sort_fields: String::new(),
2318+
num_merge_ops: 0,
2319+
row_keys: None,
2320+
zonemap_regexes: String::new(),
23152321
};
23162322

23172323
let state = pg_split.split_state().unwrap_or(MetricsSplitState::Staged);

quickwit/quickwit-parquet-engine/src/split/metadata.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,38 @@ pub struct MetricsSplitMetadata {
153153

154154
/// When this split was created.
155155
pub created_at: SystemTime,
156+
157+
/// Parquet file path(s) relative to storage root.
158+
pub parquet_files: Vec<String>,
159+
160+
/// Window start as epoch seconds. None for pre-Phase-31 splits (backward compat).
161+
#[serde(default, skip_serializing_if = "Option::is_none")]
162+
pub window_start: Option<i64>,
163+
164+
/// Window duration in seconds. Paired with window_start.
165+
/// 0 for pre-Phase-31 splits.
166+
#[serde(default)]
167+
pub window_duration_secs: u32,
168+
169+
/// Sort schema as Husky-style string (e.g., "metric_name|host|timestamp/V2").
170+
/// Empty string for pre-Phase-31 splits.
171+
#[serde(default)]
172+
pub sort_fields: String,
173+
174+
/// Number of merge operations this split has been through.
175+
/// 0 for newly ingested splits.
176+
#[serde(default)]
177+
pub num_merge_ops: u32,
178+
179+
/// RowKeys (sort-key min/max boundaries) as proto bytes.
180+
/// None for pre-Phase-31 splits or splits without sort schema.
181+
#[serde(default, skip_serializing_if = "Option::is_none")]
182+
pub row_keys_proto: Option<Vec<u8>>,
183+
184+
/// Per-column zonemap regex strings, keyed by column name.
185+
/// Empty for pre-Phase-31 splits.
186+
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
187+
pub zonemap_regexes: HashMap<String, String>,
156188
}
157189

158190
impl MetricsSplitMetadata {
@@ -221,6 +253,13 @@ pub struct MetricsSplitMetadataBuilder {
221253
metric_names: HashSet<String>,
222254
low_cardinality_tags: HashMap<String, HashSet<String>>,
223255
high_cardinality_tag_keys: HashSet<String>,
256+
parquet_files: Vec<String>,
257+
window_start: Option<i64>,
258+
window_duration_secs: u32,
259+
sort_fields: String,
260+
num_merge_ops: u32,
261+
row_keys_proto: Option<Vec<u8>>,
262+
zonemap_regexes: HashMap<String, String>,
224263
}
225264

226265
impl MetricsSplitMetadataBuilder {
@@ -284,7 +323,65 @@ impl MetricsSplitMetadataBuilder {
284323
self
285324
}
286325

326+
pub fn add_parquet_file(mut self, path: impl Into<String>) -> Self {
327+
self.parquet_files.push(path.into());
328+
self
329+
}
330+
331+
pub fn window_start_secs(mut self, epoch_secs: i64) -> Self {
332+
self.window_start = Some(epoch_secs);
333+
self
334+
}
335+
336+
pub fn window_duration_secs(mut self, dur: u32) -> Self {
337+
self.window_duration_secs = dur;
338+
self
339+
}
340+
341+
pub fn sort_fields(mut self, schema: impl Into<String>) -> Self {
342+
self.sort_fields = schema.into();
343+
self
344+
}
345+
346+
pub fn num_merge_ops(mut self, ops: u32) -> Self {
347+
self.num_merge_ops = ops;
348+
self
349+
}
350+
351+
pub fn row_keys_proto(mut self, bytes: Vec<u8>) -> Self {
352+
self.row_keys_proto = Some(bytes);
353+
self
354+
}
355+
356+
pub fn add_zonemap_regex(
357+
mut self,
358+
column: impl Into<String>,
359+
regex: impl Into<String>,
360+
) -> Self {
361+
self.zonemap_regexes.insert(column.into(), regex.into());
362+
self
363+
}
364+
287365
pub fn build(self) -> MetricsSplitMetadata {
366+
// TW-2 (ADR-003): window_duration must evenly divide 3600.
367+
// Enforced at build time so no invalid metadata propagates to storage.
368+
debug_assert!(
369+
self.window_duration_secs == 0 || 3600 % self.window_duration_secs == 0,
370+
"TW-2 violated: window_duration_secs={} does not divide 3600",
371+
self.window_duration_secs
372+
);
373+
374+
// TW-1 (ADR-003, partial): window_start and window_duration_secs are paired.
375+
// If one is set, the other must be too. Pre-Phase-31 splits have both at defaults.
376+
debug_assert!(
377+
(self.window_start.is_none() && self.window_duration_secs == 0)
378+
|| (self.window_start.is_some() && self.window_duration_secs > 0),
379+
"TW-1 violated: window_start and window_duration_secs must be set together \
380+
(window_start={:?}, window_duration_secs={})",
381+
self.window_start,
382+
self.window_duration_secs
383+
);
384+
288385
MetricsSplitMetadata {
289386
split_id: self.split_id.unwrap_or_else(SplitId::generate),
290387
index_uid: self.index_uid.expect("index_uid is required"),
@@ -295,6 +392,13 @@ impl MetricsSplitMetadataBuilder {
295392
low_cardinality_tags: self.low_cardinality_tags,
296393
high_cardinality_tag_keys: self.high_cardinality_tag_keys,
297394
created_at: SystemTime::now(),
395+
parquet_files: self.parquet_files,
396+
window_start: self.window_start,
397+
window_duration_secs: self.window_duration_secs,
398+
sort_fields: self.sort_fields,
399+
num_merge_ops: self.num_merge_ops,
400+
row_keys_proto: self.row_keys_proto,
401+
zonemap_regexes: self.zonemap_regexes,
298402
}
299403
}
300404
}
@@ -401,4 +505,96 @@ mod tests {
401505
);
402506
assert_eq!(format!("{}", MetricsSplitState::Published), "Published");
403507
}
508+
509+
#[test]
510+
fn test_backward_compat_deserialize_pre_phase31_json() {
511+
// Simulate a JSON string from pre-Phase-31 code (no compaction fields).
512+
let pre_phase31_json = r#"{
513+
"split_id": "metrics_abc123",
514+
"index_uid": "test-index:00000000000000000000000000",
515+
"time_range": {"start_secs": 1000, "end_secs": 2000},
516+
"num_rows": 500,
517+
"size_bytes": 1024,
518+
"metric_names": ["cpu.usage"],
519+
"low_cardinality_tags": {},
520+
"high_cardinality_tag_keys": [],
521+
"created_at": {"secs_since_epoch": 1700000000, "nanos_since_epoch": 0},
522+
"parquet_files": ["split1.parquet"]
523+
}"#;
524+
525+
let metadata: MetricsSplitMetadata =
526+
serde_json::from_str(pre_phase31_json).expect("should deserialize pre-Phase-31 JSON");
527+
528+
// New fields should be at their defaults.
529+
assert!(metadata.window_start.is_none());
530+
assert_eq!(metadata.window_duration_secs, 0);
531+
assert_eq!(metadata.sort_fields, "");
532+
assert_eq!(metadata.num_merge_ops, 0);
533+
assert!(metadata.row_keys_proto.is_none());
534+
assert!(metadata.zonemap_regexes.is_empty());
535+
536+
// Existing fields should be intact.
537+
assert_eq!(metadata.split_id.as_str(), "metrics_abc123");
538+
assert_eq!(metadata.index_uid, "test-index:00000000000000000000000000");
539+
assert_eq!(metadata.num_rows, 500);
540+
}
541+
542+
#[test]
543+
fn test_round_trip_with_compaction_fields() {
544+
let metadata = MetricsSplitMetadata::builder()
545+
.split_id(SplitId::new("roundtrip-compaction"))
546+
.index_uid("test-index:00000000000000000000000000")
547+
.time_range(TimeRange::new(1000, 2000))
548+
.num_rows(100)
549+
.size_bytes(500)
550+
.window_start_secs(1700000000)
551+
.window_duration_secs(3600)
552+
.sort_fields("metric_name|host|timestamp/V2")
553+
.num_merge_ops(3)
554+
.row_keys_proto(vec![0x08, 0x01, 0x10, 0x02])
555+
.add_zonemap_regex("metric_name", "cpu\\..*")
556+
.add_zonemap_regex("host", "host-\\d+")
557+
.build();
558+
559+
let json = serde_json::to_string(&metadata).expect("should serialize");
560+
let recovered: MetricsSplitMetadata =
561+
serde_json::from_str(&json).expect("should deserialize");
562+
563+
assert_eq!(recovered.window_start, Some(1700000000));
564+
assert_eq!(recovered.window_duration_secs, 3600);
565+
assert_eq!(recovered.sort_fields, "metric_name|host|timestamp/V2");
566+
assert_eq!(recovered.num_merge_ops, 3);
567+
assert_eq!(recovered.row_keys_proto, Some(vec![0x08, 0x01, 0x10, 0x02]));
568+
assert_eq!(recovered.zonemap_regexes.len(), 2);
569+
assert_eq!(
570+
recovered.zonemap_regexes.get("metric_name").unwrap(),
571+
"cpu\\..*"
572+
);
573+
assert_eq!(recovered.zonemap_regexes.get("host").unwrap(), "host-\\d+");
574+
}
575+
576+
#[test]
577+
fn test_skip_serializing_empty_compaction_fields() {
578+
let metadata = MetricsSplitMetadata::builder()
579+
.split_id(SplitId::new("skip-test"))
580+
.index_uid("test-index:00000000000000000000000000")
581+
.time_range(TimeRange::new(1000, 2000))
582+
.build();
583+
584+
let json = serde_json::to_string(&metadata).expect("should serialize");
585+
586+
// Optional fields with skip_serializing_if should be absent.
587+
assert!(
588+
!json.contains("\"window_start\""),
589+
"window_start should not appear when None"
590+
);
591+
assert!(
592+
!json.contains("\"row_keys_proto\""),
593+
"row_keys_proto should not appear when None"
594+
);
595+
assert!(
596+
!json.contains("\"zonemap_regexes\""),
597+
"zonemap_regexes should not appear when empty"
598+
);
599+
}
404600
}

quickwit/quickwit-parquet-engine/src/split/postgres.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ pub struct PgMetricsSplit {
7171
pub size_bytes: i64,
7272
pub split_metadata_json: String,
7373
pub update_timestamp: i64,
74+
pub window_start: Option<i64>,
75+
pub window_duration_secs: i32,
76+
pub sort_fields: String,
77+
pub num_merge_ops: i32,
78+
pub row_keys: Option<Vec<u8>>,
79+
pub zonemap_regexes: String,
7480
}
7581

7682
/// Insertable row for metrics_splits table.
@@ -92,6 +98,12 @@ pub struct InsertableMetricsSplit {
9298
pub num_rows: i64,
9399
pub size_bytes: i64,
94100
pub split_metadata_json: String,
101+
pub window_start: Option<i64>,
102+
pub window_duration_secs: i32,
103+
pub sort_fields: String,
104+
pub num_merge_ops: i32,
105+
pub row_keys: Option<Vec<u8>>,
106+
pub zonemap_regexes: String,
95107
}
96108

97109
impl InsertableMetricsSplit {
@@ -102,6 +114,12 @@ impl InsertableMetricsSplit {
102114
) -> Result<Self, serde_json::Error> {
103115
let split_metadata_json = serde_json::to_string(metadata)?;
104116

117+
let zonemap_regexes_json = if metadata.zonemap_regexes.is_empty() {
118+
"{}".to_string()
119+
} else {
120+
serde_json::to_string(&metadata.zonemap_regexes)?
121+
};
122+
105123
Ok(Self {
106124
split_id: metadata.split_id.as_str().to_string(),
107125
split_state: state.as_str().to_string(),
@@ -118,6 +136,12 @@ impl InsertableMetricsSplit {
118136
num_rows: metadata.num_rows as i64,
119137
size_bytes: metadata.size_bytes as i64,
120138
split_metadata_json,
139+
window_start: metadata.window_start,
140+
window_duration_secs: metadata.window_duration_secs as i32,
141+
sort_fields: metadata.sort_fields.clone(),
142+
num_merge_ops: metadata.num_merge_ops as i32,
143+
row_keys: metadata.row_keys_proto.clone(),
144+
zonemap_regexes: zonemap_regexes_json,
121145
})
122146
}
123147
}
@@ -134,10 +158,17 @@ impl PgMetricsSplit {
134158
// Primary path: deserialize from JSON (authoritative)
135159
let metadata: MetricsSplitMetadata = serde_json::from_str(&self.split_metadata_json)?;
136160

137-
// Overlay database columns (for consistency verification in debug builds)
161+
// SS-5: Verify consistency between JSON blob and SQL columns.
138162
debug_assert_eq!(metadata.split_id.as_str(), self.split_id);
139163
debug_assert_eq!(metadata.time_range.start_secs, self.time_range_start as u64);
140164
debug_assert_eq!(metadata.time_range.end_secs, self.time_range_end as u64);
165+
debug_assert_eq!(metadata.window_start, self.window_start);
166+
debug_assert_eq!(
167+
metadata.window_duration_secs,
168+
self.window_duration_secs as u32
169+
);
170+
debug_assert_eq!(metadata.sort_fields, self.sort_fields);
171+
debug_assert_eq!(metadata.num_merge_ops, self.num_merge_ops as u32);
141172

142173
Ok(metadata)
143174
}
@@ -259,6 +290,12 @@ mod tests {
259290
size_bytes: insertable.size_bytes,
260291
split_metadata_json: insertable.split_metadata_json,
261292
update_timestamp: 1704067200,
293+
window_start: insertable.window_start,
294+
window_duration_secs: insertable.window_duration_secs,
295+
sort_fields: insertable.sort_fields,
296+
num_merge_ops: insertable.num_merge_ops,
297+
row_keys: insertable.row_keys,
298+
zonemap_regexes: insertable.zonemap_regexes,
262299
};
263300

264301
let recovered = pg_row.to_metadata().expect("should deserialize");

0 commit comments

Comments
 (0)