diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b2aa2755b..be925a857 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -148,7 +148,7 @@ swagger-models = { module = "io.swagger.core.v3:swagger-models", version.ref = " swagger-annotations = { module = "io.swagger.core.v3:swagger-annotations", version.ref = "swagger" } s3client-minio = { module = "io.minio:minio", version = "8.5.17" } -s3client-aws = { module = "software.amazon.awssdk:s3", version = "2.29.52" } +s3client-aws = { module = "software.amazon.awssdk:s3", version = "2.42.35" } camunda7-engine = { module = "org.camunda.bpm:camunda-engine", version.ref = "camunda7" } camunda7-rest-jakarta = { module = "org.camunda.bpm:camunda-engine-rest-jakarta", version.ref = "camunda7" } diff --git a/s3/s3-client-aws/build.gradle b/s3/s3-client-aws/build.gradle new file mode 100644 index 000000000..5f99473b6 --- /dev/null +++ b/s3/s3-client-aws/build.gradle @@ -0,0 +1,16 @@ +dependencies { + annotationProcessor project(":config:config-annotation-processor") + + api project(":telemetry:telemetry-common") + api project(":http:http-client-common") + api project(":config:config-common") + api project(":common") + + api(libs.s3client.aws) { + exclude group: "software.amazon.awssdk", module: "apache-client" + exclude group: "software.amazon.awssdk", module: "netty-nio-client" + } + + testImplementation project(":internal:test-logging") + testImplementation libs.testcontainers.core +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientFactory.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientFactory.java new file mode 100644 index 000000000..047183eff --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientFactory.java @@ -0,0 +1,8 @@ +package io.koraframework.s3.client.aws; + +import software.amazon.awssdk.services.s3.S3Client; + +public interface AwsS3ClientFactory { + + S3Client create(AwsS3Config config); +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientModule.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientModule.java new file mode 100644 index 000000000..2b568d2b9 --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientModule.java @@ -0,0 +1,92 @@ +package io.koraframework.s3.client.aws; + +import io.koraframework.application.graph.All; +import io.koraframework.common.DefaultComponent; +import io.koraframework.common.Tag; +import io.koraframework.config.common.Config; +import io.koraframework.config.common.extractor.ConfigValueExtractor; +import io.koraframework.http.client.common.HttpClient; +import io.koraframework.s3.client.aws.telemetry.AwsS3ClientTelemetryFactory; +import io.koraframework.s3.client.aws.telemetry.DefaultAwsS3ClientTelemetryFactory; +import io.micrometer.core.instrument.MeterRegistry; +import io.opentelemetry.api.trace.Tracer; +import org.jspecify.annotations.Nullable; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.awscore.AwsClient; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; + +import java.net.URI; + +public interface AwsS3ClientModule { + + default AwsS3Config awsS3Config(Config config, ConfigValueExtractor extractor) { + var value = config.get("s3client.aws"); + return extractor.extract(value); + } + + @Tag(AwsClient.class) + @DefaultComponent + default HttpClient awsS3httpClient(HttpClient client) { + return client; + } + + @DefaultComponent + default KoraAwsSdkHttpClient awsS3koraSdkHttpClient(@Tag(AwsClient.class) HttpClient client, + AwsS3Config clientConfig) { + return new KoraAwsSdkHttpClient(client, clientConfig); + } + + @DefaultComponent + default S3Configuration awsS3Configuration(AwsS3Config config) { + return S3Configuration.builder() + .chunkedEncodingEnabled(config.chunkedEncodingEnabled()) + .pathStyleAccessEnabled(config.addressStyle() == AwsS3Config.AddressStyle.PATH) + .build(); + } + + @DefaultComponent + default AwsCredentialsProvider awsS3credentialsProvider(AwsS3Config config) { + return () -> AwsBasicCredentials.create(config.credentials().accessKey(), config.credentials().secretKey()); + } + + default AwsS3ClientTelemetryFactory awsS3ClientTelemetryFactory(@Nullable Tracer tracer, + @Nullable MeterRegistry meterRegistry) { + return new DefaultAwsS3ClientTelemetryFactory(tracer, meterRegistry); + } + + default AwsS3ClientFactory awsS3ClientFactory(SdkHttpClient httpClient, + AwsCredentialsProvider credentialsProvider, + S3Configuration s3Configuration, + AwsS3ClientTelemetryFactory telemetryFactory, + All interceptors) { + return (config) -> { + var configuration = s3Configuration.toBuilder() + .chunkedEncodingEnabled(config.chunkedEncodingEnabled()) + .pathStyleAccessEnabled(config.addressStyle() == AwsS3Config.AddressStyle.PATH) + .build(); + + return S3Client.builder() + .credentialsProvider(credentialsProvider) + .httpClient(httpClient) + .endpointOverride(URI.create(config.url())) + .serviceConfiguration(configuration) + .region(Region.of(config.region())) + .requestChecksumCalculation(RequestChecksumCalculation.fromValue(config.checksumCalculationRequest().name())) + .responseChecksumValidation(ResponseChecksumValidation.fromValue(config.checksumValidationResponse().name())) + .overrideConfiguration(b -> b.addExecutionInterceptor(new AwsS3ClientTelemetryInterceptor(telemetryFactory.get(config.telemetry())))) + .overrideConfiguration(b -> interceptors.forEach(b::addExecutionInterceptor)) + .build(); + }; + } + + default S3Client awsS3Client(AwsS3Config config, AwsS3ClientFactory factory) { + return factory.create(config); + } +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientTelemetryInterceptor.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientTelemetryInterceptor.java new file mode 100644 index 000000000..3f5b6118e --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3ClientTelemetryInterceptor.java @@ -0,0 +1,296 @@ +package io.koraframework.s3.client.aws; + +import io.koraframework.s3.client.aws.telemetry.AwsS3ClientObservation; +import io.koraframework.s3.client.aws.telemetry.AwsS3ClientTelemetry; +import org.jspecify.annotations.Nullable; +import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttribute; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.services.s3.model.*; + +public final class AwsS3ClientTelemetryInterceptor implements ExecutionInterceptor { + + private static final ExecutionAttribute OBSERVATION = new ExecutionAttribute<>("s3-aws-telemetry-observation"); + + private final AwsS3ClientTelemetry telemetry; + + public AwsS3ClientTelemetryInterceptor(AwsS3ClientTelemetry telemetry) { + this.telemetry = telemetry; + } + + @Override + public void beforeExecution(Context.BeforeExecution execContext, ExecutionAttributes executionAttributes) { + var bucket = getBucket(execContext.request()); + var operation = getOperation(execContext.request()); + var observation = telemetry.observe(operation, bucket); + executionAttributes.putAttribute(OBSERVATION, observation); + } + + @Override + public void afterExecution(Context.AfterExecution execContext, ExecutionAttributes executionAttributes) { + var observation = executionAttributes.getAttribute(OBSERVATION); + if (observation != null) { + execContext.httpResponse().firstMatchingHeader("x-amz-request-id").ifPresent(observation::observeAwsRequestId); + execContext.httpResponse().firstMatchingHeader("x-amz-extended-request-id").ifPresent(observation::observeAwsRequestId); + + var key = extractKey(execContext.request()); + if (key != null) { + observation.observeKey(key); + } + + var uploadId = extractUploadId(execContext.request(), execContext.response()); + if (uploadId != null) { + observation.observeUploadId(uploadId); + } + + observation.end(); + } + } + + @Override + public void onExecutionFailure(Context.FailedExecution execContext, ExecutionAttributes executionAttributes) { + var observation = executionAttributes.getAttribute(OBSERVATION); + if (observation != null) { + if (execContext.httpResponse().isPresent()) { + var httpResponse = execContext.httpResponse().get(); + httpResponse.firstMatchingHeader("x-amz-request-id").ifPresent(observation::observeAwsRequestId); + httpResponse.firstMatchingHeader("x-amz-extended-request-id").ifPresent(observation::observeAwsRequestId); + } + + if (execContext.response().isPresent()) { + var response = execContext.response().get(); + var uploadId = extractUploadId(execContext.request(), response); + if (uploadId != null) { + observation.observeUploadId(uploadId); + } + } else { + var uploadId = extractUploadId(execContext.request(), null); + if (uploadId != null) { + observation.observeUploadId(uploadId); + } + } + + var key = extractKey(execContext.request()); + if (key != null) { + observation.observeKey(key); + } + + observation.observeError(execContext.exception()); + observation.end(); + } + } + + private static String getBucket(SdkRequest request) { + return switch (request) { + // Операции с объектами + case GetObjectRequest req -> req.bucket(); + case GetObjectAttributesRequest req -> req.bucket(); + case PutObjectRequest req -> req.bucket(); + case DeleteObjectRequest req -> req.bucket(); + case HeadObjectRequest req -> req.bucket(); + case CopyObjectRequest req -> req.destinationBucket(); + case UploadPartRequest req -> req.bucket(); + case UploadPartCopyRequest req -> req.destinationBucket(); + case AbortMultipartUploadRequest req -> req.bucket(); + case CompleteMultipartUploadRequest req -> req.bucket(); + case CreateMultipartUploadRequest req -> req.bucket(); + + // Операции с бакетами + case ListObjectsRequest req -> req.bucket(); + case ListObjectsV2Request req -> req.bucket(); + case ListObjectVersionsRequest req -> req.bucket(); + case ListMultipartUploadsRequest req -> req.bucket(); + + case HeadBucketRequest req -> req.bucket(); + case GetBucketLocationRequest req -> req.bucket(); + case GetBucketAclRequest req -> req.bucket(); + case GetBucketCorsRequest req -> req.bucket(); + case GetBucketLifecycleConfigurationRequest req -> req.bucket(); + case GetBucketPolicyRequest req -> req.bucket(); + case GetBucketReplicationRequest req -> req.bucket(); + case GetBucketTaggingRequest req -> req.bucket(); + case GetBucketVersioningRequest req -> req.bucket(); + case GetBucketEncryptionRequest req -> req.bucket(); + case GetBucketLoggingRequest req -> req.bucket(); + case GetBucketNotificationConfigurationRequest req -> req.bucket(); + case GetPublicAccessBlockRequest req -> req.bucket(); + case GetBucketOwnershipControlsRequest req -> req.bucket(); + + case PutBucketAclRequest req -> req.bucket(); + case PutBucketCorsRequest req -> req.bucket(); + case PutBucketLifecycleConfigurationRequest req -> req.bucket(); + case PutBucketPolicyRequest req -> req.bucket(); + case PutBucketReplicationRequest req -> req.bucket(); + case PutBucketTaggingRequest req -> req.bucket(); + case PutBucketVersioningRequest req -> req.bucket(); + case PutBucketEncryptionRequest req -> req.bucket(); + case PutBucketLoggingRequest req -> req.bucket(); + case PutBucketNotificationConfigurationRequest req -> req.bucket(); + case PutPublicAccessBlockRequest req -> req.bucket(); + case PutBucketOwnershipControlsRequest req -> req.bucket(); + + case DeleteBucketCorsRequest req -> req.bucket(); + case DeleteBucketPolicyRequest req -> req.bucket(); + case DeleteBucketReplicationRequest req -> req.bucket(); + case DeleteBucketTaggingRequest req -> req.bucket(); + case DeletePublicAccessBlockRequest req -> req.bucket(); + case DeleteBucketOwnershipControlsRequest req -> req.bucket(); + + // Удаление бакета + case DeleteBucketRequest req -> req.bucket(); + + // Создание бакета + case CreateBucketRequest req -> req.bucket(); + + // Others + case GetObjectLegalHoldRequest req -> req.bucket(); + case PutObjectLegalHoldRequest req -> req.bucket(); + case GetObjectRetentionRequest req -> req.bucket(); + case PutObjectRetentionRequest req -> req.bucket(); + case GetObjectTaggingRequest req -> req.bucket(); + case PutObjectTaggingRequest req -> req.bucket(); + case DeleteObjectTaggingRequest req -> req.bucket(); + case RestoreObjectRequest req -> req.bucket(); + case SelectObjectContentRequest req -> req.bucket(); + + default -> request.getValueForField("Bucket", String.class).orElse("unknown"); + }; + } + + private static String getOperation(SdkRequest request) { + return switch (request) { + // CRUD объектов + case GetObjectRequest req -> "GetObject"; + case GetObjectAttributesRequest req -> "GetObjectAttributes"; + case PutObjectRequest req -> "PutObject"; + case HeadObjectRequest req -> "HeadObject"; + case DeleteObjectRequest req -> "DeleteObject"; + case DeleteObjectsRequest req -> "DeleteObjects"; + case CopyObjectRequest req -> "CopyObject"; + + // Multipart Upload + case CreateMultipartUploadRequest req -> "CreateMultipartUpload"; + case UploadPartRequest req -> "UploadPart"; + case UploadPartCopyRequest req -> "UploadPartCopy"; + case CompleteMultipartUploadRequest req -> "CompleteMultipartUpload"; + case AbortMultipartUploadRequest req -> "AbortMultipartUpload"; + + // List операции + case ListObjectsRequest req -> "ListObjects"; + case ListObjectsV2Request req -> "ListObjectsV2"; + case ListObjectVersionsRequest req -> "ListObjectVersions"; + case ListMultipartUploadsRequest req -> "ListMultipartUploads"; + + // Бакеты + case CreateBucketRequest req -> "CreateBucket"; + case DeleteBucketRequest req -> "DeleteBucket"; + case HeadBucketRequest req -> "HeadBucket"; + + // Bucket Configuration + case GetBucketLocationRequest req -> "GetBucketLocation"; + case GetBucketAclRequest req -> "GetBucketAcl"; + case GetBucketCorsRequest req -> "GetBucketCors"; + case GetBucketLifecycleConfigurationRequest req -> "GetLifecycleConfiguration"; + case PutBucketLifecycleConfigurationRequest req -> "PutLifecycleConfiguration"; + case GetBucketPolicyRequest req -> "GetBucketPolicy"; + case PutBucketPolicyRequest req -> "PutBucketPolicy"; + case GetBucketReplicationRequest req -> "GetReplicationConfiguration"; + case PutBucketReplicationRequest req -> "PutReplicationConfiguration"; + case GetBucketTaggingRequest req -> "GetBucketTagging"; + case PutBucketTaggingRequest req -> "PutBucketTagging"; + case DeleteBucketTaggingRequest req -> "DeleteBucketTagging"; + case GetBucketVersioningRequest req -> "GetBucketVersioning"; + case PutBucketVersioningRequest req -> "PutBucketVersioning"; + case GetBucketEncryptionRequest req -> "GetBucketEncryption"; + case PutBucketEncryptionRequest req -> "PutBucketEncryption"; + case GetBucketLoggingRequest req -> "GetBucketLogging"; + case PutBucketLoggingRequest req -> "PutBucketLogging"; + case GetPublicAccessBlockRequest req -> "GetPublicAccessBlock"; + case PutPublicAccessBlockRequest req -> "PutPublicAccessBlock"; + case DeletePublicAccessBlockRequest req -> "DeletePublicAccessBlock"; + case GetBucketOwnershipControlsRequest req -> "GetBucketOwnershipControls"; + case PutBucketOwnershipControlsRequest req -> "PutBucketOwnershipControls"; + case DeleteBucketOwnershipControlsRequest req -> "DeleteBucketOwnershipControls"; + + // Object Tagging + case GetObjectTaggingRequest req -> "GetObjectTagging"; + case PutObjectTaggingRequest req -> "PutObjectTagging"; + case DeleteObjectTaggingRequest req -> "DeleteObjectTagging"; + + // Object Retention & Legal Hold + case GetObjectRetentionRequest req -> "GetObjectRetention"; + case PutObjectRetentionRequest req -> "PutObjectRetention"; + case GetObjectLegalHoldRequest req -> "GetObjectLegalHold"; + case PutObjectLegalHoldRequest req -> "PutObjectLegalHold"; + + // Дополнительные операции + case RestoreObjectRequest req -> "RestoreObject"; + case SelectObjectContentRequest req -> "SelectObjectContent"; + case WriteGetObjectResponseRequest req -> "WriteGetObjectResponse"; // для S3 Object Lambda + + default -> "unknown"; + }; + } + + @Nullable + private static String extractKey(SdkRequest request) { + return switch (request) { + case GetObjectRequest req -> req.key(); + case GetObjectAttributesRequest req -> req.key(); + case PutObjectRequest req -> req.key(); + case HeadObjectRequest req -> req.key(); + case DeleteObjectRequest req -> req.key(); + case AbortMultipartUploadRequest req -> req.key(); + case CompleteMultipartUploadRequest req -> req.key(); + case CreateMultipartUploadRequest req -> req.key(); + case UploadPartRequest req -> req.key(); + case RestoreObjectRequest req -> req.key(); + case SelectObjectContentRequest req -> req.key(); + case GetObjectTaggingRequest req -> req.key(); + case PutObjectTaggingRequest req -> req.key(); + case DeleteObjectTaggingRequest req -> req.key(); + case GetObjectRetentionRequest req -> req.key(); + case PutObjectRetentionRequest req -> req.key(); + case GetObjectLegalHoldRequest req -> req.key(); + case PutObjectLegalHoldRequest req -> req.key(); + + case CopyObjectRequest req -> extractKeyFromCopySource(req.copySource()); + case UploadPartCopyRequest req -> extractKeyFromCopySource(req.copySource()); + + case WriteGetObjectResponseRequest req -> req.requestToken(); // не key, но полезный идентификатор + + default -> null; + }; + } + + private static String extractKeyFromCopySource(String copySource) { + if (copySource == null || copySource.isEmpty()) { + return null; + } + + String source = copySource.startsWith("/") ? copySource.substring(1) : copySource; + int slashIndex = source.indexOf('/'); + return slashIndex >= 0 ? source.substring(slashIndex + 1) : null; + } + + private static String extractUploadId(SdkRequest request, SdkResponse response) { + if (request != null) { + return switch (request) { + case UploadPartRequest req -> req.uploadId(); + case UploadPartCopyRequest req -> req.uploadId(); + case CompleteMultipartUploadRequest req -> req.uploadId(); + case AbortMultipartUploadRequest req -> req.uploadId(); + default -> null; + }; + } + + if (response instanceof CreateMultipartUploadResponse createResp) { + return createResp.uploadId(); + } + + return null; + } +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3Config.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3Config.java new file mode 100644 index 000000000..cf42c1c6a --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/AwsS3Config.java @@ -0,0 +1,57 @@ +package io.koraframework.s3.client.aws; + +import io.koraframework.config.common.annotation.ConfigValueExtractor; +import io.koraframework.s3.client.aws.telemetry.AwsS3ClientTelemetryConfig; + +import java.time.Duration; + +public interface AwsS3Config { + + enum AddressStyle { + PATH, + VIRTUAL_HOSTED + } + + String url(); + + default String region() { + return "aws-global"; + } + + default AddressStyle addressStyle() { + return AddressStyle.PATH; + } + + default Duration requestTimeout() { + return Duration.ofSeconds(45); + } + + default ChecksumCalculation checksumCalculationRequest() { + return ChecksumCalculation.WHEN_REQUIRED; + } + + default ChecksumCalculation checksumValidationResponse() { + return ChecksumCalculation.WHEN_REQUIRED; + } + + default boolean chunkedEncodingEnabled() { + return true; + } + + S3Credentials credentials(); + + AwsS3ClientTelemetryConfig telemetry(); + + @ConfigValueExtractor + interface S3Credentials { + + String accessKey(); + + String secretKey(); + } + + enum ChecksumCalculation { + WHEN_SUPPORTED, + WHEN_REQUIRED + } +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/KoraAwsSdkHttpClient.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/KoraAwsSdkHttpClient.java new file mode 100644 index 000000000..6da302104 --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/KoraAwsSdkHttpClient.java @@ -0,0 +1,137 @@ +package io.koraframework.s3.client.aws; + +import io.koraframework.http.client.common.HttpClient; +import io.koraframework.http.client.common.request.HttpClientRequest; +import io.koraframework.http.client.common.request.HttpClientRequestBuilder; +import io.koraframework.http.client.common.response.HttpClientResponse; +import io.koraframework.http.common.body.HttpBodyInput; +import io.koraframework.http.common.body.HttpBodyOutput; +import software.amazon.awssdk.http.*; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public final class KoraAwsSdkHttpClient implements SdkHttpClient { + + private final HttpClient httpClient; + private final AwsS3Config clientConfig; + + public KoraAwsSdkHttpClient(HttpClient httpClient, AwsS3Config clientConfig) { + this.httpClient = httpClient; + this.clientConfig = clientConfig; + } + + @Override + public String clientName() { + return "aws-kora"; + } + + @Override + public ExecutableHttpRequest prepareRequest(HttpExecuteRequest httpExecuteRequest) { + return new ExecutableHttpRequest() { + + @Override + public HttpExecuteResponse call() { + final HttpClientRequest request = asKoraRequest(httpExecuteRequest); + final HttpClientResponse response = httpClient.execute(request); + return asAwsResponse(response); + } + + @Override + public void abort() { + // do nothing + } + }; + } + + private HttpClientRequest asKoraRequest(HttpExecuteRequest httpExecuteRequest) { + final SdkHttpRequest sdkHttpRequest = httpExecuteRequest.httpRequest(); + final HttpClientRequestBuilder builder = getBaseBuilder(sdkHttpRequest.getUri(), sdkHttpRequest.method().name(), sdkHttpRequest.rawQueryParameters(), sdkHttpRequest.headers()); + + httpExecuteRequest.contentStreamProvider().ifPresent(provider -> { + String contentType = sdkHttpRequest.firstMatchingHeader("Content-Type").orElse("application/octet-stream"); + String contentLength = sdkHttpRequest.firstMatchingHeader("Content-Length").orElse(null); + if (contentLength == null) { + builder.body(HttpBodyOutput.of(contentType, provider.newStream())); + } else { + builder.body(HttpBodyOutput.of(contentType, Long.parseLong(contentLength), provider.newStream())); + } + }); + + return builder + .requestTimeout(clientConfig.requestTimeout()) + .build(); + } + + private static HttpClientRequestBuilder getBaseBuilder(URI sdkUri, + String method, + Map> rawQueryParameters, + Map> headers) { + try { + final URI uri = new URI(sdkUri.getScheme(), + sdkUri.getAuthority(), + sdkUri.getPath(), + null, // Ignore the query part of the input url + sdkUri.getFragment()); + + final HttpClientRequestBuilder builder = HttpClientRequest.of(method, uri.toString()); + rawQueryParameters.forEach((k, v) -> { + if (v == null || v.isEmpty() || v.get(0) == null) { + builder.queryParam(k); + } else { + builder.queryParam(k, v); + } + }); + + headers.forEach((k, v) -> { + if (!"host".equalsIgnoreCase(k) && !"expect".equalsIgnoreCase(k)) { + builder.header(k, v); + } + }); + + return builder; + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + private static HttpExecuteResponse asAwsResponse(HttpClientResponse koraHttpResponse) { + final SdkHttpFullResponse.Builder sdkResponseBuilder = SdkHttpResponse.builder(); + final Map> responseHeaders = new HashMap<>(); + koraHttpResponse.headers().forEach(e -> responseHeaders.put(e.getKey(), e.getValue())); + sdkResponseBuilder.headers(responseHeaders); + sdkResponseBuilder.statusCode(koraHttpResponse.code()); + sdkResponseBuilder.statusText(String.valueOf(koraHttpResponse.code())); + + AbortableInputStream bodyStream = asSdkResponseStream(koraHttpResponse); + sdkResponseBuilder.content(bodyStream); + + final SdkHttpFullResponse sdkHttpResponse = sdkResponseBuilder.build(); + return HttpExecuteResponse.builder() + .response(sdkHttpResponse) + .responseBody(bodyStream) + .build(); + } + + private static AbortableInputStream asSdkResponseStream(HttpClientResponse koraHttpResponse) { + final HttpBodyInput body = koraHttpResponse.body(); + final InputStream bodyIS = body.asInputStream(); + return AbortableInputStream.create(bodyIS, () -> { + try { + bodyIS.close(); + } catch (IOException e) { + // ignore + } + }); + } + + @Override + public void close() { + // do nothing + } +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientObservation.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientObservation.java new file mode 100644 index 000000000..db8e525be --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientObservation.java @@ -0,0 +1,15 @@ +package io.koraframework.s3.client.aws.telemetry; + +import io.koraframework.common.telemetry.Observation; +import org.jspecify.annotations.Nullable; + +public interface AwsS3ClientObservation extends Observation { + + void observeKey(String key); + + void observeUploadId(String uploadId); + + void observeAwsRequestId(@Nullable String amxRequestId); + + void observeAwsExtendedId(@Nullable String amxRequestId); +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetry.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetry.java new file mode 100644 index 000000000..b346f8e28 --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetry.java @@ -0,0 +1,6 @@ +package io.koraframework.s3.client.aws.telemetry; + +public interface AwsS3ClientTelemetry { + + AwsS3ClientObservation observe(String operation, String bucket); +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetryConfig.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetryConfig.java new file mode 100644 index 000000000..9f93fb326 --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetryConfig.java @@ -0,0 +1,32 @@ +package io.koraframework.s3.client.aws.telemetry; + +import io.koraframework.config.common.annotation.ConfigValueExtractor; +import io.koraframework.telemetry.common.TelemetryConfig; + +@ConfigValueExtractor +public interface AwsS3ClientTelemetryConfig extends TelemetryConfig { + + @Override + S3ClientLogConfig logging(); + + @Override + S3ClientMetricsConfig metrics(); + + @Override + S3ClientTracingConfig tracing(); + + @ConfigValueExtractor + interface S3ClientLogConfig extends TelemetryConfig.LogConfig { + + } + + @ConfigValueExtractor + interface S3ClientTracingConfig extends TelemetryConfig.TracingConfig { + + } + + @ConfigValueExtractor + interface S3ClientMetricsConfig extends TelemetryConfig.MetricsConfig { + + } +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetryFactory.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetryFactory.java new file mode 100644 index 000000000..461a0ff8c --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/AwsS3ClientTelemetryFactory.java @@ -0,0 +1,6 @@ +package io.koraframework.s3.client.aws.telemetry; + +public interface AwsS3ClientTelemetryFactory { + + AwsS3ClientTelemetry get(AwsS3ClientTelemetryConfig config); +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientObservation.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientObservation.java new file mode 100644 index 000000000..1b388f8df --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientObservation.java @@ -0,0 +1,114 @@ +package io.koraframework.s3.client.aws.telemetry; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.semconv.ErrorAttributes; +import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes; +import io.opentelemetry.semconv.incubating.RpcIncubatingAttributes; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; + +import java.util.concurrent.TimeUnit; + +public class DefaultAwsS3ClientObservation implements AwsS3ClientObservation { + + private final long start = System.nanoTime(); + + protected final String bucket; + protected final String operation; + protected final AwsS3ClientTelemetryConfig config; + protected final Span span; + protected final Logger logger; + protected final Meter.MeterProvider duration; + + @Nullable + protected String awsKey; + @Nullable + protected String uploadId; + @Nullable + protected String requestId; + @Nullable + protected String extendedRequestId; + @Nullable + private Throwable error; + + public DefaultAwsS3ClientObservation(String bucket, + String operation, + AwsS3ClientTelemetryConfig config, + Span span, Logger logger, + Meter.MeterProvider duration) { + this.bucket = bucket; + this.operation = operation; + this.config = config; + this.span = span; + this.logger = logger; + this.duration = duration; + } + + @Override + public void observeKey(String key) { + this.awsKey = key; + this.span.setAttribute(AwsIncubatingAttributes.AWS_S3_KEY, key); + } + + @Override + public void observeUploadId(String uploadId) { + this.uploadId = uploadId; + this.span.setAttribute(AwsIncubatingAttributes.AWS_S3_UPLOAD_ID, uploadId); + } + + @Override + public void observeAwsRequestId(@Nullable String amxRequestId) { + this.requestId = amxRequestId; + this.span.setAttribute(AwsIncubatingAttributes.AWS_REQUEST_ID, amxRequestId); + } + + @Override + public void observeAwsExtendedId(@Nullable String amxRequestId) { + this.extendedRequestId = amxRequestId; + this.span.setAttribute(AwsIncubatingAttributes.AWS_EXTENDED_REQUEST_ID, amxRequestId); + } + + @Override + public Span span() { + return this.span; + } + + @Override + public void observeError(Throwable e) { + this.span.recordException(e); + this.span.setStatus(StatusCode.ERROR); + this.error = e; + } + + @Override + public void end() { + var errorValue = (error == null) ? "" : this.error.getClass().getCanonicalName(); + + if (error == null) { + this.span.setStatus(StatusCode.OK); + this.logger.debug("AwsS3Client completed operation '{}' on bucket: {}", operation, bucket); + } else { + this.span.setStatus(StatusCode.ERROR, errorValue); + this.span.recordException(error); + this.logger.warn("AwsS3Client failed operation '{}' on bucket '{}' due to: {}", operation, bucket, error.getMessage()); + } + + if (config.metrics().enabled()) { + var took = System.nanoTime() - this.start; + var meter = this.duration.withTags(Tags.of( + Tag.of(RpcIncubatingAttributes.RPC_METHOD.getKey(), operation), + Tag.of(AwsIncubatingAttributes.AWS_S3_BUCKET.getKey(), bucket), + Tag.of(ErrorAttributes.ERROR_TYPE.getKey(), errorValue) + )); + + meter.record(took, TimeUnit.NANOSECONDS); + } + + this.span.end(); + } +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientTelemetry.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientTelemetry.java new file mode 100644 index 000000000..9d232d768 --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientTelemetry.java @@ -0,0 +1,77 @@ +package io.koraframework.s3.client.aws.telemetry; + +import io.micrometer.core.instrument.*; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes; +import io.opentelemetry.semconv.incubating.RpcIncubatingAttributes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.helpers.NOPLogger; + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class DefaultAwsS3ClientTelemetry implements AwsS3ClientTelemetry { + + protected final ConcurrentMap durationCache = new ConcurrentHashMap<>(); + + protected final AwsS3ClientTelemetryConfig config; + protected final Tracer tracer; + protected final MeterRegistry meterRegistry; + protected final Logger logger; + protected final Meter.MeterProvider requestDurationMeter; + + public DefaultAwsS3ClientTelemetry(AwsS3ClientTelemetryConfig config, + Tracer tracer, + MeterRegistry meterRegistry) { + this.config = config; + this.tracer = tracer; + this.meterRegistry = meterRegistry; + + this.requestDurationMeter = tags -> + durationCache.computeIfAbsent(Tags.of(tags), _ -> createMetricClientDuration() + .tags((Iterable) tags) + .register(this.meterRegistry)); + + var logger = LoggerFactory.getLogger(AwsS3ClientTelemetry.class); + this.logger = this.config.logging().enabled() && logger.isWarnEnabled() + ? logger + : NOPLogger.NOP_LOGGER; + } + + @Override + public AwsS3ClientObservation observe(String operation, String bucket) { + var span = this.config.tracing().enabled() + ? createSpan(operation, bucket).startSpan() + : Span.getInvalid(); + + logger.debug("S3AwsClient starting S3 operation '{}' on bucket: {}", operation, bucket); + return new DefaultAwsS3ClientObservation(bucket, operation, config, span, logger, requestDurationMeter); + } + + protected Timer.Builder createMetricClientDuration() { + var staticTags = new ArrayList(1 + this.config.metrics().tags().size()); + staticTags.add(Tag.of(RpcIncubatingAttributes.RPC_SYSTEM.getKey(), "s3-aws")); + for (var e : this.config.metrics().tags().entrySet()) { + staticTags.add(Tag.of(e.getKey(), e.getValue())); + } + + return Timer.builder("rpc.client.duration") + .serviceLevelObjectives(this.config.metrics().slo()) + .tags(staticTags); + } + + protected SpanBuilder createSpan(String operation, String bucket) { + var span = this.tracer.spanBuilder("S3." + operation) + .setAttribute(RpcIncubatingAttributes.RPC_SYSTEM, "s3") + .setAttribute(RpcIncubatingAttributes.RPC_METHOD, operation) + .setAttribute(AwsIncubatingAttributes.AWS_S3_BUCKET, bucket); + for (var entry : this.config.tracing().attributes().entrySet()) { + span.setAttribute(entry.getKey(), entry.getValue()); + } + return span; + } +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientTelemetryFactory.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientTelemetryFactory.java new file mode 100644 index 000000000..f617327af --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/DefaultAwsS3ClientTelemetryFactory.java @@ -0,0 +1,36 @@ +package io.koraframework.s3.client.aws.telemetry; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerProvider; +import org.jspecify.annotations.Nullable; + +public final class DefaultAwsS3ClientTelemetryFactory implements AwsS3ClientTelemetryFactory { + + private static final Tracer NOOP_TRACER = TracerProvider.noop().get("s3-client-aws"); + private static final MeterRegistry NOOP_METER_REGISTRY = new CompositeMeterRegistry(); + + @Nullable + private final Tracer tracer; + @Nullable + private final MeterRegistry meterRegistry; + + public DefaultAwsS3ClientTelemetryFactory(@Nullable Tracer tracer, @Nullable MeterRegistry meterRegistry) { + this.tracer = tracer; + this.meterRegistry = meterRegistry; + } + + @Override + public AwsS3ClientTelemetry get(AwsS3ClientTelemetryConfig config) { + var tracerEnabled = this.tracer != null && config.tracing().enabled(); + var metricEnabled = this.meterRegistry != null && config.metrics().enabled(); + if (!tracerEnabled && !metricEnabled && !config.logging().enabled()) { + return NoopAwsS3ClientTelemetry.INSTANCE; + } + + var tracer = tracerEnabled ? this.tracer : NOOP_TRACER; + var meterRegistry = metricEnabled ? this.meterRegistry : NOOP_METER_REGISTRY; + return new DefaultAwsS3ClientTelemetry(config, tracer, meterRegistry); + } +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/NoopAwsS3ClientObservation.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/NoopAwsS3ClientObservation.java new file mode 100644 index 000000000..a6c04cbdc --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/NoopAwsS3ClientObservation.java @@ -0,0 +1,46 @@ +package io.koraframework.s3.client.aws.telemetry; + +import io.opentelemetry.api.trace.Span; + +public final class NoopAwsS3ClientObservation implements AwsS3ClientObservation { + + public static final AwsS3ClientObservation INSTANCE = new NoopAwsS3ClientObservation(); + + private NoopAwsS3ClientObservation() {} + + @Override + public void observeKey(String key) { + + } + + @Override + public void observeUploadId(String uploadId) { + + } + + @Override + public void observeAwsRequestId(String amxRequestId) { + + } + + @Override + public void observeAwsExtendedId(String amxRequestId) { + + } + + @Override + public Span span() { + return Span.getInvalid(); + } + + @Override + public void end() { + + } + + @Override + public void observeError(Throwable e) { + + } + +} diff --git a/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/NoopAwsS3ClientTelemetry.java b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/NoopAwsS3ClientTelemetry.java new file mode 100644 index 000000000..94a9c3176 --- /dev/null +++ b/s3/s3-client-aws/src/main/java/io/koraframework/s3/client/aws/telemetry/NoopAwsS3ClientTelemetry.java @@ -0,0 +1,13 @@ +package io.koraframework.s3.client.aws.telemetry; + +public final class NoopAwsS3ClientTelemetry implements AwsS3ClientTelemetry { + + public static final NoopAwsS3ClientTelemetry INSTANCE = new NoopAwsS3ClientTelemetry(); + + private NoopAwsS3ClientTelemetry() {} + + @Override + public AwsS3ClientObservation observe(String operation, String bucket) { + return NoopAwsS3ClientObservation.INSTANCE; + } +} diff --git a/settings.gradle b/settings.gradle index 73b2d12e2..95065f9b7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -99,6 +99,7 @@ include( 'experimental:s3-client-annotation-processor', 'experimental:s3-client-symbol-processor', 'experimental:s3-client', + 's3:s3-client-aws', 'experimental:camunda-engine-bpmn', 'experimental:camunda-rest-undertow', 'experimental:camunda-zeebe-worker',