Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;

public interface HttpSender {

CompletableFuture<Response> send(BodyWriter writer, int contentLength);
Response send(BodyWriter writer, int contentLength) throws IOException;
Comment thread
breedx-splk marked this conversation as resolved.

interface BodyWriter {
void writeTo(OutputStream outputStream) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import io.opentelemetry.api.internal.InstrumentationUtil;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -21,6 +21,7 @@
public final class OkHttpSender implements HttpSender {
private final OkHttpClient client;
private final String url;
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(30);

public static OkHttpSender create(String url) {
return create(url, new OkHttpClient());
Expand All @@ -35,40 +36,38 @@ public static OkHttpSender create(String url, OkHttpClient client) {

private OkHttpSender(String url, OkHttpClient client) {
this.url = url;
this.client = client;
this.client = client.newBuilder().callTimeout(REQUEST_TIMEOUT).build();
}

@Override
public CompletableFuture<Response> send(BodyWriter writer, int contentLength) {
CompletableFuture<Response> future = new CompletableFuture<>();
public Response send(BodyWriter writer, int contentLength) throws IOException {
Request.Builder builder = new Request.Builder().url(url);
builder.addHeader("Content-Type", CONTENT_TYPE);

RequestBody body = new RawRequestBody(writer, contentLength, MEDIA_TYPE);
builder.post(body);

AtomicReference<Response> responseRef = new AtomicReference<>();
AtomicReference<IOException> errorRef = new AtomicReference<>();
// By suppressing instrumentations, we prevent automatic instrumentations for the okhttp request
// that polls the opamp server.
InstrumentationUtil.suppressInstrumentation(() -> doSendRequest(builder.build(), future));

return future;
InstrumentationUtil.suppressInstrumentation(
() -> {
try {
responseRef.set(doSendRequest(builder.build()));
} catch (IOException e) {
errorRef.set(e);
}
});
if (errorRef.get() != null) {
throw errorRef.get();
}
return Objects.requireNonNull(responseRef.get());
}

private void doSendRequest(Request request, CompletableFuture<Response> future) {
client
.newCall(request)
.enqueue(
new Callback() {
@Override
public void onResponse(Call call, okhttp3.Response response) {
future.complete(new OkHttpResponse(response));
}

@Override
public void onFailure(Call call, IOException e) {
future.completeExceptionally(e);
}
});
private Response doSendRequest(Request request) throws IOException {
okhttp3.Response response = client.newCall(request).execute();
return new OkHttpResponse(response);
}

private static class OkHttpResponse implements Response {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -149,26 +146,18 @@ private void doSendRequest() {
AgentToServer agentToServer = Objects.requireNonNull(requestSupplier).get().getAgentToServer();

byte[] data = agentToServer.encodeByteString().toByteArray();
CompletableFuture<HttpSender.Response> future =
requestSender.send(outputStream -> outputStream.write(data), data.length);
try (HttpSender.Response response = future.get(30, TimeUnit.SECONDS)) {
try (HttpSender.Response response =
requestSender.send(outputStream -> outputStream.write(data), data.length)) {
if (isSuccessful(response)) {
ServerToAgent serverToAgent = ServerToAgent.ADAPTER.decode(response.bodyInputStream());
getCallback().onConnectionSuccess();
handleHttpSuccess(Response.create(serverToAgent));
} else {
handleHttpError(response);
}
} catch (IOException | InterruptedException | TimeoutException e) {
} catch (IOException e) {
getCallback().onConnectionFailed(e);
connectionStatus.retryAfter(null);
} catch (ExecutionException e) {
if (e.getCause() != null) {
getCallback().onConnectionFailed(e.getCause());
} else {
getCallback().onConnectionFailed(e);
}
connectionStatus.retryAfter(null);
} catch (RuntimeException e) {
getCallback().onRequestFailed(e);
connectionStatus.retryAfter(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand All @@ -26,6 +25,8 @@
import io.opentelemetry.opamp.client.request.service.RequestService;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -34,10 +35,6 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import opamp.proto.AgentToServer;
import opamp.proto.RetryInfo;
import opamp.proto.ServerErrorResponse;
Expand Down Expand Up @@ -194,12 +191,9 @@ void verifySendingRequest_whenTheresAParsingError() {
}

@Test
void verifySendingRequest_whenThereIsAnExecutionError()
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<HttpSender.Response> future = mock();
requestSender.enqueueResponseFuture(future);
Exception myException = mock();
doThrow(new ExecutionException(myException)).when(future).get(30, TimeUnit.SECONDS);
void verifySendingRequest_whenThereIsAnIOException() throws IOException {
IOException myException = mock();
requestSender.enqueueException(myException);

httpRequestService.sendRequest();

Expand All @@ -208,12 +202,9 @@ void verifySendingRequest_whenThereIsAnExecutionError()
}

@Test
void verifySendingRequest_whenThereIsAnInterruptedException()
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<HttpSender.Response> future = mock();
requestSender.enqueueResponseFuture(future);
InterruptedException myException = mock();
doThrow(myException).when(future).get(30, TimeUnit.SECONDS);
void verifySendingRequest_whenThereIsATimeoutException() throws IOException {
SocketTimeoutException myException = mock();
requestSender.enqueueException(myException);

httpRequestService.sendRequest();

Expand Down Expand Up @@ -406,26 +397,29 @@ private static class TestHttpSender implements HttpSender {
private final List<RequestParams> requests = new ArrayList<>();

@SuppressWarnings("JdkObsolete")
private final Queue<CompletableFuture<HttpSender.Response>> responses = new LinkedList<>();
private final Queue<Object> responses = new LinkedList<>();

@Override
public CompletableFuture<HttpSender.Response> send(BodyWriter writer, int contentLength) {
public HttpSender.Response send(BodyWriter writer, int contentLength) throws IOException {
requests.add(new RequestParams(contentLength));
CompletableFuture<HttpSender.Response> response = null;
Object response = null;
try {
response = responses.remove();
} catch (NoSuchElementException e) {
fail("Unwanted triggered request");
}
return response;
if (response instanceof IOException) {
throw (IOException) response;
}
return (HttpSender.Response) response;
}
Comment thread
breedx-splk marked this conversation as resolved.

void enqueueResponse(HttpSender.Response response) {
enqueueResponseFuture(CompletableFuture.completedFuture(response));
responses.add(response);
}

void enqueueResponseFuture(CompletableFuture<HttpSender.Response> future) {
responses.add(future);
void enqueueException(IOException exception) {
responses.add(exception);
}

List<RequestParams> getRequests(int size) {
Expand Down
Loading