diff --git a/opamp-client/build.gradle.kts b/opamp-client/build.gradle.kts index a1f0c698e..6045e6add 100644 --- a/opamp-client/build.gradle.kts +++ b/opamp-client/build.gradle.kts @@ -12,8 +12,10 @@ description = "Client implementation of the OpAMP spec." otelJava.moduleName.set("io.opentelemetry.contrib.opamp.client") dependencies { + implementation("com.squareup.okhttp3:okhttp") annotationProcessor("com.google.auto.value:auto-value") compileOnly("com.google.auto.value:auto-value-annotations") + testImplementation("org.mockito:mockito-inline") } val opampReleaseInfo = tasks.register("opampLastReleaseInfo") { diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpErrorException.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpErrorException.java new file mode 100644 index 000000000..c1104118c --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpErrorException.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.http; + +public class HttpErrorException extends Exception { + private final int errorCode; + + private static final long serialVersionUID = 1L; + + public int getErrorCode() { + return errorCode; + } + + /** + * Constructs an HTTP error related exception. + * + * @param errorCode The HTTP error code. + * @param message The HTTP error message associated with the code. + */ + public HttpErrorException(int errorCode, String message) { + super(message); + this.errorCode = errorCode; + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java new file mode 100644 index 000000000..704d172ac --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.http; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.CompletableFuture; + +public interface HttpSender { + + CompletableFuture send(BodyWriter writer, int contentLength); + + interface BodyWriter { + void writeTo(OutputStream outputStream) throws IOException; + } + + interface Response extends Closeable { + int statusCode(); + + String statusMessage(); + + InputStream bodyInputStream(); + + String getHeader(String name); + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java new file mode 100644 index 000000000..3e8f9a174 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java @@ -0,0 +1,135 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.http; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.RequestBody; +import okio.BufferedSink; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public final class OkHttpSender implements HttpSender { + private final OkHttpClient client; + private final String url; + + public static OkHttpSender create(String url) { + return create(url, new OkHttpClient()); + } + + public static OkHttpSender create(String url, OkHttpClient client) { + return new OkHttpSender(url, client); + } + + private static final String CONTENT_TYPE = "application/x-protobuf"; + private static final MediaType MEDIA_TYPE = MediaType.parse(CONTENT_TYPE); + + private OkHttpSender(String url, OkHttpClient client) { + this.url = url; + this.client = client; + } + + @Override + public CompletableFuture send(BodyWriter writer, int contentLength) { + CompletableFuture future = new CompletableFuture<>(); + okhttp3.Request.Builder builder = new okhttp3.Request.Builder().url(url); + builder.addHeader("Content-Type", CONTENT_TYPE); + + RequestBody body = new RawRequestBody(writer, contentLength, MEDIA_TYPE); + builder.post(body); + + client + .newCall(builder.build()) + .enqueue( + new Callback() { + @Override + public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) { + if (response.isSuccessful() && response.body() != null) { + future.complete(new OkHttpResponse(response)); + } else { + future.completeExceptionally( + new HttpErrorException(response.code(), response.message())); + } + } + + @Override + public void onFailure(@NotNull Call call, @NotNull IOException e) { + future.completeExceptionally(e); + } + }); + + return future; + } + + private static class OkHttpResponse implements Response { + private final okhttp3.Response response; + + private OkHttpResponse(okhttp3.Response response) { + if (response.body() == null) { + throw new IllegalStateException("null response body not expected"); + } + this.response = response; + } + + @Override + public int statusCode() { + return response.code(); + } + + @Override + public String statusMessage() { + return response.message(); + } + + @Override + public InputStream bodyInputStream() { + return response.body().byteStream(); + } + + @Override + public String getHeader(String name) { + return response.headers().get(name); + } + + @Override + public void close() { + response.close(); + } + } + + private static class RawRequestBody extends RequestBody { + private final BodyWriter writer; + private final int contentLength; + private final MediaType contentType; + + private RawRequestBody(BodyWriter writer, int contentLength, MediaType contentType) { + this.writer = writer; + this.contentLength = contentLength; + this.contentType = contentType; + } + + @Nullable + @Override + public MediaType contentType() { + return contentType; + } + + @Override + public long contentLength() { + return contentLength; + } + + @Override + public void writeTo(@NotNull BufferedSink bufferedSink) throws IOException { + writer.writeTo(bufferedSink.outputStream()); + } + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/RetryAfterParser.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/RetryAfterParser.java new file mode 100644 index 000000000..6cc2d1e3b --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/RetryAfterParser.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.http; + +import io.opentelemetry.opamp.client.internal.tools.SystemTime; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.Optional; +import java.util.regex.Pattern; + +public final class RetryAfterParser { + private final SystemTime systemTime; + private static final Pattern SECONDS_PATTERN = Pattern.compile("\\d+"); + private static final Pattern DATE_PATTERN = + Pattern.compile( + "[A-Za-z]{3}, [0-3][0-9] [A-Za-z]{3} [0-9]{4} [0-2][0-9]:[0-5][0-9]:[0-5][0-9] GMT"); + private static final DateTimeFormatter DATE_FORMAT = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss z", Locale.US); + + public static RetryAfterParser getInstance() { + return new RetryAfterParser(SystemTime.getInstance()); + } + + RetryAfterParser(SystemTime systemTime) { + this.systemTime = systemTime; + } + + public Optional tryParse(String value) { + Duration duration = null; + if (SECONDS_PATTERN.matcher(value).matches()) { + duration = Duration.ofSeconds(Long.parseLong(value)); + } else if (DATE_PATTERN.matcher(value).matches()) { + long difference = toMilliseconds(value) - systemTime.getCurrentTimeMillis(); + if (difference > 0) { + duration = Duration.ofMillis(difference); + } + } + return Optional.ofNullable(duration); + } + + private static long toMilliseconds(String value) { + return ZonedDateTime.parse(value, DATE_FORMAT).toInstant().toEpochMilli(); + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/Request.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/Request.java new file mode 100644 index 000000000..d6a3c7aed --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/Request.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request; + +import com.google.auto.value.AutoValue; +import opamp.proto.AgentToServer; + +/** Wrapper class for "AgentToServer" request body. */ +@AutoValue +public abstract class Request { + public abstract AgentToServer getAgentToServer(); + + public static Request create(AgentToServer agentToServer) { + return new AutoValue_Request(agentToServer); + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/AcceptsDelaySuggestion.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/AcceptsDelaySuggestion.java new file mode 100644 index 000000000..f53b00d45 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/AcceptsDelaySuggestion.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.delay; + +import java.time.Duration; + +/** + * A {@link PeriodicDelay} implementation that wants to accept delay time suggestions, as explained + * here, + * must implement this interface. + */ +public interface AcceptsDelaySuggestion { + void suggestDelay(Duration delay); +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/FixedPeriodicDelay.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/FixedPeriodicDelay.java new file mode 100644 index 000000000..2b54180e5 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/FixedPeriodicDelay.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.delay; + +import java.time.Duration; + +final class FixedPeriodicDelay implements PeriodicDelay { + private final Duration duration; + + public FixedPeriodicDelay(Duration duration) { + this.duration = duration; + } + + @Override + public Duration getNextDelay() { + return duration; + } + + @Override + public void reset() {} +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/PeriodicDelay.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/PeriodicDelay.java new file mode 100644 index 000000000..67aa93491 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/delay/PeriodicDelay.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.delay; + +import java.time.Duration; + +public interface PeriodicDelay { + static PeriodicDelay ofFixedDuration(Duration duration) { + return new FixedPeriodicDelay(duration); + } + + Duration getNextDelay(); + + void reset(); +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java new file mode 100644 index 000000000..32d5a60f1 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java @@ -0,0 +1,273 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.service; + +import io.opentelemetry.opamp.client.internal.connectivity.http.HttpErrorException; +import io.opentelemetry.opamp.client.internal.connectivity.http.HttpSender; +import io.opentelemetry.opamp.client.internal.connectivity.http.RetryAfterParser; +import io.opentelemetry.opamp.client.internal.request.Request; +import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; +import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay; +import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError; +import io.opentelemetry.opamp.client.internal.response.Response; +import java.io.IOException; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import opamp.proto.AgentToServer; +import opamp.proto.ServerErrorResponse; +import opamp.proto.ServerErrorResponseType; +import opamp.proto.ServerToAgent; + +public final class HttpRequestService implements RequestService { + private final HttpSender requestSender; + // must be a single threaded executor, the code in this class relies on requests being processed + // serially + private final ScheduledExecutorService executorService; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicBoolean hasStopped = new AtomicBoolean(false); + private final ConnectionStatus connectionStatus; + private final AtomicReference> scheduledTask = new AtomicReference<>(); + private final RetryAfterParser retryAfterParser; + @Nullable private Callback callback; + @Nullable private Supplier requestSupplier; + public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_REQUESTS = + PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); + + /** + * Creates an {@link HttpRequestService}. + * + * @param requestSender The HTTP sender implementation. + */ + public static HttpRequestService create(HttpSender requestSender) { + return create(requestSender, DEFAULT_DELAY_BETWEEN_REQUESTS, DEFAULT_DELAY_BETWEEN_REQUESTS); + } + + /** + * Creates an {@link HttpRequestService}. + * + * @param requestSender The HTTP sender implementation. + * @param periodicRequestDelay The time to wait between requests in general. + * @param periodicRetryDelay The time to wait between retries. + */ + public static HttpRequestService create( + HttpSender requestSender, + PeriodicDelay periodicRequestDelay, + PeriodicDelay periodicRetryDelay) { + return new HttpRequestService( + requestSender, + Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory()), + periodicRequestDelay, + periodicRetryDelay, + RetryAfterParser.getInstance()); + } + + HttpRequestService( + HttpSender requestSender, + ScheduledExecutorService executorService, + PeriodicDelay periodicRequestDelay, + PeriodicDelay periodicRetryDelay, + RetryAfterParser retryAfterParser) { + this.requestSender = requestSender; + this.executorService = executorService; + this.retryAfterParser = retryAfterParser; + this.connectionStatus = new ConnectionStatus(periodicRequestDelay, periodicRetryDelay); + } + + @Override + public void start(Callback callback, Supplier requestSupplier) { + if (hasStopped.get()) { + throw new IllegalStateException("HttpRequestService cannot start after it has been stopped."); + } + if (isRunning.compareAndSet(false, true)) { + this.callback = callback; + this.requestSupplier = requestSupplier; + scheduleNextExecution(); + } else { + throw new IllegalStateException("HttpRequestService is already running"); + } + } + + @Override + public void stop() { + if (isRunning.compareAndSet(true, false)) { + hasStopped.set(true); + executorService.shutdown(); + } + } + + @Override + public void sendRequest() { + if (!isRunning.get()) { + throw new IllegalStateException("HttpRequestService is not running"); + } + + executorService.execute( + () -> { + // cancel the already scheduled task, a new one is created after current request is + // processed + ScheduledFuture scheduledFuture = scheduledTask.get(); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + sendAndScheduleNext(); + }); + } + + private void sendAndScheduleNext() { + doSendRequest(); + scheduleNextExecution(); + } + + private void scheduleNextExecution() { + scheduledTask.set( + executorService.schedule( + this::sendAndScheduleNext, + connectionStatus.getNextDelay().toNanos(), + TimeUnit.NANOSECONDS)); + } + + private void doSendRequest() { + AgentToServer agentToServer = Objects.requireNonNull(requestSupplier).get().getAgentToServer(); + + byte[] data = agentToServer.encodeByteString().toByteArray(); + CompletableFuture future = + requestSender.send(outputStream -> outputStream.write(data), data.length); + try (HttpSender.Response response = future.get(30, TimeUnit.SECONDS)) { + getCallback().onConnectionSuccess(); + if (isSuccessful(response)) { + handleHttpSuccess( + Response.create(ServerToAgent.ADAPTER.decode(response.bodyInputStream()))); + } else { + handleHttpError(response); + } + } catch (IOException | InterruptedException | TimeoutException e) { + getCallback().onConnectionFailed(e); + } catch (ExecutionException e) { + if (e.getCause() != null) { + getCallback().onConnectionFailed(e.getCause()); + } else { + getCallback().onConnectionFailed(e); + } + } + } + + private void handleHttpError(HttpSender.Response response) { + int errorCode = response.statusCode(); + getCallback().onRequestFailed(new HttpErrorException(errorCode, response.statusMessage())); + + if (errorCode == 503 || errorCode == 429) { + String retryAfterHeader = response.getHeader("Retry-After"); + Duration retryAfter = null; + if (retryAfterHeader != null) { + Optional duration = retryAfterParser.tryParse(retryAfterHeader); + if (duration.isPresent()) { + retryAfter = duration.get(); + } + } + connectionStatus.retryAfter(retryAfter); + } + } + + private static boolean isSuccessful(HttpSender.Response response) { + return response.statusCode() >= 200 && response.statusCode() < 300; + } + + private void handleHttpSuccess(Response response) { + connectionStatus.success(); + ServerToAgent serverToAgent = response.getServerToAgent(); + + if (serverToAgent.error_response != null) { + handleErrorResponse(serverToAgent.error_response); + } else { + getCallback().onRequestSuccess(response); + } + } + + private void handleErrorResponse(ServerErrorResponse errorResponse) { + if (errorResponse.type.equals(ServerErrorResponseType.ServerErrorResponseType_Unavailable)) { + Duration retryAfter = null; + if (errorResponse.retry_info != null) { + retryAfter = Duration.ofNanos(errorResponse.retry_info.retry_after_nanoseconds); + } + connectionStatus.retryAfter(retryAfter); + } + getCallback().onRequestFailed(new OpampServerResponseError(errorResponse.error_message)); + } + + private Callback getCallback() { + return Objects.requireNonNull(callback); + } + + // this class is only used from a single threaded ScheduledExecutorService, hence no + // synchronization is needed + private static class ConnectionStatus { + private final PeriodicDelay periodicRequestDelay; + private final PeriodicDelay periodicRetryDelay; + + private boolean retrying; + private PeriodicDelay currentDelay; + + ConnectionStatus(PeriodicDelay periodicRequestDelay, PeriodicDelay periodicRetryDelay) { + this.periodicRequestDelay = periodicRequestDelay; + this.periodicRetryDelay = periodicRetryDelay; + currentDelay = periodicRequestDelay; + } + + void success() { + // after successful request transition from retry to regular delay + if (retrying) { + retrying = false; + periodicRequestDelay.reset(); + currentDelay = periodicRequestDelay; + } + } + + void retryAfter(@Nullable Duration retryAfter) { + // after failed request transition from regular to retry delay + if (!retrying) { + retrying = true; + periodicRetryDelay.reset(); + currentDelay = periodicRetryDelay; + if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) { + ((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter); + } + } + } + + Duration getNextDelay() { + return currentDelay.getNextDelay(); + } + } + + private static class DaemonThreadFactory implements ThreadFactory { + private final ThreadFactory delegate = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(@Nonnull Runnable r) { + Thread t = delegate.newThread(r); + try { + t.setDaemon(true); + } catch (SecurityException e) { + // Well, we tried. + } + return t; + } + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/RequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/RequestService.java new file mode 100644 index 000000000..ee47e4249 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/RequestService.java @@ -0,0 +1,76 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.service; + +import io.opentelemetry.opamp.client.internal.OpampClient; +import io.opentelemetry.opamp.client.internal.request.Request; +import io.opentelemetry.opamp.client.internal.response.Response; +import java.util.function.Supplier; + +/** + * Handles the network connectivity in general, its implementation can choose what protocol to use + * (HTTP or WebSocket) and should provide the necessary configurations options depending on the + * case. There are 2 implementations ready to use, {@link HttpRequestService}, for using HTTP, and + * {@link WebSocketRequestService} for using WebSocket. The {@link OpampClient} must not be aware of + * the specific implementation it uses as it can expect the same behavior from either. + */ +public interface RequestService { + + /** + * Starts the service. The actions done in this method depend on the implementation. For HTTP this + * is where the periodic poll task should get started, whereas for WebSocket this is where the + * connectivity is started. + * + * @param callback This is the only way that the service can communicate back to the {@link + * OpampClient} implementation. + * @param requestSupplier This supplier must be queried every time a new request is about to be + * sent. + */ + void start(Callback callback, Supplier requestSupplier); + + /** Triggers a new request send. */ + void sendRequest(); + + /** + * Clears the service for good. No further calls to {@link #sendRequest()} can be made after this + * method is called. + */ + void stop(); + + /** Allows the service to talk back to the {@link OpampClient} implementation. */ + interface Callback { + /** + * For WebSocket implementations, this is called when the connection is established. For HTTP + * implementations, this is called on every HTTP request that ends successfully. + */ + void onConnectionSuccess(); + + /** + * For WebSocket implementations, this is called when the connection cannot be made or is lost. + * For HTTP implementations, this is called on every HTTP request that cannot get a response. + * + * @param throwable The detailed error. + */ + void onConnectionFailed(Throwable throwable); + + /** + * For WebSocket implementations, this is called every time there's a new message from the + * server. For HTTP implementations, this is called when a successful HTTP request is finished + * with a valid server to agent response body. + * + * @param response The server to agent message. + */ + void onRequestSuccess(Response response); + + /** + * For both HTTP and WebSocket implementations, this is called when an attempt at sending a + * message fails. + * + * @param throwable The detailed error. + */ + void onRequestFailed(Throwable throwable); + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/response/OpampServerResponseError.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/response/OpampServerResponseError.java new file mode 100644 index 000000000..08ad6a4d2 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/response/OpampServerResponseError.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.response; + +public class OpampServerResponseError extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an OpAMP error related exception. + * + * @param message The OpAMP error message. + */ + public OpampServerResponseError(String message) { + super(message); + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/response/Response.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/response/Response.java new file mode 100644 index 000000000..c9b6bc19e --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/response/Response.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.response; + +import com.google.auto.value.AutoValue; +import opamp.proto.ServerToAgent; + +@AutoValue +public abstract class Response { + public abstract ServerToAgent getServerToAgent(); + + public static Response create(ServerToAgent serverToAgent) { + return new AutoValue_Response(serverToAgent); + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/tools/SystemTime.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/tools/SystemTime.java new file mode 100644 index 000000000..1d7f9e61e --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/tools/SystemTime.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.tools; + +/** Utility to be able to mock the current system time for testing purposes. */ +public final class SystemTime { + private static final SystemTime INSTANCE = new SystemTime(); + + public static SystemTime getInstance() { + return INSTANCE; + } + + private SystemTime() {} + + public long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } +} diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/http/RetryAfterParserTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/http/RetryAfterParserTest.java new file mode 100644 index 000000000..ece1ff0a7 --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/http/RetryAfterParserTest.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.http; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.opamp.client.internal.tools.SystemTime; +import java.time.Duration; +import org.junit.jupiter.api.Test; + +class RetryAfterParserTest { + + @Test + void verifyParsing() { + SystemTime systemTime = mock(); + long currentTimeMillis = 1577836800000L; // Wed, 01 Jan 2020 00:00:00 GMT + when(systemTime.getCurrentTimeMillis()).thenReturn(currentTimeMillis); + + RetryAfterParser parser = new RetryAfterParser(systemTime); + + assertThat(parser.tryParse("123")).get().isEqualTo(Duration.ofSeconds(123)); + assertThat(parser.tryParse("Wed, 01 Jan 2020 01:00:00 GMT")) + .get() + .isEqualTo(Duration.ofHours(1)); + + // Check when provided time is older than the current one + assertThat(parser.tryParse("Tue, 31 Dec 2019 23:00:00 GMT")).isNotPresent(); + } +} diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java new file mode 100644 index 000000000..63439e8c6 --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -0,0 +1,492 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.opamp.client.internal.connectivity.http.HttpErrorException; +import io.opentelemetry.opamp.client.internal.connectivity.http.HttpSender; +import io.opentelemetry.opamp.client.internal.connectivity.http.RetryAfterParser; +import io.opentelemetry.opamp.client.internal.request.Request; +import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; +import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay; +import io.opentelemetry.opamp.client.internal.response.Response; +import java.io.ByteArrayInputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import opamp.proto.AgentToServer; +import opamp.proto.RetryInfo; +import opamp.proto.ServerErrorResponse; +import opamp.proto.ServerErrorResponseType; +import opamp.proto.ServerToAgent; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@SuppressWarnings("unchecked") +@ExtendWith(MockitoExtension.class) +class HttpRequestServiceTest { + + private static final Duration REGULAR_DELAY = Duration.ofSeconds(1); + private static final Duration RETRY_DELAY = Duration.ofSeconds(5); + + @Mock private RequestService.Callback callback; + private final List scheduledTasks = new ArrayList<>(); + private ScheduledExecutorService executorService; + private TestHttpSender requestSender; + private PeriodicDelay periodicRequestDelay; + private PeriodicDelayWithSuggestion periodicRetryDelay; + private int requestSize = -1; + private HttpRequestService httpRequestService; + + @BeforeEach + void setUp() { + requestSender = new TestHttpSender(); + periodicRequestDelay = createPeriodicDelay(REGULAR_DELAY); + periodicRetryDelay = createPeriodicDelayWithSuggestionSupport(RETRY_DELAY); + executorService = createTestScheduleExecutorService(); + httpRequestService = + new HttpRequestService( + requestSender, + executorService, + periodicRequestDelay, + periodicRetryDelay, + RetryAfterParser.getInstance()); + httpRequestService.start(callback, this::createRequestSupplier); + } + + @AfterEach + void tearDown() { + httpRequestService.stop(); + scheduledTasks.clear(); + verify(executorService).shutdown(); + } + + @Test + void verifyStart_scheduledFirstTask() { + assertThat(scheduledTasks).hasSize(1); + ScheduledTask firstTask = scheduledTasks.get(0); + assertThat(firstTask.delay).isEqualTo(REGULAR_DELAY); + + // Verify initial task creates next one + scheduledTasks.clear(); + requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build())); + firstTask.run(); + + assertThat(scheduledTasks).hasSize(1); + + // Check on-demand requests don't create subsequent tasks + requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build())); + httpRequestService.sendRequest(); + + assertThat(scheduledTasks).hasSize(1); + } + + @Test + void verifySendingRequest_happyPath() { + ServerToAgent serverToAgent = new ServerToAgent.Builder().build(); + HttpSender.Response httpResponse = createSuccessfulResponse(serverToAgent); + requestSender.enqueueResponse(httpResponse); + + httpRequestService.sendRequest(); + + verifySingleRequestSent(); + verifyRequestSuccessCallback(serverToAgent); + verify(callback).onConnectionSuccess(); + } + + @Test + void verifyWhenSendingOnDemandRequest_andDelayChanges() { + // Initial state + assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY); + + // Trigger delay strategy change + requestSender.enqueueResponse(createFailedResponse(503)); + httpRequestService.sendRequest(); + + // Expected state + assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(RETRY_DELAY); + } + + @Test + void verifySendingRequest_whenTheresAParsingError() { + HttpSender.Response httpResponse = createSuccessfulResponse(new byte[] {1, 2, 3}); + requestSender.enqueueResponse(httpResponse); + + httpRequestService.sendRequest(); + + verifySingleRequestSent(); + verify(callback).onConnectionFailed(any()); + } + + @Test + void verifySendingRequest_whenThereIsAnExecutionError() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = mock(); + requestSender.enqueueResponseFuture(future); + Exception myException = mock(); + doThrow(new ExecutionException(myException)).when(future).get(30, TimeUnit.SECONDS); + + httpRequestService.sendRequest(); + + verifySingleRequestSent(); + verify(callback).onConnectionFailed(myException); + } + + @Test + void verifySendingRequest_whenThereIsAnInterruptedException() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = mock(); + requestSender.enqueueResponseFuture(future); + InterruptedException myException = mock(); + doThrow(myException).when(future).get(30, TimeUnit.SECONDS); + + httpRequestService.sendRequest(); + + verifySingleRequestSent(); + verify(callback).onConnectionFailed(myException); + } + + @Test + void verifySendingRequest_whenThereIsAGenericHttpError() { + requestSender.enqueueResponse(createFailedResponse(500)); + + httpRequestService.sendRequest(); + + verifySingleRequestSent(); + verifyRequestFailedCallback(500); + } + + @Test + void verifySendingRequest_whenThereIsATooManyRequestsError() { + verifyRetryDelayOnError(createFailedResponse(429), RETRY_DELAY); + } + + @Test + void verifySendingRequest_whenThereIsATooManyRequestsError_withSuggestedDelay() { + HttpSender.Response response = createFailedResponse(429); + when(response.getHeader("Retry-After")).thenReturn("5"); + + verifyRetryDelayOnError(response, Duration.ofSeconds(5)); + } + + @Test + void verifySendingRequest_whenServerProvidesRetryInfo() { + long nanosecondsToWaitForRetry = 1000; + ServerErrorResponse errorResponse = + new ServerErrorResponse.Builder() + .type(ServerErrorResponseType.ServerErrorResponseType_Unavailable) + .retry_info( + new RetryInfo.Builder().retry_after_nanoseconds(nanosecondsToWaitForRetry).build()) + .build(); + ServerToAgent serverToAgent = new ServerToAgent.Builder().error_response(errorResponse).build(); + HttpSender.Response response = createSuccessfulResponse(serverToAgent); + + verifyRetryDelayOnError(response, Duration.ofNanos(nanosecondsToWaitForRetry)); + } + + @Test + void verifySendingRequest_whenServerIsUnavailable() { + ServerErrorResponse errorResponse = + new ServerErrorResponse.Builder() + .type(ServerErrorResponseType.ServerErrorResponseType_Unavailable) + .build(); + ServerToAgent serverToAgent = new ServerToAgent.Builder().error_response(errorResponse).build(); + HttpSender.Response response = createSuccessfulResponse(serverToAgent); + + verifyRetryDelayOnError(response, RETRY_DELAY); + } + + @Test + void verifySendingRequest_whenThereIsAServiceUnavailableError() { + verifyRetryDelayOnError(createFailedResponse(503), RETRY_DELAY); + } + + @Test + void verifySendingRequest_whenThereIsAServiceUnavailableError_withSuggestedDelay() { + HttpSender.Response response = createFailedResponse(503); + when(response.getHeader("Retry-After")).thenReturn("2"); + + verifyRetryDelayOnError(response, Duration.ofSeconds(2)); + } + + @Test + void verifySendingRequest_duringRegularMode() { + requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build())); + + httpRequestService.sendRequest(); + + verifySingleRequestSent(); + } + + private void verifyRetryDelayOnError( + HttpSender.Response errorResponse, Duration expectedRetryDelay) { + requestSender.enqueueResponse(errorResponse); + ScheduledTask previousTask = assertAndGetSingleCurrentTask(); + + previousTask.run(); + + verifySingleRequestSent(); + verify(periodicRetryDelay).reset(); + verify(callback).onRequestFailed(any()); + ScheduledTask retryTask = assertAndGetSingleCurrentTask(); + assertThat(retryTask.delay).isEqualTo(expectedRetryDelay); + + // Retry with another error + clearInvocations(callback); + scheduledTasks.clear(); + requestSender.enqueueResponse(createFailedResponse(500)); + retryTask.run(); + + verifySingleRequestSent(); + verify(callback).onRequestFailed(any()); + ScheduledTask retryTask2 = assertAndGetSingleCurrentTask(); + assertThat(retryTask2.delay).isEqualTo(expectedRetryDelay); + + // Retry with a success + clearInvocations(callback); + scheduledTasks.clear(); + ServerToAgent serverToAgent = new ServerToAgent.Builder().build(); + requestSender.enqueueResponse(createSuccessfulResponse(serverToAgent)); + retryTask2.run(); + + verify(periodicRequestDelay).reset(); + verifySingleRequestSent(); + verifyRequestSuccessCallback(serverToAgent); + assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY); + } + + private Request createRequestSupplier() { + AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build(); + requestSize = agentToServer.encodeByteString().size(); + return Request.create(agentToServer); + } + + private ScheduledTask assertAndGetSingleCurrentTask() { + assertThat(scheduledTasks).hasSize(1); + return scheduledTasks.get(0); + } + + private void verifySingleRequestSent() { + List requests = requestSender.getRequests(1); + assertThat(requests.get(0).contentLength).isEqualTo(requestSize); + } + + private void verifyRequestSuccessCallback(ServerToAgent serverToAgent) { + verify(callback).onRequestSuccess(Response.create(serverToAgent)); + } + + private void verifyRequestFailedCallback(int errorCode) { + ArgumentCaptor captor = ArgumentCaptor.forClass(HttpErrorException.class); + verify(callback).onRequestFailed(captor.capture()); + assertThat(captor.getValue().getErrorCode()).isEqualTo(errorCode); + assertThat(captor.getValue().getMessage()).isEqualTo("Error message"); + } + + private ScheduledExecutorService createTestScheduleExecutorService() { + ScheduledExecutorService service = mock(); + + lenient() + .doAnswer( + invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }) + .when(service) + .execute(any()); + + when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer( + invocation -> { + ScheduledTask task = + new ScheduledTask(invocation.getArgument(0), invocation.getArgument(1)); + + scheduledTasks.add(task); + + return task; + }); + + return service; + } + + private static HttpSender.Response createSuccessfulResponse(ServerToAgent serverToAgent) { + return createSuccessfulResponse(serverToAgent.encodeByteString().toByteArray()); + } + + private static HttpSender.Response createSuccessfulResponse(byte[] serverToAgent) { + HttpSender.Response response = mock(); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serverToAgent); + when(response.statusCode()).thenReturn(200); + when(response.bodyInputStream()).thenReturn(byteArrayInputStream); + return response; + } + + private static HttpSender.Response createFailedResponse(int statusCode) { + HttpSender.Response response = mock(); + when(response.statusCode()).thenReturn(statusCode); + when(response.statusMessage()).thenReturn("Error message"); + return response; + } + + private static PeriodicDelay createPeriodicDelay(Duration delay) { + PeriodicDelay mock = mock(); + when(mock.getNextDelay()).thenReturn(delay); + return mock; + } + + private static PeriodicDelayWithSuggestion createPeriodicDelayWithSuggestionSupport( + Duration delay) { + return spy(new PeriodicDelayWithSuggestion(delay)); + } + + private static class PeriodicDelayWithSuggestion + implements PeriodicDelay, AcceptsDelaySuggestion { + private final Duration initialDelay; + private Duration currentDelay; + + private PeriodicDelayWithSuggestion(Duration initialDelay) { + this.initialDelay = initialDelay; + currentDelay = initialDelay; + } + + @Override + public void suggestDelay(Duration delay) { + currentDelay = delay; + } + + @Override + public Duration getNextDelay() { + return currentDelay; + } + + @Override + public void reset() { + currentDelay = initialDelay; + } + } + + private static class TestHttpSender implements HttpSender { + private final List requests = new ArrayList<>(); + + @SuppressWarnings("JdkObsolete") + private final Queue> responses = new LinkedList<>(); + + @Override + public CompletableFuture send(BodyWriter writer, int contentLength) { + requests.add(new RequestParams(contentLength)); + CompletableFuture response = null; + try { + response = responses.remove(); + } catch (NoSuchElementException e) { + fail("Unwanted triggered request"); + } + return response; + } + + public void enqueueResponse(HttpSender.Response response) { + enqueueResponseFuture(CompletableFuture.completedFuture(response)); + } + + public void enqueueResponseFuture(CompletableFuture future) { + responses.add(future); + } + + public List getRequests(int size) { + assertThat(requests).hasSize(size); + List immutableRequests = + Collections.unmodifiableList(new ArrayList<>(requests)); + requests.clear(); + return immutableRequests; + } + + private static class RequestParams { + public final int contentLength; + + private RequestParams(int contentLength) { + this.contentLength = contentLength; + } + } + } + + private class ScheduledTask implements ScheduledFuture { + private final Runnable runnable; + private final Duration delay; + + public void run() { + get(); + } + + private ScheduledTask(Runnable runnable, long timeNanos) { + this.runnable = runnable; + this.delay = Duration.ofNanos(timeNanos); + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return scheduledTasks.remove(this); + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException(); + } + + @Override + public Object get() { + scheduledTasks.remove(this); + runnable.run(); + return null; + } + + @Override + public Object get(long timeout, @NotNull TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(@NotNull Delayed o) { + throw new UnsupportedOperationException(); + } + } +}