Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 32 additions & 37 deletions quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_directories::write_hotcache;
use quickwit_doc_mapper::NamedField;
use quickwit_doc_mapper::tag_pruning::append_to_tag_set;
use quickwit_proto::search::{
ListFieldType, ListFields, ListFieldsEntryResponse, serialize_split_fields,
};
use quickwit_proto::search::{ListFieldsEntry, ListFieldsMetadata, ListFieldsType};
use tantivy::index::FieldMetadata;
use tantivy::schema::{FieldType, Type};
use tantivy::{InvertedIndexReader, ReloadPolicy, SegmentMeta};
Expand Down Expand Up @@ -314,7 +312,7 @@ fn create_packaged_split(
build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?;
ctx.record_progress();

let serialized_split_fields = serialize_field_metadata(&fields_metadata);
let serialized_split_fields = serialize_fields_metadata(&fields_metadata);

let packaged_split = PackagedSplit {
serialized_split_fields,
Expand All @@ -327,37 +325,19 @@ fn create_packaged_split(
Ok(packaged_split)
}

/// Serializes the Split fields.
///
/// `fields_metadata` has to be sorted.
fn serialize_field_metadata(fields_metadata: &[FieldMetadata]) -> Vec<u8> {
let fields = fields_metadata
/// Serializes the fields metadata from a split sorted by (name, type).
fn serialize_fields_metadata(fields_metadata: &[FieldMetadata]) -> Vec<u8> {
let entries = fields_metadata
.iter()
.map(field_metadata_to_list_field_serialized)
.map(field_metadata_to_list_fields_entry)
.sorted_unstable_by(|left, right| left.cmp_by_name_and_type(right))
Comment thread
guilload marked this conversation as resolved.
.collect::<Vec<_>>();

serialize_split_fields(ListFields { fields })
}

fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldType {
match typ {
Type::Str => ListFieldType::Str,
Type::U64 => ListFieldType::U64,
Type::I64 => ListFieldType::I64,
Type::F64 => ListFieldType::F64,
Type::Bool => ListFieldType::Bool,
Type::Date => ListFieldType::Date,
Type::Facet => ListFieldType::Facet,
Type::Bytes => ListFieldType::Bytes,
Type::Json => ListFieldType::Json,
Type::IpAddr => ListFieldType::IpAddr,
}
ListFieldsMetadata { entries }.serialize()
}

fn field_metadata_to_list_field_serialized(
field_metadata: &FieldMetadata,
) -> ListFieldsEntryResponse {
ListFieldsEntryResponse {
fn field_metadata_to_list_fields_entry(field_metadata: &FieldMetadata) -> ListFieldsEntry {
ListFieldsEntry {
field_name: field_metadata.field_name.to_string(),
field_type: tantivy_type_to_list_field_type(field_metadata.typ) as i32,
searchable: field_metadata.is_indexed(),
Expand All @@ -368,6 +348,21 @@ fn field_metadata_to_list_field_serialized(
}
}

fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldsType {
match typ {
Type::Bool => ListFieldsType::Bool,
Type::Bytes => ListFieldsType::Bytes,
Type::Date => ListFieldsType::Date,
Type::F64 => ListFieldsType::F64,
Type::Facet => ListFieldsType::Facet,
Type::I64 => ListFieldsType::I64,
Type::IpAddr => ListFieldsType::IpAddr,
Type::Json => ListFieldsType::Json,
Type::Str => ListFieldsType::Str,
Type::U64 => ListFieldsType::U64,
}
}

/// Reads u64 from stored term data.
fn u64_from_term_data(data: &[u8]) -> anyhow::Result<u64> {
let u64_bytes: [u8; 8] = data[0..8]
Expand All @@ -382,7 +377,7 @@ mod tests {

use quickwit_actors::{ObservationType, Universe};
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_proto::search::{ListFieldsEntryResponse, deserialize_split_fields};
use quickwit_proto::search::{ListFieldsEntry, ListFieldsMetadata};
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId};
use tantivy::directory::MmapDirectory;
use tantivy::schema::{FAST, NumericOptions, STRING, Schema, TEXT, Type};
Expand Down Expand Up @@ -424,24 +419,24 @@ mod tests {
},
];

let out = serialize_field_metadata(&fields_metadata);
let out = serialize_fields_metadata(&fields_metadata);

let deserialized: Vec<ListFieldsEntryResponse> =
deserialize_split_fields(&mut &out[..]).unwrap().fields;
let deserialized: Vec<ListFieldsEntry> =
ListFieldsMetadata::deserialize(&out[..]).unwrap().entries;

assert_eq!(fields_metadata.len(), deserialized.len());
assert_eq!(deserialized[0].field_name, "test");
assert_eq!(deserialized[0].field_type, ListFieldType::Str as i32);
assert_eq!(deserialized[0].field_type, ListFieldsType::Str as i32);
assert!(deserialized[0].searchable);
assert!(deserialized[0].aggregatable);

assert_eq!(deserialized[1].field_name, "test2");
assert_eq!(deserialized[1].field_type, ListFieldType::Str as i32);
assert_eq!(deserialized[1].field_type, ListFieldsType::Str as i32);
assert!(deserialized[1].searchable);
assert!(!deserialized[1].aggregatable);

assert_eq!(deserialized[2].field_name, "test3");
assert_eq!(deserialized[2].field_type, ListFieldType::U64 as i32);
assert_eq!(deserialized[2].field_type, ListFieldsType::U64 as i32);
assert!(deserialized[2].searchable);
assert!(deserialized[2].aggregatable);
}
Expand Down
23 changes: 14 additions & 9 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ message ListFieldsRequest {
repeated string index_id_patterns = 1;
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 2;
repeated string field_patterns = 2;
Comment thread
guilload marked this conversation as resolved.

// Time filter, expressed in seconds since epoch.
// That filter is to be interpreted as the semi-open interval:
Expand All @@ -144,16 +144,23 @@ message LeafListFieldsRequest {

// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 4;
repeated string field_patterns = 4;
}

/// Message returned by leaf and root list fields requests.
message ListFieldsResponse {
repeated ListFieldsEntryResponse fields = 1;
repeated ListFieldsEntry entries = 1;
}

message ListFieldsEntryResponse {
/// Message containing the fields metadata for a split sorted by (name, type) and stored zstd-compressed in the split. Currently duplicate of ListFieldsResponse, but kept
/// distinct so they can evolve independently.
message ListFieldsMetadata {
repeated ListFieldsEntry entries = 1;
}

message ListFieldsEntry {
string field_name = 1;
ListFieldType field_type = 2;
ListFieldsType field_type = 2;
// The index ids the field exists
repeated string index_ids = 3;
// True means the field is searchable (indexed) in at least some indices.
Expand All @@ -168,7 +175,7 @@ message ListFieldsEntryResponse {
repeated string non_aggregatable_index_ids = 7;
}

enum ListFieldType {
enum ListFieldsType {
STR = 0;
U64 = 1;
I64 = 2;
Expand All @@ -180,9 +187,7 @@ enum ListFieldType {
IP_ADDR = 8;
JSON = 9;
}
message ListFields {
repeated ListFieldsEntryResponse fields = 1;
}

// -- Search -------------------

message SearchRequest {
Expand Down
29 changes: 16 additions & 13 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 86 additions & 37 deletions quickwit/quickwit-proto/src/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,48 +229,97 @@ impl PartialHit {
}
}

/// Serializes the Split fields.
///
/// `fields_metadata` has to be sorted.
pub fn serialize_split_fields(list_fields: ListFields) -> Vec<u8> {
let payload = list_fields.encode_to_vec();
let compression_level = 3;
let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level)
.expect("zstd encoding failed");
let mut out = Vec::new();
// Write Header -- Format Version 2
let format_version = 2u8;
out.push(format_version);
// Write Payload
out.extend_from_slice(&payload_compressed);
out
}
/// On-disk format version for serialized [`ListFieldsMetadata`]. Bumped whenever the wire format
/// produced by [`ListFieldsMetadata::serialize`] changes in a way readers can't tolerate.
const FIELDS_METADATA_FORMAT_VERSION: u8 = 2;

/// Zstd compression level used when writing fields metadata.
const FIELDS_METADATA_COMPRESSION_LEVEL: i32 = 3;

impl ListFieldsMetadata {
/// Serializes the entries: one version byte followed by the zstd-compressed protobuf
/// encoding of `Self`.
pub fn serialize(&self) -> Vec<u8> {
let payload = self.encode_to_vec();
Comment thread
guilload marked this conversation as resolved.
let mut out = vec![FIELDS_METADATA_FORMAT_VERSION];
zstd::stream::copy_encode(&payload[..], &mut out, FIELDS_METADATA_COMPRESSION_LEVEL)
.expect("zstd encoding into `Vec<u8>` should not fail");
out
}

/// Reads the format produced by [`Self::serialize`].
pub fn deserialize<R: Read>(mut reader: R) -> io::Result<Self> {
let mut version_byte = [0u8; 1];
reader.read_exact(&mut version_byte)?;

/// Reads a fixed number of bytes into an array and returns the array.
fn read_exact_array<const N: usize>(reader: &mut impl Read) -> io::Result<[u8; N]> {
let mut buffer = [0u8; N];
reader.read_exact(&mut buffer)?;
Ok(buffer)
if version_byte[0] != FIELDS_METADATA_FORMAT_VERSION {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"unsupported split fields format version: {}",
version_byte[0]
),
));
}
let mut zstd_decoder = zstd::stream::read::Decoder::new(reader)?;
let mut decompressed = Vec::new();
zstd_decoder.read_to_end(&mut decompressed)?;

Self::decode(&decompressed[..])
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
}
}

/// Reads the Split fields from a zstd compressed stream of bytes
pub fn deserialize_split_fields<R: Read>(mut reader: R) -> io::Result<ListFields> {
let format_version = read_exact_array::<1>(&mut reader)?[0];
if format_version != 2 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unsupported split field format version: {format_version}"),
));
impl ListFieldsEntry {
pub fn cmp_by_name_and_type(&self, other: &Self) -> Ordering {
self.field_name
.cmp(&other.field_name)
.then_with(|| self.field_type.cmp(&other.field_type))
}
let reader = zstd::Decoder::new(reader)?;
read_split_fields_from_zstd(reader)
}

/// Reads the Split fields from a stream of bytes
#[allow(clippy::unbuffered_bytes)]
fn read_split_fields_from_zstd<R: Read>(reader: R) -> io::Result<ListFields> {
let all_bytes: Vec<_> = reader.bytes().collect::<io::Result<_>>()?;
let serialized_list_fields: ListFields = prost::Message::decode(&all_bytes[..])?;
#[cfg(test)]
mod tests {
use super::*;

fn entry(field_name: &str) -> ListFieldsEntry {
ListFieldsEntry {
field_name: field_name.to_string(),
field_type: ListFieldsType::Str as i32,
searchable: true,
aggregatable: true,
index_ids: vec!["index-1".to_string()],
non_searchable_index_ids: Vec::new(),
non_aggregatable_index_ids: Vec::new(),
}
}

#[test]
fn list_fields_entries_roundtrip() {
let entries = ListFieldsMetadata {
entries: vec![entry("a"), entry("b"), entry("c")],
};
let serialized = entries.serialize();
let deserialized = ListFieldsMetadata::deserialize(&serialized[..]).unwrap();
assert_eq!(deserialized, entries);
}

Ok(serialized_list_fields)
#[test]
fn list_fields_entries_empty_roundtrip() {
let entries = ListFieldsMetadata {
entries: Vec::new(),
};
let serialized = entries.serialize();
// Just the version byte plus an (essentially empty) zstd frame.
assert_eq!(serialized[0], FIELDS_METADATA_FORMAT_VERSION);
let deserialized = ListFieldsMetadata::deserialize(&serialized[..]).unwrap();
assert_eq!(deserialized, entries);
}

#[test]
fn list_fields_entries_rejects_unknown_version() {
let serialized = [0xFFu8];
let error = ListFieldsMetadata::deserialize(&serialized[..]).unwrap_err();
assert_eq!(error.kind(), io::ErrorKind::InvalidData);
}
}
Loading
Loading