Skip to content

Commit 9a9e2eb

Browse files
authored
Optimize list fields (#6439)
1 parent 964cba5 commit 9a9e2eb

16 files changed

Lines changed: 1433 additions & 1117 deletions

File tree

quickwit/quickwit-indexing/src/actors/packager.rs

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ use quickwit_common::temp_dir::TempDirectory;
2727
use quickwit_directories::write_hotcache;
2828
use quickwit_doc_mapper::NamedField;
2929
use quickwit_doc_mapper::tag_pruning::append_to_tag_set;
30-
use quickwit_proto::search::{
31-
ListFieldType, ListFields, ListFieldsEntryResponse, serialize_split_fields,
32-
};
30+
use quickwit_proto::search::{ListFieldsEntry, ListFieldsMetadata, ListFieldsType};
3331
use tantivy::index::FieldMetadata;
3432
use tantivy::schema::{FieldType, Type};
3533
use tantivy::{InvertedIndexReader, ReloadPolicy, SegmentMeta};
@@ -314,7 +312,7 @@ fn create_packaged_split(
314312
build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?;
315313
ctx.record_progress();
316314

317-
let serialized_split_fields = serialize_field_metadata(&fields_metadata);
315+
let serialized_split_fields = serialize_fields_metadata(&fields_metadata);
318316

319317
let packaged_split = PackagedSplit {
320318
serialized_split_fields,
@@ -327,37 +325,19 @@ fn create_packaged_split(
327325
Ok(packaged_split)
328326
}
329327

330-
/// Serializes the Split fields.
331-
///
332-
/// `fields_metadata` has to be sorted.
333-
fn serialize_field_metadata(fields_metadata: &[FieldMetadata]) -> Vec<u8> {
334-
let fields = fields_metadata
328+
/// Serializes the fields metadata from a split sorted by (name, type).
329+
fn serialize_fields_metadata(fields_metadata: &[FieldMetadata]) -> Vec<u8> {
330+
let entries = fields_metadata
335331
.iter()
336-
.map(field_metadata_to_list_field_serialized)
332+
.map(field_metadata_to_list_fields_entry)
333+
.sorted_unstable_by(|left, right| left.cmp_by_name_and_type(right))
337334
.collect::<Vec<_>>();
338335

339-
serialize_split_fields(ListFields { fields })
340-
}
341-
342-
fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldType {
343-
match typ {
344-
Type::Str => ListFieldType::Str,
345-
Type::U64 => ListFieldType::U64,
346-
Type::I64 => ListFieldType::I64,
347-
Type::F64 => ListFieldType::F64,
348-
Type::Bool => ListFieldType::Bool,
349-
Type::Date => ListFieldType::Date,
350-
Type::Facet => ListFieldType::Facet,
351-
Type::Bytes => ListFieldType::Bytes,
352-
Type::Json => ListFieldType::Json,
353-
Type::IpAddr => ListFieldType::IpAddr,
354-
}
336+
ListFieldsMetadata { entries }.serialize()
355337
}
356338

357-
fn field_metadata_to_list_field_serialized(
358-
field_metadata: &FieldMetadata,
359-
) -> ListFieldsEntryResponse {
360-
ListFieldsEntryResponse {
339+
fn field_metadata_to_list_fields_entry(field_metadata: &FieldMetadata) -> ListFieldsEntry {
340+
ListFieldsEntry {
361341
field_name: field_metadata.field_name.to_string(),
362342
field_type: tantivy_type_to_list_field_type(field_metadata.typ) as i32,
363343
searchable: field_metadata.is_indexed(),
@@ -368,6 +348,21 @@ fn field_metadata_to_list_field_serialized(
368348
}
369349
}
370350

351+
fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldsType {
352+
match typ {
353+
Type::Bool => ListFieldsType::Bool,
354+
Type::Bytes => ListFieldsType::Bytes,
355+
Type::Date => ListFieldsType::Date,
356+
Type::F64 => ListFieldsType::F64,
357+
Type::Facet => ListFieldsType::Facet,
358+
Type::I64 => ListFieldsType::I64,
359+
Type::IpAddr => ListFieldsType::IpAddr,
360+
Type::Json => ListFieldsType::Json,
361+
Type::Str => ListFieldsType::Str,
362+
Type::U64 => ListFieldsType::U64,
363+
}
364+
}
365+
371366
/// Reads u64 from stored term data.
372367
fn u64_from_term_data(data: &[u8]) -> anyhow::Result<u64> {
373368
let u64_bytes: [u8; 8] = data[0..8]
@@ -382,7 +377,7 @@ mod tests {
382377

383378
use quickwit_actors::{ObservationType, Universe};
384379
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
385-
use quickwit_proto::search::{ListFieldsEntryResponse, deserialize_split_fields};
380+
use quickwit_proto::search::{ListFieldsEntry, ListFieldsMetadata};
386381
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId};
387382
use tantivy::directory::MmapDirectory;
388383
use tantivy::schema::{FAST, NumericOptions, STRING, Schema, TEXT, Type};
@@ -424,24 +419,24 @@ mod tests {
424419
},
425420
];
426421

427-
let out = serialize_field_metadata(&fields_metadata);
422+
let out = serialize_fields_metadata(&fields_metadata);
428423

429-
let deserialized: Vec<ListFieldsEntryResponse> =
430-
deserialize_split_fields(&mut &out[..]).unwrap().fields;
424+
let deserialized: Vec<ListFieldsEntry> =
425+
ListFieldsMetadata::deserialize(&out[..]).unwrap().entries;
431426

432427
assert_eq!(fields_metadata.len(), deserialized.len());
433428
assert_eq!(deserialized[0].field_name, "test");
434-
assert_eq!(deserialized[0].field_type, ListFieldType::Str as i32);
429+
assert_eq!(deserialized[0].field_type, ListFieldsType::Str as i32);
435430
assert!(deserialized[0].searchable);
436431
assert!(deserialized[0].aggregatable);
437432

438433
assert_eq!(deserialized[1].field_name, "test2");
439-
assert_eq!(deserialized[1].field_type, ListFieldType::Str as i32);
434+
assert_eq!(deserialized[1].field_type, ListFieldsType::Str as i32);
440435
assert!(deserialized[1].searchable);
441436
assert!(!deserialized[1].aggregatable);
442437

443438
assert_eq!(deserialized[2].field_name, "test3");
444-
assert_eq!(deserialized[2].field_type, ListFieldType::U64 as i32);
439+
assert_eq!(deserialized[2].field_type, ListFieldsType::U64 as i32);
445440
assert!(deserialized[2].searchable);
446441
assert!(deserialized[2].aggregatable);
447442
}

quickwit/quickwit-proto/protos/quickwit/search.proto

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ message ListFieldsRequest {
117117
repeated string index_id_patterns = 1;
118118
// Optional limit query to a list of fields
119119
// Wildcard expressions are supported.
120-
repeated string fields = 2;
120+
repeated string field_patterns = 2;
121121

122122
// Time filter, expressed in seconds since epoch.
123123
// That filter is to be interpreted as the semi-open interval:
@@ -144,16 +144,23 @@ message LeafListFieldsRequest {
144144

145145
// Optional limit query to a list of fields
146146
// Wildcard expressions are supported.
147-
repeated string fields = 4;
147+
repeated string field_patterns = 4;
148148
}
149149

150+
/// Message returned by leaf and root list fields requests.
150151
message ListFieldsResponse {
151-
repeated ListFieldsEntryResponse fields = 1;
152+
repeated ListFieldsEntry entries = 1;
152153
}
153154

154-
message ListFieldsEntryResponse {
155+
/// Message containing the fields metadata for a split sorted by (name, type) and stored zstd-compressed in the split. Currently duplicate of ListFieldsResponse, but kept
156+
/// distinct so they can evolve independently.
157+
message ListFieldsMetadata {
158+
repeated ListFieldsEntry entries = 1;
159+
}
160+
161+
message ListFieldsEntry {
155162
string field_name = 1;
156-
ListFieldType field_type = 2;
163+
ListFieldsType field_type = 2;
157164
// The index ids the field exists
158165
repeated string index_ids = 3;
159166
// True means the field is searchable (indexed) in at least some indices.
@@ -168,7 +175,7 @@ message ListFieldsEntryResponse {
168175
repeated string non_aggregatable_index_ids = 7;
169176
}
170177

171-
enum ListFieldType {
178+
enum ListFieldsType {
172179
STR = 0;
173180
U64 = 1;
174181
I64 = 2;
@@ -180,9 +187,7 @@ enum ListFieldType {
180187
IP_ADDR = 8;
181188
JSON = 9;
182189
}
183-
message ListFields {
184-
repeated ListFieldsEntryResponse fields = 1;
185-
}
190+
186191
// -- Search -------------------
187192

188193
message SearchRequest {

quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Lines changed: 16 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-proto/src/search/mod.rs

Lines changed: 86 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -229,48 +229,97 @@ impl PartialHit {
229229
}
230230
}
231231

232-
/// Serializes the Split fields.
233-
///
234-
/// `fields_metadata` has to be sorted.
235-
pub fn serialize_split_fields(list_fields: ListFields) -> Vec<u8> {
236-
let payload = list_fields.encode_to_vec();
237-
let compression_level = 3;
238-
let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level)
239-
.expect("zstd encoding failed");
240-
let mut out = Vec::new();
241-
// Write Header -- Format Version 2
242-
let format_version = 2u8;
243-
out.push(format_version);
244-
// Write Payload
245-
out.extend_from_slice(&payload_compressed);
246-
out
247-
}
232+
/// On-disk format version for serialized [`ListFieldsMetadata`]. Bumped whenever the wire format
233+
/// produced by [`ListFieldsMetadata::serialize`] changes in a way readers can't tolerate.
234+
const FIELDS_METADATA_FORMAT_VERSION: u8 = 2;
235+
236+
/// Zstd compression level used when writing fields metadata.
237+
const FIELDS_METADATA_COMPRESSION_LEVEL: i32 = 3;
238+
239+
impl ListFieldsMetadata {
240+
/// Serializes the entries: one version byte followed by the zstd-compressed protobuf
241+
/// encoding of `Self`.
242+
pub fn serialize(&self) -> Vec<u8> {
243+
let payload = self.encode_to_vec();
244+
let mut out = vec![FIELDS_METADATA_FORMAT_VERSION];
245+
zstd::stream::copy_encode(&payload[..], &mut out, FIELDS_METADATA_COMPRESSION_LEVEL)
246+
.expect("zstd encoding into `Vec<u8>` should not fail");
247+
out
248+
}
249+
250+
/// Reads the format produced by [`Self::serialize`].
251+
pub fn deserialize<R: Read>(mut reader: R) -> io::Result<Self> {
252+
let mut version_byte = [0u8; 1];
253+
reader.read_exact(&mut version_byte)?;
248254

249-
/// Reads a fixed number of bytes into an array and returns the array.
250-
fn read_exact_array<const N: usize>(reader: &mut impl Read) -> io::Result<[u8; N]> {
251-
let mut buffer = [0u8; N];
252-
reader.read_exact(&mut buffer)?;
253-
Ok(buffer)
255+
if version_byte[0] != FIELDS_METADATA_FORMAT_VERSION {
256+
return Err(io::Error::new(
257+
io::ErrorKind::InvalidData,
258+
format!(
259+
"unsupported split fields format version: {}",
260+
version_byte[0]
261+
),
262+
));
263+
}
264+
let mut zstd_decoder = zstd::stream::read::Decoder::new(reader)?;
265+
let mut decompressed = Vec::new();
266+
zstd_decoder.read_to_end(&mut decompressed)?;
267+
268+
Self::decode(&decompressed[..])
269+
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
270+
}
254271
}
255272

256-
/// Reads the Split fields from a zstd compressed stream of bytes
257-
pub fn deserialize_split_fields<R: Read>(mut reader: R) -> io::Result<ListFields> {
258-
let format_version = read_exact_array::<1>(&mut reader)?[0];
259-
if format_version != 2 {
260-
return Err(io::Error::new(
261-
io::ErrorKind::InvalidData,
262-
format!("Unsupported split field format version: {format_version}"),
263-
));
273+
impl ListFieldsEntry {
274+
pub fn cmp_by_name_and_type(&self, other: &Self) -> Ordering {
275+
self.field_name
276+
.cmp(&other.field_name)
277+
.then_with(|| self.field_type.cmp(&other.field_type))
264278
}
265-
let reader = zstd::Decoder::new(reader)?;
266-
read_split_fields_from_zstd(reader)
267279
}
268280

269-
/// Reads the Split fields from a stream of bytes
270-
#[allow(clippy::unbuffered_bytes)]
271-
fn read_split_fields_from_zstd<R: Read>(reader: R) -> io::Result<ListFields> {
272-
let all_bytes: Vec<_> = reader.bytes().collect::<io::Result<_>>()?;
273-
let serialized_list_fields: ListFields = prost::Message::decode(&all_bytes[..])?;
281+
#[cfg(test)]
282+
mod tests {
283+
use super::*;
284+
285+
fn entry(field_name: &str) -> ListFieldsEntry {
286+
ListFieldsEntry {
287+
field_name: field_name.to_string(),
288+
field_type: ListFieldsType::Str as i32,
289+
searchable: true,
290+
aggregatable: true,
291+
index_ids: vec!["index-1".to_string()],
292+
non_searchable_index_ids: Vec::new(),
293+
non_aggregatable_index_ids: Vec::new(),
294+
}
295+
}
296+
297+
#[test]
298+
fn list_fields_entries_roundtrip() {
299+
let entries = ListFieldsMetadata {
300+
entries: vec![entry("a"), entry("b"), entry("c")],
301+
};
302+
let serialized = entries.serialize();
303+
let deserialized = ListFieldsMetadata::deserialize(&serialized[..]).unwrap();
304+
assert_eq!(deserialized, entries);
305+
}
274306

275-
Ok(serialized_list_fields)
307+
#[test]
308+
fn list_fields_entries_empty_roundtrip() {
309+
let entries = ListFieldsMetadata {
310+
entries: Vec::new(),
311+
};
312+
let serialized = entries.serialize();
313+
// Just the version byte plus an (essentially empty) zstd frame.
314+
assert_eq!(serialized[0], FIELDS_METADATA_FORMAT_VERSION);
315+
let deserialized = ListFieldsMetadata::deserialize(&serialized[..]).unwrap();
316+
assert_eq!(deserialized, entries);
317+
}
318+
319+
#[test]
320+
fn list_fields_entries_rejects_unknown_version() {
321+
let serialized = [0xFFu8];
322+
let error = ListFieldsMetadata::deserialize(&serialized[..]).unwrap_err();
323+
assert_eq!(error.kind(), io::ErrorKind::InvalidData);
324+
}
276325
}

0 commit comments

Comments
 (0)