Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
Comparing source compatibility of opentelemetry-sdk-common-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.60.1.jar
No changes.
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.GrpcSenderConfig (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize()
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.HttpSenderConfig (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize()
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Comparing source compatibility of opentelemetry-sdk-extension-jaeger-remote-sampler-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-jaeger-remote-sampler-1.60.1.jar
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long)
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ public GrpcExporter build() {
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
executorService,
grpcChannel));
grpcChannel,
// 4mb to align with spec guidance - even though we don't do anything with the
Comment thread
jack-berg marked this conversation as resolved.
// response today, we will so better to have future-looking memory profile
4 * 1024L * 1024L));
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());

return new GrpcExporter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static ImmutableGrpcSenderConfig create(
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService,
@Nullable Object managedChannel) {
@Nullable Object managedChannel,
long maxResponseBodySize) {
return new AutoValue_ImmutableGrpcSenderConfig(
endpoint,
fullMethodName,
Expand All @@ -49,6 +50,10 @@ public static ImmutableGrpcSenderConfig create(
sslContext,
trustManager,
executorService,
managedChannel);
managedChannel,
maxResponseBodySize);
}

@Override
public abstract long getMaxResponseBodySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ public HttpExporter build() {
retryPolicy,
isPlainHttp ? null : tlsConfigHelper.getSslContext(),
isPlainHttp ? null : tlsConfigHelper.getTrustManager(),
executorService));
executorService,
// 4mb to align with spec guidance - even though we don't do anything with the
// response today, we will so better to have future-looking memory profile
4 * 1024L * 1024L));
LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName());

return new HttpExporter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ static HttpSenderConfig create(
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
return new AutoValue_ImmutableHttpSenderConfig(
endpoint,
contentType,
Expand All @@ -47,6 +48,10 @@ static HttpSenderConfig create(
retryPolicy,
sslContext,
trustManager,
executorService);
executorService,
maxResponseBodySize);
}

@Override
public abstract long getMaxResponseBodySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public void setUp() {
null,
null,
null,
null),
null,
Long.MAX_VALUE),
InternalTelemetryVersion.LATEST,
ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER),
MeterProvider::noop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,29 @@

package io.opentelemetry.exporter.sender.jdk.internal;

import static java.util.stream.Collectors.joining;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.Compressor;
import io.opentelemetry.sdk.common.export.HttpResponse;
import io.opentelemetry.sdk.common.export.HttpSender;
import io.opentelemetry.sdk.common.export.MessageWriter;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import io.opentelemetry.sdk.common.internal.DaemonThreadFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -72,6 +71,7 @@ public final class JdkHttpSender implements HttpSender {
private final Supplier<Map<String, List<String>>> headerSupplier;
@Nullable private final RetryPolicy retryPolicy;
private final Predicate<IOException> retryExceptionPredicate;
private final long maxResponseBodySize;

// Visible for testing
JdkHttpSender(
Expand All @@ -82,7 +82,8 @@ public final class JdkHttpSender implements HttpSender {
Duration timeout,
Supplier<Map<String, List<String>>> headerSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JaegerRemoteSamplerBuilder.setMaxSamplingStrategyResponseBodySize validates bytes > 0. But the sender constructors (JdkHttpSender, OkHttpHttpSender, OkHttpGrpcSender) accept any long without
validation. A caller can bypass the guard by constructing a sender directly with 0 or -1. Consider adding the validation at the sender level too, or document the expected invariant.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the sender constructors are internal, and have a bunch of other unvalidated parameters which could be equally corrupted if a user goes around the guards. There's a conversation here discussing where to add additional null checks beyond the guarantees from nullaway. I think we should adopt a policy of adding additional null checks at the API boundaries, but trusting nullaway once we're within the walled garden of internal code. The same would apply to parameter validation, I think.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... setting directly to a low value like -1 or 0 will effectively bloc data export...
see:

  if (bodyBytes.length > maxResponseBodySize) {
    throw new ResponseBodyTooLargeException(
        "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes");
  }

All bodies will be bigger that that.

this.client = client;
this.endpoint = endpoint;
this.contentType = contentType;
Expand All @@ -101,6 +102,7 @@ public final class JdkHttpSender implements HttpSender {
this.executorService = executorService;
this.managedExecutor = false;
}
this.maxResponseBodySize = maxResponseBodySize;
}

JdkHttpSender(
Expand All @@ -113,7 +115,8 @@ public final class JdkHttpSender implements HttpSender {
@Nullable RetryPolicy retryPolicy,
@Nullable ProxyOptions proxyOptions,
@Nullable SSLContext sslContext,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
this(
configureClient(sslContext, connectTimeout, proxyOptions),
endpoint,
Expand All @@ -122,7 +125,8 @@ public final class JdkHttpSender implements HttpSender {
timeout,
headerSupplier,
retryPolicy,
executorService);
executorService,
maxResponseBodySize);
}

private static ExecutorService newExecutor() {
Expand Down Expand Up @@ -151,10 +155,8 @@ private static HttpClient configureClient(

@Override
public void send(
MessageWriter messageWriter,
Consumer<io.opentelemetry.sdk.common.export.HttpResponse> onResponse,
Consumer<Throwable> onError) {
CompletableFuture<HttpResponse<byte[]>> unused =
MessageWriter messageWriter, Consumer<HttpResponse> onResponse, Consumer<Throwable> onError) {
CompletableFuture<HttpResponse> unused =
CompletableFuture.supplyAsync(
() -> {
try {
Expand All @@ -170,12 +172,12 @@ public void send(
onError.accept(throwable);
return;
}
onResponse.accept(toHttpResponse(httpResponse));
onResponse.accept(httpResponse);
});
}

// Visible for testing
HttpResponse<byte[]> sendInternal(MessageWriter requestBodyWriter) throws IOException {
HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOException {
long startTimeNanos = System.nanoTime();
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(endpoint).timeout(timeout);
Map<String, List<String>> headers = headerSupplier.get();
Expand Down Expand Up @@ -207,7 +209,7 @@ HttpResponse<byte[]> sendInternal(MessageWriter requestBodyWriter) throws IOExce

long attempt = 0;
long nextBackoffNanos = retryPolicy.getInitialBackoff().toNanos();
HttpResponse<byte[]> httpResponse = null;
HttpResponse httpResponse = null;
IOException exception = null;
do {
if (attempt > 0) {
Expand All @@ -234,7 +236,7 @@ HttpResponse<byte[]> sendInternal(MessageWriter requestBodyWriter) throws IOExce
requestBuilder.timeout(timeout.minusNanos(System.nanoTime() - startTimeNanos));
try {
httpResponse = sendRequest(requestBuilder, byteBufferPool);
boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode());
boolean retryable = retryableStatusCodes.contains(httpResponse.getStatusCode());
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
Expand Down Expand Up @@ -273,21 +275,16 @@ HttpResponse<byte[]> sendInternal(MessageWriter requestBodyWriter) throws IOExce
throw exception;
}

private static String responseStringRepresentation(HttpResponse<?> response) {
StringJoiner joiner = new StringJoiner(",", "HttpResponse{", "}");
joiner.add("code=" + response.statusCode());
joiner.add(
"headers="
Comment thread
jack-berg marked this conversation as resolved.
+ response.headers().map().entrySet().stream()
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
.collect(joining(",", "[", "]")));
return joiner.toString();
private static String responseStringRepresentation(HttpResponse response) {
return "HttpResponse{code=" + response.getStatusCode() + "}";
}

private HttpResponse<byte[]> sendRequest(
private HttpResponse sendRequest(
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException {
try {
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray());
java.net.http.HttpResponse<InputStream> response =
client.send(requestBuilder.build(), BodyHandlers.ofInputStream());
return toHttpResponse(response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
Expand Down Expand Up @@ -319,22 +316,30 @@ private byte[] buf() {
}
}

private static io.opentelemetry.sdk.common.export.HttpResponse toHttpResponse(
HttpResponse<byte[]> response) {
return new io.opentelemetry.sdk.common.export.HttpResponse() {
private HttpResponse toHttpResponse(java.net.http.HttpResponse<InputStream> response) {
int statusCode = response.statusCode();
byte[] bodyBytes;
try (InputStream is = response.body()) {
bodyBytes = is.readNBytes((int) Math.min(maxResponseBodySize, Integer.MAX_VALUE));
} catch (IOException e) {
bodyBytes = new byte[0];
logger.log(Level.WARNING, "Failed to read response body", e);
}
byte[] body = bodyBytes;
return new HttpResponse() {
@Override
public int getStatusCode() {
return response.statusCode();
return statusCode;
}

@Override
public String getStatusMessage() {
return String.valueOf(response.statusCode());
return String.valueOf(statusCode);
}

@Override
public byte[] getResponseBody() {
return response.body();
return body;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) {
httpSenderConfig.getRetryPolicy(),
httpSenderConfig.getProxyOptions(),
httpSenderConfig.getSslContext(),
httpSenderConfig.getExecutorService());
httpSenderConfig.getExecutorService(),
httpSenderConfig.getMaxResponseBodySize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void setup() throws IOException, InterruptedException {
Duration.ofSeconds(10),
Collections::emptyMap,
RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(),
null);
null,
Long.MAX_VALUE);
Comment thread
jack-berg marked this conversation as resolved.
}

@Test
Expand Down Expand Up @@ -121,7 +122,8 @@ void sendInternal_RetryableConnectException() throws IOException, InterruptedExc
Duration.ofSeconds(10),
Collections::emptyMap,
RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(),
null);
null,
Long.MAX_VALUE);

assertThatThrownBy(() -> sender.sendInternal(new NoOpRequestBodyWriter()))
.satisfies(
Expand Down Expand Up @@ -177,7 +179,8 @@ void connectTimeout() {
null,
null,
null,
null);
null,
Long.MAX_VALUE);

assertThat(sender)
.extracting("client", as(InstanceOfAssertFactories.type(HttpClient.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public final class OkHttpGrpcSender implements GrpcSender {
private final HttpUrl url;
@Nullable private final Compressor compressor;
private final Supplier<Map<String, List<String>>> headersSupplier;
private final long maxResponseBodySize;

/** Creates a new {@link OkHttpGrpcSender}. */
@SuppressWarnings("TooManyParameters")
Expand All @@ -95,7 +96,8 @@ public OkHttpGrpcSender(
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager,
@Nullable ExecutorService executorService) {
@Nullable ExecutorService executorService,
long maxResponseBodySize) {
int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE);
int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE);

Expand Down Expand Up @@ -133,6 +135,7 @@ public OkHttpGrpcSender(
this.compressor = compressor;
this.headersSupplier = headersSupplier;
this.url = HttpUrl.get(endpoint);
this.maxResponseBodySize = maxResponseBodySize;
}

@Override
Expand Down Expand Up @@ -169,7 +172,15 @@ public void onResponse(Call call, Response response) {
// Must consume body before accessing trailers
byte[] bodyBytes = null;
try {
bodyBytes = getResponseMessageBytes(body.bytes());
Buffer buffer = new Buffer();
while (buffer.size() < maxResponseBodySize) {
long n =
body.source().read(buffer, maxResponseBodySize - buffer.size());
if (n == -1L) {
break;
}
}
bodyBytes = getResponseMessageBytes(buffer.readByteArray());
} catch (IOException e) {
bodyBytes = new byte[0];
logger.log(Level.FINE, "Failed to read response body", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public GrpcSender createSender(GrpcSenderConfig grpcSenderConfig) {
grpcSenderConfig.getRetryPolicy(),
grpcSenderConfig.getSslContext(),
grpcSenderConfig.getTrustManager(),
grpcSenderConfig.getExecutorService());
grpcSenderConfig.getExecutorService(),
grpcSenderConfig.getMaxResponseBodySize());
}
}
Loading
Loading