Skip to content

Commit c639cf6

Browse files
g-talbotclaude
andcommitted
fix: nightly rustfmt import ordering
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6eda0e5 commit c639cf6

3 files changed

Lines changed: 68 additions & 47 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/tests.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,9 @@ fn test_merge_per_output_metadata_from_actual_rows() {
323323
num_outputs: 1,
324324
writer_config: ParquetWriterConfig::default(),
325325
};
326-
let outputs = merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config).unwrap();
326+
let outputs =
327+
merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config)
328+
.unwrap();
327329
assert_eq!(outputs.len(), 1);
328330
let output = &outputs[0];
329331
assert!(output.metric_names.contains("cpu"));

quickwit/quickwit-parquet-engine/src/merge/writer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use crate::row_keys;
4040
use crate::sort_fields::parse_sort_fields;
4141
use crate::sorted_series::SORTED_SERIES_COLUMN;
4242
use crate::split::TAG_SERVICE;
43-
use crate::storage::split_writer::{extract_metric_names, extract_service_names, extract_time_range};
43+
use crate::storage::split_writer::{
44+
extract_metric_names, extract_service_names, extract_time_range,
45+
};
4446
use crate::storage::{
4547
PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON,
4648
PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START,
@@ -119,8 +121,8 @@ pub fn write_merge_outputs(
119121
// Extract per-output logical metadata from the actual rows.
120122
let metric_names = extract_metric_names(&sorted_batch)
121123
.context("extracting metric names from merge output")?;
122-
let time_range = extract_time_range(&sorted_batch)
123-
.context("extracting time range from merge output")?;
124+
let time_range =
125+
extract_time_range(&sorted_batch).context("extracting time range from merge output")?;
124126
let service_names = extract_service_names(&sorted_batch)
125127
.context("extracting service names from merge output")?;
126128

quickwit/quickwit-parquet-engine/src/storage/split_writer.rs

Lines changed: 60 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -194,69 +194,86 @@ pub(crate) fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, Parqu
194194
}
195195

196196
/// Extracts distinct metric names from a RecordBatch.
197-
pub(crate) fn extract_metric_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
197+
pub(crate) fn extract_metric_names(
198+
batch: &RecordBatch,
199+
) -> Result<HashSet<String>, ParquetWriteError> {
198200
let metric_idx = batch
199201
.schema()
200202
.index_of("metric_name")
201203
.map_err(|_| ParquetWriteError::SchemaValidation("missing metric_name column".into()))?;
202-
let metric_col = batch.column(metric_idx);
203-
let mut names = HashSet::new();
204-
205-
// The column is Dictionary(Int32, Utf8)
206-
if let Some(dict_array) = metric_col
207-
.as_any()
208-
.downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
209-
{
210-
let values = dict_array.values();
211-
if let Some(string_values) = values.as_any().downcast_ref::<arrow::array::StringArray>() {
212-
// Get all dictionary values that are actually used
213-
for i in 0..dict_array.len() {
214-
if !dict_array.is_null(i)
215-
&& let Ok(key) = dict_array.keys().value(i).try_into()
216-
{
217-
let key: usize = key;
218-
if key < string_values.len() && !string_values.is_null(key) {
219-
names.insert(string_values.value(key).to_string());
220-
}
221-
}
222-
}
223-
}
224-
}
225-
226-
Ok(names)
204+
extract_distinct_strings(batch.column(metric_idx))
227205
}
228206

229207
/// Extracts distinct service names from a RecordBatch.
230-
pub(crate) fn extract_service_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
208+
pub(crate) fn extract_service_names(
209+
batch: &RecordBatch,
210+
) -> Result<HashSet<String>, ParquetWriteError> {
231211
let service_idx = match batch.schema().index_of("service").ok() {
232212
Some(idx) => idx,
233213
None => return Ok(HashSet::new()),
234214
};
235-
let service_col = batch.column(service_idx);
236-
let mut names = HashSet::new();
215+
extract_distinct_strings(batch.column(service_idx))
216+
}
237217

238-
// The column is Dictionary(Int32, Utf8)
239-
if let Some(dict_array) = service_col
218+
/// Extracts distinct non-null string values from a column.
219+
///
220+
/// Handles both Dictionary(Int32, Utf8) encoding (common at ingest) and
221+
/// plain Utf8/LargeUtf8 (possible after optimize_output_batch in the merge
222+
/// path when cardinality is too high for dictionary encoding).
223+
fn extract_distinct_strings(
224+
col: &dyn arrow::array::Array,
225+
) -> Result<HashSet<String>, ParquetWriteError> {
226+
let mut values = HashSet::new();
227+
228+
// Try Dictionary(Int32, Utf8) first — the common case at ingest.
229+
if let Some(dict_array) = col
240230
.as_any()
241231
.downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
232+
&& let Some(string_values) = dict_array
233+
.values()
234+
.as_any()
235+
.downcast_ref::<arrow::array::StringArray>()
242236
{
243-
let values = dict_array.values();
244-
if let Some(string_values) = values.as_any().downcast_ref::<arrow::array::StringArray>() {
245-
// Get all dictionary values that are actually used
246-
for i in 0..dict_array.len() {
247-
if !dict_array.is_null(i)
248-
&& let Ok(key) = dict_array.keys().value(i).try_into()
249-
{
250-
let key: usize = key;
251-
if key < string_values.len() && !string_values.is_null(key) {
252-
names.insert(string_values.value(key).to_string());
253-
}
237+
for i in 0..dict_array.len() {
238+
if !dict_array.is_null(i)
239+
&& let Ok(key) = dict_array.keys().value(i).try_into()
240+
{
241+
let key: usize = key;
242+
if key < string_values.len() && !string_values.is_null(key) {
243+
values.insert(string_values.value(key).to_string());
254244
}
255245
}
256246
}
247+
return Ok(values);
248+
}
249+
250+
// Fall back to plain Utf8 (after optimize_output_batch strips dictionary
251+
// encoding for high-cardinality columns).
252+
if let Some(string_array) = col.as_any().downcast_ref::<arrow::array::StringArray>() {
253+
for i in 0..string_array.len() {
254+
if !string_array.is_null(i) {
255+
values.insert(string_array.value(i).to_string());
256+
}
257+
}
258+
return Ok(values);
259+
}
260+
261+
// LargeUtf8 variant.
262+
if let Some(string_array) = col
263+
.as_any()
264+
.downcast_ref::<arrow::array::LargeStringArray>()
265+
{
266+
for i in 0..string_array.len() {
267+
if !string_array.is_null(i) {
268+
values.insert(string_array.value(i).to_string());
269+
}
270+
}
271+
return Ok(values);
257272
}
258273

259-
Ok(names)
274+
// Unrecognized column type — return empty rather than error, since the
275+
// column may legitimately be a type we don't extract strings from.
276+
Ok(values)
260277
}
261278

262279
#[cfg(test)]

0 commit comments

Comments
 (0)