Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -119,6 +120,8 @@ public class PrometheusSinkAMPIT {
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;
@Mock
private AwsCredentialsSupplier awsQueryCredentialsSupplier;
@Mock
private Counter metricsSuccessCounter;
@Mock
private Counter metricsFailedCounter;
Expand Down Expand Up @@ -161,6 +164,8 @@ void setUp() {

.build();

awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
awsQueryCredentialsSupplier = mock(AwsCredentialsSupplier.class);
eventHandle = mock(EventHandle.class);
pipelineDescription = mock(PipelineDescription.class);
awsCredentialsProvider = DefaultCredentialsProvider.create();
Expand Down Expand Up @@ -204,6 +209,7 @@ void setUp() {
String remoteWriteUrl = url + "api/v1/remote_write";
queryUrl = url + "api/v1/query";
when(awsCredentialsSupplier.getProvider(any())).thenAnswer(options -> DefaultCredentialsProvider.create());
lenient().when(awsQueryCredentialsSupplier.getProvider(any())).thenAnswer(options -> DefaultCredentialsProvider.create());
thresholdConfig = mock(PrometheusSinkThresholdConfig.class);
when(thresholdConfig.getMaxEvents()).thenReturn(NUM_RECORDS);
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(100000L);
Expand Down Expand Up @@ -254,7 +260,7 @@ private void getMetricsFromAMP(final String metricName, final String qs) throws
String encodedQuery = URLEncoder.encode(query, StandardCharsets.UTF_8);
String getUrlQuery = "query=" + query + "&start="+testStartTime+"&end="+endTime+"&step=1s";
String getUrl = queryRangeUrl+"?query=" + encodedQuery + "&start="+testStartTime+"&end="+endTime+"&step=1s";
PrometheusSigV4Signer signer = new PrometheusSigV4Signer(awsCredentialsSupplier, prometheusSinkConfig, baseUrl + queryRangeUrl);
PrometheusSigV4Signer signer = new PrometheusSigV4Signer(awsQueryCredentialsSupplier, prometheusSinkConfig, baseUrl + queryRangeUrl);
final SdkHttpFullRequest signedRequest = signer.signQueryRequest(getUrlQuery);

final RequestHeadersBuilder headersBuilder = RequestHeaders.builder()
Expand Down Expand Up @@ -911,4 +917,25 @@ private Collection<Record<Event>> getExponentialHistogramRecordList(int numberOf
return records;
}

@Test
void testToVerifyLackOfCredentialsResultInFailure() throws Exception {

AwsCredentialsProvider provider = mock(AwsCredentialsProvider.class);
when(awsCredentialsSupplier.getProvider(any())).thenReturn(provider);
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(1L);
when(thresholdConfig.getMaxEvents()).thenReturn(1);
PrometheusSink sink = createObjectUnderTest();
Collection<Record<Event>> records = getHistogramRecordList(NUM_RECORDS);
sink.doOutput(records);

long startTimeSeconds = testStartTime.getEpochSecond();
assertThrows( org.awaitility.core.ConditionTimeoutException.class, () -> await().atMost(Duration.ofSeconds(2))
.untilAsserted(() -> {
metricsInAMP = 0;
getMetricsFromAMP(histogramMetricName, "histogram");
assertThat(metricsInAMP, greaterThanOrEqualTo(1));
}));

verify(metricsSuccessCounter, times(0)).increment(NUM_RECORDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public PrometheusPushResult pushToEndpoint(final byte[] payload) {
final byte[] compressedBufferData = compressionEngine.compress(payload);

final HttpRequest request = buildHttpRequest(compressedBufferData);
if (request == null) {
return new PrometheusPushResult(false, 0);
}
final long startTime = System.currentTimeMillis();

// Execute request and wait for completion
Expand Down Expand Up @@ -170,6 +173,9 @@ private HttpRequest buildHttpRequest(final byte[] payload) {
SdkHttpFullRequest sdkHttpRequest = createSdkHttpRequest(config.getUrl(), payload);
if (signer != null) {
sdkHttpRequest = signer.signRequest(sdkHttpRequest);
if (sdkHttpRequest == null) {
return null;
}
}

final RequestHeadersBuilder headersBuilder = RequestHeaders.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig
}

SdkHttpFullRequest signQueryRequest(final String query) {
if (credentialsProvider == null || credentialsProvider.resolveCredentials() == null) {
return null;
}
SdkHttpFullRequest unsignedRequest = SdkHttpFullRequest.builder()
.method(SdkHttpMethod.POST)
.uri(endpointUri)
Expand All @@ -93,6 +96,9 @@ SdkHttpFullRequest signQueryRequest(final String query) {
* @return A signed {@link SdkHttpFullRequest} ready for transmission to the AWS OTLP endpoint
*/
SdkHttpFullRequest signRequest(final SdkHttpFullRequest unsignedRequest) {
if (credentialsProvider == null || credentialsProvider.resolveCredentials() == null) {
return null;
}
return signer.sign(unsignedRequest, Aws4SignerParams.builder()
.signingRegion(region)
.signingName(SERVICE_NAME)
Expand Down
Loading
Loading