Skip to content

Commit 5466050

Browse files
authored
Merge pull request #310 from DataDog/vickenty/ske
AGTMETRICS-489 Support Sketch class in PayloadBuilder
2 parents 7ac5c1a + 88cc0a9 commit 5466050

2 files changed

Lines changed: 152 additions & 77 deletions

File tree

dogstatsd-http-core/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,36 @@
77

88
package com.datadoghq.dogstatsd.http.serializer;
99

10+
import com.datadoghq.dogstatsd.Sketch;
11+
import java.nio.BufferOverflowException;
12+
1013
/** Builder for sketch timeseries. */
1114
public class SketchMetric extends Metric<SketchMetric> {
15+
static final long maxBinCount = (1L << 32) - 1;
16+
static final long maxBinBytes = ProtoUtil.varintLen(maxBinCount);
17+
18+
static class BinConsumer implements Sketch.BinConsumer {
19+
int numBins;
20+
ColumnarBuffer r;
21+
DeltaEncoder dk = new DeltaEncoder();
22+
23+
BinConsumer(ColumnarBuffer record) {
24+
r = record;
25+
}
26+
27+
@Override
28+
public void consumeBin(short key, long count) {
29+
while (count > maxBinCount) {
30+
r.putSint64(Column.sketchBinKeys, dk.encode(key));
31+
r.putUint64(Column.sketchBinCnts, maxBinCount);
32+
count -= maxBinCount;
33+
numBins++;
34+
}
35+
r.putSint64(Column.sketchBinKeys, dk.encode(key));
36+
r.putUint64(Column.sketchBinCnts, count);
37+
numBins++;
38+
}
39+
}
1240

1341
SketchMetric(PayloadBuilder pb, int type, String name) {
1442
super(pb, type, name);
@@ -20,44 +48,29 @@ protected SketchMetric self() {
2048
}
2149

2250
/**
23-
* Add a new timeseries point.
51+
* Add a new timeseries point sourced from a {@link Sketch}.
2452
*
2553
* @param timestamp Timestamp of the point in seconds since Unix epoch.
26-
* @param sum Total sum of all observed values.
27-
* @param min Minimum observed value.
28-
* @param max Maximum observed value.
29-
* @param cnt Number of observed values.
30-
* @param binKeys Array of keys for each bin in the sketch.
31-
* @param binCnts Array of number of observations for each bin.
54+
* @param sketch Sketch supplying the summary statistics and bin distribution.
3255
* @return This.
3356
*/
34-
public SketchMetric addPoint(
35-
long timestamp,
36-
double sum,
37-
double min,
38-
double max,
39-
long cnt,
40-
int[] binKeys,
41-
int[] binCnts) {
42-
43-
if (binKeys.length != binCnts.length) {
44-
throw new IllegalArgumentException("binKeys and binCnts must have the same length");
57+
public SketchMetric addPoint(long timestamp, Sketch sketch) {
58+
// Skip doing the work if just the bin data would exceed payload size limit.
59+
if (sketch.count() / maxBinCount * maxBinBytes >= pb.maxPayloadSize) {
60+
throw new BufferOverflowException();
4561
}
4662

4763
pb.timestamps.put(timestamp);
48-
pb.values.put(sum);
49-
pb.values.put(min);
50-
pb.values.put(max);
51-
pb.counts.put(cnt);
64+
pb.values.put(sketch.sum());
65+
pb.values.put(sketch.min());
66+
pb.values.put(sketch.max());
67+
pb.counts.put(sketch.count());
5268

53-
ColumnarBuffer r = pb.currentRecord();
54-
DeltaEncoder dk = new DeltaEncoder();
69+
final ColumnarBuffer r = pb.currentRecord();
70+
final BinConsumer bc = new BinConsumer(r);
5571

56-
r.putUint64(Column.sketchNumBins, binKeys.length);
57-
for (int i = 0; i < binKeys.length; i++) {
58-
r.putSint64(Column.sketchBinKeys, dk.encode(binKeys[i]));
59-
r.putUint64(Column.sketchBinCnts, binCnts[i]);
60-
}
72+
sketch.bins(bc);
73+
r.putUint64(Column.sketchNumBins, bc.numBins);
6174

6275
return this;
6376
}

dogstatsd-http-core/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java

Lines changed: 110 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.junit.Assert.assertEquals;
1111
import static org.junit.Assert.assertTrue;
1212

13+
import com.datadoghq.dogstatsd.Sketch;
1314
import java.util.ArrayList;
1415
import java.util.Arrays;
1516
import org.junit.Test;
@@ -42,13 +43,20 @@ public void handle(byte[] p) {
4243

4344
b.gauge("defgh").addPoint(100, 0).close();
4445

46+
Sketch sketch1 = new Sketch();
47+
sketch1.build(new long[] {1, 2, 2}, 1.0);
48+
Sketch sketch2 = new Sketch();
49+
sketch2.build(new long[] {2, 2, 3, 3, 3}, 5e-10);
50+
4551
b.sketch("ijk")
4652
.setTags(Arrays.asList(new String[] {"foo", "baz"}))
47-
.addPoint(100, 4.75, 1.25, 1.75, 3, new int[] {1351, 1373}, new int[] {1, 2})
48-
.addPoint(110, 6.5, 2.25, 2.75, 5, new int[] {1389, 1402}, new int[] {2, 3})
53+
.addPoint(100, sketch1)
54+
.addPoint(110, sketch2)
4955
.close();
5056

5157
b.rate("lm").setInterval(10).addPoint(100, 3.14).close();
58+
b.rate("no").addPoint(100, 1).addPoint(110, 1.5).close();
59+
b.rate("pq").addPoint(100, 1L << 25).addPoint(110, 1.5).close();
5260

5361
b.close();
5462

@@ -60,11 +68,11 @@ public void handle(byte[] p) {
6068
new int[] {
6169
// MetricData
6270
(3 << 3) | 2,
63-
188,
71+
243,
6472
1,
6573
// dictNameStr
6674
(1 << 3) | 2,
67-
17,
75+
23,
6876
3,
6977
97,
7078
98,
@@ -82,6 +90,12 @@ public void handle(byte[] p) {
8290
2,
8391
108,
8492
109, // lm
93+
2,
94+
110,
95+
111, // no
96+
2,
97+
112,
98+
113, // pq
8599
// dictTagsStr
86100
(2 << 3) | 2,
87101
12,
@@ -135,98 +149,112 @@ public void handle(byte[] p) {
135149
0,
136150
// types
137151
(10 << 3) | 2,
138-
4,
152+
6,
139153
0x11,
140154
0x03,
141-
0x24,
155+
0x14,
156+
0x32,
157+
0x22,
142158
0x32,
143159
// names
144160
(11 << 3) | 2,
145-
4,
161+
6,
162+
2,
163+
2,
146164
2,
147165
2,
148166
2,
149167
2,
150168
// tags
151169
(12 << 3) | 2,
152-
4,
170+
6,
153171
2,
154172
1,
155173
4,
156174
3,
175+
0,
176+
0,
157177
// resources
158178
(13 << 3) | 2,
159-
4,
179+
6,
160180
2,
161181
1,
162182
0,
163183
0,
184+
0,
185+
0,
164186
// intervals
165187
(14 << 3) | 2,
166-
4,
188+
6,
167189
0,
168190
0,
169191
0,
170192
10,
193+
0,
194+
0,
171195
// numPoints
172196
(15 << 3) | 2,
173-
4,
197+
6,
174198
2,
175199
1,
176200
2,
177201
1,
202+
2,
203+
2,
178204
// timestamps
179205
(16 << 3) | 2,
180206
1,
181-
7,
207+
11,
182208
200,
183209
1,
184210
20,
185211
19,
186212
0,
187213
20,
188214
19,
215+
0,
216+
20,
217+
19,
218+
20,
189219
// valsSint64
190220
(17 << 3) | 2,
191221
1,
222+
19,
223+
2,
192224
4,
225+
10,
193226
2,
194227
4,
195228
6,
196-
10,
197-
// valsFloat32,
198-
// list(pack('<ffffff', 4.75, 1.25, 1.75, 6.5, 2.25, 2.75))
229+
128,
230+
144,
231+
196,
232+
219,
233+
193,
234+
1,
235+
4,
236+
6,
237+
246,
238+
143,
239+
223,
240+
192,
241+
74,
242+
// valsFloat32, list(pack('<ff', 1, 1.5))
199243
(18 << 3) | 2,
200244
1,
201-
24,
202-
0,
203-
0,
204-
152,
205-
64,
245+
8,
206246
0,
207247
0,
208-
160,
248+
128,
209249
63,
210250
0,
211251
0,
212-
224,
252+
192,
213253
63,
214-
0,
215-
0,
216-
208,
217-
64,
218-
0,
219-
0,
220-
16,
221-
64,
222-
0,
223-
0,
224-
48,
225-
64,
226-
// valsFloat64, list(pack('<d', 3.14))
254+
// valsFloat64, list(pack('<ddd', 3.14, 1<<25, 1.5))
227255
(19 << 3) | 2,
228256
1,
229-
8,
257+
24,
230258
31,
231259
133,
232260
235,
@@ -235,46 +263,80 @@ public void handle(byte[] p) {
235263
30,
236264
9,
237265
64,
266+
0,
267+
0,
268+
0,
269+
0,
270+
0,
271+
0,
272+
128,
273+
65,
274+
0,
275+
0,
276+
0,
277+
0,
278+
0,
279+
0,
280+
248,
281+
63,
238282
// sketchNumBins
239283
(20 << 3) | 2,
240284
1,
241285
2,
242286
2,
243-
2,
287+
3,
244288
// sketchBinKeys
245289
(21 << 3) | 2,
246290
1,
247-
6,
248-
142,
249-
21,
250-
44,
251-
218,
291+
7,
292+
244,
293+
20,
294+
90,
295+
206,
252296
21,
253-
26,
297+
52,
298+
0,
254299
// sketchBinCnts
255300
(22 << 3) | 2,
256301
1,
257-
4,
302+
17,
258303
1,
259304
2,
260-
2,
261-
3,
305+
254,
306+
207,
307+
172,
308+
243,
309+
14,
310+
255,
311+
255,
312+
255,
313+
255,
314+
15,
315+
254,
316+
247,
317+
130,
318+
173,
319+
6,
262320
// sourceTypeName
263321
(23 << 3) | 2,
264322
1,
265-
4,
323+
6,
324+
0,
325+
0,
266326
0,
267327
0,
268328
0,
269329
0,
270330
// origins
271331
(24 << 3) | 2,
272332
1,
273-
4,
333+
6,
274334
2,
275335
0,
276336
0,
277337
0,
338+
0,
339+
0,
278340
});
279341
}
280342

0 commit comments

Comments
 (0)