Skip to content

Commit 225116e

Browse files
authored
[opamp-client] Remove CompletableFuture usage (#2810)
1 parent bf2571a commit 225116e

4 files changed

Lines changed: 45 additions & 64 deletions

File tree

opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@
99
import java.io.IOException;
1010
import java.io.InputStream;
1111
import java.io.OutputStream;
12-
import java.util.concurrent.CompletableFuture;
1312

1413
public interface HttpSender {
1514

16-
CompletableFuture<Response> send(BodyWriter writer, int contentLength);
15+
Response send(BodyWriter writer, int contentLength) throws IOException;
1716

1817
interface BodyWriter {
1918
void writeTo(OutputStream outputStream) throws IOException;

opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
import io.opentelemetry.api.internal.InstrumentationUtil;
99
import java.io.IOException;
1010
import java.io.InputStream;
11-
import java.util.concurrent.CompletableFuture;
11+
import java.time.Duration;
12+
import java.util.Objects;
13+
import java.util.concurrent.atomic.AtomicReference;
1214
import javax.annotation.Nullable;
13-
import okhttp3.Call;
14-
import okhttp3.Callback;
1515
import okhttp3.MediaType;
1616
import okhttp3.OkHttpClient;
1717
import okhttp3.Request;
@@ -21,6 +21,7 @@
2121
public final class OkHttpSender implements HttpSender {
2222
private final OkHttpClient client;
2323
private final String url;
24+
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(30);
2425

2526
public static OkHttpSender create(String url) {
2627
return create(url, new OkHttpClient());
@@ -35,40 +36,38 @@ public static OkHttpSender create(String url, OkHttpClient client) {
3536

3637
private OkHttpSender(String url, OkHttpClient client) {
3738
this.url = url;
38-
this.client = client;
39+
this.client = client.newBuilder().callTimeout(REQUEST_TIMEOUT).build();
3940
}
4041

4142
@Override
42-
public CompletableFuture<Response> send(BodyWriter writer, int contentLength) {
43-
CompletableFuture<Response> future = new CompletableFuture<>();
43+
public Response send(BodyWriter writer, int contentLength) throws IOException {
4444
Request.Builder builder = new Request.Builder().url(url);
4545
builder.addHeader("Content-Type", CONTENT_TYPE);
4646

4747
RequestBody body = new RawRequestBody(writer, contentLength, MEDIA_TYPE);
4848
builder.post(body);
4949

50+
AtomicReference<Response> responseRef = new AtomicReference<>();
51+
AtomicReference<IOException> errorRef = new AtomicReference<>();
5052
// By suppressing instrumentations, we prevent automatic instrumentations for the okhttp request
5153
// that polls the opamp server.
52-
InstrumentationUtil.suppressInstrumentation(() -> doSendRequest(builder.build(), future));
53-
54-
return future;
54+
InstrumentationUtil.suppressInstrumentation(
55+
() -> {
56+
try {
57+
responseRef.set(doSendRequest(builder.build()));
58+
} catch (IOException e) {
59+
errorRef.set(e);
60+
}
61+
});
62+
if (errorRef.get() != null) {
63+
throw errorRef.get();
64+
}
65+
return Objects.requireNonNull(responseRef.get());
5566
}
5667

57-
private void doSendRequest(Request request, CompletableFuture<Response> future) {
58-
client
59-
.newCall(request)
60-
.enqueue(
61-
new Callback() {
62-
@Override
63-
public void onResponse(Call call, okhttp3.Response response) {
64-
future.complete(new OkHttpResponse(response));
65-
}
66-
67-
@Override
68-
public void onFailure(Call call, IOException e) {
69-
future.completeExceptionally(e);
70-
}
71-
});
68+
private Response doSendRequest(Request request) throws IOException {
69+
okhttp3.Response response = client.newCall(request).execute();
70+
return new OkHttpResponse(response);
7271
}
7372

7473
private static class OkHttpResponse implements Response {

opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,10 @@
1919
import java.time.Duration;
2020
import java.util.Objects;
2121
import java.util.Optional;
22-
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.ExecutionException;
2422
import java.util.concurrent.Executors;
2523
import java.util.concurrent.ScheduledExecutorService;
2624
import java.util.concurrent.ScheduledFuture;
2725
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.TimeoutException;
2926
import java.util.concurrent.atomic.AtomicBoolean;
3027
import java.util.concurrent.atomic.AtomicReference;
3128
import java.util.function.Supplier;
@@ -149,26 +146,18 @@ private void doSendRequest() {
149146
AgentToServer agentToServer = Objects.requireNonNull(requestSupplier).get().getAgentToServer();
150147

151148
byte[] data = agentToServer.encodeByteString().toByteArray();
152-
CompletableFuture<HttpSender.Response> future =
153-
requestSender.send(outputStream -> outputStream.write(data), data.length);
154-
try (HttpSender.Response response = future.get(30, TimeUnit.SECONDS)) {
149+
try (HttpSender.Response response =
150+
requestSender.send(outputStream -> outputStream.write(data), data.length)) {
155151
if (isSuccessful(response)) {
156152
ServerToAgent serverToAgent = ServerToAgent.ADAPTER.decode(response.bodyInputStream());
157153
getCallback().onConnectionSuccess();
158154
handleHttpSuccess(Response.create(serverToAgent));
159155
} else {
160156
handleHttpError(response);
161157
}
162-
} catch (IOException | InterruptedException | TimeoutException e) {
158+
} catch (IOException e) {
163159
getCallback().onConnectionFailed(e);
164160
connectionStatus.retryAfter(null);
165-
} catch (ExecutionException e) {
166-
if (e.getCause() != null) {
167-
getCallback().onConnectionFailed(e.getCause());
168-
} else {
169-
getCallback().onConnectionFailed(e);
170-
}
171-
connectionStatus.retryAfter(null);
172161
} catch (RuntimeException e) {
173162
getCallback().onRequestFailed(e);
174163
connectionStatus.retryAfter(null);

opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import static org.assertj.core.api.Assertions.fail;
1111
import static org.mockito.ArgumentMatchers.any;
1212
import static org.mockito.Mockito.clearInvocations;
13-
import static org.mockito.Mockito.doThrow;
1413
import static org.mockito.Mockito.mock;
1514
import static org.mockito.Mockito.never;
1615
import static org.mockito.Mockito.spy;
@@ -26,6 +25,8 @@
2625
import io.opentelemetry.opamp.client.request.service.RequestService;
2726
import java.io.ByteArrayInputStream;
2827
import java.io.EOFException;
28+
import java.io.IOException;
29+
import java.net.SocketTimeoutException;
2930
import java.time.Duration;
3031
import java.util.ArrayList;
3132
import java.util.Arrays;
@@ -34,10 +35,6 @@
3435
import java.util.List;
3536
import java.util.NoSuchElementException;
3637
import java.util.Queue;
37-
import java.util.concurrent.CompletableFuture;
38-
import java.util.concurrent.ExecutionException;
39-
import java.util.concurrent.TimeUnit;
40-
import java.util.concurrent.TimeoutException;
4138
import opamp.proto.AgentToServer;
4239
import opamp.proto.RetryInfo;
4340
import opamp.proto.ServerErrorResponse;
@@ -194,12 +191,9 @@ void verifySendingRequest_whenTheresAParsingError() {
194191
}
195192

196193
@Test
197-
void verifySendingRequest_whenThereIsAnExecutionError()
198-
throws ExecutionException, InterruptedException, TimeoutException {
199-
CompletableFuture<HttpSender.Response> future = mock();
200-
requestSender.enqueueResponseFuture(future);
201-
Exception myException = mock();
202-
doThrow(new ExecutionException(myException)).when(future).get(30, TimeUnit.SECONDS);
194+
void verifySendingRequest_whenThereIsAnIOException() throws IOException {
195+
IOException myException = mock();
196+
requestSender.enqueueException(myException);
203197

204198
httpRequestService.sendRequest();
205199

@@ -208,12 +202,9 @@ void verifySendingRequest_whenThereIsAnExecutionError()
208202
}
209203

210204
@Test
211-
void verifySendingRequest_whenThereIsAnInterruptedException()
212-
throws ExecutionException, InterruptedException, TimeoutException {
213-
CompletableFuture<HttpSender.Response> future = mock();
214-
requestSender.enqueueResponseFuture(future);
215-
InterruptedException myException = mock();
216-
doThrow(myException).when(future).get(30, TimeUnit.SECONDS);
205+
void verifySendingRequest_whenThereIsATimeoutException() throws IOException {
206+
SocketTimeoutException myException = mock();
207+
requestSender.enqueueException(myException);
217208

218209
httpRequestService.sendRequest();
219210

@@ -406,26 +397,29 @@ private static class TestHttpSender implements HttpSender {
406397
private final List<RequestParams> requests = new ArrayList<>();
407398

408399
@SuppressWarnings("JdkObsolete")
409-
private final Queue<CompletableFuture<HttpSender.Response>> responses = new LinkedList<>();
400+
private final Queue<Object> responses = new LinkedList<>();
410401

411402
@Override
412-
public CompletableFuture<HttpSender.Response> send(BodyWriter writer, int contentLength) {
403+
public HttpSender.Response send(BodyWriter writer, int contentLength) throws IOException {
413404
requests.add(new RequestParams(contentLength));
414-
CompletableFuture<HttpSender.Response> response = null;
405+
Object response = null;
415406
try {
416407
response = responses.remove();
417408
} catch (NoSuchElementException e) {
418409
fail("Unwanted triggered request");
419410
}
420-
return response;
411+
if (response instanceof IOException) {
412+
throw (IOException) response;
413+
}
414+
return (HttpSender.Response) response;
421415
}
422416

423417
void enqueueResponse(HttpSender.Response response) {
424-
enqueueResponseFuture(CompletableFuture.completedFuture(response));
418+
responses.add(response);
425419
}
426420

427-
void enqueueResponseFuture(CompletableFuture<HttpSender.Response> future) {
428-
responses.add(future);
421+
void enqueueException(IOException exception) {
422+
responses.add(exception);
429423
}
430424

431425
List<RequestParams> getRequests(int size) {

0 commit comments

Comments
 (0)