Skip to content

Commit f327b15

Browse files
Add Prometheus scrape/pull source to prometheus plugin
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent 1ac64aa commit f327b15

17 files changed

Lines changed: 3275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.prometheus;
12+
13+
import com.linecorp.armeria.common.HttpResponse;
14+
import com.linecorp.armeria.common.HttpStatus;
15+
import com.linecorp.armeria.common.MediaType;
16+
import com.linecorp.armeria.server.Server;
17+
import io.micrometer.core.instrument.Counter;
18+
import io.micrometer.core.instrument.Timer;
19+
import org.junit.jupiter.api.AfterEach;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
import org.opensearch.dataprepper.metrics.PluginMetrics;
23+
import org.opensearch.dataprepper.model.buffer.Buffer;
24+
import org.opensearch.dataprepper.model.event.Event;
25+
import org.opensearch.dataprepper.model.metric.Gauge;
26+
import org.opensearch.dataprepper.model.metric.Histogram;
27+
import org.opensearch.dataprepper.model.metric.Sum;
28+
import org.opensearch.dataprepper.model.metric.Summary;
29+
import org.opensearch.dataprepper.model.record.Record;
30+
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
31+
32+
import java.time.Duration;
33+
import java.util.ArrayList;
34+
import java.util.Collection;
35+
import java.util.List;
36+
37+
import static org.hamcrest.CoreMatchers.equalTo;
38+
import static org.hamcrest.CoreMatchers.instanceOf;
39+
import static org.hamcrest.MatcherAssert.assertThat;
40+
import static org.hamcrest.Matchers.empty;
41+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
42+
import static org.hamcrest.Matchers.hasSize;
43+
import static org.hamcrest.Matchers.is;
44+
import static org.mockito.ArgumentMatchers.anyString;
45+
import static org.mockito.Mockito.doAnswer;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.when;
48+
49+
/**
50+
* Integration test for the Prometheus scrape source.
51+
* Starts real Armeria HTTP servers that serve Prometheus text exposition format,
52+
* creates a real PrometheusScrapeService that scrapes them, and verifies
53+
* parsed metric events appear in a real BlockingBuffer.
54+
*/
55+
class PrometheusScrapeSourceIT {
56+
57+
private static final int BUFFER_SIZE = 4096;
58+
private static final int BATCH_SIZE = 256;
59+
private static final int BUFFER_WRITE_TIMEOUT_MS = 5000;
60+
61+
private final List<Server> servers = new ArrayList<>();
62+
private Buffer<Record<Event>> buffer;
63+
private PrometheusScrapeService scrapeService;
64+
65+
@BeforeEach
66+
void setUp() {
67+
buffer = new BlockingBuffer<>(BUFFER_SIZE, BATCH_SIZE, "test-pipeline");
68+
}
69+
70+
@AfterEach
71+
void tearDown() {
72+
if (scrapeService != null) {
73+
scrapeService.stop();
74+
}
75+
for (final Server server : servers) {
76+
server.stop().join();
77+
}
78+
}
79+
80+
@Test
81+
void testScrapeGaugeMetric() throws Exception {
82+
final String metricsBody = "# TYPE cpu_temp gauge\n"
83+
+ "cpu_temp{host=\"server1\"} 72.5 1706869800000\n";
84+
85+
final Server server = startMetricsServer(metricsBody);
86+
startScrapeService(List.of(targetUrl(server)));
87+
88+
final List<Event> events = drainBuffer();
89+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
90+
91+
final Event event = events.get(0);
92+
assertThat(event, instanceOf(Gauge.class));
93+
assertThat(event.get("kind", String.class), equalTo("GAUGE"));
94+
assertThat(event.get("name", String.class), equalTo("cpu_temp"));
95+
assertThat(event.get("value", Double.class), equalTo(72.5));
96+
}
97+
98+
@Test
99+
void testScrapeCounterMetric() throws Exception {
100+
final String metricsBody = "# TYPE http_requests counter\n"
101+
+ "http_requests_total{method=\"GET\"} 42 1706869800000\n";
102+
103+
final Server server = startMetricsServer(metricsBody);
104+
startScrapeService(List.of(targetUrl(server)));
105+
106+
final List<Event> events = drainBuffer();
107+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
108+
109+
final Event event = events.get(0);
110+
assertThat(event, instanceOf(Sum.class));
111+
assertThat(event.get("kind", String.class), equalTo("SUM"));
112+
assertThat(event.get("name", String.class), equalTo("http_requests"));
113+
assertThat(event.get("isMonotonic", Boolean.class), equalTo(true));
114+
}
115+
116+
@Test
117+
void testScrapeHistogramMetric() throws Exception {
118+
final String metricsBody = "# TYPE req_duration histogram\n"
119+
+ "req_duration_bucket{method=\"GET\",le=\"0.1\"} 5 1706869800000\n"
120+
+ "req_duration_bucket{method=\"GET\",le=\"0.5\"} 10 1706869800000\n"
121+
+ "req_duration_bucket{method=\"GET\",le=\"1.0\"} 15 1706869800000\n"
122+
+ "req_duration_bucket{method=\"GET\",le=\"+Inf\"} 20 1706869800000\n"
123+
+ "req_duration_count{method=\"GET\"} 20 1706869800000\n"
124+
+ "req_duration_sum{method=\"GET\"} 5.5 1706869800000\n";
125+
126+
final Server server = startMetricsServer(metricsBody);
127+
startScrapeService(List.of(targetUrl(server)));
128+
129+
final List<Event> events = drainBuffer();
130+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
131+
132+
final Event event = events.get(0);
133+
assertThat(event, instanceOf(Histogram.class));
134+
assertThat(event.get("kind", String.class), equalTo("HISTOGRAM"));
135+
assertThat(event.get("name", String.class), equalTo("req_duration"));
136+
assertThat(event.get("count", Long.class), equalTo(20L));
137+
138+
final List<Long> bucketCounts = event.get("bucketCountsList", List.class);
139+
assertThat(bucketCounts, hasSize(4));
140+
assertThat(bucketCounts.get(0), equalTo(5L));
141+
assertThat(bucketCounts.get(1), equalTo(5L));
142+
assertThat(bucketCounts.get(2), equalTo(5L));
143+
assertThat(bucketCounts.get(3), equalTo(5L));
144+
145+
final List<Double> bounds = event.get("explicitBounds", List.class);
146+
assertThat(bounds, hasSize(3));
147+
}
148+
149+
@Test
150+
void testScrapeSummaryMetric() throws Exception {
151+
final String metricsBody = "# TYPE rpc_latency summary\n"
152+
+ "rpc_latency{service=\"api\",quantile=\"0.5\"} 0.2 1706869800000\n"
153+
+ "rpc_latency{service=\"api\",quantile=\"0.99\"} 0.8 1706869800000\n"
154+
+ "rpc_latency_count{service=\"api\"} 1000 1706869800000\n"
155+
+ "rpc_latency_sum{service=\"api\"} 300.5 1706869800000\n";
156+
157+
final Server server = startMetricsServer(metricsBody);
158+
startScrapeService(List.of(targetUrl(server)));
159+
160+
final List<Event> events = drainBuffer();
161+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
162+
163+
final Event event = events.get(0);
164+
assertThat(event, instanceOf(Summary.class));
165+
assertThat(event.get("kind", String.class), equalTo("SUMMARY"));
166+
assertThat(event.get("name", String.class), equalTo("rpc_latency"));
167+
assertThat(event.get("count", Long.class), equalTo(1000L));
168+
169+
final List<?> quantiles = event.get("quantiles", List.class);
170+
assertThat(quantiles, hasSize(2));
171+
}
172+
173+
@Test
174+
void testScrapeMultipleTargets() throws Exception {
175+
final String metricsBody1 = "# TYPE cpu_temp gauge\n"
176+
+ "cpu_temp{host=\"server1\"} 72.5 1706869800000\n";
177+
final String metricsBody2 = "# TYPE memory_usage gauge\n"
178+
+ "memory_usage{host=\"server2\"} 4096 1706869800000\n";
179+
180+
final Server server1 = startMetricsServer(metricsBody1);
181+
final Server server2 = startMetricsServer(metricsBody2);
182+
startScrapeService(List.of(targetUrl(server1), targetUrl(server2)));
183+
184+
final List<Event> events = drainBuffer(2);
185+
assertThat(events, hasSize(greaterThanOrEqualTo(2)));
186+
187+
final long cpuTempCount = events.stream()
188+
.filter(e -> "cpu_temp".equals(e.get("name", String.class)))
189+
.count();
190+
final long memoryUsageCount = events.stream()
191+
.filter(e -> "memory_usage".equals(e.get("name", String.class)))
192+
.count();
193+
194+
assertThat(cpuTempCount, is(greaterThanOrEqualTo(1L)));
195+
assertThat(memoryUsageCount, is(greaterThanOrEqualTo(1L)));
196+
}
197+
198+
@Test
199+
void testScrapeNon2xxResponse() throws Exception {
200+
final Server server = Server.builder()
201+
.http(0)
202+
.service("/metrics", (ctx, req) ->
203+
HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR))
204+
.build();
205+
server.start().join();
206+
servers.add(server);
207+
208+
startScrapeService(List.of(targetUrl(server)));
209+
210+
Thread.sleep(3000);
211+
212+
final List<Event> events = drainBufferImmediate();
213+
assertThat(events, is(empty()));
214+
}
215+
216+
// -- helper methods --
217+
218+
private Server startMetricsServer(final String metricsBody) {
219+
final Server server = Server.builder()
220+
.http(0)
221+
.service("/metrics", (ctx, req) ->
222+
HttpResponse.of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, metricsBody))
223+
.build();
224+
server.start().join();
225+
servers.add(server);
226+
return server;
227+
}
228+
229+
private String targetUrl(final Server server) {
230+
return "http://127.0.0.1:" + server.activeLocalPort() + "/metrics";
231+
}
232+
233+
private void startScrapeService(final List<String> targetUrls) {
234+
final PrometheusScrapeConfig config = mock(PrometheusScrapeConfig.class);
235+
236+
final List<ScrapeTargetConfig> targets = new ArrayList<>();
237+
for (final String url : targetUrls) {
238+
final ScrapeTargetConfig target = mock(ScrapeTargetConfig.class);
239+
when(target.getUrl()).thenReturn(url);
240+
targets.add(target);
241+
}
242+
243+
when(config.getTargets()).thenReturn(targets);
244+
when(config.getScrapeInterval()).thenReturn(Duration.ofSeconds(1));
245+
when(config.getScrapeTimeout()).thenReturn(Duration.ofSeconds(5));
246+
when(config.isFlattenLabels()).thenReturn(false);
247+
when(config.isInsecure()).thenReturn(false);
248+
when(config.getAuthentication()).thenReturn(null);
249+
when(config.getSslCertificateFile()).thenReturn(null);
250+
251+
final PluginMetrics pluginMetrics = mock(PluginMetrics.class);
252+
when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
253+
final Timer timer = mock(Timer.class);
254+
doAnswer(invocation -> {
255+
final Runnable runnable = invocation.getArgument(0);
256+
runnable.run();
257+
return null;
258+
}).when(timer).record(org.mockito.ArgumentMatchers.<Runnable>any());
259+
when(pluginMetrics.timer(anyString())).thenReturn(timer);
260+
261+
scrapeService = new PrometheusScrapeService(config, buffer, BUFFER_WRITE_TIMEOUT_MS, pluginMetrics);
262+
scrapeService.start();
263+
}
264+
265+
private List<Event> drainBuffer() {
266+
return drainBuffer(1);
267+
}
268+
269+
private List<Event> drainBuffer(final int minExpected) {
270+
final List<Event> events = new ArrayList<>();
271+
final long deadline = System.currentTimeMillis() + 10_000;
272+
while (System.currentTimeMillis() < deadline) {
273+
final Collection<Record<Event>> records = buffer.read(100).getKey();
274+
for (final Record<Event> record : records) {
275+
events.add(record.getData());
276+
}
277+
if (events.size() >= minExpected) {
278+
return events;
279+
}
280+
if (records.isEmpty()) {
281+
try {
282+
Thread.sleep(100);
283+
} catch (final InterruptedException e) {
284+
Thread.currentThread().interrupt();
285+
break;
286+
}
287+
}
288+
}
289+
return events;
290+
}
291+
292+
private List<Event> drainBufferImmediate() {
293+
final List<Event> events = new ArrayList<>();
294+
final Collection<Record<Event>> records = buffer.read(100).getKey();
295+
for (final Record<Event> record : records) {
296+
events.add(record.getData());
297+
}
298+
return events;
299+
}
300+
}

data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusRemoteWriteSource.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class PrometheusRemoteWriteSource extends BaseHttpSource<Record<Event>> {
4646
private static final String SOURCE_NAME = "Prometheus Remote Write";
4747

4848
private final PrometheusRemoteWriteSourceConfig sourceConfig;
49+
private final PluginMetrics pluginMetrics;
50+
private PrometheusScrapeService scrapeService;
4951

5052
@DataPrepperPluginConstructor
5153
public PrometheusRemoteWriteSource(final PrometheusRemoteWriteSourceConfig sourceConfig,
@@ -55,6 +57,29 @@ public PrometheusRemoteWriteSource(final PrometheusRemoteWriteSourceConfig sourc
5557
super(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription, SOURCE_NAME,
5658
LoggerFactory.getLogger(PrometheusRemoteWriteSource.class));
5759
this.sourceConfig = sourceConfig;
60+
this.pluginMetrics = pluginMetrics;
61+
}
62+
63+
@Override
64+
public void start(final Buffer<Record<Event>> buffer) {
65+
super.start(buffer);
66+
if (sourceConfig.getScrapeConfig() != null && scrapeService == null) {
67+
scrapeService = new PrometheusScrapeService(
68+
sourceConfig.getScrapeConfig(), buffer,
69+
sourceConfig.getBufferTimeoutInMillis(), pluginMetrics);
70+
scrapeService.start();
71+
}
72+
}
73+
74+
@Override
75+
public void stop() {
76+
try {
77+
if (scrapeService != null) {
78+
scrapeService.stop();
79+
}
80+
} finally {
81+
super.stop();
82+
}
5883
}
5984

6085
@Override

data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusRemoteWriteSourceConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package org.opensearch.dataprepper.plugins.source.prometheus;
1212

1313
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import jakarta.validation.Valid;
1415
import org.opensearch.dataprepper.http.BaseHttpServerConfig;
1516

1617
/**
@@ -25,6 +26,10 @@ public class PrometheusRemoteWriteSourceConfig extends BaseHttpServerConfig {
2526
@JsonProperty("flatten_labels")
2627
private boolean flattenLabels = false;
2728

29+
@Valid
30+
@JsonProperty("scrape")
31+
private PrometheusScrapeConfig scrapeConfig;
32+
2833
@Override
2934
public int getDefaultPort() {
3035
return DEFAULT_PORT;
@@ -43,4 +48,8 @@ public String getDefaultPath() {
4348
public boolean isFlattenLabels() {
4449
return flattenLabels;
4550
}
51+
52+
public PrometheusScrapeConfig getScrapeConfig() {
53+
return scrapeConfig;
54+
}
4655
}

0 commit comments

Comments
 (0)