Skip to content

Commit 713b2d2

Browse files
Add Prometheus scrape/pull source to prometheus plugin
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent 8f4589f commit 713b2d2

18 files changed

Lines changed: 3273 additions & 2 deletions

data-prepper-plugins/prometheus-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusRemoteWriteSourceIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,6 @@ void testEmptyWriteRequestReturns204() throws Exception {
336336
assertThat(response.status(), equalTo(HttpStatus.NO_CONTENT));
337337
}
338338

339-
// Helper methods
340-
341339
private AggregatedHttpResponse sendWriteRequest(final Remote.WriteRequest request) throws IOException {
342340
final byte[] compressed = Snappy.compress(request.toByteArray());
343341
final HttpRequest httpRequest = HttpRequest.of(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.prometheus;
12+
13+
import com.linecorp.armeria.common.HttpResponse;
14+
import com.linecorp.armeria.common.HttpStatus;
15+
import com.linecorp.armeria.common.MediaType;
16+
import com.linecorp.armeria.server.Server;
17+
import io.micrometer.core.instrument.Counter;
18+
import io.micrometer.core.instrument.Timer;
19+
import org.junit.jupiter.api.AfterEach;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
import org.opensearch.dataprepper.metrics.PluginMetrics;
23+
import org.opensearch.dataprepper.model.buffer.Buffer;
24+
import org.opensearch.dataprepper.model.event.Event;
25+
import org.opensearch.dataprepper.model.metric.Gauge;
26+
import org.opensearch.dataprepper.model.metric.Histogram;
27+
import org.opensearch.dataprepper.model.metric.Sum;
28+
import org.opensearch.dataprepper.model.metric.Summary;
29+
import org.opensearch.dataprepper.model.record.Record;
30+
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
31+
32+
import java.time.Duration;
33+
import java.util.ArrayList;
34+
import java.util.Collection;
35+
import java.util.List;
36+
37+
import static org.hamcrest.CoreMatchers.equalTo;
38+
import static org.hamcrest.CoreMatchers.instanceOf;
39+
import static org.hamcrest.MatcherAssert.assertThat;
40+
import static org.hamcrest.Matchers.empty;
41+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
42+
import static org.hamcrest.Matchers.hasSize;
43+
import static org.hamcrest.Matchers.is;
44+
import static org.mockito.ArgumentMatchers.anyString;
45+
import static org.mockito.Mockito.doAnswer;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.when;
48+
49+
/**
50+
* Integration test for the Prometheus scrape source.
51+
* Starts real Armeria HTTP servers that serve Prometheus text exposition format,
52+
* creates a real PrometheusScrapeService that scrapes them, and verifies
53+
* parsed metric events appear in a real BlockingBuffer.
54+
*/
55+
class PrometheusScrapeSourceIT {
56+
57+
private static final int BUFFER_SIZE = 4096;
58+
private static final int BATCH_SIZE = 256;
59+
private static final int BUFFER_WRITE_TIMEOUT_MS = 5000;
60+
61+
private final List<Server> servers = new ArrayList<>();
62+
private Buffer<Record<Event>> buffer;
63+
private PrometheusScrapeService scrapeService;
64+
65+
@BeforeEach
66+
void setUp() {
67+
buffer = new BlockingBuffer<>(BUFFER_SIZE, BATCH_SIZE, "test-pipeline");
68+
}
69+
70+
@AfterEach
71+
void tearDown() {
72+
if (scrapeService != null) {
73+
scrapeService.stop();
74+
}
75+
for (final Server server : servers) {
76+
server.stop().join();
77+
}
78+
}
79+
80+
@Test
81+
void testScrapeGaugeMetric() throws Exception {
82+
final String metricsBody = "# TYPE cpu_temp gauge\n"
83+
+ "cpu_temp{host=\"server1\"} 72.5 1706869800000\n";
84+
85+
final Server server = startMetricsServer(metricsBody);
86+
startScrapeService(List.of(targetUrl(server)));
87+
88+
final List<Event> events = drainBuffer();
89+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
90+
91+
final Event event = events.get(0);
92+
assertThat(event, instanceOf(Gauge.class));
93+
assertThat(event.get("kind", String.class), equalTo("GAUGE"));
94+
assertThat(event.get("name", String.class), equalTo("cpu_temp"));
95+
assertThat(event.get("value", Double.class), equalTo(72.5));
96+
}
97+
98+
@Test
99+
void testScrapeCounterMetric() throws Exception {
100+
final String metricsBody = "# TYPE http_requests counter\n"
101+
+ "http_requests_total{method=\"GET\"} 42 1706869800000\n";
102+
103+
final Server server = startMetricsServer(metricsBody);
104+
startScrapeService(List.of(targetUrl(server)));
105+
106+
final List<Event> events = drainBuffer();
107+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
108+
109+
final Event event = events.get(0);
110+
assertThat(event, instanceOf(Sum.class));
111+
assertThat(event.get("kind", String.class), equalTo("SUM"));
112+
assertThat(event.get("name", String.class), equalTo("http_requests"));
113+
assertThat(event.get("isMonotonic", Boolean.class), equalTo(true));
114+
}
115+
116+
@Test
117+
void testScrapeHistogramMetric() throws Exception {
118+
final String metricsBody = "# TYPE req_duration histogram\n"
119+
+ "req_duration_bucket{method=\"GET\",le=\"0.1\"} 5 1706869800000\n"
120+
+ "req_duration_bucket{method=\"GET\",le=\"0.5\"} 10 1706869800000\n"
121+
+ "req_duration_bucket{method=\"GET\",le=\"1.0\"} 15 1706869800000\n"
122+
+ "req_duration_bucket{method=\"GET\",le=\"+Inf\"} 20 1706869800000\n"
123+
+ "req_duration_count{method=\"GET\"} 20 1706869800000\n"
124+
+ "req_duration_sum{method=\"GET\"} 5.5 1706869800000\n";
125+
126+
final Server server = startMetricsServer(metricsBody);
127+
startScrapeService(List.of(targetUrl(server)));
128+
129+
final List<Event> events = drainBuffer();
130+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
131+
132+
final Event event = events.get(0);
133+
assertThat(event, instanceOf(Histogram.class));
134+
assertThat(event.get("kind", String.class), equalTo("HISTOGRAM"));
135+
assertThat(event.get("name", String.class), equalTo("req_duration"));
136+
assertThat(event.get("count", Long.class), equalTo(20L));
137+
138+
final List<Long> bucketCounts = event.get("bucketCountsList", List.class);
139+
assertThat(bucketCounts, hasSize(4));
140+
assertThat(bucketCounts.get(0), equalTo(5L));
141+
assertThat(bucketCounts.get(1), equalTo(5L));
142+
assertThat(bucketCounts.get(2), equalTo(5L));
143+
assertThat(bucketCounts.get(3), equalTo(5L));
144+
145+
final List<Double> bounds = event.get("explicitBounds", List.class);
146+
assertThat(bounds, hasSize(3));
147+
}
148+
149+
@Test
150+
void testScrapeSummaryMetric() throws Exception {
151+
final String metricsBody = "# TYPE rpc_latency summary\n"
152+
+ "rpc_latency{service=\"api\",quantile=\"0.5\"} 0.2 1706869800000\n"
153+
+ "rpc_latency{service=\"api\",quantile=\"0.99\"} 0.8 1706869800000\n"
154+
+ "rpc_latency_count{service=\"api\"} 1000 1706869800000\n"
155+
+ "rpc_latency_sum{service=\"api\"} 300.5 1706869800000\n";
156+
157+
final Server server = startMetricsServer(metricsBody);
158+
startScrapeService(List.of(targetUrl(server)));
159+
160+
final List<Event> events = drainBuffer();
161+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
162+
163+
final Event event = events.get(0);
164+
assertThat(event, instanceOf(Summary.class));
165+
assertThat(event.get("kind", String.class), equalTo("SUMMARY"));
166+
assertThat(event.get("name", String.class), equalTo("rpc_latency"));
167+
assertThat(event.get("count", Long.class), equalTo(1000L));
168+
169+
final List<?> quantiles = event.get("quantiles", List.class);
170+
assertThat(quantiles, hasSize(2));
171+
}
172+
173+
@Test
174+
void testScrapeMultipleTargets() throws Exception {
175+
final String metricsBody1 = "# TYPE cpu_temp gauge\n"
176+
+ "cpu_temp{host=\"server1\"} 72.5 1706869800000\n";
177+
final String metricsBody2 = "# TYPE memory_usage gauge\n"
178+
+ "memory_usage{host=\"server2\"} 4096 1706869800000\n";
179+
180+
final Server server1 = startMetricsServer(metricsBody1);
181+
final Server server2 = startMetricsServer(metricsBody2);
182+
startScrapeService(List.of(targetUrl(server1), targetUrl(server2)));
183+
184+
final List<Event> events = drainBuffer(2);
185+
assertThat(events, hasSize(greaterThanOrEqualTo(2)));
186+
187+
final long cpuTempCount = events.stream()
188+
.filter(e -> "cpu_temp".equals(e.get("name", String.class)))
189+
.count();
190+
final long memoryUsageCount = events.stream()
191+
.filter(e -> "memory_usage".equals(e.get("name", String.class)))
192+
.count();
193+
194+
assertThat(cpuTempCount, is(greaterThanOrEqualTo(1L)));
195+
assertThat(memoryUsageCount, is(greaterThanOrEqualTo(1L)));
196+
}
197+
198+
@Test
199+
void testScrapeNon2xxResponse() throws Exception {
200+
final Server server = Server.builder()
201+
.http(0)
202+
.service("/metrics", (ctx, req) ->
203+
HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR))
204+
.build();
205+
server.start().join();
206+
servers.add(server);
207+
208+
startScrapeService(List.of(targetUrl(server)));
209+
210+
Thread.sleep(3000);
211+
212+
final List<Event> events = drainBufferImmediate();
213+
assertThat(events, is(empty()));
214+
}
215+
216+
private Server startMetricsServer(final String metricsBody) {
217+
final Server server = Server.builder()
218+
.http(0)
219+
.service("/metrics", (ctx, req) ->
220+
HttpResponse.of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, metricsBody))
221+
.build();
222+
server.start().join();
223+
servers.add(server);
224+
return server;
225+
}
226+
227+
private String targetUrl(final Server server) {
228+
return "http://127.0.0.1:" + server.activeLocalPort() + "/metrics";
229+
}
230+
231+
private void startScrapeService(final List<String> targetUrls) {
232+
final PrometheusScrapeConfig config = mock(PrometheusScrapeConfig.class);
233+
234+
final List<ScrapeTargetConfig> targets = new ArrayList<>();
235+
for (final String url : targetUrls) {
236+
final ScrapeTargetConfig target = mock(ScrapeTargetConfig.class);
237+
when(target.getUrl()).thenReturn(url);
238+
targets.add(target);
239+
}
240+
241+
when(config.getTargets()).thenReturn(targets);
242+
when(config.getScrapeInterval()).thenReturn(Duration.ofSeconds(1));
243+
when(config.getScrapeTimeout()).thenReturn(Duration.ofSeconds(5));
244+
when(config.isFlattenLabels()).thenReturn(false);
245+
when(config.isInsecure()).thenReturn(false);
246+
when(config.getAuthentication()).thenReturn(null);
247+
when(config.getSslCertificateFile()).thenReturn(null);
248+
249+
final PluginMetrics pluginMetrics = mock(PluginMetrics.class);
250+
when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
251+
final Timer timer = mock(Timer.class);
252+
doAnswer(invocation -> {
253+
final Runnable runnable = invocation.getArgument(0);
254+
runnable.run();
255+
return null;
256+
}).when(timer).record(org.mockito.ArgumentMatchers.<Runnable>any());
257+
when(pluginMetrics.timer(anyString())).thenReturn(timer);
258+
259+
scrapeService = new PrometheusScrapeService(config, buffer, BUFFER_WRITE_TIMEOUT_MS, pluginMetrics);
260+
scrapeService.start();
261+
}
262+
263+
private List<Event> drainBuffer() {
264+
return drainBuffer(1);
265+
}
266+
267+
private List<Event> drainBuffer(final int minExpected) {
268+
final List<Event> events = new ArrayList<>();
269+
final long deadline = System.currentTimeMillis() + 10_000;
270+
while (System.currentTimeMillis() < deadline) {
271+
final Collection<Record<Event>> records = buffer.read(100).getKey();
272+
for (final Record<Event> record : records) {
273+
events.add(record.getData());
274+
}
275+
if (events.size() >= minExpected) {
276+
return events;
277+
}
278+
if (records.isEmpty()) {
279+
try {
280+
Thread.sleep(100);
281+
} catch (final InterruptedException e) {
282+
Thread.currentThread().interrupt();
283+
break;
284+
}
285+
}
286+
}
287+
return events;
288+
}
289+
290+
private List<Event> drainBufferImmediate() {
291+
final List<Event> events = new ArrayList<>();
292+
final Collection<Record<Event>> records = buffer.read(100).getKey();
293+
for (final Record<Event> record : records) {
294+
events.add(record.getData());
295+
}
296+
return events;
297+
}
298+
}

data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusRemoteWriteSource.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class PrometheusRemoteWriteSource extends BaseHttpSource<Record<Event>> {
4646
private static final String SOURCE_NAME = "Prometheus Remote Write";
4747

4848
private final PrometheusRemoteWriteSourceConfig sourceConfig;
49+
private final PluginMetrics pluginMetrics;
50+
private PrometheusScrapeService scrapeService;
4951

5052
@DataPrepperPluginConstructor
5153
public PrometheusRemoteWriteSource(final PrometheusRemoteWriteSourceConfig sourceConfig,
@@ -55,6 +57,29 @@ public PrometheusRemoteWriteSource(final PrometheusRemoteWriteSourceConfig sourc
5557
super(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription, SOURCE_NAME,
5658
LoggerFactory.getLogger(PrometheusRemoteWriteSource.class));
5759
this.sourceConfig = sourceConfig;
60+
this.pluginMetrics = pluginMetrics;
61+
}
62+
63+
@Override
64+
public void start(final Buffer<Record<Event>> buffer) {
65+
super.start(buffer);
66+
if (sourceConfig.getScrapeConfig() != null && scrapeService == null) {
67+
scrapeService = new PrometheusScrapeService(
68+
sourceConfig.getScrapeConfig(), buffer,
69+
sourceConfig.getBufferTimeoutInMillis(), pluginMetrics);
70+
scrapeService.start();
71+
}
72+
}
73+
74+
@Override
75+
public void stop() {
76+
try {
77+
if (scrapeService != null) {
78+
scrapeService.stop();
79+
}
80+
} finally {
81+
super.stop();
82+
}
5883
}
5984

6085
@Override

data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusRemoteWriteSourceConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package org.opensearch.dataprepper.plugins.source.prometheus;
1212

1313
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import jakarta.validation.Valid;
1415
import org.opensearch.dataprepper.http.BaseHttpServerConfig;
1516

1617
/**
@@ -25,6 +26,10 @@ public class PrometheusRemoteWriteSourceConfig extends BaseHttpServerConfig {
2526
@JsonProperty("flatten_labels")
2627
private boolean flattenLabels = false;
2728

29+
@Valid
30+
@JsonProperty("scrape")
31+
private PrometheusScrapeConfig scrapeConfig;
32+
2833
@Override
2934
public int getDefaultPort() {
3035
return DEFAULT_PORT;
@@ -43,4 +48,8 @@ public String getDefaultPath() {
4348
public boolean isFlattenLabels() {
4449
return flattenLabels;
4550
}
51+
52+
public PrometheusScrapeConfig getScrapeConfig() {
53+
return scrapeConfig;
54+
}
4655
}

0 commit comments

Comments
 (0)