Skip to content

Commit 064bcaa

Browse files
committed
Fix bin count when bins are being split
1 parent 0643be4 commit 064bcaa

2 files changed

Lines changed: 30 additions & 20 deletions

File tree

dogstatsd-http-core/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,31 @@
1212

1313
/** Builder for sketch timeseries. */
1414
public class SketchMetric extends Metric<SketchMetric> {
15+
static final long maxBinCount = (1L << 32) - 1;
16+
static final long maxBinBytes = ProtoUtil.varintLen(maxBinCount);
17+
18+
static class BinConsumer implements Sketch.BinConsumer {
19+
int numBins;
20+
ColumnarBuffer r;
21+
DeltaEncoder dk = new DeltaEncoder();
22+
23+
BinConsumer(ColumnarBuffer record) {
24+
r = record;
25+
}
26+
27+
@Override
28+
public void consumeBin(short key, long count) {
29+
while (count > maxBinCount) {
30+
r.putSint64(Column.sketchBinKeys, dk.encode(key));
31+
r.putUint64(Column.sketchBinCnts, maxBinCount);
32+
count -= maxBinCount;
33+
numBins++;
34+
}
35+
r.putSint64(Column.sketchBinKeys, dk.encode(key));
36+
r.putUint64(Column.sketchBinCnts, count);
37+
numBins++;
38+
}
39+
}
1540

1641
SketchMetric(PayloadBuilder pb, int type, String name) {
1742
super(pb, type, name);
@@ -30,9 +55,6 @@ protected SketchMetric self() {
3055
* @return This.
3156
*/
3257
public SketchMetric addPoint(long timestamp, Sketch sketch) {
33-
final long maxBinCount = (1L << 32) - 1;
34-
final long maxBinBytes = ProtoUtil.varintLen(maxBinCount);
35-
3658
// Skip doing the work if just the bin data would exceed payload size limit.
3759
if (sketch.count() / maxBinCount * maxBinBytes >= pb.maxPayloadSize) {
3860
throw new BufferOverflowException();
@@ -45,22 +67,10 @@ public SketchMetric addPoint(long timestamp, Sketch sketch) {
4567
pb.counts.put(sketch.count());
4668

4769
final ColumnarBuffer r = pb.currentRecord();
48-
final DeltaEncoder dk = new DeltaEncoder();
49-
50-
r.putUint64(Column.sketchNumBins, sketch.size());
51-
sketch.bins(
52-
new Sketch.BinConsumer() {
53-
@Override
54-
public void consumeBin(short key, long count) {
55-
while (count > maxBinCount) {
56-
r.putSint64(Column.sketchBinKeys, dk.encode(key));
57-
r.putUint64(Column.sketchBinCnts, maxBinCount);
58-
count -= maxBinCount;
59-
}
60-
r.putSint64(Column.sketchBinKeys, dk.encode(key));
61-
r.putUint64(Column.sketchBinCnts, count);
62-
}
63-
});
70+
final BinConsumer bc = new BinConsumer(r);
71+
72+
sketch.bins(bc);
73+
r.putUint64(Column.sketchNumBins, bc.numBins);
6474

6575
return this;
6676
}

dogstatsd-http-core/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ public void handle(byte[] p) {
284284
1,
285285
2,
286286
2,
287-
2,
287+
3,
288288
// sketchBinKeys
289289
(21 << 3) | 2,
290290
1,

0 commit comments

Comments
 (0)