Skip to content

Commit e26a959

Browse files
gaobinlongdweiss
andauthored
Introduce TopGroupsCollectorManager (#15603)
* Introduce TopGroupsCollectorManager Signed-off-by: Binlong Gao <gbinlong@amazon.com> * Update lucene/grouping/src/java/org/apache/lucene/search/grouping/TopGroupsCollectorManager.java --------- Signed-off-by: Binlong Gao <gbinlong@amazon.com> Co-authored-by: Dawid Weiss <dawid.weiss@gmail.com>
1 parent f2b01e5 commit e26a959

3 files changed

Lines changed: 153 additions & 9 deletions

File tree

lucene/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ Improvements
110110

111111
* GITHUB#15453: Avoid unnecessary sorting and instantiations in readMapOfStrings. (Benjamin Lerer)
112112

113+
* GITHUB#15574: Introduce TopGroupsCollectorManager to parallelize search when using TopGroupsCollector. (Binlong Gao)
114+
113115
* GITHUB#15225: Improve package documentation for org.apache.lucene.util. (Syed Mohammad Saad)
114116

115117
Optimizations
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.lucene.search.grouping;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.List;
23+
import java.util.function.Supplier;
24+
import org.apache.lucene.search.CollectorManager;
25+
import org.apache.lucene.search.Sort;
26+
27+
/** A CollectorManager implementation for TopGroupsCollector. */
28+
public class TopGroupsCollectorManager<T>
29+
implements CollectorManager<TopGroupsCollector<T>, TopGroups<T>> {
30+
31+
private final Supplier<GroupSelector<T>> groupSelectorFactory;
32+
private final Collection<SearchGroup<T>> searchGroups;
33+
private final Sort groupSort;
34+
private final Sort sortWithinGroup;
35+
private final int maxDocsPerGroup;
36+
private final boolean getMaxScores;
37+
private final TopGroups.ScoreMergeMode scoreMergeMode;
38+
private final List<TopGroupsCollector<T>> collectors;
39+
40+
/**
41+
* Creates a new TopGroupsCollectorManager.
42+
*
43+
* @param groupSelectorFactory factory to create group selectors for each collector
44+
* @param searchGroups the search groups from the first pass
45+
* @param groupSort the sort to use for groups
46+
* @param sortWithinGroup the sort to use within each group
47+
* @param maxDocsPerGroup the maximum number of documents per group
48+
* @param getMaxScores whether to compute max scores
49+
*/
50+
public TopGroupsCollectorManager(
51+
Supplier<GroupSelector<T>> groupSelectorFactory,
52+
Collection<SearchGroup<T>> searchGroups,
53+
Sort groupSort,
54+
Sort sortWithinGroup,
55+
int maxDocsPerGroup,
56+
boolean getMaxScores) {
57+
this(
58+
groupSelectorFactory,
59+
searchGroups,
60+
groupSort,
61+
sortWithinGroup,
62+
maxDocsPerGroup,
63+
getMaxScores,
64+
TopGroups.ScoreMergeMode.None);
65+
}
66+
67+
/**
68+
* Creates a new TopGroupsCollectorManager.
69+
*
70+
* @param groupSelectorFactory factory to create group selectors for each collector
71+
* @param searchGroups the search groups from the first pass
72+
* @param groupSort the sort to use for groups
73+
* @param sortWithinGroup the sort to use within each group
74+
* @param maxDocsPerGroup the maximum number of documents per group
75+
* @param getMaxScores whether to compute max scores
76+
* @param scoreMergeMode the mode for merging scores across shards
77+
*/
78+
public TopGroupsCollectorManager(
79+
Supplier<GroupSelector<T>> groupSelectorFactory,
80+
Collection<SearchGroup<T>> searchGroups,
81+
Sort groupSort,
82+
Sort sortWithinGroup,
83+
int maxDocsPerGroup,
84+
boolean getMaxScores,
85+
TopGroups.ScoreMergeMode scoreMergeMode) {
86+
this.groupSelectorFactory = groupSelectorFactory;
87+
this.searchGroups = searchGroups;
88+
this.groupSort = groupSort;
89+
this.sortWithinGroup = sortWithinGroup;
90+
this.maxDocsPerGroup = maxDocsPerGroup;
91+
this.getMaxScores = getMaxScores;
92+
this.scoreMergeMode = scoreMergeMode;
93+
this.collectors = new ArrayList<>();
94+
}
95+
96+
@Override
97+
public TopGroupsCollector<T> newCollector() throws IOException {
98+
TopGroupsCollector<T> collector =
99+
new TopGroupsCollector<>(
100+
groupSelectorFactory.get(),
101+
searchGroups,
102+
groupSort,
103+
sortWithinGroup,
104+
maxDocsPerGroup,
105+
getMaxScores);
106+
collectors.add(collector);
107+
return collector;
108+
}
109+
110+
@Override
111+
public TopGroups<T> reduce(Collection<TopGroupsCollector<T>> collectors) throws IOException {
112+
if (collectors.isEmpty()) {
113+
return null;
114+
}
115+
116+
if (collectors.size() == 1) {
117+
return collectors.iterator().next().getTopGroups(0);
118+
}
119+
120+
// Merge results from multiple collectors
121+
List<TopGroups<T>> shardGroupsList = new ArrayList<>();
122+
for (TopGroupsCollector<T> collector : collectors) {
123+
TopGroups<T> groups = collector.getTopGroups(0);
124+
if (groups != null) {
125+
shardGroupsList.add(groups);
126+
}
127+
}
128+
129+
if (shardGroupsList.isEmpty()) {
130+
return null;
131+
}
132+
133+
@SuppressWarnings({"unchecked", "rawtypes"})
134+
TopGroups<T>[] shardGroups = (TopGroups<T>[]) shardGroupsList.toArray(TopGroups[]::new);
135+
return TopGroups.merge(
136+
shardGroups, groupSort, sortWithinGroup, 0, maxDocsPerGroup, scoreMergeMode);
137+
}
138+
139+
public List<TopGroupsCollector<T>> getCollectors() {
140+
return collectors;
141+
}
142+
}

lucene/grouping/src/test/org/apache/lucene/search/grouping/BaseGroupSelectorTestCase.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -324,21 +324,21 @@ public void testShardedGrouping() throws IOException {
324324
Collection<SearchGroup<T>> mergedGroups = SearchGroup.merge(shardGroups, 0, 5, sort);
325325
assertEquals(singletonGroups, mergedGroups);
326326

327-
TopGroupsCollector<T> singletonSecondPass =
328-
new TopGroupsCollector<>(
329-
getGroupSelector(), singletonGroups, sort, Sort.RELEVANCE, 5, true);
330-
control.getIndexSearcher().search(topLevel, singletonSecondPass);
331-
TopGroups<T> singletonTopGroups = singletonSecondPass.getTopGroups(0);
327+
TopGroupsCollectorManager<T> topGroupsCollectorManager =
328+
new TopGroupsCollectorManager<>(
329+
this::getGroupSelector, singletonGroups, sort, Sort.RELEVANCE, 5, true);
330+
TopGroups<T> singletonTopGroups =
331+
control.getIndexSearcher().search(topLevel, topGroupsCollectorManager);
332332

333333
// TODO why does SearchGroup.merge() take a list but TopGroups.merge() take an array?
334334
@SuppressWarnings("unchecked")
335335
TopGroups<T>[] shardTopGroups = (TopGroups<T>[]) new TopGroups<?>[shards.length];
336336
int j = 0;
337337
for (Shard shard : shards) {
338-
TopGroupsCollector<T> sc =
339-
new TopGroupsCollector<>(getGroupSelector(), mergedGroups, sort, Sort.RELEVANCE, 5, true);
340-
shard.getIndexSearcher().search(topLevel, sc);
341-
shardTopGroups[j] = sc.getTopGroups(0);
338+
TopGroupsCollectorManager<T> scm =
339+
new TopGroupsCollectorManager<>(
340+
this::getGroupSelector, mergedGroups, sort, Sort.RELEVANCE, 5, true);
341+
shardTopGroups[j] = shard.getIndexSearcher().search(topLevel, scm);
342342
j++;
343343
}
344344
TopGroups<T> mergedTopGroups =

0 commit comments

Comments
 (0)