From ab6c8578c0fdd96c75877f3dcfd6d05f80325934 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 6 Jun 2026 17:37:45 +0530 Subject: [PATCH 1/6] Add S3 archive storage mapping --- .../apache/paimon/s3/S3ArchiveOperations.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java new file mode 100644 index 000000000000..15e4b47e67bf --- /dev/null +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.s3; + +import org.apache.paimon.fs.StorageType; + +import software.amazon.awssdk.services.s3.model.StorageClass; + +import java.time.Duration; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** S3 archive operation helpers. */ +class S3ArchiveOperations { + + private static final long SECONDS_PER_DAY = 24 * 60 * 60; + + private S3ArchiveOperations() {} + + static StorageClass archiveStorageClass(StorageType type) { + switch (checkNotNull(type, "Storage type must not be null.")) { + case ARCHIVE: + return StorageClass.GLACIER; + case COLD_ARCHIVE: + return StorageClass.DEEP_ARCHIVE; + default: + throw new IllegalArgumentException( + "Unsupported S3 archive storage type: " + type + ". Use unarchive."); + } + } + + static StorageClass unarchiveStorageClass(StorageType type) { + if (checkNotNull(type, "Storage type must not be null.") == StorageType.STANDARD) { + return StorageClass.STANDARD; + } + throw new IllegalArgumentException("Unsupported S3 unarchive storage type: " + type); + } + + static int restoreDays(Duration duration) { + checkNotNull(duration, "Restore duration must not be null."); + checkArgument( + !duration.isZero() && !duration.isNegative(), + "Restore duration must be greater than zero."); + + long seconds = duration.getSeconds(); + long days = seconds / SECONDS_PER_DAY; + if (seconds % SECONDS_PER_DAY != 0 || duration.getNano() > 0) { + days++; + } + + checkArgument(days <= Integer.MAX_VALUE, "Restore duration is too large: %s", duration); + return (int) days; + } +} From 46bf7f71c540945553cfae96c8e86c80aeb55855 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 6 Jun 2026 17:39:50 +0530 Subject: [PATCH 2/6] Implement S3 archive operations --- .../apache/paimon/s3/S3ArchiveOperations.java | 104 ++++++++++++++++++ .../java/org/apache/paimon/s3/S3FileIO.java | 34 ++++++ 2 files changed, 138 insertions(+) diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java index 15e4b47e67bf..f156d216eb47 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java @@ -20,8 +20,25 @@ import org.apache.paimon.fs.StorageType; +import org.apache.hadoop.fs.s3a.S3AFileSystem; + +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.GlacierJobParameters; +import software.amazon.awssdk.services.s3.model.MetadataDirective; +import software.amazon.awssdk.services.s3.model.ObjectAlreadyInActiveTierErrorException; +import software.amazon.awssdk.services.s3.model.ObjectNotInActiveTierErrorException; +import software.amazon.awssdk.services.s3.model.RestoreObjectRequest; +import software.amazon.awssdk.services.s3.model.RestoreRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.StorageClass; +import software.amazon.awssdk.services.s3.model.TaggingDirective; +import software.amazon.awssdk.services.s3.model.Tier; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.time.Duration; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -32,6 +49,8 @@ class S3ArchiveOperations { private static final long SECONDS_PER_DAY = 24 * 60 * 60; + private static final String RESTORE_ALREADY_IN_PROGRESS = "RestoreAlreadyInProgress"; + private S3ArchiveOperations() {} static StorageClass archiveStorageClass(StorageType type) { @@ -68,4 +87,89 @@ static int restoreDays(Duration duration) { checkArgument(days <= Integer.MAX_VALUE, "Restore duration is too large: %s", duration); return (int) days; } + + static void changeStorageClass( + S3AFileSystem fileSystem, + org.apache.hadoop.fs.Path path, + StorageClass storageClass, + String operation) + throws IOException { + String key = fileSystem.pathToKey(path); + CopyObjectRequest request = + fileSystem + .getRequestFactory() + .newCopyObjectRequestBuilder(key, key, fileSystem.getObjectMetadata(path)) + .storageClass(storageClass) + .metadataDirective(MetadataDirective.COPY) + .taggingDirective(TaggingDirective.COPY) + .build(); + + try { + s3Client(fileSystem).copyObject(request); + } catch (ObjectNotInActiveTierErrorException e) { + throw new IOException( + "S3 object " + + path + + " is not in active tier. Restore it before unarchiving.", + e); + } catch (S3Exception e) { + throw new IOException( + "Failed to " + operation + " S3 object " + path + " to " + storageClass + ".", + e); + } catch (SdkException e) { + throw new IOException( + "Failed to " + operation + " S3 object " + path + " to " + storageClass + ".", + e); + } + } + + static void restoreArchive(S3AFileSystem fileSystem, org.apache.hadoop.fs.Path path, int days) + throws IOException { + RestoreObjectRequest request = restoreObjectRequest(fileSystem.getBucket(), fileSystem.pathToKey(path), days); + + try { + s3Client(fileSystem).restoreObject(request); + } catch (ObjectAlreadyInActiveTierErrorException e) { + throw new IOException("S3 object " + path + " is already in active tier.", e); + } catch (S3Exception e) { + if (isRestoreAlreadyInProgress(e)) { + return; + } + throw new IOException("Failed to restore archived S3 object " + path + ".", e); + } catch (SdkException e) { + throw new IOException("Failed to restore archived S3 object " + path + ".", e); + } + } + + static RestoreObjectRequest restoreObjectRequest(String bucket, String key, int days) { + return RestoreObjectRequest.builder() + .bucket(bucket) + .key(key) + .restoreRequest( + RestoreRequest.builder() + .days(days) + .glacierJobParameters( + GlacierJobParameters.builder().tier(Tier.STANDARD).build()) + .build()) + .build(); + } + + static boolean isRestoreAlreadyInProgress(S3Exception exception) { + return exception.awsErrorDetails() != null + && RESTORE_ALREADY_IN_PROGRESS.equals(exception.awsErrorDetails().errorCode()); + } + + private static S3Client s3Client(S3AFileSystem fileSystem) throws IOException { + try { + Method method = S3AFileSystem.class.getDeclaredMethod("getS3Client"); + method.setAccessible(true); + return (S3Client) method.invoke(fileSystem); + } catch (NoSuchMethodException e) { + throw new IOException("S3AFileSystem does not expose an S3 client.", e); + } catch (IllegalAccessException e) { + throw new IOException("Failed to access S3 client from S3AFileSystem.", e); + } catch (InvocationTargetException e) { + throw new IOException("Failed to get S3 client from S3AFileSystem.", e); + } + } } diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java index 827251837342..103a14fb0a53 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.StorageType; import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.options.Options; @@ -33,8 +34,10 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.time.Duration; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** S3 {@link FileIO}. */ @@ -85,6 +88,37 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path); } + @Override + public Optional archive(Path path, StorageType type) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3ArchiveOperations.changeStorageClass( + (S3AFileSystem) getFileSystem(hadoopPath), + hadoopPath, + S3ArchiveOperations.archiveStorageClass(type), + "archive"); + return Optional.empty(); + } + + @Override + public void restoreArchive(Path path, Duration duration) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3ArchiveOperations.restoreArchive( + (S3AFileSystem) getFileSystem(hadoopPath), + hadoopPath, + S3ArchiveOperations.restoreDays(duration)); + } + + @Override + public Optional unarchive(Path path, StorageType type) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3ArchiveOperations.changeStorageClass( + (S3AFileSystem) getFileSystem(hadoopPath), + hadoopPath, + S3ArchiveOperations.unarchiveStorageClass(type), + "unarchive"); + return Optional.empty(); + } + // add additional config entries from the IO config to the Hadoop config private Options loadHadoopConfigFromContext(CatalogContext context) { Options hadoopConfig = new Options(); From 528de86231b50c1d71fedfb8ed40af530ac77490 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 6 Jun 2026 17:41:35 +0530 Subject: [PATCH 3/6] Test S3 archive operations --- .../paimon/s3/S3ArchiveOperationsTest.java | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java diff --git a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java new file mode 100644 index 000000000000..bab53f27cd26 --- /dev/null +++ b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.s3; + +import org.apache.paimon.fs.StorageType; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.services.s3.model.RestoreObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.StorageClass; +import software.amazon.awssdk.services.s3.model.Tier; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link S3ArchiveOperations}. */ +class S3ArchiveOperationsTest { + + @Test + void testArchiveStorageClassMapping() { + assertThat(S3ArchiveOperations.archiveStorageClass(StorageType.ARCHIVE)) + .isEqualTo(StorageClass.GLACIER); + assertThat(S3ArchiveOperations.archiveStorageClass(StorageType.COLD_ARCHIVE)) + .isEqualTo(StorageClass.DEEP_ARCHIVE); + + assertThatThrownBy(() -> S3ArchiveOperations.archiveStorageClass(StorageType.STANDARD)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Use unarchive"); + } + + @Test + void testUnarchiveStorageClassMapping() { + assertThat(S3ArchiveOperations.unarchiveStorageClass(StorageType.STANDARD)) + .isEqualTo(StorageClass.STANDARD); + + assertThatThrownBy(() -> S3ArchiveOperations.unarchiveStorageClass(StorageType.ARCHIVE)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported S3 unarchive storage type"); + assertThatThrownBy( + () -> S3ArchiveOperations.unarchiveStorageClass(StorageType.COLD_ARCHIVE)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported S3 unarchive storage type"); + } + + @Test + void testRestoreDays() { + assertThat(S3ArchiveOperations.restoreDays(Duration.ofNanos(1))).isEqualTo(1); + assertThat(S3ArchiveOperations.restoreDays(Duration.ofHours(24))).isEqualTo(1); + assertThat(S3ArchiveOperations.restoreDays(Duration.ofHours(25))).isEqualTo(2); + assertThat(S3ArchiveOperations.restoreDays(Duration.ofDays(7))).isEqualTo(7); + + assertThatThrownBy(() -> S3ArchiveOperations.restoreDays(Duration.ZERO)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("greater than zero"); + assertThatThrownBy(() -> S3ArchiveOperations.restoreDays(Duration.ofSeconds(-1))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("greater than zero"); + } + + @Test + void testRestoreObjectRequest() { + RestoreObjectRequest request = + S3ArchiveOperations.restoreObjectRequest("bucket", "partition/data.orc", 7); + + assertThat(request.bucket()).isEqualTo("bucket"); + assertThat(request.key()).isEqualTo("partition/data.orc"); + assertThat(request.restoreRequest().days()).isEqualTo(7); + assertThat(request.restoreRequest().glacierJobParameters().tier()).isEqualTo(Tier.STANDARD); + } + + @Test + void testRestoreAlreadyInProgress() { + assertThat( + S3ArchiveOperations.isRestoreAlreadyInProgress( + s3Exception("RestoreAlreadyInProgress"))) + .isTrue(); + assertThat(S3ArchiveOperations.isRestoreAlreadyInProgress(s3Exception("OtherError"))) + .isFalse(); + } + + private static S3Exception s3Exception(String errorCode) { + return (S3Exception) + S3Exception.builder() + .awsErrorDetails(AwsErrorDetails.builder().errorCode(errorCode).build()) + .build(); + } +} From c676992ae3c257d220990693e819f12dfedad33b Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 6 Jun 2026 17:44:46 +0530 Subject: [PATCH 4/6] Validate S3 archive requests --- .../apache/paimon/s3/S3ArchiveOperations.java | 37 +++++++++++++------ .../paimon/s3/S3ArchiveOperationsTest.java | 32 ++++++++++++++++ 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java index f156d216eb47..f68e1b1c6ca3 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java @@ -21,11 +21,11 @@ import org.apache.paimon.fs.StorageType; import org.apache.hadoop.fs.s3a.S3AFileSystem; - import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.GlacierJobParameters; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.MetadataDirective; import software.amazon.awssdk.services.s3.model.ObjectAlreadyInActiveTierErrorException; import software.amazon.awssdk.services.s3.model.ObjectNotInActiveTierErrorException; @@ -96,21 +96,17 @@ static void changeStorageClass( throws IOException { String key = fileSystem.pathToKey(path); CopyObjectRequest request = - fileSystem - .getRequestFactory() - .newCopyObjectRequestBuilder(key, key, fileSystem.getObjectMetadata(path)) - .storageClass(storageClass) - .metadataDirective(MetadataDirective.COPY) - .taggingDirective(TaggingDirective.COPY) - .build(); + copyObjectRequest( + fileSystem.getRequestFactory()::newCopyObjectRequestBuilder, + key, + fileSystem.getObjectMetadata(path), + storageClass); try { s3Client(fileSystem).copyObject(request); } catch (ObjectNotInActiveTierErrorException e) { throw new IOException( - "S3 object " - + path - + " is not in active tier. Restore it before unarchiving.", + "S3 object " + path + " is not in active tier. Restore it before unarchiving.", e); } catch (S3Exception e) { throw new IOException( @@ -123,9 +119,22 @@ static void changeStorageClass( } } + static CopyObjectRequest copyObjectRequest( + CopyObjectRequestBuilderFactory factory, + String key, + HeadObjectResponse metadata, + StorageClass storageClass) { + return factory.create(key, key, metadata) + .storageClass(storageClass) + .metadataDirective(MetadataDirective.COPY) + .taggingDirective(TaggingDirective.COPY) + .build(); + } + static void restoreArchive(S3AFileSystem fileSystem, org.apache.hadoop.fs.Path path, int days) throws IOException { - RestoreObjectRequest request = restoreObjectRequest(fileSystem.getBucket(), fileSystem.pathToKey(path), days); + RestoreObjectRequest request = + restoreObjectRequest(fileSystem.getBucket(), fileSystem.pathToKey(path), days); try { s3Client(fileSystem).restoreObject(request); @@ -172,4 +181,8 @@ private static S3Client s3Client(S3AFileSystem fileSystem) throws IOException { throw new IOException("Failed to get S3 client from S3AFileSystem.", e); } } + + interface CopyObjectRequestBuilderFactory { + CopyObjectRequest.Builder create(String sourceKey, String destinationKey, HeadObjectResponse metadata); + } } diff --git a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java index bab53f27cd26..46f8ff9785bf 100644 --- a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java +++ b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java @@ -22,12 +22,17 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.MetadataDirective; import software.amazon.awssdk.services.s3.model.RestoreObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.StorageClass; +import software.amazon.awssdk.services.s3.model.TaggingDirective; import software.amazon.awssdk.services.s3.model.Tier; import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -87,6 +92,33 @@ void testRestoreObjectRequest() { assertThat(request.restoreRequest().glacierJobParameters().tier()).isEqualTo(Tier.STANDARD); } + @Test + void testCopyObjectRequest() { + AtomicReference source = new AtomicReference<>(); + AtomicReference destination = new AtomicReference<>(); + + CopyObjectRequest request = + S3ArchiveOperations.copyObjectRequest( + (sourceKey, destinationKey, metadata) -> { + source.set(sourceKey); + destination.set(destinationKey); + return CopyObjectRequest.builder() + .sourceKey(sourceKey) + .destinationKey(destinationKey); + }, + "partition/data.orc", + HeadObjectResponse.builder().build(), + StorageClass.GLACIER); + + assertThat(source).hasValue("partition/data.orc"); + assertThat(destination).hasValue("partition/data.orc"); + assertThat(request.sourceKey()).isEqualTo("partition/data.orc"); + assertThat(request.destinationKey()).isEqualTo("partition/data.orc"); + assertThat(request.storageClass()).isEqualTo(StorageClass.GLACIER); + assertThat(request.metadataDirective()).isEqualTo(MetadataDirective.COPY); + assertThat(request.taggingDirective()).isEqualTo(TaggingDirective.COPY); + } + @Test void testRestoreAlreadyInProgress() { assertThat( From 6d3a8d1a5ac3f722de1a3e285c1fa78ec4918baf Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 6 Jun 2026 17:46:50 +0530 Subject: [PATCH 5/6] Apply S3 archive formatting --- .../main/java/org/apache/paimon/s3/S3ArchiveOperations.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java index f68e1b1c6ca3..d50d27e81456 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java @@ -183,6 +183,7 @@ private static S3Client s3Client(S3AFileSystem fileSystem) throws IOException { } interface CopyObjectRequestBuilderFactory { - CopyObjectRequest.Builder create(String sourceKey, String destinationKey, HeadObjectResponse metadata); + CopyObjectRequest.Builder create( + String sourceKey, String destinationKey, HeadObjectResponse metadata); } } From 6f64bfc9d6f0df579c3d2a90ba570c33b8e18bc7 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 9 Jun 2026 23:59:48 +0530 Subject: [PATCH 6/6] Support multipart S3 archive copy --- .../apache/paimon/s3/S3ArchiveOperations.java | 296 +++++++++++++++++- .../paimon/s3/S3ArchiveOperationsTest.java | 259 +++++++++++++++ 2 files changed, 545 insertions(+), 10 deletions(-) diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java index d50d27e81456..d3ff7795cc73 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3ArchiveOperations.java @@ -21,9 +21,19 @@ import org.apache.paimon.fs.StorageType; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.CopyPartResult; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest; +import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse; import software.amazon.awssdk.services.s3.model.GlacierJobParameters; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.MetadataDirective; @@ -33,13 +43,18 @@ import software.amazon.awssdk.services.s3.model.RestoreRequest; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.StorageClass; +import software.amazon.awssdk.services.s3.model.Tagging; import software.amazon.awssdk.services.s3.model.TaggingDirective; import software.amazon.awssdk.services.s3.model.Tier; +import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest; +import software.amazon.awssdk.services.s3.model.UploadPartCopyResponse; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -51,6 +66,16 @@ class S3ArchiveOperations { private static final String RESTORE_ALREADY_IN_PROGRESS = "RestoreAlreadyInProgress"; + static final long MAX_SINGLE_COPY_SIZE = 5_000_000_000L; + + static final long DEFAULT_MULTIPART_COPY_PART_SIZE = 128L * 1024 * 1024; + + private static final long MIN_MULTIPART_COPY_PART_SIZE = 5L * 1024 * 1024; + + private static final long MAX_MULTIPART_COPY_PART_SIZE = 5L * 1024 * 1024 * 1024; + + private static final int MAX_MULTIPART_COPY_PARTS = 10_000; + private S3ArchiveOperations() {} static StorageClass archiveStorageClass(StorageType type) { @@ -95,26 +120,51 @@ static void changeStorageClass( String operation) throws IOException { String key = fileSystem.pathToKey(path); - CopyObjectRequest request = - copyObjectRequest( - fileSystem.getRequestFactory()::newCopyObjectRequestBuilder, - key, - fileSystem.getObjectMetadata(path), - storageClass); + changeStorageClass( + s3Client(fileSystem), + requestBuilderFactory(fileSystem), + fileSystem.getBucket(), + key, + fileSystem.getObjectMetadata(path), + storageClass, + operation, + DEFAULT_MULTIPART_COPY_PART_SIZE); + } + static void changeStorageClass( + S3Client client, + ArchiveRequestBuilderFactory factory, + String bucket, + String key, + HeadObjectResponse metadata, + StorageClass storageClass, + String operation, + long multipartCopyPartSize) + throws IOException { try { - s3Client(fileSystem).copyObject(request); + if (useSingleCopy(metadata)) { + client.copyObject(copyObjectRequest(factory, key, metadata, storageClass)); + } else { + copyObjectWithMultipartUpload( + client, + factory, + bucket, + key, + metadata, + storageClass, + multipartCopyPartSize); + } } catch (ObjectNotInActiveTierErrorException e) { throw new IOException( - "S3 object " + path + " is not in active tier. Restore it before unarchiving.", + "S3 object " + key + " is not in active tier. Restore it before unarchiving.", e); } catch (S3Exception e) { throw new IOException( - "Failed to " + operation + " S3 object " + path + " to " + storageClass + ".", + "Failed to " + operation + " S3 object " + key + " to " + storageClass + ".", e); } catch (SdkException e) { throw new IOException( - "Failed to " + operation + " S3 object " + path + " to " + storageClass + ".", + "Failed to " + operation + " S3 object " + key + " to " + storageClass + ".", e); } } @@ -131,6 +181,171 @@ static CopyObjectRequest copyObjectRequest( .build(); } + private static boolean useSingleCopy(HeadObjectResponse metadata) { + Long contentLength = + checkNotNull( + metadata.contentLength(), "S3 object content length must not be null."); + return contentLength <= MAX_SINGLE_COPY_SIZE; + } + + private static void copyObjectWithMultipartUpload( + S3Client client, + ArchiveRequestBuilderFactory factory, + String bucket, + String key, + HeadObjectResponse metadata, + StorageClass storageClass, + long partSize) + throws IOException { + String uploadId = null; + try { + CreateMultipartUploadResponse createResponse = + client.createMultipartUpload( + createMultipartUploadRequest( + factory, client, bucket, key, metadata, storageClass)); + uploadId = + checkNotNull( + createResponse.uploadId(), "S3 multipart upload ID must not be null."); + + List completedParts = new ArrayList<>(); + for (CopyPartRange range : copyPartRanges(metadata.contentLength(), partSize)) { + UploadPartCopyResponse partResponse = + client.uploadPartCopy( + uploadPartCopyRequest( + bucket, key, uploadId, range, metadata.eTag())); + CopyPartResult copyPartResult = + checkNotNull( + partResponse.copyPartResult(), + "S3 copy part result must not be null."); + completedParts.add( + CompletedPart.builder() + .partNumber(range.partNumber) + .eTag(copyPartResult.eTag()) + .build()); + } + + client.completeMultipartUpload( + factory.newCompleteMultipartUploadRequestBuilder(key, uploadId, completedParts) + .build()); + } catch (IOException | RuntimeException e) { + if (uploadId != null) { + try { + client.abortMultipartUpload( + factory.newAbortMultipartUploadRequestBuilder(key, uploadId).build()); + } catch (RuntimeException abortException) { + e.addSuppressed(abortException); + } + } + throw e; + } + } + + @SuppressWarnings("deprecation") + static CreateMultipartUploadRequest createMultipartUploadRequest( + ArchiveRequestBuilderFactory factory, + S3Client client, + String bucket, + String key, + HeadObjectResponse metadata, + StorageClass storageClass) + throws IOException { + CreateMultipartUploadRequest.Builder builder = + factory.newMultipartUploadRequestBuilder(key) + .storageClass(storageClass) + .metadata(metadata.metadata()); + if (metadata.cacheControl() != null) { + builder.cacheControl(metadata.cacheControl()); + } + if (metadata.contentDisposition() != null) { + builder.contentDisposition(metadata.contentDisposition()); + } + if (metadata.contentEncoding() != null) { + builder.contentEncoding(metadata.contentEncoding()); + } + if (metadata.contentLanguage() != null) { + builder.contentLanguage(metadata.contentLanguage()); + } + if (metadata.contentType() != null) { + builder.contentType(metadata.contentType()); + } + if (metadata.expires() != null) { + builder.expires(metadata.expires()); + } + if (metadata.websiteRedirectLocation() != null) { + builder.websiteRedirectLocation(metadata.websiteRedirectLocation()); + } + if (metadata.objectLockMode() != null) { + builder.objectLockMode(metadata.objectLockMode()); + } + if (metadata.objectLockRetainUntilDate() != null) { + builder.objectLockRetainUntilDate(metadata.objectLockRetainUntilDate()); + } + if (metadata.objectLockLegalHoldStatus() != null) { + builder.objectLockLegalHoldStatus(metadata.objectLockLegalHoldStatus()); + } + + Tagging tagging = objectTagging(client, bucket, key); + if (tagging.hasTagSet() && !tagging.tagSet().isEmpty()) { + builder.tagging(tagging); + } + return builder.build(); + } + + private static Tagging objectTagging(S3Client client, String bucket, String key) { + GetObjectTaggingResponse response = + client.getObjectTagging( + GetObjectTaggingRequest.builder().bucket(bucket).key(key).build()); + return Tagging.builder().tagSet(response.tagSet()).build(); + } + + static UploadPartCopyRequest uploadPartCopyRequest( + String bucket, String key, String uploadId, CopyPartRange range, String eTag) { + UploadPartCopyRequest.Builder builder = + UploadPartCopyRequest.builder() + .sourceBucket(bucket) + .sourceKey(key) + .destinationBucket(bucket) + .destinationKey(key) + .uploadId(uploadId) + .partNumber(range.partNumber) + .copySourceRange(range.header()); + if (eTag != null) { + builder.copySourceIfMatch(eTag); + } + return builder.build(); + } + + static List copyPartRanges(long objectSize, long configuredPartSize) { + checkArgument(objectSize > 0, "Object size must be greater than zero."); + long partSize = multipartCopyPartSize(objectSize, configuredPartSize); + List ranges = new ArrayList<>(); + long start = 0; + int partNumber = 1; + while (start < objectSize) { + long end = Math.min(start + partSize, objectSize) - 1; + ranges.add(new CopyPartRange(partNumber, start, end)); + start = end + 1; + partNumber++; + } + return ranges; + } + + private static long multipartCopyPartSize(long objectSize, long configuredPartSize) { + checkArgument( + configuredPartSize > 0, "Multipart copy part size must be greater than zero."); + long minimumPartSize = + (objectSize + MAX_MULTIPART_COPY_PARTS - 1) / MAX_MULTIPART_COPY_PARTS; + long partSize = + Math.max( + Math.max(configuredPartSize, MIN_MULTIPART_COPY_PART_SIZE), + minimumPartSize); + checkArgument( + partSize <= MAX_MULTIPART_COPY_PART_SIZE, + "S3 object is too large for multipart copy: %s bytes.", + objectSize); + return partSize; + } + static void restoreArchive(S3AFileSystem fileSystem, org.apache.hadoop.fs.Path path, int days) throws IOException { RestoreObjectRequest request = @@ -168,6 +383,38 @@ static boolean isRestoreAlreadyInProgress(S3Exception exception) { && RESTORE_ALREADY_IN_PROGRESS.equals(exception.awsErrorDetails().errorCode()); } + private static ArchiveRequestBuilderFactory requestBuilderFactory(S3AFileSystem fileSystem) { + RequestFactory requestFactory = fileSystem.getRequestFactory(); + return new ArchiveRequestBuilderFactory() { + @Override + public CopyObjectRequest.Builder create( + String sourceKey, String destinationKey, HeadObjectResponse metadata) { + return requestFactory.newCopyObjectRequestBuilder( + sourceKey, destinationKey, metadata); + } + + @Override + public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(String key) + throws IOException { + return requestFactory.newMultipartUploadRequestBuilder( + key, PutObjectOptions.keepingDirs()); + } + + @Override + public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( + String key, String uploadId, List parts) { + return requestFactory.newCompleteMultipartUploadRequestBuilder( + key, uploadId, parts, PutObjectOptions.keepingDirs()); + } + + @Override + public AbortMultipartUploadRequest.Builder newAbortMultipartUploadRequestBuilder( + String key, String uploadId) { + return requestFactory.newAbortMultipartUploadRequestBuilder(key, uploadId); + } + }; + } + private static S3Client s3Client(S3AFileSystem fileSystem) throws IOException { try { Method method = S3AFileSystem.class.getDeclaredMethod("getS3Client"); @@ -186,4 +433,33 @@ interface CopyObjectRequestBuilderFactory { CopyObjectRequest.Builder create( String sourceKey, String destinationKey, HeadObjectResponse metadata); } + + interface ArchiveRequestBuilderFactory extends CopyObjectRequestBuilderFactory { + + CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(String key) + throws IOException; + + CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( + String key, String uploadId, List parts); + + AbortMultipartUploadRequest.Builder newAbortMultipartUploadRequestBuilder( + String key, String uploadId); + } + + static class CopyPartRange { + + private final int partNumber; + private final long start; + private final long end; + + private CopyPartRange(int partNumber, long start, long end) { + this.partNumber = partNumber; + this.start = start; + this.end = end; + } + + String header() { + return "bytes=" + start + "-" + end; + } + } } diff --git a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java index 46f8ff9785bf..df7b759b7385 100644 --- a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java +++ b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3ArchiveOperationsTest.java @@ -22,16 +22,36 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.CopyObjectResponse; +import software.amazon.awssdk.services.s3.model.CopyPartResult; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest; +import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.MetadataDirective; import software.amazon.awssdk.services.s3.model.RestoreObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.StorageClass; +import software.amazon.awssdk.services.s3.model.Tag; import software.amazon.awssdk.services.s3.model.TaggingDirective; import software.amazon.awssdk.services.s3.model.Tier; +import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest; +import software.amazon.awssdk.services.s3.model.UploadPartCopyResponse; +import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; @@ -119,6 +139,135 @@ void testCopyObjectRequest() { assertThat(request.taggingDirective()).isEqualTo(TaggingDirective.COPY); } + @Test + void testChangeStorageClassUsesSingleCopyAtMaxSingleCopySize() throws IOException { + RecordingS3Client client = new RecordingS3Client(); + TestArchiveRequestBuilderFactory factory = new TestArchiveRequestBuilderFactory("bucket"); + + S3ArchiveOperations.changeStorageClass( + client, + factory, + "bucket", + "partition/data.orc", + HeadObjectResponse.builder() + .contentLength(S3ArchiveOperations.MAX_SINGLE_COPY_SIZE) + .build(), + StorageClass.GLACIER, + "archive", + S3ArchiveOperations.DEFAULT_MULTIPART_COPY_PART_SIZE); + + assertThat(client.copyObjectRequests).hasSize(1); + assertThat(client.createMultipartUploadRequests).isEmpty(); + assertThat(client.uploadPartCopyRequests).isEmpty(); + assertThat(client.getObjectTaggingRequests).isEmpty(); + assertThat(client.copyObjectRequests.get(0).storageClass()).isEqualTo(StorageClass.GLACIER); + } + + @Test + void testChangeStorageClassUsesMultipartCopyForLargeObject() throws IOException { + RecordingS3Client client = new RecordingS3Client(); + client.tags.add(Tag.builder().key("project").value("paimon").build()); + TestArchiveRequestBuilderFactory factory = new TestArchiveRequestBuilderFactory("bucket"); + long objectSize = S3ArchiveOperations.MAX_SINGLE_COPY_SIZE + 1; + + S3ArchiveOperations.changeStorageClass( + client, + factory, + "bucket", + "partition/data.orc", + HeadObjectResponse.builder() + .contentLength(objectSize) + .eTag("\"etag\"") + .contentType("application/octet-stream") + .cacheControl("max-age=60") + .metadata(Collections.singletonMap("owner", "paimon")) + .build(), + StorageClass.DEEP_ARCHIVE, + "archive", + S3ArchiveOperations.DEFAULT_MULTIPART_COPY_PART_SIZE); + + assertThat(client.copyObjectRequests).isEmpty(); + assertThat(client.getObjectTaggingRequests).hasSize(1); + assertThat(client.createMultipartUploadRequests).hasSize(1); + assertThat(client.uploadPartCopyRequests).isNotEmpty(); + assertThat(client.completeMultipartUploadRequests).hasSize(1); + assertThat(client.abortMultipartUploadRequests).isEmpty(); + + CreateMultipartUploadRequest createRequest = client.createMultipartUploadRequests.get(0); + assertThat(createRequest.bucket()).isEqualTo("bucket"); + assertThat(createRequest.key()).isEqualTo("partition/data.orc"); + assertThat(createRequest.storageClass()).isEqualTo(StorageClass.DEEP_ARCHIVE); + assertThat(createRequest.contentType()).isEqualTo("application/octet-stream"); + assertThat(createRequest.cacheControl()).isEqualTo("max-age=60"); + assertThat(createRequest.metadata()).containsEntry("owner", "paimon"); + assertThat(createRequest.tagging()).contains("project=paimon"); + + UploadPartCopyRequest firstPart = client.uploadPartCopyRequests.get(0); + assertThat(firstPart.sourceBucket()).isEqualTo("bucket"); + assertThat(firstPart.sourceKey()).isEqualTo("partition/data.orc"); + assertThat(firstPart.destinationBucket()).isEqualTo("bucket"); + assertThat(firstPart.destinationKey()).isEqualTo("partition/data.orc"); + assertThat(firstPart.copySourceIfMatch()).isEqualTo("\"etag\""); + assertThat(firstPart.copySourceRange()) + .isEqualTo("bytes=0-" + (S3ArchiveOperations.DEFAULT_MULTIPART_COPY_PART_SIZE - 1)); + + UploadPartCopyRequest lastPart = + client.uploadPartCopyRequests.get(client.uploadPartCopyRequests.size() - 1); + assertThat(lastPart.copySourceRange()).endsWith("-" + (objectSize - 1)); + + List completedParts = + client.completeMultipartUploadRequests.get(0).multipartUpload().parts(); + assertThat(completedParts).hasSize(client.uploadPartCopyRequests.size()); + assertThat(completedParts.get(0).partNumber()).isEqualTo(1); + assertThat(completedParts.get(0).eTag()).isEqualTo("etag-1"); + } + + @Test + void testCopyPartRanges() { + long partSize = 5L * 1024 * 1024; + List ranges = + S3ArchiveOperations.copyPartRanges(partSize * 2 + 1, partSize); + + assertThat(ranges) + .extracting(S3ArchiveOperations.CopyPartRange::header) + .containsExactly( + "bytes=0-" + (partSize - 1), + "bytes=" + partSize + "-" + (partSize * 2 - 1), + "bytes=" + (partSize * 2) + "-" + (partSize * 2)); + } + + @Test + void testMultipartCopyAbortsOnPartFailure() { + RecordingS3Client client = new RecordingS3Client(); + client.failPartNumber = 2; + TestArchiveRequestBuilderFactory factory = new TestArchiveRequestBuilderFactory("bucket"); + long partSize = S3ArchiveOperations.DEFAULT_MULTIPART_COPY_PART_SIZE; + + assertThatThrownBy( + () -> + S3ArchiveOperations.changeStorageClass( + client, + factory, + "bucket", + "partition/data.orc", + HeadObjectResponse.builder() + .contentLength( + S3ArchiveOperations.MAX_SINGLE_COPY_SIZE + + partSize) + .build(), + StorageClass.GLACIER, + "archive", + partSize)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Failed to archive"); + + assertThat(client.createMultipartUploadRequests).hasSize(1); + assertThat(client.uploadPartCopyRequests).hasSize(2); + assertThat(client.completeMultipartUploadRequests).isEmpty(); + assertThat(client.abortMultipartUploadRequests).hasSize(1); + assertThat(client.abortMultipartUploadRequests.get(0).uploadId()).isEqualTo("upload-id"); + } + @Test void testRestoreAlreadyInProgress() { assertThat( @@ -129,6 +278,116 @@ void testRestoreAlreadyInProgress() { .isFalse(); } + private static class TestArchiveRequestBuilderFactory + implements S3ArchiveOperations.ArchiveRequestBuilderFactory { + + private final String bucket; + + private TestArchiveRequestBuilderFactory(String bucket) { + this.bucket = bucket; + } + + @Override + public CopyObjectRequest.Builder create( + String sourceKey, String destinationKey, HeadObjectResponse metadata) { + return CopyObjectRequest.builder() + .sourceBucket(bucket) + .sourceKey(sourceKey) + .destinationBucket(bucket) + .destinationKey(destinationKey); + } + + @Override + public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(String key) { + return CreateMultipartUploadRequest.builder().bucket(bucket).key(key); + } + + @Override + public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( + String key, String uploadId, List parts) { + return CompleteMultipartUploadRequest.builder() + .bucket(bucket) + .key(key) + .uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder().parts(parts).build()); + } + + @Override + public AbortMultipartUploadRequest.Builder newAbortMultipartUploadRequestBuilder( + String key, String uploadId) { + return AbortMultipartUploadRequest.builder().bucket(bucket).key(key).uploadId(uploadId); + } + } + + private static class RecordingS3Client implements S3Client { + + private final List copyObjectRequests = new ArrayList<>(); + private final List getObjectTaggingRequests = new ArrayList<>(); + private final List createMultipartUploadRequests = + new ArrayList<>(); + private final List uploadPartCopyRequests = new ArrayList<>(); + private final List completeMultipartUploadRequests = + new ArrayList<>(); + private final List abortMultipartUploadRequests = + new ArrayList<>(); + private final List tags = new ArrayList<>(); + + private int failPartNumber = -1; + + @Override + public String serviceName() { + return S3Client.SERVICE_NAME; + } + + @Override + public void close() {} + + @Override + public CopyObjectResponse copyObject(CopyObjectRequest copyObjectRequest) { + copyObjectRequests.add(copyObjectRequest); + return CopyObjectResponse.builder().build(); + } + + @Override + public GetObjectTaggingResponse getObjectTagging(GetObjectTaggingRequest request) { + getObjectTaggingRequests.add(request); + return GetObjectTaggingResponse.builder().tagSet(tags).build(); + } + + @Override + public CreateMultipartUploadResponse createMultipartUpload( + CreateMultipartUploadRequest request) { + createMultipartUploadRequests.add(request); + return CreateMultipartUploadResponse.builder().uploadId("upload-id").build(); + } + + @Override + public UploadPartCopyResponse uploadPartCopy(UploadPartCopyRequest request) { + uploadPartCopyRequests.add(request); + if (request.partNumber() == failPartNumber) { + throw S3Exception.builder().message("copy failed").build(); + } + return UploadPartCopyResponse.builder() + .copyPartResult( + CopyPartResult.builder().eTag("etag-" + request.partNumber()).build()) + .build(); + } + + @Override + public CompleteMultipartUploadResponse completeMultipartUpload( + CompleteMultipartUploadRequest request) { + completeMultipartUploadRequests.add(request); + return CompleteMultipartUploadResponse.builder().build(); + } + + @Override + public AbortMultipartUploadResponse abortMultipartUpload( + AbortMultipartUploadRequest request) { + abortMultipartUploadRequests.add(request); + return AbortMultipartUploadResponse.builder().build(); + } + } + private static S3Exception s3Exception(String errorCode) { return (S3Exception) S3Exception.builder()