Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ Improvements
* GITHUB#15989: DocValuesRangeIterator always tries to use skipper-based block iteration
as its approximation. (Alan Woodward)

* GITHUB#15565: Introduce AllGroupHeadsCollectorManager to parallelize search when using AllGroupHeadsCollector. (Binlong Gao)
Comment thread
gaobinlong marked this conversation as resolved.

Optimizations
---------------------
* GITHUB#15861: Optimise PhraseScorer by short circuiting non competitive documents in TOP_SCORES mode. (Prithvi S)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public abstract class AllGroupHeadsCollector<T> extends SimpleCollector {

private final GroupSelector<T> groupSelector;
protected final Sort sort;
protected final boolean fillSortValues;
Comment thread
javanna marked this conversation as resolved.
Outdated

protected final int[] reversed;
protected final int compIDXEnd;
Expand All @@ -62,15 +63,29 @@ public abstract class AllGroupHeadsCollector<T> extends SimpleCollector {
* @param <T> the group value type
*/
public static <T> AllGroupHeadsCollector<T> newCollector(GroupSelector<T> selector, Sort sort) {
return newCollector(selector, sort, false);
}

/**
* Create a new AllGroupHeadsCollector based on the type of within-group Sort required
*
* @param selector a GroupSelector to define the groups
* @param sort the within-group sort to use to choose the group head document
* @param fillSortValues whether to store sort values for merging across collectors
* @param <T> the group value type
*/
public static <T> AllGroupHeadsCollector<T> newCollector(
GroupSelector<T> selector, Sort sort, boolean fillSortValues) {
if (sort.equals(Sort.RELEVANCE)) {
return new ScoringGroupHeadsCollector<>(selector, sort);
return new ScoringGroupHeadsCollector<>(selector, sort, fillSortValues);
}
return new SortingGroupHeadsCollector<>(selector, sort);
return new SortingGroupHeadsCollector<>(selector, sort, fillSortValues);
}

private AllGroupHeadsCollector(GroupSelector<T> selector, Sort sort) {
private AllGroupHeadsCollector(GroupSelector<T> selector, Sort sort, boolean fillSortValues) {
this.groupSelector = selector;
this.sort = sort;
this.fillSortValues = fillSortValues;
this.reversed = new int[sort.getSort().length];
final SortField[] sortFields = sort.getSort();
for (int i = 0; i < sortFields.length; i++) {
Expand Down Expand Up @@ -232,40 +247,58 @@ protected void setNextReader(LeafReaderContext ctx) throws IOException {
* @throws IOException If I/O related errors occur
*/
protected abstract void updateDocHead(int doc) throws IOException;

/**
* Returns the sort values for this group head.
*
* @return the sort values, or null if not stored
*/
protected abstract Object[] getSortValues();
}

/** General implementation using a {@link FieldComparator} to select the group head */
private static class SortingGroupHeadsCollector<T> extends AllGroupHeadsCollector<T> {

protected SortingGroupHeadsCollector(GroupSelector<T> selector, Sort sort) {
super(selector, sort);
protected SortingGroupHeadsCollector(
GroupSelector<T> selector, Sort sort, boolean fillSortValues) {
super(selector, sort, fillSortValues);
}

@Override
protected GroupHead<T> newGroupHead(int doc, T value, LeafReaderContext ctx, Scorable scorer)
throws IOException {
return new SortingGroupHead<>(sort, value, doc, ctx, scorer);
return new SortingGroupHead<>(sort, value, doc, ctx, scorer, fillSortValues);
}
}

private static class SortingGroupHead<T> extends GroupHead<T> {

final FieldComparator[] comparators;
final LeafFieldComparator[] leafComparators;
final Object[] sortValues;

protected SortingGroupHead(
Sort sort, T groupValue, int doc, LeafReaderContext context, Scorable scorer)
Sort sort,
T groupValue,
int doc,
LeafReaderContext context,
Scorable scorer,
boolean fillSortValues)
throws IOException {
super(groupValue, doc, context.docBase);
final SortField[] sortFields = sort.getSort();
comparators = new FieldComparator[sortFields.length];
leafComparators = new LeafFieldComparator[sortFields.length];
sortValues = fillSortValues ? new Object[sortFields.length] : null;
for (int i = 0; i < sortFields.length; i++) {
comparators[i] = sortFields[i].getComparator(1, Pruning.NONE);
leafComparators[i] = comparators[i].getLeafComparator(context);
leafComparators[i].setScorer(scorer);
leafComparators[i].copy(0, doc);
leafComparators[i].setBottom(0);
if (fillSortValues) {
sortValues[i] = comparators[i].value(0);
}
}
}

Expand All @@ -291,38 +324,50 @@ public int compare(int compIDX, int doc) throws IOException {

@Override
public void updateDocHead(int doc) throws IOException {
for (LeafFieldComparator comparator : leafComparators) {
comparator.copy(0, doc);
comparator.setBottom(0);
for (int i = 0; i < leafComparators.length; i++) {
leafComparators[i].copy(0, doc);
leafComparators[i].setBottom(0);
if (sortValues != null) {
sortValues[i] = comparators[i].value(0);
}
}
this.doc = doc + docBase;
}

@Override
protected Object[] getSortValues() {
return sortValues;
}
}

/** Specialized implementation for sorting by score */
private static class ScoringGroupHeadsCollector<T> extends AllGroupHeadsCollector<T> {

protected ScoringGroupHeadsCollector(GroupSelector<T> selector, Sort sort) {
super(selector, sort);
protected ScoringGroupHeadsCollector(
GroupSelector<T> selector, Sort sort, boolean fillSortValues) {
super(selector, sort, fillSortValues);
}

@Override
protected GroupHead<T> newGroupHead(
int doc, T value, LeafReaderContext context, Scorable scorer) throws IOException {
return new ScoringGroupHead<>(scorer, value, doc, context.docBase);
return new ScoringGroupHead<>(scorer, value, doc, context.docBase, fillSortValues);
}
}

private static class ScoringGroupHead<T> extends GroupHead<T> {

private Scorable scorer;
private float topScore;
private final Object[] sortValues;

protected ScoringGroupHead(Scorable scorer, T groupValue, int doc, int docBase)
protected ScoringGroupHead(
Scorable scorer, T groupValue, int doc, int docBase, boolean fillSortValues)
throws IOException {
super(groupValue, doc, docBase);
this.scorer = scorer;
this.topScore = scorer.score();
this.sortValues = fillSortValues ? new Object[] {topScore} : null;
}

@Override
Expand All @@ -344,6 +389,14 @@ protected int compare(int compIDX, int doc) throws IOException {
@Override
protected void updateDocHead(int doc) throws IOException {
this.doc = doc + docBase;
if (sortValues != null) {
sortValues[0] = topScore;
}
}

@Override
protected Object[] getSortValues() {
return sortValues;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search.grouping;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.Pruning;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FixedBitSet;

/**
* A {@link CollectorManager} implementation for {@link AllGroupHeadsCollector} that collects the
* most relevant document (group head) for each group across multiple segments and merges the
* per-segment results into a single {@link GroupHeadsResult}.
*
* <p>Example usage:
*
* <pre class="prettyprint">
* IndexSearcher searcher = ...; // your IndexSearcher
* AllGroupHeadsCollectorManager&lt;BytesRef&gt; manager =
* new AllGroupHeadsCollectorManager&lt;&gt;(
* () -&gt; new TermGroupSelector("category"), Sort.RELEVANCE);
* GroupHeadsResult result = searcher.search(new MatchAllDocsQuery(), manager);
* Bits groupHeadsBits = result.retrieveGroupHeads(searcher.getIndexReader().maxDoc());
* </pre>
*
* @param <T> the type of the group value
* @lucene.experimental
*/
public class AllGroupHeadsCollectorManager<T>
implements CollectorManager<
AllGroupHeadsCollector<T>, AllGroupHeadsCollectorManager.GroupHeadsResult> {

/** Holds the merged group heads and provides access as an {@code int[]} or {@link Bits}. */
public static class GroupHeadsResult {
private final int[] groupHeads;

private GroupHeadsResult(int[] groupHeads) {
this.groupHeads = groupHeads;
}

/** Returns the group head document IDs as an array. */
public int[] retrieveGroupHeads() {
return groupHeads;
}

/**
* Returns the group head document IDs as a {@link Bits} set of size {@code maxDoc}, suitable
* for use as a filter.
*
* @param maxDoc The maxDoc of the top level {@link IndexReader}.
*/
public Bits retrieveGroupHeads(int maxDoc) {
FixedBitSet result = new FixedBitSet(maxDoc);
for (int docId : groupHeads) {
result.set(docId);
}
return result;
}
}

private static final class GroupHeadWithValues {
int doc;
final Object[] sortValues;

GroupHeadWithValues(int doc, Object[] sortValues) {
this.doc = doc;
this.sortValues = sortValues;
}
}

private final Supplier<GroupSelector<T>> groupSelectorFactory;
private final Sort sortWithinGroup;

/**
* Creates a new AllGroupHeadsCollectorManager.
*
* @param groupSelectorFactory factory to create group selectors for each collector
* @param sortWithinGroup the sort to use within each group to determine the group head
*/
public AllGroupHeadsCollectorManager(
Supplier<GroupSelector<T>> groupSelectorFactory, Sort sortWithinGroup) {
this.groupSelectorFactory = groupSelectorFactory;
this.sortWithinGroup = sortWithinGroup;
}

@Override
public AllGroupHeadsCollector<T> newCollector() throws IOException {
return AllGroupHeadsCollector.newCollector(groupSelectorFactory.get(), sortWithinGroup, true);
}

@Override
public GroupHeadsResult reduce(Collection<AllGroupHeadsCollector<T>> collectors) {
Map<Object, GroupHeadWithValues> mergedHeads = new HashMap<>();
Comment thread
gaobinlong marked this conversation as resolved.
Outdated
SortField[] sortFields = sortWithinGroup.getSort();

for (AllGroupHeadsCollector<T> collector : collectors) {
mergeCollectorHeads(collector, mergedHeads, sortFields);
}

return new GroupHeadsResult(mergedHeads.values().stream().mapToInt(h -> h.doc).toArray());
}

private void mergeCollectorHeads(
AllGroupHeadsCollector<T> collector,
Map<Object, GroupHeadWithValues> mergedHeads,
SortField[] sortFields) {
Collection<? extends AllGroupHeadsCollector.GroupHead<T>> heads =
collector.getCollectedGroupHeads();
for (AllGroupHeadsCollector.GroupHead<T> head : heads) {
Object[] sortValues = head.getSortValues();
GroupHeadWithValues existing = mergedHeads.get(head.groupValue);
if (existing == null) {
mergedHeads.put(head.groupValue, new GroupHeadWithValues(head.doc, sortValues));
} else if (sortValues != null && existing.sortValues != null) {
int cmp = compareValues(sortValues, existing.sortValues, sortFields);
if (cmp < 0 || (cmp == 0 && head.doc < existing.doc)) {
mergedHeads.put(head.groupValue, new GroupHeadWithValues(head.doc, sortValues));
}
}
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private int compareValues(Object[] values1, Object[] values2, SortField[] sortFields) {
for (int i = 0; i < sortFields.length; i++) {
FieldComparator comparator = sortFields[i].getComparator(1, Pruning.NONE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a fresh new comparator per sort field, per group head? Couldn't we reuse the same instance?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SortingGroupHead have comparator which can be reused but ScoringGroupHead not, at a second thought, I reverted this change back to the original version, that comparing values directly, please help to check.

int cmp = comparator.compareValues(values1[i], values2[i]);
if (cmp != 0) {
return sortFields[i].getReverse() ? -cmp : cmp;
}
}
return 0;
}
}
Loading
Loading