Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ void setUp() {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(60L);
prometheusSinkConfig = mock(PrometheusSinkConfiguration.class);
when(prometheusSinkConfig.getMaxRetries()).thenReturn(5);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(0));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(0));
when(prometheusSinkConfig.getSanitizeNames()).thenReturn(false);
when(prometheusSinkConfig.getUrl()).thenReturn(remoteWriteUrl);
when(prometheusSinkConfig.getContentType()).thenReturn("application/x-protobuf");
Expand Down Expand Up @@ -299,7 +299,7 @@ private void getMetricsFromAMP(final String metricName, final String qs) throws
@ValueSource(ints = {0, 2, 5})
void TestSumMetrics(final int window) throws Exception {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(window));
PrometheusSink sink = createObjectUnderTest();
long startTimeSeconds = testStartTime.getEpochSecond();
Instant time = Instant.now();
Expand Down Expand Up @@ -407,7 +407,7 @@ void TestSumMetricsFailuresWithDLQ() throws Exception {
@ValueSource(ints = {0, 2, 5})
void TestSumMetricsFailuresWithoutDLQ(final int window) throws Exception {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(window));
when(thresholdConfig.getMaxEvents()).thenReturn(1);
PrometheusSink sink = createObjectUnderTest();

Expand Down Expand Up @@ -488,7 +488,7 @@ private Collection<Record<Event>> getSumRecordList(int numberOfRecords, final St
@ValueSource(ints = {0, 2, 5})
void TestGaugeMetrics(final int window) throws Exception {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(window));

PrometheusSink sink = createObjectUnderTest();
Collection<Record<Event>> records = getGaugeRecordList(NUM_RECORDS);
Expand Down Expand Up @@ -532,7 +532,7 @@ void TestGaugeMetrics(final int window) throws Exception {
@ValueSource(ints = {0, 2, 5})
void TestGaugeMetricsWithMaxRequestSizeLimitAndFlushTimeout(final int window) throws Exception {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(window));

when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(220L);
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(20L);
Expand Down Expand Up @@ -604,7 +604,7 @@ private Collection<Record<Event>> getGaugeRecordList(int numberOfRecords) {
@ValueSource(ints = {0, 2, 5})
void TestSummaryMetrics(final int window) throws Exception {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(window));

PrometheusSink sink = createObjectUnderTest();
Collection<Record<Event>> records = getSummaryRecordList(NUM_RECORDS);
Expand Down Expand Up @@ -716,7 +716,7 @@ private Collection<Record<Event>> getSummaryRecordList(int numberOfRecords) {
@ValueSource(ints = {0, 2, 5})
void TestHistogramMetrics(final int window) throws Exception {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(window));

PrometheusSink sink = createObjectUnderTest();
Collection<Record<Event>> records = getHistogramRecordList(NUM_RECORDS);
Expand Down Expand Up @@ -783,7 +783,7 @@ void TestHistogramMetrics(final int window) throws Exception {
@ValueSource(ints = {0, 2, 5})
void TestExponentialHistogramMetrics(final int window) throws Exception {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(window));

PrometheusSink sink = createObjectUnderTest();
Collection<Record<Event>> records = getExponentialHistogramRecordList(NUM_RECORDS);
Expand Down Expand Up @@ -853,7 +853,7 @@ void TestExponentialHistogramMetrics(final int window) throws Exception {
@ValueSource(ints = {0, 2, 5})
public void TestMultipleMetrics(final int window) throws Exception {
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
when(prometheusSinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(window));
when(thresholdConfig.getMaxEvents()).thenReturn(1);
long startTimeSeconds = testStartTime.getEpochSecond();
PrometheusSink sink = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class PrometheusSinkConfiguration {
private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(60);
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(60);
private static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(60);
private static final Duration DEFAULT_OUT_OF_ORDER_WINDOW = Duration.ofSeconds(5);
private static final Duration DEFAULT_OUT_OF_ORDER_TIME_WINDOW = Duration.ofSeconds(10);

@JsonProperty("aws")
@NotNull
Expand All @@ -45,8 +45,8 @@ public class PrometheusSinkConfiguration {
@JsonProperty("url")
private String url;

@JsonProperty("out_of_order_window")
private Duration outOfOrderWindow = DEFAULT_OUT_OF_ORDER_WINDOW;
@JsonProperty("out_of_order_time_window")
private Duration outOfOrderTimeWindow = DEFAULT_OUT_OF_ORDER_TIME_WINDOW;

@JsonProperty("max_retries")
private int maxRetries = DEFAULT_MAX_RETRIES;
Expand Down Expand Up @@ -112,8 +112,8 @@ public String getContentType() {
return contentType;
}

public Duration getOutOfOrderWindow() {
return outOfOrderWindow;
public Duration getOutOfOrderTimeWindow() {
return outOfOrderTimeWindow;
}

public String getRemoteWriteVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class PrometheusSinkBufferWriter implements SinkBufferWriter {
public PrometheusSinkBufferWriter(final PrometheusSinkConfiguration sinkConfig, final SinkMetrics sinkMetrics) {
this.buffer = new HashMap<>();
this.sinkMetrics = sinkMetrics;
this.outOfOrderWindowMillis = sinkConfig.getOutOfOrderWindow().toMillis();
this.outOfOrderWindowMillis = sinkConfig.getOutOfOrderTimeWindow().toMillis();
this.maxEvents = sinkConfig.getThresholdConfig().getMaxEvents();
this.maxRequestSize = sinkConfig.getThresholdConfig().getMaxRequestSizeBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void setUp() throws Exception {
when(sinkThresholdConfig.getMaxEvents()).thenReturn(3);
when(sinkThresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);
when(sinkConfig.getThresholdConfig()).thenReturn(sinkThresholdConfig);
when(sinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(0));
when(sinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(0));
sinkFlushContext = mock(PrometheusSinkFlushContext.class);
gauge1 = createGaugeMetric("gauge1", Instant.now(), 1.0d);
prometheusSinkBufferEntry = new PrometheusSinkBufferEntry(gauge1, true);
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testPrometheusSinkBufferWriterWithOutOfOrderEntries() throws Excepti
public void testGetBufferWithMultipleMetrics() throws Exception {
when(sinkThresholdConfig.getMaxEvents()).thenReturn(5);
when(sinkThresholdConfig.getMaxRequestSizeBytes()).thenReturn(100000L);
when(sinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(3));
when(sinkConfig.getOutOfOrderTimeWindow()).thenReturn(Duration.ofSeconds(3));
prometheusSinkBufferWriter = createObjectUnderTest();
Instant t1 = Instant.now();
Instant t2 = t1.minusSeconds(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class PrometheusSinkServiceTest {
" max_events: 2\n" +
" flush_interval: 10\n"+
" connection_timeout: 10\n"+
" out_of_order_window: 0\n" +
" out_of_order_time_window: 0\n" +
" idle_timeout: 10\n"+
" aws:\n" +
" region: \"us-east-2\"\n" +
Expand Down
Loading