Skip to content

Commit d8e4f0a

Browse files
authored
Merge pull request #211 from DataDog/vickenty/tsm
Implement sending metrics with a timestamp
2 parents d5676f6 + fc63a60 commit d8e4f0a

6 files changed

Lines changed: 254 additions & 21 deletions

File tree

src/main/java/com/timgroup/statsd/Message.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.timgroup.statsd;
22

33
import java.util.Arrays;
4+
import java.util.EnumSet;
45
import java.util.Objects;
6+
import java.util.Set;
57

68
public abstract class Message implements Comparable<Message> {
9+
710
final String aspect;
811
final Message.Type type;
912
final String[] tags;
@@ -34,6 +37,8 @@ public String toString() {
3437
}
3538
}
3639

40+
protected static final Set<Type> AGGREGATE_SET = EnumSet.of(Type.COUNT, Type.GAUGE, Type.SET);
41+
3742
protected Message(Message.Type type) {
3843
this("", type, null);
3944
}
@@ -92,13 +97,11 @@ public String[] getTags() {
9297

9398
/**
9499
* Return whether a message can be aggregated.
95-
* Not sure if this makes sense.
96100
*
97101
* @return boolean on whether or not this message type may be aggregated.
98102
*/
99103
public boolean canAggregate() {
100-
// return (this.type == m.type);
101-
return false;
104+
return AGGREGATE_SET.contains(type);
102105
}
103106

104107
public void setDone(boolean done) {

src/main/java/com/timgroup/statsd/NoOpStatsDClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ public final class NoOpStatsDClient implements StatsDClient {
2020

2121
@Override public void count(String aspect, double delta, double sampleRate, String... tags) { }
2222

23+
@Override public void countWithTimestamp(String aspect, long delta, long timestamp, String... tags) { }
24+
25+
@Override public void countWithTimestamp(String aspect, double delta, long timestamp, String... tags) { }
26+
2327
@Override public void incrementCounter(String aspect, String... tags) { }
2428

2529
@Override public void incrementCounter(String aspect, double sampleRate, String... tags) { }
@@ -52,6 +56,10 @@ public final class NoOpStatsDClient implements StatsDClient {
5256

5357
@Override public void gauge(String aspect, long value, double sampleRate, String... tags) { }
5458

59+
@Override public void gaugeWithTimestamp(String aspect, double value, long timestamp, String... tags) { }
60+
61+
@Override public void gaugeWithTimestamp(String aspect, long value, long timestamp, String... tags) { }
62+
5563
@Override public void recordExecutionTime(String aspect, long timeInMs, String... tags) { }
5664

5765
@Override public void recordExecutionTime(String aspect, long timeInMs, double sampleRate, String... tags) { }

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.ThreadLocalRandom;
2121
import java.util.concurrent.TimeUnit;
2222

23+
2324
/**
2425
* A simple StatsD client implementation facilitating metrics recording.
2526
*
@@ -42,6 +43,11 @@
4243
* IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed
4344
* not to throw an exception which may disrupt application execution.
4445
*
46+
* <p>Some methods allow recording a value for a specific point in time by taking an extra
47+
* timestamp parameter. Such values are exempt from aggregation and the value should indicate the
48+
* final metric value at the given time. Please refer to Datadog documentation for the range of
49+
* accepted timestamp values.
50+
*
4551
* <p>As part of a clean system shutdown, the {@link #stop()} method should be invoked
4652
* on any StatsD clients.</p>
4753
*
@@ -57,6 +63,8 @@ public class NonBlockingStatsDClient implements StatsDClient {
5763
private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ;
5864
public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED";
5965

66+
private static final long MIN_TIMESTAMP = 1;
67+
6068
enum Literal {
6169
SERVICE,
6270
ENV,
@@ -487,10 +495,12 @@ ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeo
487495

488496
abstract class StatsDMessage<T extends Number> extends NumericMessage<T> {
489497
final double sampleRate; // NaN for none
498+
final long timestamp; // zero for none
490499

491-
protected StatsDMessage(String aspect, Message.Type type, T value, double sampleRate, String[] tags) {
500+
protected StatsDMessage(String aspect, Message.Type type, T value, double sampleRate, long timestamp, String[] tags) {
492501
super(aspect, type, value, tags);
493502
this.sampleRate = sampleRate;
503+
this.timestamp = timestamp;
494504
}
495505

496506
@Override
@@ -501,6 +511,9 @@ public final void writeTo(StringBuilder builder, String containerID) {
501511
if (!Double.isNaN(sampleRate)) {
502512
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
503513
}
514+
if (timestamp != 0) {
515+
builder.append("|T").append(timestamp);
516+
}
504517
tagString(this.tags, builder);
505518
if (containerID != null && !containerID.isEmpty()) {
506519
builder.append("|c:").append(containerID);
@@ -509,6 +522,12 @@ public final void writeTo(StringBuilder builder, String containerID) {
509522
builder.append('\n');
510523
}
511524

525+
@Override
526+
public boolean canAggregate() {
527+
// Timestamped values can not be aggregated.
528+
return super.canAggregate() && this.timestamp == 0;
529+
}
530+
512531
protected abstract void writeValue(StringBuilder builder);
513532
}
514533

@@ -528,8 +547,8 @@ private boolean send(final Message message) {
528547
return success;
529548
}
530549

531-
// send double with sample rate
532-
private void send(String aspect, final double value, Message.Type type, double sampleRate, String[] tags) {
550+
// send double with sample rate and timestamp
551+
private void send(String aspect, final double value, Message.Type type, double sampleRate, long timestamp, String[] tags) {
533552
if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) {
534553
switch (type) {
535554
case COUNT:
@@ -542,21 +561,25 @@ private void send(String aspect, final double value, Message.Type type, double s
542561

543562
if (Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) {
544563

545-
sendMetric(new StatsDMessage<Double>(aspect, type, Double.valueOf(value), sampleRate, tags) {
564+
sendMetric(new StatsDMessage<Double>(aspect, type, Double.valueOf(value), sampleRate, timestamp, tags) {
546565
@Override protected void writeValue(StringBuilder builder) {
547566
builder.append(format(NUMBER_FORMATTER, this.value));
548567
}
549568
});
550569
}
551570
}
552571

572+
private void send(String aspect, final double value, Message.Type type, double sampleRate, String[] tags) {
573+
send(aspect, value, type, sampleRate, 0, tags);
574+
}
575+
553576
// send double without sample rate
554577
private void send(String aspect, final double value, Message.Type type, String[] tags) {
555-
send(aspect, value, type, Double.NaN, tags);
578+
send(aspect, value, type, Double.NaN, 0, tags);
556579
}
557580

558581
// send long with sample rate
559-
private void send(String aspect, final long value, Message.Type type, double sampleRate, String[] tags) {
582+
private void send(String aspect, final long value, Message.Type type, double sampleRate, long timestamp, String[] tags) {
560583
if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) {
561584
switch (type) {
562585
case COUNT:
@@ -569,17 +592,36 @@ private void send(String aspect, final long value, Message.Type type, double sam
569592

570593
if (Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) {
571594

572-
sendMetric(new StatsDMessage<Long>(aspect, type, value, sampleRate, tags) {
595+
sendMetric(new StatsDMessage<Long>(aspect, type, value, sampleRate, timestamp, tags) {
573596
@Override protected void writeValue(StringBuilder builder) {
574597
builder.append(this.value);
575598
}
576599
});
577600
}
578601
}
579602

603+
private void send(String aspect, final long value, Message.Type type, double sampleRate, String[] tags) {
604+
send(aspect, value, type, sampleRate, 0, tags);
605+
}
606+
580607
// send long without sample rate
581608
private void send(String aspect, final long value, Message.Type type, String[] tags) {
582-
send(aspect, value, type, Double.NaN, tags);
609+
send(aspect, value, type, Double.NaN, 0, tags);
610+
}
611+
612+
private void sendWithTimestamp(String aspect, final double value, Message.Type type, long timestamp, String[] tags) {
613+
if (timestamp < MIN_TIMESTAMP) {
614+
timestamp = MIN_TIMESTAMP;
615+
}
616+
send(aspect, value, type, Double.NaN, timestamp, tags);
617+
}
618+
619+
private void sendWithTimestamp(String aspect, final long value, Message.Type type, long timestamp, String[] tags) {
620+
if (timestamp < MIN_TIMESTAMP) {
621+
timestamp = MIN_TIMESTAMP;
622+
}
623+
624+
send(aspect, value, type, Double.NaN, timestamp, tags);
583625
}
584626

585627
/**
@@ -632,6 +674,22 @@ public void count(final String aspect, final double delta, final double sampleRa
632674
send(aspect, delta, Message.Type.COUNT, sampleRate, tags);
633675
}
634676

677+
/**
678+
* {@inheritDoc}
679+
*/
680+
@Override
681+
public void countWithTimestamp(final String aspect, final long value, final long timestamp, final String...tags) {
682+
sendWithTimestamp(aspect, value, Message.Type.COUNT, timestamp, tags);
683+
}
684+
685+
/**
686+
* {@inheritDoc}
687+
*/
688+
@Override
689+
public void countWithTimestamp(final String aspect, final double value, final long timestamp, final String...tags) {
690+
sendWithTimestamp(aspect, value, Message.Type.COUNT, timestamp, tags);
691+
}
692+
635693
/**
636694
* Increments the specified counter by one.
637695
*
@@ -776,7 +834,6 @@ public void gauge(final String aspect, final double value, final double sampleRa
776834
recordGaugeValue(aspect, value, sampleRate, tags);
777835
}
778836

779-
780837
/**
781838
* Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}.
782839
*/
@@ -793,6 +850,22 @@ public void gauge(final String aspect, final long value, final double sampleRate
793850
recordGaugeValue(aspect, value, sampleRate, tags);
794851
}
795852

853+
/**
854+
* {@inheritDoc}
855+
*/
856+
@Override
857+
public void gaugeWithTimestamp(final String aspect, final double value, final long timestamp, final String... tags) {
858+
sendWithTimestamp(aspect, value, Message.Type.GAUGE, timestamp, tags);
859+
}
860+
861+
/**
862+
* {@inheritDoc}
863+
*/
864+
@Override
865+
public void gaugeWithTimestamp(final String aspect, final long value, final long timestamp, final String... tags) {
866+
sendWithTimestamp(aspect, value, Message.Type.GAUGE, timestamp, tags);
867+
}
868+
796869
/**
797870
* Records an execution time in milliseconds for the specified named operation.
798871
*

src/main/java/com/timgroup/statsd/StatsDAggregator.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package com.timgroup.statsd;
22

33
import java.util.ArrayList;
4-
import java.util.EnumSet;
54
import java.util.HashMap;
65
import java.util.Iterator;
76
import java.util.Map;
8-
import java.util.Set;
97
import java.util.Timer;
108
import java.util.TimerTask;
119

@@ -15,8 +13,6 @@ public class StatsDAggregator {
1513
public static int DEFAULT_SHARDS = 4; // 4 partitions to reduce contention.
1614

1715
protected final String AGGREGATOR_THREAD_NAME = "statsd-aggregator-thread";
18-
protected static final Set<Message.Type> AGGREGATE_SET = EnumSet.of(Message.Type.COUNT, Message.Type.GAUGE,
19-
Message.Type.SET);
2016
protected final ArrayList<Map<Message, Message>> aggregateMetrics;
2117

2218
protected final int shardGranularity;
@@ -81,10 +77,6 @@ public void stop() {
8177
}
8278
}
8379

84-
public boolean isTypeAggregate(Message.Type type) {
85-
return AGGREGATE_SET.contains(type);
86-
}
87-
8880
/**
8981
* Aggregate a message if possible.
9082
*
@@ -93,7 +85,7 @@ public boolean isTypeAggregate(Message.Type type) {
9385
*
9486
* */
9587
public boolean aggregateMessage(Message message) {
96-
if (flushInterval == 0 || !isTypeAggregate(message.getType()) || message.getDone()) {
88+
if (flushInterval == 0 || !message.canAggregate() || message.getDone()) {
9789
return false;
9890
}
9991

src/main/java/com/timgroup/statsd/StatsDClient.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,48 @@ public interface StatsDClient extends Closeable {
101101
*/
102102
void count(String aspect, double delta, double sampleRate, String... tags);
103103

104+
/**
105+
* Set the counter metric at the given time to the specified value.
106+
*
107+
* <p>Values with an explicit timestamp are never aggregated and
108+
* will be recorded as the metric value at the given time.</p>
109+
*
110+
* <p>This method is a DataDog extension, and may not work with other servers.</p>
111+
*
112+
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
113+
*
114+
* @param aspect
115+
* the name of the counter to adjust
116+
* @param value
117+
* the amount to adjust the counter by
118+
* @param timestamp
119+
* timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z
120+
* @param tags
121+
* array of tags to be added to the data
122+
*/
123+
void countWithTimestamp(String aspect, long value, long timestamp, String... tags);
124+
125+
/**
126+
* Set the counter metric at the given time to the specified value.
127+
*
128+
* <p>Values with an explicit timestamp are never aggregated and
129+
* will be recorded as the metric value at the given time.</p>
130+
*
131+
* <p>This method is a DataDog extension, and may not work with other servers.</p>
132+
*
133+
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
134+
*
135+
* @param aspect
136+
* the name of the counter to adjust
137+
* @param value
138+
* the amount to adjust the counter by
139+
* @param timestamp
140+
* timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z
141+
* @param tags
142+
* array of tags to be added to the data
143+
*/
144+
void countWithTimestamp(String aspect, double value, long timestamp, String... tags);
145+
104146
/**
105147
* Increments the specified counter by one.
106148
*
@@ -323,8 +365,43 @@ public interface StatsDClient extends Closeable {
323365
* @param tags
324366
* array of tags to be added to the data
325367
*/
368+
326369
void gauge(String aspect, long value, double sampleRate, String... tags);
327370

371+
/**
372+
* Set the gauge metric at the given time to the specified value.
373+
*
374+
* <p>Values with an explicit timestamp are never aggregated and
375+
* will be recorded as the metric value at the given time.</p>
376+
*
377+
* @param aspect
378+
* the name of the gauge
379+
* @param value
380+
* the new reading of the gauge
381+
* @param timestamp
382+
* timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z
383+
* @param tags
384+
* array of tags to be added to the data
385+
*/
386+
void gaugeWithTimestamp(String aspect, double value, long timestamp, String... tags);
387+
388+
/**
389+
* Set the gauge metric at the given time to the specified value.
390+
*
391+
* <p>Values with an explicit timestamp are never aggregated and
392+
* will be recorded as the metric value at the given time.</p>
393+
*
394+
* @param aspect
395+
* the name of the gauge
396+
* @param value
397+
* the new reading of the gauge
398+
* @param timestamp
399+
* timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z
400+
* @param tags
401+
* array of tags to be added to the data
402+
*/
403+
void gaugeWithTimestamp(String aspect, long value, long timestamp, String... tags);
404+
328405
/**
329406
* Records an execution time in milliseconds for the specified named operation.
330407
*

0 commit comments

Comments
 (0)