Skip to content

Commit de983e4

Browse files
GWphuaabhishekrb19
andauthored
[Groupby Query Metrics] Add merge buffer tracking (#18731)
* Add byte buffer tracking for underlying hash tables * Byte buffer tracking for underlying offset handlers * Fix tests * Fix quidem tests * Documentation * bytesUsed naming * Add max metrics * Add missing calculation in BufferHashGrouper * Checkstyle * Checkstyle * GroupByStatsProvider javadocs * Fix GroupByStatsProviderTest comments * Fix doc order for GroupByStatsProvider metrics * Fix test for GroupByStatsMonitorTest * Update server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com> * Revert stylistic changes in BufferHashGrouper * Rename mergeBufferUsage to mergeBufferUsedBytes * Order of maxAcquisitionTimeNs * Track the open addressing hash table * Remove max metrics, push them in another PR... * Remove max metrics in GroupByStatsProviderTest * LimitedBufferHashGrouper to use parent method to report maxTableBufferUsedBytes * Standardised merge buffer names * Tests for buffer hash grouper Tests for buffer hash grouper * Address multiplication cast * Javadocs for getMergeBufferUsedBytes * Remix comments in test for peak calculations * Clean up after merging conflicts * Standardize maxMergeBufferUsedBytes * Test duplicate buffer adds * Test * Add javadocs for update --------- Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com>
1 parent 9bb5fed commit de983e4

15 files changed

Lines changed: 303 additions & 49 deletions

File tree

docs/operations/metrics.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring
9090
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
9191
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
9292
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
93+
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
94+
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
9395
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
9496
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
9597
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
@@ -117,6 +119,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring
117119
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
118120
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
119121
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
122+
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
123+
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
120124
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
121125
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
122126
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
@@ -147,6 +151,8 @@ to represent the task ID are deprecated and will be removed in a future release.
147151
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
148152
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
149153
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
154+
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
155+
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
150156
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
151157
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
152158
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|

processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
import java.util.concurrent.atomic.AtomicLong;
2828

2929
/**
30-
* Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size.
30+
* Collects groupBy query metrics (spilled bytes, merge buffer usage, dictionary size) per-query, then
31+
* aggregates them when queries complete. Stats are retrieved and reset periodically via {@link #getStatsSince()}.
3132
*/
3233
@LazySingleton
3334
public class GroupByStatsProvider
@@ -60,14 +61,18 @@ public synchronized void closeQuery(QueryResourceId resourceId)
6061

6162
public synchronized AggregateStats getStatsSince()
6263
{
63-
return aggregateStatsContainer.reset();
64+
AggregateStats aggregateStats = new AggregateStats(aggregateStatsContainer);
65+
aggregateStatsContainer.reset();
66+
return aggregateStats;
6467
}
6568

6669
public static class AggregateStats
6770
{
6871
private long mergeBufferQueries = 0;
6972
private long mergeBufferAcquisitionTimeNs = 0;
7073
private long maxMergeBufferAcquisitionTimeNs = 0;
74+
private long totalMergeBufferUsedBytes = 0;
75+
private long maxMergeBufferUsedBytes = 0;
7176
private long spilledQueries = 0;
7277
private long spilledBytes = 0;
7378
private long maxSpilledBytes = 0;
@@ -78,10 +83,28 @@ public AggregateStats()
7883
{
7984
}
8085

86+
public AggregateStats(AggregateStats aggregateStats)
87+
{
88+
this(
89+
aggregateStats.mergeBufferQueries,
90+
aggregateStats.mergeBufferAcquisitionTimeNs,
91+
aggregateStats.maxMergeBufferAcquisitionTimeNs,
92+
aggregateStats.totalMergeBufferUsedBytes,
93+
aggregateStats.maxMergeBufferUsedBytes,
94+
aggregateStats.spilledQueries,
95+
aggregateStats.spilledBytes,
96+
aggregateStats.maxSpilledBytes,
97+
aggregateStats.mergeDictionarySize,
98+
aggregateStats.maxMergeDictionarySize
99+
);
100+
}
101+
81102
public AggregateStats(
82103
long mergeBufferQueries,
83104
long mergeBufferAcquisitionTimeNs,
84105
long maxMergeBufferAcquisitionTimeNs,
106+
long totalMergeBufferUsedBytes,
107+
long maxMergeBufferUsedBytes,
85108
long spilledQueries,
86109
long spilledBytes,
87110
long maxSpilledBytes,
@@ -92,6 +115,8 @@ public AggregateStats(
92115
this.mergeBufferQueries = mergeBufferQueries;
93116
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
94117
this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
118+
this.totalMergeBufferUsedBytes = totalMergeBufferUsedBytes;
119+
this.maxMergeBufferUsedBytes = maxMergeBufferUsedBytes;
95120
this.spilledQueries = spilledQueries;
96121
this.spilledBytes = spilledBytes;
97122
this.maxSpilledBytes = maxSpilledBytes;
@@ -114,6 +139,16 @@ public long getMaxMergeBufferAcquisitionTimeNs()
114139
return maxMergeBufferAcquisitionTimeNs;
115140
}
116141

142+
public long getTotalMergeBufferUsedBytes()
143+
{
144+
return totalMergeBufferUsedBytes;
145+
}
146+
147+
public long getMaxMergeBufferUsedBytes()
148+
{
149+
return maxMergeBufferUsedBytes;
150+
}
151+
117152
public long getSpilledQueries()
118153
{
119154
return spilledQueries;
@@ -148,6 +183,8 @@ public void addQueryStats(PerQueryStats perQueryStats)
148183
maxMergeBufferAcquisitionTimeNs,
149184
perQueryStats.getMergeBufferAcquisitionTimeNs()
150185
);
186+
totalMergeBufferUsedBytes += perQueryStats.getMaxMergeBufferUsedBytes();
187+
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, perQueryStats.getMaxMergeBufferUsedBytes());
151188
}
152189

153190
if (perQueryStats.getSpilledBytes() > 0) {
@@ -160,36 +197,25 @@ public void addQueryStats(PerQueryStats perQueryStats)
160197
maxMergeDictionarySize = Math.max(maxMergeDictionarySize, perQueryStats.getMergeDictionarySize());
161198
}
162199

163-
public AggregateStats reset()
200+
public void reset()
164201
{
165-
AggregateStats aggregateStats =
166-
new AggregateStats(
167-
mergeBufferQueries,
168-
mergeBufferAcquisitionTimeNs,
169-
maxMergeBufferAcquisitionTimeNs,
170-
spilledQueries,
171-
spilledBytes,
172-
maxSpilledBytes,
173-
mergeDictionarySize,
174-
maxMergeDictionarySize
175-
);
176-
177202
this.mergeBufferQueries = 0;
178203
this.mergeBufferAcquisitionTimeNs = 0;
179204
this.maxMergeBufferAcquisitionTimeNs = 0;
205+
this.totalMergeBufferUsedBytes = 0;
206+
this.maxMergeBufferUsedBytes = 0;
180207
this.spilledQueries = 0;
181208
this.spilledBytes = 0;
182209
this.maxSpilledBytes = 0;
183210
this.mergeDictionarySize = 0;
184211
this.maxMergeDictionarySize = 0;
185-
186-
return aggregateStats;
187212
}
188213
}
189214

190215
public static class PerQueryStats
191216
{
192217
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
218+
private final AtomicLong maxMergeBufferUsedBytes = new AtomicLong(0);
193219
private final AtomicLong spilledBytes = new AtomicLong(0);
194220
private final AtomicLong mergeDictionarySize = new AtomicLong(0);
195221

@@ -198,6 +224,11 @@ public void mergeBufferAcquisitionTime(long delay)
198224
mergeBufferAcquisitionTimeNs.addAndGet(delay);
199225
}
200226

227+
public void maxMergeBufferUsedBytes(long bytes)
228+
{
229+
maxMergeBufferUsedBytes.addAndGet(bytes);
230+
}
231+
201232
public void spilledBytes(long bytes)
202233
{
203234
spilledBytes.addAndGet(bytes);
@@ -213,6 +244,11 @@ public long getMergeBufferAcquisitionTimeNs()
213244
return mergeBufferAcquisitionTimeNs.get();
214245
}
215246

247+
public long getMaxMergeBufferUsedBytes()
248+
{
249+
return maxMergeBufferUsedBytes.get();
250+
}
251+
216252
public long getSpilledBytes()
217253
{
218254
return spilledBytes.get();

processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,18 @@ public void close()
173173
aggregators.reset();
174174
}
175175

176+
/**
177+
* Retrieves the size of the merge buffers used for this groupby query. This value is retrieved when
178+
* {@link SpillingGrouper#close()} is called.
179+
* <p></p>
180+
* This method is implemented to return the highest memory value used, this is helpful especially in
181+
* reporting the highest number of bytes used throughout the entire query lifecycle.
182+
*/
183+
public long getMaxMergeBufferUsedBytes()
184+
{
185+
return hashTable.getMaxMergeBufferUsedBytes();
186+
}
187+
176188
/**
177189
* Populate a {@link ReusableEntry} with values from a particular bucket.
178190
*/

processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.druid.query.aggregation.AggregatorAdapters;
2727
import org.apache.druid.query.aggregation.AggregatorFactory;
2828

29-
import javax.annotation.Nullable;
3029
import java.nio.ByteBuffer;
3130
import java.util.AbstractList;
3231
import java.util.Collections;
@@ -50,7 +49,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
5049
// to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query.
5150
private final boolean useDefaultSorting;
5251

53-
@Nullable
5452
private ByteBufferIntList offsetList;
5553

5654
public BufferHashGrouper(
@@ -154,6 +152,18 @@ public void reset()
154152
aggregators.reset();
155153
}
156154

155+
@Override
156+
public long getMaxMergeBufferUsedBytes()
157+
{
158+
if (!initialized) {
159+
return 0L;
160+
}
161+
162+
long hashTableUsage = hashTable.getMaxMergeBufferUsedBytes();
163+
long offSetListUsage = offsetList.getMaxMergeBufferUsedBytes();
164+
return hashTableUsage + offSetListUsage;
165+
}
166+
157167
@Override
158168
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
159169
{

processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@
2626

2727
import java.nio.ByteBuffer;
2828

29+
/**
30+
* A fixed-width, open-addressing hash table that lives inside a caller-provided byte buffer.
31+
* <p>
32+
* The table uses a contiguous slice of the input {@link ByteBuffer} as its backing store. Each bucket holds
33+
* at most one entry, and occupies {@code bucketSizeWithHash} number of bytes. Collisions are resolved by continuously
34+
* probing the next bucket to find an empty bucket to slot the new entry. The current table view {@code tableBuffer}
35+
* is maintained as a {@link ByteBuffer} slice that moves and grows within the arena as the table expands.
36+
*/
2937
public class ByteBufferHashTable
3038
{
3139
public static int calculateTableArenaSizeWithPerBucketAdditionalSize(
@@ -79,6 +87,9 @@ public static int calculateTableArenaSizeWithFixedAdditionalSize(
7987
@Nullable
8088
protected BucketUpdateHandler bucketUpdateHandler;
8189

90+
// Tracks maximum bytes used for the entire lifecycle of this hash table.
91+
protected long maxMergeBufferUsedBytes;
92+
8293
public ByteBufferHashTable(
8394
float maxLoadFactor,
8495
int initialBuckets,
@@ -97,6 +108,7 @@ public ByteBufferHashTable(
97108
this.maxSizeForTesting = maxSizeForTesting;
98109
this.tableArenaSize = buffer.capacity();
99110
this.bucketUpdateHandler = bucketUpdateHandler;
111+
this.maxMergeBufferUsedBytes = 0;
100112
}
101113

102114
public void reset()
@@ -139,6 +151,7 @@ public void reset()
139151
bufferDup.position(tableStart);
140152
bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
141153
tableBuffer = bufferDup.slice();
154+
updateMaxMergeBufferUsedBytes();
142155

143156
// Clear used bits of new table
144157
for (int i = 0; i < maxBuckets; i++) {
@@ -245,6 +258,7 @@ protected void initializeNewBucketKey(
245258
tableBuffer.putInt(Groupers.getUsedFlag(keyHash));
246259
tableBuffer.put(keyBuffer);
247260
size++;
261+
updateMaxMergeBufferUsedBytes();
248262

249263
if (bucketUpdateHandler != null) {
250264
bucketUpdateHandler.handleNewBucket(offset);
@@ -427,6 +441,20 @@ public int getGrowthCount()
427441
return growthCount;
428442
}
429443

444+
/**
445+
* To maintain an accurate tracking of the maximum bytes used per query, this function is to be called immediately
446+
* whenever either of {@link #size} or {@link #bucketSizeWithHash} is changed.
447+
*/
448+
protected void updateMaxMergeBufferUsedBytes()
449+
{
450+
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, (long) size * bucketSizeWithHash);
451+
}
452+
453+
public long getMaxMergeBufferUsedBytes()
454+
{
455+
return maxMergeBufferUsedBytes;
456+
}
457+
430458
public interface BucketUpdateHandler
431459
{
432460
void handleNewBucket(int bucketOffset);

processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class ByteBufferIntList
3030
private final int maxElements;
3131
private int numElements;
3232

33+
private int maxMergeBufferUsedBytes;
34+
3335
public ByteBufferIntList(
3436
ByteBuffer buffer,
3537
int maxElements
@@ -38,6 +40,7 @@ public ByteBufferIntList(
3840
this.buffer = buffer;
3941
this.maxElements = maxElements;
4042
this.numElements = 0;
43+
this.maxMergeBufferUsedBytes = 0;
4144

4245
if (buffer.capacity() < (maxElements * Integer.BYTES)) {
4346
throw new IAE(
@@ -55,6 +58,7 @@ public void add(int val)
5558
}
5659
buffer.putInt(numElements * Integer.BYTES, val);
5760
numElements++;
61+
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, numElements * Integer.BYTES);
5862
}
5963

6064
public void set(int index, int val)
@@ -71,4 +75,9 @@ public void reset()
7175
{
7276
numElements = 0;
7377
}
78+
79+
public int getMaxMergeBufferUsedBytes()
80+
{
81+
return maxMergeBufferUsedBytes;
82+
}
7483
}

processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class ByteBufferMinMaxOffsetHeap
4444

4545
private int heapSize;
4646
private int maxHeapSize;
47+
private int maxMergeBufferUsedBytes;
4748

4849
public ByteBufferMinMaxOffsetHeap(
4950
ByteBuffer buf,
@@ -55,6 +56,7 @@ public ByteBufferMinMaxOffsetHeap(
5556
this.buf = buf;
5657
this.limit = limit;
5758
this.heapSize = 0;
59+
this.maxMergeBufferUsedBytes = 0;
5860
this.minComparator = minComparator;
5961
this.maxComparator = Ordering.from(minComparator).reverse();
6062
this.heapIndexUpdater = heapIndexUpdater;
@@ -71,9 +73,9 @@ public int addOffset(int offset)
7173
int pos = heapSize;
7274
buf.putInt(pos * Integer.BYTES, offset);
7375
heapSize++;
74-
if (heapSize > maxHeapSize) {
75-
maxHeapSize = heapSize;
76-
}
76+
77+
maxHeapSize = Math.max(maxHeapSize, heapSize);
78+
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, maxHeapSize * Integer.BYTES);
7779

7880
if (heapIndexUpdater != null) {
7981
heapIndexUpdater.updateHeapIndexForOffset(offset, pos);
@@ -226,6 +228,11 @@ public int getHeapSize()
226228
return heapSize;
227229
}
228230

231+
public int getMaxMergeBufferUsedBytes()
232+
{
233+
return maxMergeBufferUsedBytes;
234+
}
235+
229236
private void bubbleUp(int pos)
230237
{
231238
if (isEvenLevel(pos)) {

0 commit comments

Comments
 (0)