Skip to content

Commit 89a9d9d

Browse files
kkondakasimonelbaz
authored andcommitted
Prometheus Sink: Fix setting DLQ pipeline, add NOISY marker for logs (opensearch-project#6388)
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
1 parent 7ab257c commit 89a9d9d

6 files changed

Lines changed: 17 additions & 22 deletions

File tree

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/PrometheusHttpSender.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
package org.opensearch.dataprepper.plugins.sink.prometheus;
1212

13+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
1314
import org.opensearch.dataprepper.plugins.sink.prometheus.configuration.PrometheusSinkConfiguration;
1415
import com.linecorp.armeria.client.WebClient;
1516
import com.linecorp.armeria.client.retry.Backoff;
@@ -145,12 +146,12 @@ public PrometheusPushResult pushToEndpoint(final byte[] payload) {
145146
return new PrometheusPushResult(handleResponse(statusCode, responseBytes), statusCode);
146147
})
147148
.exceptionally(throwable -> {
148-
LOG.error("Request failed", throwable);
149+
LOG.error(NOISY, "Request failed", throwable);
149150
return new PrometheusPushResult(false, 0);
150151
})
151152
.join(); // Wait for completion
152153
} catch (Exception e) {
153-
LOG.error("Failed to execute request", e);
154+
LOG.error(NOISY, "Failed to execute request", e);
154155
result = new PrometheusPushResult(false, 0);
155156
}
156157
return result;
@@ -210,7 +211,7 @@ private boolean handleResponse(final int statusCode, final byte[] responseBytes)
210211
? new String(responseBytes, StandardCharsets.UTF_8)
211212
: "<no body>";
212213

213-
LOG.error("Non-successful Prometheus server response. Status: {}, Response: {}",
214+
LOG.error(NOISY, "Non-successful Prometheus server response. Status: {}, Response: {}",
214215
statusCode, responseBody);
215216
return false;
216217
}

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/PrometheusSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ public PrometheusSink(final PluginSetting pluginSetting,
6666
prometheusSinkConfiguration,
6767
sinkMetrics,
6868
httpSender,
69-
getFailurePipeline(),
7069
pipelineDescription);
7170
}
7271

@@ -87,6 +86,7 @@ public boolean isReady() {
8786
@Override
8887
public void doInitialize() {
8988
sinkInitialized = Boolean.TRUE;
89+
prometheusSinkService.setDlqPipeline(getFailurePipeline());
9090
}
9191

9292
@VisibleForTesting

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkBufferWriterEntry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*/
1010
package org.opensearch.dataprepper.plugins.sink.prometheus.service;
1111

12+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
1213
import com.google.common.annotations.VisibleForTesting;
1314
import software.amazon.awssdk.utils.Pair;
1415
import org.slf4j.Logger;
@@ -53,7 +54,7 @@ public boolean add(final PrometheusSinkBufferEntry bufferEntry) {
5354

5455
entries.put(time, bufferEntry);
5556
if (entries.size() > maxEntries) {
56-
LOG.warn("Number of entries exceeded maxEntries");
57+
LOG.warn(NOISY, "Number of entries exceeded maxEntries");
5758
return false;
5859
}
5960
return true;

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010

1111
package org.opensearch.dataprepper.plugins.sink.prometheus.service;
1212

13-
import com.google.common.annotations.VisibleForTesting;
14-
1513
import org.opensearch.dataprepper.common.sink.DefaultSinkOutputStrategy;
1614
import org.opensearch.dataprepper.common.sink.SinkMetrics;
1715
import org.opensearch.dataprepper.common.sink.SinkBufferEntry;
@@ -22,16 +20,13 @@
2220
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
2321
import org.opensearch.dataprepper.plugins.sink.prometheus.configuration.PrometheusSinkConfiguration;
2422
import org.opensearch.dataprepper.plugins.sink.prometheus.PrometheusHttpSender;
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
2723

2824
import java.util.ArrayList;
2925
import java.util.Collection;
3026
import java.util.List;
3127

3228
public class PrometheusSinkService extends DefaultSinkOutputStrategy {
3329
static final String PLUGIN_NAME = "prometheus";
34-
private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkService.class);
3530
public static final String PROMETHEUS_SINK_RECORDS_SUCCESS_COUNTER = "prometheusSinkRecordsNumberOfSuccessful";
3631

3732
public static final String PROMETHEUS_SINK_RECORDS_FAILED_COUNTER = "prometheusSinkRecordsNumberOfFailed";
@@ -46,7 +41,6 @@ public class PrometheusSinkService extends DefaultSinkOutputStrategy {
4641
public PrometheusSinkService(final PrometheusSinkConfiguration prometheusSinkConfiguration,
4742
final SinkMetrics sinkMetrics,
4843
final PrometheusHttpSender httpSender,
49-
final HeadlessPipeline dlqPipeline,
5044
final PipelineDescription pipelineDescription) {
5145
super(new ReentrantLockStrategy(),
5246
new PrometheusSinkBuffer(prometheusSinkConfiguration.getThresholdConfig().getMaxEvents(),
@@ -57,7 +51,6 @@ public PrometheusSinkService(final PrometheusSinkConfiguration prometheusSinkCon
5751
sinkMetrics);
5852
sanitizeNames = prometheusSinkConfiguration.getSanitizeNames();
5953
this.dropIfNoDLQConfigured = false;
60-
this.dlqPipeline = dlqPipeline;
6154
this.dlqRecords = new ArrayList<>();
6255
this.httpSender = httpSender;
6356
this.pipelineDescription = pipelineDescription;
@@ -73,7 +66,6 @@ public SinkBufferEntry getSinkBufferEntry(final Event event) throws Exception {
7366
return new PrometheusSinkBufferEntry(event, sanitizeNames);
7467
}
7568

76-
@VisibleForTesting
7769
public void setDlqPipeline(HeadlessPipeline pipeline) {
7870
this.dlqPipeline = pipeline;
7971
}

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusTimeSeries.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
package org.opensearch.dataprepper.plugins.sink.prometheus.service;
1212

13+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
14+
1315
import com.arpnetworking.metrics.prometheus.Types.Label;
1416
import com.arpnetworking.metrics.prometheus.Types.Sample;
1517
import com.arpnetworking.metrics.prometheus.Types.TimeSeries;
@@ -161,7 +163,7 @@ private long processResourceAndScopeAttributes(Metric metric) {
161163
size += processAttributes(scopeAttributes, SCOPE_PREFIX);
162164
}
163165
} catch (Exception e) {
164-
LOG.warn("Failed to get resource/scope attributes", e);
166+
LOG.warn(NOISY, "Failed to get resource/scope attributes", e);
165167
}
166168
return size;
167169
}

data-prepper-plugins/prometheus-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkServiceTest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,19 +109,18 @@ void setup() throws IOException {
109109

110110
}
111111

112-
PrometheusSinkService createObjectUnderTest(final PrometheusSinkConfiguration prometheusSinkConfig, final HeadlessPipeline dlqPipeline) {
112+
PrometheusSinkService createObjectUnderTest(final PrometheusSinkConfiguration prometheusSinkConfig) {
113113
return new PrometheusSinkService(
114114
prometheusSinkConfig,
115115
sinkMetrics,
116116
httpSender,
117-
dlqPipeline,
118117
pipelineDescription);
119118
}
120119

121120
@Test
122121
void prometheusSinkServiceTestSuccessfulOutput() throws NoSuchFieldException, IllegalAccessException {
123122
when(httpSender.pushToEndpoint(any())).thenReturn(new PrometheusPushResult(true, 0));
124-
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration, null);
123+
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration);
125124
JacksonGauge gauge1 = createGaugeMetric("gauge1", null);
126125
JacksonGauge gauge2 = createGaugeMetric("gauge2", null);
127126
Collection<Record<Event>> records = List.of(new Record<>(gauge1), new Record<>(gauge2));
@@ -136,7 +135,7 @@ void prometheusSinkServiceTestSuccessfulOutputWithWindow1() throws NoSuchFieldEx
136135
String newYaml = SINK_YAML.replace("out_of_order_window: 0", "out_of_order_window: 1");
137136
this.prometheusSinkConfiguration = objectMapper.readValue(newYaml,PrometheusSinkConfiguration.class);
138137
when(httpSender.pushToEndpoint(any())).thenReturn(new PrometheusPushResult(true, 0));
139-
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration, null);
138+
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration);
140139
Instant t = Instant.now();
141140
JacksonGauge gauge1 = createGaugeMetric("gauge1", t);
142141
JacksonGauge gauge2 = createGaugeMetric("gauge1", t.plusMillis(100));
@@ -163,7 +162,7 @@ void prometheusSinkServiceTestFailedOutput() throws NoSuchFieldException, Illega
163162
}
164163
return null;
165164
}).when(dlqPipeline).sendEvents(any(Collection.class));
166-
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration, null);
165+
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration);
167166
objectUnderTest.setDlqPipeline(dlqPipeline);
168167
JacksonGauge gauge1 = createGaugeMetric("gauge1", null);
169168
JacksonGauge gauge2 = createGaugeMetric("gauge2", null);
@@ -177,7 +176,7 @@ void prometheusSinkServiceTestFailedOutput() throws NoSuchFieldException, Illega
177176
@Test
178177
void prometheusSinkServiceTestFailedOutputWithNoDLQ() throws NoSuchFieldException, IllegalAccessException {
179178
when(httpSender.pushToEndpoint(any())).thenReturn(new PrometheusPushResult(false, 410));
180-
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration, null);
179+
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration);
181180
JacksonGauge gauge1 = createGaugeMetric("gauge1", null);
182181
JacksonGauge gauge2 = createGaugeMetric("gauge2", null);
183182
Collection<Record<Event>> records = List.of(new Record<>(gauge1), new Record<>(gauge2));
@@ -190,7 +189,7 @@ void prometheusSinkServiceTestFailedOutputWithNoDLQ() throws NoSuchFieldExceptio
190189
@Test
191190
void prometheusSinkServiceTestWithExceptionInHttpSender() throws NoSuchFieldException, IllegalAccessException {
192191
when(httpSender.pushToEndpoint(any())).thenThrow(new RuntimeException("exception"));
193-
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration, null);
192+
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration);
194193
JacksonGauge gauge1 = createGaugeMetric("gauge1", null);
195194
JacksonGauge gauge2 = createGaugeMetric("gauge2", null);
196195
Collection<Record<Event>> records = List.of(new Record<>(gauge1), new Record<>(gauge2));
@@ -204,7 +203,7 @@ void prometheusSinkServiceTestWithExceptionInHttpSender() throws NoSuchFieldExce
204203

205204
@Test
206205
void prometheus_sink_service_test_output_with_zero_record() throws NoSuchFieldException, IllegalAccessException {
207-
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration, null);
206+
final PrometheusSinkService objectUnderTest = createObjectUnderTest(prometheusSinkConfiguration);
208207
Collection<Record<Event>> records = List.of();
209208
objectUnderTest.output(records);
210209
}

0 commit comments

Comments
 (0)