Skip to content

Commit ba956d6

Browse files
committed
Encode sketches
1 parent d5ba75f commit ba956d6

2 files changed

Lines changed: 52 additions & 66 deletions

File tree

dogstatsd-http-core/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
package com.datadoghq.dogstatsd.http.serializer;
99

10+
import com.datadoghq.dogstatsd.Sketch;
11+
import java.nio.BufferOverflowException;
12+
1013
/** Builder for sketch timeseries. */
1114
public class SketchMetric extends Metric<SketchMetric> {
1215

@@ -20,44 +23,44 @@ protected SketchMetric self() {
2023
}
2124

2225
/**
23-
* Add a new timeseries point.
26+
* Add a new timeseries point sourced from a {@link Sketch}.
2427
*
2528
* @param timestamp Timestamp of the point in seconds since Unix epoch.
26-
* @param sum Total sum of all observed values.
27-
* @param min Minimum observed value.
28-
* @param max Maximum observed value.
29-
* @param cnt Number of observed values.
30-
* @param binKeys Array of keys for each bin in the sketch.
31-
* @param binCnts Array of number of observations for each bin.
29+
* @param sketch Sketch supplying the summary statistics and bin distribution.
3230
* @return This.
3331
*/
34-
public SketchMetric addPoint(
35-
long timestamp,
36-
double sum,
37-
double min,
38-
double max,
39-
long cnt,
40-
int[] binKeys,
41-
int[] binCnts) {
32+
public SketchMetric addPoint(long timestamp, Sketch sketch) {
33+
final long maxBinCount = (1L << 32) - 1;
34+
final long maxBinBytes = ProtoUtil.varintLen(maxBinCount);
4235

43-
if (binKeys.length != binCnts.length) {
44-
throw new IllegalArgumentException("binKeys and binCnts must have the same length");
36+
// Skip doing the work if just the bin data would exceed payload size limit.
37+
if (sketch.count() / maxBinCount * maxBinBytes >= pb.maxPayloadSize) {
38+
throw new BufferOverflowException();
4539
}
4640

4741
pb.timestamps.put(timestamp);
48-
pb.values.put(sum);
49-
pb.values.put(min);
50-
pb.values.put(max);
51-
pb.counts.put(cnt);
42+
pb.values.put(sketch.sum());
43+
pb.values.put(sketch.min());
44+
pb.values.put(sketch.max());
45+
pb.counts.put(sketch.count());
5246

53-
ColumnarBuffer r = pb.currentRecord();
54-
DeltaEncoder dk = new DeltaEncoder();
47+
final ColumnarBuffer r = pb.currentRecord();
48+
final DeltaEncoder dk = new DeltaEncoder();
5549

56-
r.putUint64(Column.sketchNumBins, binKeys.length);
57-
for (int i = 0; i < binKeys.length; i++) {
58-
r.putSint64(Column.sketchBinKeys, dk.encode(binKeys[i]));
59-
r.putUint64(Column.sketchBinCnts, binCnts[i]);
60-
}
50+
r.putUint64(Column.sketchNumBins, sketch.size());
51+
sketch.bins(
52+
new Sketch.BinConsumer() {
53+
@Override
54+
public void consumeBin(short key, long count) {
55+
while (count > maxBinCount) {
56+
r.putSint64(Column.sketchBinKeys, dk.encode(key));
57+
r.putUint64(Column.sketchBinCnts, maxBinCount);
58+
count -= maxBinCount;
59+
}
60+
r.putSint64(Column.sketchBinKeys, dk.encode(key));
61+
r.putUint64(Column.sketchBinCnts, count);
62+
}
63+
});
6164

6265
return this;
6366
}

dogstatsd-http-core/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java

Lines changed: 21 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.junit.Assert.assertEquals;
1111
import static org.junit.Assert.assertTrue;
1212

13+
import com.datadoghq.dogstatsd.Sketch;
1314
import java.util.ArrayList;
1415
import java.util.Arrays;
1516
import org.junit.Test;
@@ -42,10 +43,15 @@ public void handle(byte[] p) {
4243

4344
b.gauge("defgh").addPoint(100, 0).close();
4445

46+
Sketch sketch1 = new Sketch();
47+
sketch1.build(new long[] {1, 2, 2}, 1.0);
48+
Sketch sketch2 = new Sketch();
49+
sketch2.build(new long[] {2, 2, 3, 3, 3}, 1.0);
50+
4551
b.sketch("ijk")
4652
.setTags(Arrays.asList(new String[] {"foo", "baz"}))
47-
.addPoint(100, 4.75, 1.25, 1.75, 3, new int[] {1351, 1373}, new int[] {1, 2})
48-
.addPoint(110, 6.5, 2.25, 2.75, 5, new int[] {1389, 1402}, new int[] {2, 3})
53+
.addPoint(100, sketch1)
54+
.addPoint(110, sketch2)
4955
.close();
5056

5157
b.rate("lm").setInterval(10).addPoint(100, 3.14).close();
@@ -60,7 +66,7 @@ public void handle(byte[] p) {
6066
new int[] {
6167
// MetricData
6268
(3 << 3) | 2,
63-
188,
69+
167,
6470
1,
6571
// dictNameStr
6672
(1 << 3) | 2,
@@ -138,7 +144,7 @@ public void handle(byte[] p) {
138144
4,
139145
0x11,
140146
0x03,
141-
0x24,
147+
0x14,
142148
0x32,
143149
// names
144150
(11 << 3) | 2,
@@ -189,40 +195,17 @@ public void handle(byte[] p) {
189195
// valsSint64
190196
(17 << 3) | 2,
191197
1,
198+
10,
199+
2,
192200
4,
201+
10,
193202
2,
194203
4,
195204
6,
205+
26,
206+
4,
207+
6,
196208
10,
197-
// valsFloat32,
198-
// list(pack('<ffffff', 4.75, 1.25, 1.75, 6.5, 2.25, 2.75))
199-
(18 << 3) | 2,
200-
1,
201-
24,
202-
0,
203-
0,
204-
152,
205-
64,
206-
0,
207-
0,
208-
160,
209-
63,
210-
0,
211-
0,
212-
224,
213-
63,
214-
0,
215-
0,
216-
208,
217-
64,
218-
0,
219-
0,
220-
16,
221-
64,
222-
0,
223-
0,
224-
48,
225-
64,
226209
// valsFloat64, list(pack('<d', 3.14))
227210
(19 << 3) | 2,
228211
1,
@@ -245,12 +228,12 @@ public void handle(byte[] p) {
245228
(21 << 3) | 2,
246229
1,
247230
6,
248-
142,
249-
21,
250-
44,
251-
218,
231+
244,
232+
20,
233+
90,
234+
206,
252235
21,
253-
26,
236+
52,
254237
// sketchBinCnts
255238
(22 << 3) | 2,
256239
1,

0 commit comments

Comments
 (0)