Skip to content

Commit 49c7b37

Browse files
committed
perf: vectorize topN native engine
1 parent 8f3df74 commit 49c7b37

13 files changed

Lines changed: 1528 additions & 2 deletions
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.query.groupby.epinephelinae;
21+
22+
import it.unimi.dsi.fastutil.Hash;
23+
import it.unimi.dsi.fastutil.objects.Object2IntMap;
24+
import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap;
25+
import org.apache.datasketches.memory.Memory;
26+
import org.apache.druid.java.util.common.parsers.CloseableIterator;
27+
import org.apache.druid.query.aggregation.AggregatorAdapters;
28+
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
29+
30+
import java.nio.ByteBuffer;
31+
import java.nio.ByteOrder;
32+
import java.util.Arrays;
33+
import java.util.Iterator;
34+
35+
/**
36+
* On-heap {@link VectorGrouper} backed by a Java hash map and a growable on-heap {@link ByteBuffer} for
37+
* aggregator state. Unlike {@link HashVectorGrouper}, this grouper never fails with "hash table full" — it
38+
* grows its state buffer on demand — which makes it suitable for callers that cannot tolerate partial
39+
* aggregation (e.g. topN, where truncating groups mid-aggregation would produce wrong results).
40+
*
41+
* Vectorized reads and writes (selector vector reads, batched aggregator writes) are preserved; only the
42+
* hash table itself is heap-backed.
43+
*/
44+
public class HeapVectorGrouper implements VectorGrouper
45+
{
46+
private static final Hash.Strategy<byte[]> BYTE_ARRAY_HASH_STRATEGY = new Hash.Strategy<byte[]>()
47+
{
48+
@Override
49+
public int hashCode(byte[] o)
50+
{
51+
return Arrays.hashCode(o);
52+
}
53+
54+
@Override
55+
public boolean equals(byte[] a, byte[] b)
56+
{
57+
return Arrays.equals(a, b);
58+
}
59+
};
60+
61+
private static final int MIN_INITIAL_STATE_BUFFER_SIZE = 4096;
62+
63+
private final AggregatorAdapters aggregators;
64+
private final int keySize;
65+
private final int aggStateSize;
66+
private final Object2IntOpenCustomHashMap<byte[]> keyToOffset;
67+
68+
private boolean initialized;
69+
private ByteBuffer aggStateBuffer;
70+
private int aggStateEnd;
71+
72+
private int[] vAggregationPositions;
73+
private int[] vAggregationRows;
74+
private byte[] keyScratch;
75+
76+
public HeapVectorGrouper(final AggregatorAdapters aggregators, final int keySize)
77+
{
78+
this.aggregators = aggregators;
79+
this.keySize = keySize;
80+
this.aggStateSize = aggregators.spaceNeeded();
81+
this.keyToOffset = new Object2IntOpenCustomHashMap<>(BYTE_ARRAY_HASH_STRATEGY);
82+
this.keyToOffset.defaultReturnValue(-1);
83+
}
84+
85+
@Override
86+
public void initVectorized(final int maxVectorSize)
87+
{
88+
if (initialized) {
89+
return;
90+
}
91+
this.aggStateBuffer = ByteBuffer.allocate(MIN_INITIAL_STATE_BUFFER_SIZE);
92+
this.vAggregationPositions = new int[maxVectorSize];
93+
this.vAggregationRows = new int[maxVectorSize];
94+
this.keyScratch = new byte[keySize];
95+
this.aggStateEnd = 0;
96+
this.initialized = true;
97+
}
98+
99+
@Override
100+
public AggregateResult aggregateVector(final Memory keySpace, final int startRow, final int endRow)
101+
{
102+
final int numRows = endRow - startRow;
103+
104+
for (int i = 0; i < numRows; i++) {
105+
keySpace.getByteArray((long) i * keySize, keyScratch, 0, keySize);
106+
int offset = keyToOffset.getInt(keyScratch);
107+
if (offset == -1) {
108+
// Grow only when a new group actually needs state; avoids worst-case pre-allocation.
109+
if ((long) aggStateEnd + aggStateSize > aggStateBuffer.capacity()) {
110+
growBuffer((long) aggStateEnd + aggStateSize);
111+
}
112+
offset = aggStateEnd;
113+
final byte[] keyCopy = Arrays.copyOf(keyScratch, keySize);
114+
keyToOffset.put(keyCopy, offset);
115+
aggregators.init(aggStateBuffer, offset);
116+
aggStateEnd += aggStateSize;
117+
}
118+
vAggregationPositions[i] = offset;
119+
}
120+
121+
aggregators.aggregateVector(
122+
aggStateBuffer,
123+
numRows,
124+
vAggregationPositions,
125+
Groupers.writeAggregationRows(vAggregationRows, startRow, endRow)
126+
);
127+
128+
return AggregateResult.ok();
129+
}
130+
131+
private void growBuffer(final long neededCapacity)
132+
{
133+
int newCapacity = aggStateBuffer.capacity();
134+
while (newCapacity < neededCapacity) {
135+
final long doubled = (long) newCapacity * 2;
136+
if (doubled > Integer.MAX_VALUE) {
137+
newCapacity = Integer.MAX_VALUE;
138+
break;
139+
}
140+
newCapacity = (int) doubled;
141+
}
142+
143+
final ByteBuffer oldBuffer = aggStateBuffer;
144+
final ByteBuffer newBuffer = ByteBuffer.allocate(newCapacity);
145+
146+
// Copy existing aggregator state bytes. Positions remain valid in the new buffer since we copy to
147+
// the same offsets.
148+
oldBuffer.position(0);
149+
oldBuffer.limit(aggStateEnd);
150+
newBuffer.put(oldBuffer);
151+
152+
// Inform aggregators that their state bytes moved to a different buffer. Positions are unchanged.
153+
for (int pos = 0; pos < aggStateEnd; pos += aggStateSize) {
154+
aggregators.relocate(pos, pos, oldBuffer, newBuffer);
155+
}
156+
157+
this.aggStateBuffer = newBuffer;
158+
}
159+
160+
@Override
161+
public void reset()
162+
{
163+
aggregators.reset();
164+
keyToOffset.clear();
165+
aggStateEnd = 0;
166+
}
167+
168+
@Override
169+
public void close()
170+
{
171+
aggregators.reset();
172+
keyToOffset.clear();
173+
aggStateBuffer = null;
174+
vAggregationPositions = null;
175+
vAggregationRows = null;
176+
keyScratch = null;
177+
}
178+
179+
@Override
180+
public CloseableIterator<Grouper.Entry<MemoryPointer>> iterator()
181+
{
182+
final Iterator<Object2IntMap.Entry<byte[]>> mapIter =
183+
keyToOffset.object2IntEntrySet().fastIterator();
184+
185+
return new CloseableIterator<>()
186+
{
187+
final MemoryPointer reusableKey = new MemoryPointer();
188+
final ReusableEntry<MemoryPointer> reusableEntry =
189+
new ReusableEntry<>(reusableKey, new Object[aggregators.size()]);
190+
final byte[] keyBytesHolder = new byte[keySize];
191+
final Memory keyMemory = Memory.wrap(keyBytesHolder, 0, keySize, ByteOrder.nativeOrder());
192+
193+
@Override
194+
public boolean hasNext()
195+
{
196+
return mapIter.hasNext();
197+
}
198+
199+
@Override
200+
public Grouper.Entry<MemoryPointer> next()
201+
{
202+
final Object2IntMap.Entry<byte[]> mapEntry = mapIter.next();
203+
System.arraycopy(mapEntry.getKey(), 0, keyBytesHolder, 0, keySize);
204+
reusableKey.set(keyMemory, 0);
205+
206+
final int position = mapEntry.getIntValue();
207+
for (int i = 0; i < aggregators.size(); i++) {
208+
reusableEntry.getValues()[i] = aggregators.get(aggStateBuffer, position, i);
209+
}
210+
return reusableEntry;
211+
}
212+
213+
@Override
214+
public void close()
215+
{
216+
// Nothing to close.
217+
}
218+
};
219+
}
220+
}

processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.druid.java.util.common.granularity.Granularities;
2727
import org.apache.druid.java.util.common.guava.Sequence;
2828
import org.apache.druid.java.util.common.guava.Sequences;
29+
import org.apache.druid.java.util.common.io.Closer;
2930
import org.apache.druid.query.ColumnSelectorPlus;
3031
import org.apache.druid.query.CursorGranularizer;
3132
import org.apache.druid.query.QueryContexts;
@@ -35,6 +36,7 @@
3536
import org.apache.druid.query.extraction.ExtractionFn;
3637
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
3738
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
39+
import org.apache.druid.query.topn.vector.VectorTopNEngine;
3840
import org.apache.druid.segment.ColumnSelectorFactory;
3941
import org.apache.druid.segment.Cursor;
4042
import org.apache.druid.segment.CursorBuildSpec;
@@ -96,13 +98,34 @@ public Sequence<Result<TopNResultValue>> query(
9698
if (cursorHolder.isPreAggregated()) {
9799
query = query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
98100
}
101+
102+
final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
103+
104+
final boolean canVectorize = cursorHolder.canVectorize()
105+
&& VectorTopNEngine.canVectorize(query, cursorFactory);
106+
final boolean shouldVectorize = query.context().getVectorize().shouldVectorize(canVectorize);
107+
108+
if (shouldVectorize) {
109+
final ResourceHolder<ByteBuffer> bufHolder = bufferPool.take();
110+
try {
111+
final Closer resourceCloser = Closer.create();
112+
resourceCloser.register(bufHolder);
113+
resourceCloser.register(cursorHolder);
114+
return Sequences.filter(
115+
VectorTopNEngine.process(query, timeBoundaryInspector, cursorHolder, bufHolder.get()),
116+
Predicates.notNull()
117+
).withBaggage(resourceCloser);
118+
}
119+
catch (Throwable t) {
120+
throw CloseableUtils.closeAndWrapInCatch(t, bufHolder);
121+
}
122+
}
123+
99124
final Cursor cursor = cursorHolder.asCursor();
100125
if (cursor == null) {
101126
return Sequences.withBaggage(Sequences.empty(), cursorHolder);
102127
}
103128

104-
final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
105-
106129
final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
107130

108131
final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.query.topn.vector;
21+
22+
import it.unimi.dsi.fastutil.objects.Object2IntMap;
23+
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
24+
import org.apache.datasketches.memory.WritableMemory;
25+
import org.apache.druid.query.groupby.epinephelinae.DictionaryBuildingUtils;
26+
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
27+
import org.apache.druid.segment.DimensionHandlerUtils;
28+
import org.apache.druid.segment.vector.VectorObjectSelector;
29+
30+
import javax.annotation.Nullable;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
34+
/**
35+
* {@link TopNVectorColumnSelector} for single-valued STRING columns that are not natively dictionary-encoded,
36+
* such as expression virtual columns. Builds a local int dictionary on-the-fly and encodes keys as 4-byte
37+
* dictionary IDs, matching the key format of {@link SingleValueStringTopNVectorColumnSelector}.
38+
*/
39+
public class DictionaryBuildingSingleValueStringTopNVectorColumnSelector implements TopNVectorColumnSelector
40+
{
41+
private final VectorObjectSelector selector;
42+
private final List<String> dictionary = new ArrayList<>();
43+
private final Object2IntMap<String> reverseDictionary = new Object2IntOpenHashMap<>();
44+
45+
DictionaryBuildingSingleValueStringTopNVectorColumnSelector(final VectorObjectSelector selector)
46+
{
47+
this.selector = selector;
48+
reverseDictionary.defaultReturnValue(-1);
49+
}
50+
51+
@Override
52+
public int getGroupingKeySize()
53+
{
54+
return Integer.BYTES;
55+
}
56+
57+
@Override
58+
public int writeKeys(
59+
final WritableMemory keySpace,
60+
final int keySize,
61+
final int keyOffset,
62+
final int startRow,
63+
final int endRow
64+
)
65+
{
66+
final Object[] vector = selector.getObjectVector();
67+
int stateFootprintIncrease = 0;
68+
69+
for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
70+
final String value = DimensionHandlerUtils.convertObjectToString(vector[i]);
71+
int dictId = reverseDictionary.getInt(value);
72+
if (dictId < 0) {
73+
dictId = dictionary.size();
74+
dictionary.add(value);
75+
reverseDictionary.put(value, dictId);
76+
stateFootprintIncrease +=
77+
DictionaryBuildingUtils.estimateEntryFootprint(value == null ? 0 : value.length() * Character.BYTES);
78+
}
79+
keySpace.putInt(j, dictId);
80+
}
81+
82+
return stateFootprintIncrease;
83+
}
84+
85+
@Override
86+
@Nullable
87+
public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset)
88+
{
89+
return dictionary.get(keyMemory.memory().getInt(keyMemory.position() + keyOffset));
90+
}
91+
92+
@Override
93+
public void reset()
94+
{
95+
dictionary.clear();
96+
reverseDictionary.clear();
97+
}
98+
}

0 commit comments

Comments
 (0)