Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aggregation/agg_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ fn build_nodes(
}
AggregationVariants::TopHits(top_hits_req) => {
let mut top_hits = top_hits_req.clone();
top_hits.validate_and_resolve_field_names(reader.fast_fields().columnar())?;
top_hits.validate_and_resolve_field_names(reader.fast_fields())?;
let accessors: Vec<(Column<u64>, ColumnType)> = top_hits
.field_names()
.iter()
Expand Down
124 changes: 102 additions & 22 deletions src/aggregation/metric/top_hits.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::net::Ipv6Addr;

use columnar::{Column, ColumnType, ColumnarReader, DynamicColumn};
use columnar::{Column, ColumnType, DynamicColumn};
use common::json_path_writer::JSON_PATH_SEGMENT_SEP_STR;
use common::DateTime;
use regex::Regex;
Expand All @@ -18,8 +18,9 @@ use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
use crate::aggregation::{AggregationError, BucketId};
use crate::collector::sort_key::ReverseComparator;
use crate::collector::TopNComputer;
use crate::fastfield::FastFieldReaders;
use crate::schema::OwnedValue;
use crate::{DocAddress, DocId, SegmentOrdinal};
use crate::{DocAddress, DocId, SegmentOrdinal, TantivyError};

/// Contains all information required by the TopHitsSegmentCollector to perform the
/// top_hits aggregation on a segment.
Expand Down Expand Up @@ -115,13 +116,17 @@ impl TopHitsAggReqData {
/// ```
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct TopHitsAggregationReq {
sort: Vec<KeyOrder>,
size: usize,
from: Option<usize>,

/// The sort criteria to determine the top hits.
pub sort: Vec<KeyOrder>,
/// The number of top hits to return.
pub size: usize,
/// The number of top hits to skip.
pub from: Option<usize>,

/// The list of fast fields to retrieve for each document.
#[serde(rename = "docvalue_fields")]
#[serde(default)]
doc_value_fields: Vec<String>,
pub doc_value_fields: Vec<String>,

// Not supported
_source: Option<serde_json::Value>,
Expand All @@ -132,9 +137,12 @@ pub struct TopHitsAggregationReq {
version: Option<serde_json::Value>,
}

/// A field and its associated sort order.
#[derive(Debug, Clone, PartialEq, Default)]
struct KeyOrder {
pub struct KeyOrder {
/// The field name.
field: String,
/// The sort order.
order: Order,
}

Expand Down Expand Up @@ -193,7 +201,7 @@ impl TopHitsAggregationReq {
/// Validate and resolve field retrieval parameters
pub fn validate_and_resolve_field_names(
&mut self,
reader: &ColumnarReader,
reader: &FastFieldReaders,
) -> crate::Result<()> {
if self._source.is_some() {
use_doc_value_fields_err("_source")?;
Expand All @@ -218,27 +226,33 @@ impl TopHitsAggregationReq {
.doc_value_fields
.iter()
.map(|field| {
if !field.contains('*')
&& reader
.iter_columns()?
.any(|(name, _)| name.as_str() == field)
{
if !field.contains('*') {
reader.resolve_field(field)?.ok_or_else(|| {
Copy link
Copy Markdown
Collaborator

@fulmicoton fulmicoton Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect dynamic_column_handles is better (as it is public). I am not sure though

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamic_column_handles does a bit more work and internally calls resolve_field:
https://docs.rs/tantivy/latest/src/tantivy/fastfield/readers.rs.html#238-251

Copy link
Copy Markdown
Collaborator

@fulmicoton fulmicoton Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but can it is not public :-/

Changing to pub(crate) is not so bad but unnecessary here.

TantivyError::SchemaError(format!(
"Field '{field}' in docvalue_fields does not exist"
))
})?;

return Ok(vec![field.to_owned()]);
}

let pattern = globbed_string_to_regex(field)?;
let fields = reader
.columnar()
.iter_columns()?
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean to use it in Quickwit? I suspect it might not work if you have a large number of columns.

.map(|(name, _)| {
// normalize path from internal fast field repr
name.replace(JSON_PATH_SEGMENT_SEP_STR, ".")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems wrong too

})
.filter(|name| pattern.is_match(name))
.collect::<Vec<_>>();
assert!(
!fields.is_empty(),
"No fields matched the glob '{field}' in docvalue_fields"
);

if fields.is_empty() {
return Err(TantivyError::SchemaError(format!(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx

"No fields matched the glob '{field}' in docvalue_fields"
)));
}

Ok(fields)
})
.collect::<crate::Result<Vec<_>>>()?
Expand Down Expand Up @@ -663,6 +677,7 @@ mod tests {
use crate::collector::ComparableDoc;
use crate::query::AllQuery;
use crate::schema::OwnedValue;
use crate::TantivyError;

fn invert_order(cmp_feature: DocValueAndOrder) -> DocValueAndOrder {
let DocValueAndOrder { value, order } = cmp_feature;
Expand Down Expand Up @@ -863,12 +878,12 @@ mod tests {
fn test_aggregation_top_hits(merge_segments: bool) -> crate::Result<()> {
let docs = vec![
vec![
r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb", "text2": "bbb", "mixed": { "dyn_arr": [1, "2"] } }"#,
r#"{ "date": "2017-06-15T00:00:00Z", "text": "ccc", "text2": "ddd", "mixed": { "dyn_arr": [3, "4"] } }"#,
r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb", "text2": "bbb", "mixed": { "dyn_arr": [1, "2"], "dyn_str": "foo" } }"#,
r#"{ "date": "2017-06-15T00:00:00Z", "text": "ccc", "text2": "ddd", "mixed": { "dyn_arr": [3, "4"], "dyn_str": "bar" } }"#,
],
vec![
r#"{ "text": "aaa", "text2": "bbb", "date": "2018-01-02T00:00:00Z", "mixed": { "dyn_arr": ["9", 8] } }"#,
r#"{ "text": "aaa", "text2": "bbb", "date": "2016-01-02T00:00:00Z", "mixed": { "dyn_arr": ["7", 6] } }"#,
r#"{ "text": "aaa", "text2": "bbb", "date": "2018-01-02T00:00:00Z", "mixed": { "dyn_arr": ["9", 8], "dyn_str": "baz" } }"#,
r#"{ "text": "aaa", "text2": "bbb", "date": "2016-01-02T00:00:00Z", "mixed": { "dyn_arr": ["7", 6], "dyn_str": "bor" } }"#,
],
];

Expand All @@ -886,6 +901,7 @@ mod tests {
"date",
"tex*",
"mixed.*",
"mixed.dyn_str"
],
}
}
Expand All @@ -912,6 +928,7 @@ mod tests {
"text": [ "ccc" ],
"text2": [ "ddd" ],
"mixed.dyn_arr": [ 3, "4" ],
"mixed.dyn_str": [ "bar" ],
}
},
{
Expand All @@ -921,6 +938,7 @@ mod tests {
"text": [ "aaa" ],
"text2": [ "bbb" ],
"mixed.dyn_arr": [ 6, "7" ],
"mixed.dyn_str": [ "bor" ],
}
}
]
Expand All @@ -939,4 +957,66 @@ mod tests {
fn test_aggregation_top_hits_multi_segment() -> crate::Result<()> {
test_aggregation_top_hits(false)
}

#[test]
fn test_aggregation_top_hits_unknown_field() -> crate::Result<()> {
let docs = vec![
vec![
r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb", "text2": "bbb", "mixed": { "dyn_arr": [1, "2"] } }"#,
r#"{ "date": "2017-06-15T00:00:00Z", "text": "ccc", "text2": "ddd", "mixed": { "dyn_arr": [3, "4"] } }"#,
],
vec![
r#"{ "text": "aaa", "text2": "bbb", "date": "2018-01-02T00:00:00Z", "mixed": { "dyn_arr": ["9", 8] } }"#,
r#"{ "text": "aaa", "text2": "bbb", "date": "2016-01-02T00:00:00Z", "mixed": { "dyn_arr": ["7", 6] } }"#,
],
];

let index = get_test_index_from_docs(false, &docs)?;

let d: Aggregations = serde_json::from_value(json!({
"top_hits_req": {
"top_hits": {
"size": 2,
"sort": [
{ "date": "desc" }
],
"from": 1,
"docvalue_fields": [
"invalid_basic_field",
],
}
}
}))?;

let collector = AggregationCollector::from_aggs(d, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();

let agg_res = searcher.search(&AllQuery, &collector).unwrap_err();
assert!(matches!(agg_res, TantivyError::SchemaError(_)));

let d: Aggregations = serde_json::from_value(json!({
"top_hits_req": {
"top_hits": {
"size": 2,
"sort": [
{ "date": "desc" }
],
"from": 1,
"docvalue_fields": [
"invalid_glob_fie*",
],
}
}
}))?;

let collector = AggregationCollector::from_aggs(d, Default::default());
let reader = index.reader()?;
let searcher = reader.searcher();

let agg_res = searcher.search(&AllQuery, &collector).unwrap_err();
assert!(matches!(agg_res, TantivyError::SchemaError(_)));

Ok(())
}
}
2 changes: 1 addition & 1 deletion src/fastfield/readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl FastFieldReaders {
Ok(FastFieldReaders { columnar, schema })
}

fn resolve_field(&self, column_name: &str) -> crate::Result<Option<String>> {
pub(crate) fn resolve_field(&self, column_name: &str) -> crate::Result<Option<String>> {
let default_field_opt: Option<Field> = if cfg!(feature = "quickwit") {
self.schema.get_field("_dynamic").ok()
} else {
Expand Down
Loading