Skip to content

Commit a6b5b73

Browse files
g-talbotclaude
andauthored
feat: extract and populate RowKeys from sorted batches (#6292)
* feat: extract and populate RowKeys from sorted batches Compute sort-key boundaries (first/last row values) during Parquet write and store them as RowKeys proto in both the Parquet KV metadata and MetricsSplitMetadata. The compactor uses these ranges to determine key-range overlap between splits and to select merge boundaries. - New row_keys module: extract_row_keys() reads sort schema column values at rows 0 and N-1, mapping Arrow types to ColumnValue proto - prepare_write() now computes row_keys after sorting and injects qh.row_keys (base64) + qh.row_keys_json into Parquet KV metadata - write_to_bytes/write_to_file_with_metadata return row_keys proto bytes alongside the write result - split_writer captures and stores row_keys_proto on metadata Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: verify RowKeys consistency across all three storage paths End-to-end test with 8 rows across 4 series (including null tags, mixed metrics, wide timestamp range). Writes through the full split_writer pipeline and verifies the RowKeys proto is byte-identical in all three places: 1. MetricsSplitMetadata.row_keys_proto (feeds Postgres) 2. Parquet file KV metadata (qh.row_keys, base64-encoded) 3. InsertableMetricsSplit.row_keys (Postgres column) Also checks: min/max column values match expected sort order, all_inclusive_max equals max, JSON metadata is parseable, and the split is not marked expired. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: resolve merge conflicts after rebase — adapt to ParquetSplitMetadata API - Keep DDSketch sketch validation in prepare_write - Fix ParquetSplitWriter::new to pass ParquetSplitKind - Remove Postgres path test (InsertableParquetSplit moved to quickwit-metastore) - Update MetricsSplitMetadata references to ParquetSplitMetadata Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: fix nightly fmt — collapse split_writer binding to one line Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: add .unwrap() to ParquetWriter/ParquetSplitWriter::new calls in row_keys tests These constructors now return Result after the sorted-series-key merge. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: normalize legacy "timestamp" to "timestamp_secs" in row_keys extraction The sort schema parser accepts "timestamp" as a legacy alias, but the Arrow batch only contains "timestamp_secs". Without normalization, index_of fails and the code encodes None for the timestamp boundary, losing time range information from RowKeys. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: deduplicate qh.row_keys in Parquet KV metadata build_compaction_key_value_metadata already emits qh.row_keys when split_metadata.row_keys_proto is present. prepare_write then appended another entry from freshly computed values, producing duplicates. Now strips any pre-existing qh.row_keys and qh.row_keys_json before pushing the authoritative values from the sorted batch. Also adds a test assertion verifying exactly one qh.row_keys entry exists. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 68990db commit a6b5b73

5 files changed

Lines changed: 1009 additions & 27 deletions

File tree

quickwit/quickwit-parquet-engine/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
pub mod index;
2424
pub mod ingest;
2525
pub mod metrics;
26+
pub mod row_keys;
2627
pub mod schema;
2728
pub mod sort_fields;
2829
pub mod sorted_series;
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! RowKeys extraction from sorted RecordBatches.
16+
//!
17+
//! RowKeys define the sort-key boundaries of a split file: the values of each
18+
//! sort column at the first and last rows. The compactor uses these to
19+
//! determine key-range overlap between splits and to select merge boundaries.
20+
//!
21+
//! The proto format (`sortschema::RowKeys`) is defined in
22+
//! `event_store_sortschema.proto` and stored as base64-encoded bytes in
23+
//! Parquet key_value_metadata and in the PostgreSQL metastore.
24+
25+
#[cfg(test)]
26+
mod tests;
27+
28+
use anyhow::Result;
29+
use arrow::array::{Array, DictionaryArray, Int64Array, StringArray, UInt64Array};
30+
use arrow::datatypes::{DataType, Int32Type};
31+
use arrow::record_batch::RecordBatch;
32+
use quickwit_proto::sortschema::{ColumnValue, ColumnValues, RowKeys, column_value};
33+
34+
use crate::sort_fields::parse_sort_fields;
35+
36+
/// Extract [`RowKeys`] from a **sorted** [`RecordBatch`].
37+
///
38+
/// Reads the sort schema columns at row 0 (min) and the last row (max),
39+
/// building a `RowKeys` proto with `min_row_values`, `max_row_values`,
40+
/// and `all_inclusive_max_row_values`.
41+
///
42+
/// Returns `None` if the batch is empty.
43+
///
44+
/// # Errors
45+
///
46+
/// Returns an error if the sort fields string cannot be parsed.
47+
pub fn extract_row_keys(
48+
sort_fields_str: &str,
49+
sorted_batch: &RecordBatch,
50+
) -> Result<Option<RowKeys>> {
51+
if sorted_batch.num_rows() == 0 {
52+
return Ok(None);
53+
}
54+
55+
let sort_schema = parse_sort_fields(sort_fields_str)?;
56+
let batch_schema = sorted_batch.schema();
57+
58+
let first_row = 0;
59+
let last_row = sorted_batch.num_rows() - 1;
60+
61+
let mut min_values = Vec::with_capacity(sort_schema.column.len());
62+
let mut max_values = Vec::with_capacity(sort_schema.column.len());
63+
64+
for col_def in &sort_schema.column {
65+
// Normalize legacy "timestamp" token to the physical column name.
66+
let col_name = if col_def.name == "timestamp" {
67+
"timestamp_secs"
68+
} else {
69+
&col_def.name
70+
};
71+
let batch_idx = match batch_schema.index_of(col_name) {
72+
Ok(idx) => idx,
73+
Err(_) => {
74+
// Column not in batch — encode as empty ColumnValue (None value).
75+
min_values.push(ColumnValue { value: None });
76+
max_values.push(ColumnValue { value: None });
77+
continue;
78+
}
79+
};
80+
81+
let col = sorted_batch.column(batch_idx);
82+
min_values.push(extract_column_value(col.as_ref(), first_row));
83+
max_values.push(extract_column_value(col.as_ref(), last_row));
84+
}
85+
86+
let min_row = ColumnValues { column: min_values };
87+
let max_row = ColumnValues {
88+
column: max_values.clone(),
89+
};
90+
91+
Ok(Some(RowKeys {
92+
min_row_values: Some(min_row),
93+
max_row_values: Some(max_row.clone()),
94+
// No multi-valued columns in our schema, so all-inclusive max
95+
// equals max.
96+
all_inclusive_max_row_values: Some(max_row),
97+
expired: false,
98+
}))
99+
}
100+
101+
/// Serialize a [`RowKeys`] proto to bytes.
102+
pub fn encode_row_keys_proto(row_keys: &RowKeys) -> Vec<u8> {
103+
prost::Message::encode_to_vec(row_keys)
104+
}
105+
106+
/// Extract a single [`ColumnValue`] from an Arrow column at the given row.
107+
///
108+
/// Maps Arrow types to the proto's oneof:
109+
/// - `Dictionary(Int32, Utf8)` / `Utf8` → `TypeString` (bytes)
110+
/// - `Int64` → `TypeInt`
111+
/// - `UInt64` → `TypeInt` (cast to i64)
112+
/// - `Float64` → `TypeFloat`
113+
///
114+
/// Null values produce a `ColumnValue` with `value: None`.
115+
fn extract_column_value(array: &dyn Array, row: usize) -> ColumnValue {
116+
if array.is_null(row) {
117+
return ColumnValue { value: None };
118+
}
119+
120+
let value = match array.data_type() {
121+
DataType::Dictionary(_, _) => extract_dict_string(array, row)
122+
.map(|s| column_value::Value::TypeString(s.as_bytes().to_vec())),
123+
DataType::Utf8 => array
124+
.as_any()
125+
.downcast_ref::<StringArray>()
126+
.map(|a| column_value::Value::TypeString(a.value(row).as_bytes().to_vec())),
127+
DataType::Int64 => array
128+
.as_any()
129+
.downcast_ref::<Int64Array>()
130+
.map(|a| column_value::Value::TypeInt(a.value(row))),
131+
DataType::UInt64 => array
132+
.as_any()
133+
.downcast_ref::<UInt64Array>()
134+
.map(|a| column_value::Value::TypeInt(a.value(row) as i64)),
135+
DataType::Float64 => array
136+
.as_any()
137+
.downcast_ref::<arrow::array::Float64Array>()
138+
.map(|a| column_value::Value::TypeFloat(a.value(row))),
139+
_ => None,
140+
};
141+
142+
ColumnValue { value }
143+
}
144+
145+
/// Extract a string from a Dictionary(Int32, Utf8) column.
146+
fn extract_dict_string(array: &dyn Array, row: usize) -> Option<&str> {
147+
let dict = array
148+
.as_any()
149+
.downcast_ref::<DictionaryArray<Int32Type>>()?;
150+
let key_idx = dict.keys().value(row) as usize;
151+
let values = dict.values();
152+
let str_values = values.as_any().downcast_ref::<StringArray>()?;
153+
Some(str_values.value(key_idx))
154+
}

0 commit comments

Comments
 (0)