Skip to content

Commit 63b2c43

Browse files
authored
PrometheusTimeSeries performance fixes (#6316)
* PrometheusTimeSeries performance fixes Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> * Fixed checkStyle error Signed-off-by: Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent fc0b563 commit 63b2c43

4 files changed

Lines changed: 182 additions & 154 deletions

File tree

data-prepper-plugins/prometheus-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/prometheus/PrometheusSinkAMPIT.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.hamcrest.CoreMatchers.equalTo;
7777
import static org.hamcrest.CoreMatchers.hasItem;
7878
import static org.hamcrest.MatcherAssert.assertThat;
79+
import static org.junit.jupiter.api.Assertions.assertThrows;
7980

8081
import java.net.URLEncoder;
8182
import java.nio.charset.StandardCharsets;
@@ -119,6 +120,8 @@ public class PrometheusSinkAMPIT {
119120
@Mock
120121
private AwsCredentialsSupplier awsCredentialsSupplier;
121122
@Mock
123+
private AwsCredentialsSupplier awsQueryCredentialsSupplier;
124+
@Mock
122125
private Counter metricsSuccessCounter;
123126
@Mock
124127
private Counter metricsFailedCounter;
@@ -161,6 +164,8 @@ void setUp() {
161164

162165
.build();
163166

167+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
168+
awsQueryCredentialsSupplier = mock(AwsCredentialsSupplier.class);
164169
eventHandle = mock(EventHandle.class);
165170
pipelineDescription = mock(PipelineDescription.class);
166171
awsCredentialsProvider = DefaultCredentialsProvider.create();
@@ -204,6 +209,7 @@ void setUp() {
204209
String remoteWriteUrl = url + "api/v1/remote_write";
205210
queryUrl = url + "api/v1/query";
206211
when(awsCredentialsSupplier.getProvider(any())).thenAnswer(options -> DefaultCredentialsProvider.create());
212+
lenient().when(awsQueryCredentialsSupplier.getProvider(any())).thenAnswer(options -> DefaultCredentialsProvider.create());
207213
thresholdConfig = mock(PrometheusSinkThresholdConfig.class);
208214
when(thresholdConfig.getMaxEvents()).thenReturn(NUM_RECORDS);
209215
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(100000L);
@@ -254,7 +260,7 @@ private void getMetricsFromAMP(final String metricName, final String qs) throws
254260
String encodedQuery = URLEncoder.encode(query, StandardCharsets.UTF_8);
255261
String getUrlQuery = "query=" + query + "&start="+testStartTime+"&end="+endTime+"&step=1s";
256262
String getUrl = queryRangeUrl+"?query=" + encodedQuery + "&start="+testStartTime+"&end="+endTime+"&step=1s";
257-
PrometheusSigV4Signer signer = new PrometheusSigV4Signer(awsCredentialsSupplier, prometheusSinkConfig, baseUrl + queryRangeUrl);
263+
PrometheusSigV4Signer signer = new PrometheusSigV4Signer(awsQueryCredentialsSupplier, prometheusSinkConfig, baseUrl + queryRangeUrl);
258264
final SdkHttpFullRequest signedRequest = signer.signQueryRequest(getUrlQuery);
259265

260266
final RequestHeadersBuilder headersBuilder = RequestHeaders.builder()
@@ -911,4 +917,25 @@ private Collection<Record<Event>> getExponentialHistogramRecordList(int numberOf
911917
return records;
912918
}
913919

920+
@Test
921+
void testToVerifyLackOfCredentialsResultInFailure() throws Exception {
922+
923+
AwsCredentialsProvider provider = mock(AwsCredentialsProvider.class);
924+
when(awsCredentialsSupplier.getProvider(any())).thenReturn(provider);
925+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(1L);
926+
when(thresholdConfig.getMaxEvents()).thenReturn(1);
927+
PrometheusSink sink = createObjectUnderTest();
928+
Collection<Record<Event>> records = getHistogramRecordList(NUM_RECORDS);
929+
sink.doOutput(records);
930+
931+
long startTimeSeconds = testStartTime.getEpochSecond();
932+
assertThrows( org.awaitility.core.ConditionTimeoutException.class, () -> await().atMost(Duration.ofSeconds(2))
933+
.untilAsserted(() -> {
934+
metricsInAMP = 0;
935+
getMetricsFromAMP(histogramMetricName, "histogram");
936+
assertThat(metricsInAMP, greaterThanOrEqualTo(1));
937+
}));
938+
939+
verify(metricsSuccessCounter, times(0)).increment(NUM_RECORDS);
940+
}
914941
}

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/PrometheusHttpSender.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ public PrometheusPushResult pushToEndpoint(final byte[] payload) {
127127
final byte[] compressedBufferData = compressionEngine.compress(payload);
128128

129129
final HttpRequest request = buildHttpRequest(compressedBufferData);
130+
if (request == null) {
131+
return new PrometheusPushResult(false, 0);
132+
}
130133
final long startTime = System.currentTimeMillis();
131134

132135
// Execute request and wait for completion
@@ -170,6 +173,9 @@ private HttpRequest buildHttpRequest(final byte[] payload) {
170173
SdkHttpFullRequest sdkHttpRequest = createSdkHttpRequest(config.getUrl(), payload);
171174
if (signer != null) {
172175
sdkHttpRequest = signer.signRequest(sdkHttpRequest);
176+
if (sdkHttpRequest == null) {
177+
return null;
178+
}
173179
}
174180

175181
final RequestHeadersBuilder headersBuilder = RequestHeaders.builder()

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/PrometheusSigV4Signer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig
7272
}
7373

7474
SdkHttpFullRequest signQueryRequest(final String query) {
75+
if (credentialsProvider == null || credentialsProvider.resolveCredentials() == null) {
76+
return null;
77+
}
7578
SdkHttpFullRequest unsignedRequest = SdkHttpFullRequest.builder()
7679
.method(SdkHttpMethod.POST)
7780
.uri(endpointUri)
@@ -93,6 +96,9 @@ SdkHttpFullRequest signQueryRequest(final String query) {
9396
* @return A signed {@link SdkHttpFullRequest} ready for transmission to the AWS OTLP endpoint
9497
*/
9598
SdkHttpFullRequest signRequest(final SdkHttpFullRequest unsignedRequest) {
99+
if (credentialsProvider == null || credentialsProvider.resolveCredentials() == null) {
100+
return null;
101+
}
96102
return signer.sign(unsignedRequest, Aws4SignerParams.builder()
97103
.signingRegion(region)
98104
.signingName(SERVICE_NAME)

0 commit comments

Comments
 (0)