Skip to content

Commit 05c35ff

Browse files
committed
perf: vectorize topN native engine
1 parent 8f3df74 commit 05c35ff

18 files changed

Lines changed: 2281 additions & 2 deletions

benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.fasterxml.jackson.core.JsonProcessingException;
2323
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import com.google.common.collect.ImmutableMap;
2425
import org.apache.druid.collections.StupidPool;
2526
import org.apache.druid.jackson.DefaultObjectMapper;
2627
import org.apache.druid.java.util.common.FileUtils;
@@ -113,6 +114,9 @@ public class TopNBenchmark
113114
@Param({"all", "hour"})
114115
private String queryGranularity;
115116

117+
@Param({"false", "force"})
118+
private String vectorize;
119+
116120
private static final Logger log = new Logger(TopNBenchmark.class);
117121
private static final int RNG_SEED = 9999;
118122
private static final IndexMergerV9 INDEX_MERGER_V9;
@@ -161,6 +165,7 @@ private void setupQueries()
161165
.dimension("dimSequential")
162166
.metric("sumFloatNormal")
163167
.intervals(intervalSpec)
168+
.context(ImmutableMap.of("vectorize", vectorize))
164169
.aggregators(queryAggs);
165170

166171
basicQueries.put("A", queryBuilderA);
@@ -177,6 +182,7 @@ private void setupQueries()
177182
.dimension("dimUniform")
178183
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
179184
.intervals(intervalSpec)
185+
.context(ImmutableMap.of("vectorize", vectorize))
180186
.aggregators(queryAggs);
181187

182188
basicQueries.put("numericSort", queryBuilderA);
@@ -193,6 +199,7 @@ private void setupQueries()
193199
.dimension("dimUniform")
194200
.metric(new DimensionTopNMetricSpec(null, StringComparators.ALPHANUMERIC))
195201
.intervals(intervalSpec)
202+
.context(ImmutableMap.of("vectorize", vectorize))
196203
.aggregators(queryAggs);
197204

198205
basicQueries.put("alphanumericSort", queryBuilderA);
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.query.groupby.epinephelinae;
21+
22+
import it.unimi.dsi.fastutil.Hash;
23+
import it.unimi.dsi.fastutil.objects.Object2IntMap;
24+
import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap;
25+
import org.apache.datasketches.memory.Memory;
26+
import org.apache.druid.java.util.common.ISE;
27+
import org.apache.druid.java.util.common.parsers.CloseableIterator;
28+
import org.apache.druid.query.aggregation.AggregatorAdapters;
29+
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
30+
31+
import java.nio.ByteBuffer;
32+
import java.nio.ByteOrder;
33+
import java.util.Arrays;
34+
import java.util.Iterator;
35+
36+
/**
37+
* On-heap {@link VectorGrouper} that grows aggregator state on demand, up to maximum limit of 2GB.
38+
*
39+
* Vectorized analogue of {@link org.apache.druid.query.topn.BaseTopNAlgorithm}'s
40+
* {@code runWithCardinalityUnknown} path: used when dimension cardinality is unknown (numeric columns,
41+
* non-dict-encoded string virtual columns) or when a dict-encoded string column's cardinality exceeds
42+
* the processing buffer. Memory footprint is on-heap and grows with the distinct-key count — matching
43+
* the non-vectorized path's memory profile for the same queries.
44+
*/
45+
public class HeapVectorGrouper implements VectorGrouper
46+
{
47+
private static final Hash.Strategy<byte[]> BYTE_ARRAY_HASH_STRATEGY = new Hash.Strategy<byte[]>()
48+
{
49+
@Override
50+
public int hashCode(byte[] o)
51+
{
52+
return Arrays.hashCode(o);
53+
}
54+
55+
@Override
56+
public boolean equals(byte[] a, byte[] b)
57+
{
58+
return Arrays.equals(a, b);
59+
}
60+
};
61+
62+
private static final int MIN_INITIAL_STATE_BUFFER_SIZE = 4096;
63+
64+
private final AggregatorAdapters aggregators;
65+
private final int keySize;
66+
private final int aggStateSize;
67+
private final Object2IntOpenCustomHashMap<byte[]> keyToOffset;
68+
69+
private boolean initialized;
70+
private ByteBuffer aggStateBuffer;
71+
private int aggStateEnd;
72+
73+
private int[] vAggregationPositions;
74+
private int[] vAggregationRows;
75+
private byte[] keyScratch;
76+
77+
public HeapVectorGrouper(final AggregatorAdapters aggregators, final int keySize)
78+
{
79+
this.aggregators = aggregators;
80+
this.keySize = keySize;
81+
this.aggStateSize = aggregators.spaceNeeded();
82+
this.keyToOffset = new Object2IntOpenCustomHashMap<>(BYTE_ARRAY_HASH_STRATEGY);
83+
this.keyToOffset.defaultReturnValue(-1);
84+
}
85+
86+
@Override
87+
public void initVectorized(final int maxVectorSize)
88+
{
89+
if (initialized) {
90+
if (vAggregationPositions.length != maxVectorSize) {
91+
throw new ISE(
92+
"initVectorized called with different maxVectorSize (existing=%d, new=%d)",
93+
vAggregationPositions.length,
94+
maxVectorSize
95+
);
96+
}
97+
return;
98+
}
99+
this.aggStateBuffer = ByteBuffer.allocate(MIN_INITIAL_STATE_BUFFER_SIZE);
100+
this.vAggregationPositions = new int[maxVectorSize];
101+
this.vAggregationRows = new int[maxVectorSize];
102+
this.keyScratch = new byte[keySize];
103+
this.aggStateEnd = 0;
104+
this.initialized = true;
105+
}
106+
107+
/**
108+
* Contract: keys for rows [startRow, endRow) must be packed contiguously at {@code keySpace[0 ..
109+
* numRows * keySize)}; {@code startRow}/{@code endRow} are source-vector indices used to look up aggregator
110+
* input values.
111+
*/
112+
@Override
113+
public AggregateResult aggregateVector(final Memory keySpace, final int startRow, final int endRow)
114+
{
115+
final int numRows = endRow - startRow;
116+
117+
for (int i = 0; i < numRows; i++) {
118+
keySpace.getByteArray((long) i * keySize, keyScratch, 0, keySize);
119+
int offset = keyToOffset.getInt(keyScratch);
120+
if (offset == -1) {
121+
if ((long) aggStateEnd + aggStateSize > aggStateBuffer.capacity()) {
122+
growBuffer((long) aggStateEnd + aggStateSize);
123+
}
124+
offset = aggStateEnd;
125+
final byte[] keyCopy = Arrays.copyOf(keyScratch, keySize);
126+
keyToOffset.put(keyCopy, offset);
127+
aggregators.init(aggStateBuffer, offset);
128+
aggStateEnd += aggStateSize;
129+
}
130+
vAggregationPositions[i] = offset;
131+
}
132+
133+
aggregators.aggregateVector(
134+
aggStateBuffer,
135+
numRows,
136+
vAggregationPositions,
137+
Groupers.writeAggregationRows(vAggregationRows, startRow, endRow)
138+
);
139+
140+
return AggregateResult.ok();
141+
}
142+
143+
private void growBuffer(final long neededCapacity)
144+
{
145+
if (neededCapacity > Integer.MAX_VALUE) {
146+
throw new ISE("Aggregator state exceeds 2 GB; cardinality too high for HeapVectorGrouper");
147+
}
148+
int newCapacity = aggStateBuffer.capacity();
149+
while (newCapacity < neededCapacity) {
150+
final long doubled = (long) newCapacity * 2;
151+
newCapacity = doubled > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) doubled;
152+
}
153+
154+
final ByteBuffer oldBuffer = aggStateBuffer;
155+
final ByteBuffer newBuffer = ByteBuffer.allocate(newCapacity);
156+
oldBuffer.position(0);
157+
oldBuffer.limit(aggStateEnd);
158+
newBuffer.put(oldBuffer);
159+
160+
for (int pos = 0; pos < aggStateEnd; pos += aggStateSize) {
161+
aggregators.relocate(pos, pos, oldBuffer, newBuffer);
162+
}
163+
164+
this.aggStateBuffer = newBuffer;
165+
}
166+
167+
@Override
168+
public void reset()
169+
{
170+
aggregators.reset();
171+
keyToOffset.clear();
172+
aggStateEnd = 0;
173+
}
174+
175+
@Override
176+
public void close()
177+
{
178+
reset();
179+
}
180+
181+
@Override
182+
public CloseableIterator<Grouper.Entry<MemoryPointer>> iterator()
183+
{
184+
final Iterator<Object2IntMap.Entry<byte[]>> mapIter =
185+
keyToOffset.object2IntEntrySet().fastIterator();
186+
187+
return new CloseableIterator<>()
188+
{
189+
final ReusableEntry<MemoryPointer> reusableEntry =
190+
new ReusableEntry<>(new MemoryPointer(), new Object[aggregators.size()]);
191+
192+
@Override
193+
public boolean hasNext()
194+
{
195+
return mapIter.hasNext();
196+
}
197+
198+
@Override
199+
public Grouper.Entry<MemoryPointer> next()
200+
{
201+
final Object2IntMap.Entry<byte[]> mapEntry = mapIter.next();
202+
reusableEntry.getKey().set(Memory.wrap(mapEntry.getKey(), ByteOrder.nativeOrder()), 0);
203+
204+
final int position = mapEntry.getIntValue();
205+
for (int i = 0; i < aggregators.size(); i++) {
206+
reusableEntry.getValues()[i] = aggregators.get(aggStateBuffer, position, i);
207+
}
208+
return reusableEntry;
209+
}
210+
211+
@Override
212+
public void close()
213+
{
214+
// Nothing to close.
215+
}
216+
};
217+
}
218+
}

processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.druid.java.util.common.granularity.Granularities;
2727
import org.apache.druid.java.util.common.guava.Sequence;
2828
import org.apache.druid.java.util.common.guava.Sequences;
29+
import org.apache.druid.java.util.common.io.Closer;
2930
import org.apache.druid.query.ColumnSelectorPlus;
3031
import org.apache.druid.query.CursorGranularizer;
3132
import org.apache.druid.query.QueryContexts;
@@ -35,6 +36,7 @@
3536
import org.apache.druid.query.extraction.ExtractionFn;
3637
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
3738
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
39+
import org.apache.druid.query.topn.vector.VectorTopNEngine;
3840
import org.apache.druid.segment.ColumnSelectorFactory;
3941
import org.apache.druid.segment.Cursor;
4042
import org.apache.druid.segment.CursorBuildSpec;
@@ -96,13 +98,34 @@ public Sequence<Result<TopNResultValue>> query(
9698
if (cursorHolder.isPreAggregated()) {
9799
query = query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
98100
}
101+
102+
final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
103+
104+
final boolean canVectorize = cursorHolder.canVectorize()
105+
&& VectorTopNEngine.canVectorize(query, cursorFactory);
106+
final boolean shouldVectorize = query.context().getVectorize().shouldVectorize(canVectorize);
107+
108+
if (shouldVectorize) {
109+
final ResourceHolder<ByteBuffer> bufHolder = bufferPool.take();
110+
try {
111+
final Closer resourceCloser = Closer.create();
112+
resourceCloser.register(bufHolder);
113+
resourceCloser.register(cursorHolder);
114+
return Sequences.filter(
115+
VectorTopNEngine.process(query, timeBoundaryInspector, cursorHolder, bufHolder.get()),
116+
Predicates.notNull()
117+
).withBaggage(resourceCloser);
118+
}
119+
catch (Throwable t) {
120+
throw CloseableUtils.closeAndWrapInCatch(t, bufHolder);
121+
}
122+
}
123+
99124
final Cursor cursor = cursorHolder.asCursor();
100125
if (cursor == null) {
101126
return Sequences.withBaggage(Sequences.empty(), cursorHolder);
102127
}
103128

104-
final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
105-
106129
final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
107130

108131
final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =

0 commit comments

Comments
 (0)