Skip to content

Commit 4990f26

Browse files
Add Prometheus scrape/pull source to prometheus plugin [Adding in the same Prometheus Remote write source] (#6743)
Add Prometheus scrape/pull source to prometheus plugin Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent 6144e46 commit 4990f26

21 files changed

Lines changed: 3379 additions & 2 deletions

data-prepper-plugins/prometheus-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusRemoteWriteSourceIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,6 @@ void testEmptyWriteRequestReturns204() throws Exception {
336336
assertThat(response.status(), equalTo(HttpStatus.NO_CONTENT));
337337
}
338338

339-
// Helper methods
340-
341339
private AggregatedHttpResponse sendWriteRequest(final Remote.WriteRequest request) throws IOException {
342340
final byte[] compressed = Snappy.compress(request.toByteArray());
343341
final HttpRequest httpRequest = HttpRequest.of(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.prometheus;
12+
13+
import com.linecorp.armeria.common.HttpResponse;
14+
import com.linecorp.armeria.common.HttpStatus;
15+
import com.linecorp.armeria.common.MediaType;
16+
import com.linecorp.armeria.server.Server;
17+
import io.micrometer.core.instrument.Counter;
18+
import io.micrometer.core.instrument.Timer;
19+
import org.junit.jupiter.api.AfterEach;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
import org.opensearch.dataprepper.metrics.PluginMetrics;
23+
import org.opensearch.dataprepper.model.buffer.Buffer;
24+
import org.opensearch.dataprepper.model.event.Event;
25+
import org.opensearch.dataprepper.model.metric.Gauge;
26+
import org.opensearch.dataprepper.model.metric.Histogram;
27+
import org.opensearch.dataprepper.model.metric.Sum;
28+
import org.opensearch.dataprepper.model.metric.Summary;
29+
import org.opensearch.dataprepper.model.record.Record;
30+
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
31+
32+
import java.time.Duration;
33+
import java.util.ArrayList;
34+
import java.util.Collection;
35+
import java.util.List;
36+
37+
import static org.hamcrest.CoreMatchers.equalTo;
38+
import static org.hamcrest.CoreMatchers.instanceOf;
39+
import static org.hamcrest.MatcherAssert.assertThat;
40+
import static org.hamcrest.Matchers.empty;
41+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
42+
import static org.hamcrest.Matchers.hasSize;
43+
import static org.hamcrest.Matchers.is;
44+
import static org.mockito.ArgumentMatchers.anyString;
45+
import static org.mockito.Mockito.doAnswer;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.when;
48+
49+
/**
50+
* Integration test for the Prometheus scrape source.
51+
* Starts real Armeria HTTP servers that serve Prometheus text exposition format,
52+
* creates a real PrometheusScrapeService that scrapes them, and verifies
53+
* parsed metric events appear in a real BlockingBuffer.
54+
*/
55+
class PrometheusScrapeSourceIT {
56+
57+
private static final int BUFFER_SIZE = 4096;
58+
private static final int BATCH_SIZE = 256;
59+
private static final int BUFFER_WRITE_TIMEOUT_MS = 5000;
60+
61+
private final List<Server> servers = new ArrayList<>();
62+
private Buffer<Record<Event>> buffer;
63+
private PrometheusScrapeService scrapeService;
64+
65+
@BeforeEach
66+
void setUp() {
67+
buffer = new BlockingBuffer<>(BUFFER_SIZE, BATCH_SIZE, "test-pipeline");
68+
}
69+
70+
@AfterEach
71+
void tearDown() {
72+
if (scrapeService != null) {
73+
scrapeService.stop();
74+
}
75+
for (final Server server : servers) {
76+
server.stop().join();
77+
}
78+
}
79+
80+
@Test
81+
void testScrapeGaugeMetric() throws Exception {
82+
final String metricsBody = "# TYPE cpu_temp gauge\n"
83+
+ "cpu_temp{host=\"server1\"} 72.5 1706869800000\n";
84+
85+
final Server server = startMetricsServer(metricsBody);
86+
startScrapeService(List.of(targetUrl(server)));
87+
88+
final List<Event> events = drainBuffer();
89+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
90+
91+
final Event event = events.get(0);
92+
assertThat(event, instanceOf(Gauge.class));
93+
assertThat(event.get("kind", String.class), equalTo("GAUGE"));
94+
assertThat(event.get("name", String.class), equalTo("cpu_temp"));
95+
assertThat(event.get("value", Double.class), equalTo(72.5));
96+
}
97+
98+
@Test
99+
void testScrapeCounterMetric() throws Exception {
100+
final String metricsBody = "# TYPE http_requests counter\n"
101+
+ "http_requests_total{method=\"GET\"} 42 1706869800000\n";
102+
103+
final Server server = startMetricsServer(metricsBody);
104+
startScrapeService(List.of(targetUrl(server)));
105+
106+
final List<Event> events = drainBuffer();
107+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
108+
109+
final Event event = events.get(0);
110+
assertThat(event, instanceOf(Sum.class));
111+
assertThat(event.get("kind", String.class), equalTo("SUM"));
112+
assertThat(event.get("name", String.class), equalTo("http_requests"));
113+
assertThat(event.get("isMonotonic", Boolean.class), equalTo(true));
114+
}
115+
116+
@Test
117+
void testScrapeHistogramMetric() throws Exception {
118+
final String metricsBody = "# TYPE req_duration histogram\n"
119+
+ "req_duration_bucket{method=\"GET\",le=\"0.1\"} 5 1706869800000\n"
120+
+ "req_duration_bucket{method=\"GET\",le=\"0.5\"} 10 1706869800000\n"
121+
+ "req_duration_bucket{method=\"GET\",le=\"1.0\"} 15 1706869800000\n"
122+
+ "req_duration_bucket{method=\"GET\",le=\"+Inf\"} 20 1706869800000\n"
123+
+ "req_duration_count{method=\"GET\"} 20 1706869800000\n"
124+
+ "req_duration_sum{method=\"GET\"} 5.5 1706869800000\n";
125+
126+
final Server server = startMetricsServer(metricsBody);
127+
startScrapeService(List.of(targetUrl(server)));
128+
129+
final List<Event> events = drainBuffer();
130+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
131+
132+
final Event event = events.get(0);
133+
assertThat(event, instanceOf(Histogram.class));
134+
assertThat(event.get("kind", String.class), equalTo("HISTOGRAM"));
135+
assertThat(event.get("name", String.class), equalTo("req_duration"));
136+
assertThat(event.get("count", Long.class), equalTo(20L));
137+
138+
final List<Long> bucketCounts = event.get("bucketCountsList", List.class);
139+
assertThat(bucketCounts, hasSize(4));
140+
assertThat(bucketCounts.get(0), equalTo(5L));
141+
assertThat(bucketCounts.get(1), equalTo(5L));
142+
assertThat(bucketCounts.get(2), equalTo(5L));
143+
assertThat(bucketCounts.get(3), equalTo(5L));
144+
145+
final List<Double> bounds = event.get("explicitBounds", List.class);
146+
assertThat(bounds, hasSize(3));
147+
}
148+
149+
@Test
150+
void testScrapeSummaryMetric() throws Exception {
151+
final String metricsBody = "# TYPE rpc_latency summary\n"
152+
+ "rpc_latency{service=\"api\",quantile=\"0.5\"} 0.2 1706869800000\n"
153+
+ "rpc_latency{service=\"api\",quantile=\"0.99\"} 0.8 1706869800000\n"
154+
+ "rpc_latency_count{service=\"api\"} 1000 1706869800000\n"
155+
+ "rpc_latency_sum{service=\"api\"} 300.5 1706869800000\n";
156+
157+
final Server server = startMetricsServer(metricsBody);
158+
startScrapeService(List.of(targetUrl(server)));
159+
160+
final List<Event> events = drainBuffer();
161+
assertThat(events, hasSize(greaterThanOrEqualTo(1)));
162+
163+
final Event event = events.get(0);
164+
assertThat(event, instanceOf(Summary.class));
165+
assertThat(event.get("kind", String.class), equalTo("SUMMARY"));
166+
assertThat(event.get("name", String.class), equalTo("rpc_latency"));
167+
assertThat(event.get("count", Long.class), equalTo(1000L));
168+
169+
final List<?> quantiles = event.get("quantiles", List.class);
170+
assertThat(quantiles, hasSize(2));
171+
}
172+
173+
@Test
174+
void testScrapeMultipleTargets() throws Exception {
175+
final String metricsBody1 = "# TYPE cpu_temp gauge\n"
176+
+ "cpu_temp{host=\"server1\"} 72.5 1706869800000\n";
177+
final String metricsBody2 = "# TYPE memory_usage gauge\n"
178+
+ "memory_usage{host=\"server2\"} 4096 1706869800000\n";
179+
180+
final Server server1 = startMetricsServer(metricsBody1);
181+
final Server server2 = startMetricsServer(metricsBody2);
182+
startScrapeService(List.of(targetUrl(server1), targetUrl(server2)));
183+
184+
final List<Event> events = drainBuffer(2);
185+
assertThat(events, hasSize(greaterThanOrEqualTo(2)));
186+
187+
final long cpuTempCount = events.stream()
188+
.filter(e -> "cpu_temp".equals(e.get("name", String.class)))
189+
.count();
190+
final long memoryUsageCount = events.stream()
191+
.filter(e -> "memory_usage".equals(e.get("name", String.class)))
192+
.count();
193+
194+
assertThat(cpuTempCount, is(greaterThanOrEqualTo(1L)));
195+
assertThat(memoryUsageCount, is(greaterThanOrEqualTo(1L)));
196+
}
197+
198+
@Test
199+
void testScrapeNon2xxResponse() throws Exception {
200+
final Server server = Server.builder()
201+
.http(0)
202+
.service("/metrics", (ctx, req) ->
203+
HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR))
204+
.build();
205+
server.start().join();
206+
servers.add(server);
207+
208+
startScrapeService(List.of(targetUrl(server)));
209+
210+
Thread.sleep(3000);
211+
212+
final List<Event> events = drainBufferImmediate();
213+
assertThat(events, is(empty()));
214+
}
215+
216+
private Server startMetricsServer(final String metricsBody) {
217+
final Server server = Server.builder()
218+
.http(0)
219+
.service("/metrics", (ctx, req) ->
220+
HttpResponse.of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, metricsBody))
221+
.build();
222+
server.start().join();
223+
servers.add(server);
224+
return server;
225+
}
226+
227+
private String targetUrl(final Server server) {
228+
return "http://127.0.0.1:" + server.activeLocalPort() + "/metrics";
229+
}
230+
231+
private void startScrapeService(final List<String> targetUrls) {
232+
final PrometheusScrapeConfig config = mock(PrometheusScrapeConfig.class);
233+
234+
final List<ScrapeTargetConfig> targets = new ArrayList<>();
235+
for (final String url : targetUrls) {
236+
final ScrapeTargetConfig target = mock(ScrapeTargetConfig.class);
237+
when(target.getUrl()).thenReturn(url);
238+
targets.add(target);
239+
}
240+
241+
when(config.getTargets()).thenReturn(targets);
242+
when(config.getScrapeInterval()).thenReturn(Duration.ofSeconds(1));
243+
when(config.getScrapeTimeout()).thenReturn(Duration.ofSeconds(5));
244+
when(config.isFlattenLabels()).thenReturn(false);
245+
when(config.isInsecure()).thenReturn(false);
246+
when(config.getAuthentication()).thenReturn(null);
247+
when(config.getSslCertificateFile()).thenReturn(null);
248+
249+
final PluginMetrics pluginMetrics = mock(PluginMetrics.class);
250+
when(pluginMetrics.counter(anyString())).thenReturn(mock(Counter.class));
251+
final Timer timer = mock(Timer.class);
252+
doAnswer(invocation -> {
253+
final Runnable runnable = invocation.getArgument(0);
254+
runnable.run();
255+
return null;
256+
}).when(timer).record(org.mockito.ArgumentMatchers.<Runnable>any());
257+
when(pluginMetrics.timer(anyString())).thenReturn(timer);
258+
259+
scrapeService = new PrometheusScrapeService(config, buffer, BUFFER_WRITE_TIMEOUT_MS, pluginMetrics);
260+
scrapeService.start();
261+
}
262+
263+
private List<Event> drainBuffer() {
264+
return drainBuffer(1);
265+
}
266+
267+
private List<Event> drainBuffer(final int minExpected) {
268+
final List<Event> events = new ArrayList<>();
269+
final long deadline = System.currentTimeMillis() + 10_000;
270+
while (System.currentTimeMillis() < deadline) {
271+
final Collection<Record<Event>> records = buffer.read(100).getKey();
272+
for (final Record<Event> record : records) {
273+
events.add(record.getData());
274+
}
275+
if (events.size() >= minExpected) {
276+
return events;
277+
}
278+
if (records.isEmpty()) {
279+
try {
280+
Thread.sleep(100);
281+
} catch (final InterruptedException e) {
282+
Thread.currentThread().interrupt();
283+
break;
284+
}
285+
}
286+
}
287+
return events;
288+
}
289+
290+
private List<Event> drainBufferImmediate() {
291+
final List<Event> events = new ArrayList<>();
292+
final Collection<Record<Event>> records = buffer.read(100).getKey();
293+
for (final Record<Event> record : records) {
294+
events.add(record.getData());
295+
}
296+
return events;
297+
}
298+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
package org.opensearch.dataprepper.plugins.source.prometheus;
11+
12+
import com.linecorp.armeria.common.HttpHeaderNames;
13+
import com.linecorp.armeria.common.HttpHeadersBuilder;
14+
15+
import java.nio.charset.StandardCharsets;
16+
import java.util.Base64;
17+
18+
final class BasicAuthenticator implements ScrapeRequestAuthenticator {
19+
20+
private final String encodedCredentials;
21+
22+
BasicAuthenticator(final String username, final String password) {
23+
final String credentials = username + ":" + password;
24+
this.encodedCredentials = Base64.getEncoder()
25+
.encodeToString(credentials.getBytes(StandardCharsets.UTF_8));
26+
}
27+
28+
@Override
29+
public void applyAuth(final HttpHeadersBuilder builder) {
30+
builder.add(HttpHeaderNames.AUTHORIZATION, "Basic " + encodedCredentials);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
package org.opensearch.dataprepper.plugins.source.prometheus;
11+
12+
import com.linecorp.armeria.common.HttpHeaderNames;
13+
import com.linecorp.armeria.common.HttpHeadersBuilder;
14+
15+
final class BearerTokenAuthenticator implements ScrapeRequestAuthenticator {
16+
17+
private final String token;
18+
19+
BearerTokenAuthenticator(final String token) {
20+
this.token = token;
21+
}
22+
23+
@Override
24+
public void applyAuth(final HttpHeadersBuilder builder) {
25+
builder.add(HttpHeaderNames.AUTHORIZATION, "Bearer " + token);
26+
}
27+
}

0 commit comments

Comments
 (0)