diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneLifecycleConfiguration.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneLifecycleConfiguration.java index ab4b881e65dc..a325f3820d07 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneLifecycleConfiguration.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneLifecycleConfiguration.java @@ -62,6 +62,21 @@ public Integer getDays() { } } + /** + * A class that encapsulates OzoneLCAbortIncompleteMultipartUpload. + */ + public static class OzoneLCAbortIncompleteMultipartUpload { + private final Integer daysAfterInitiation; + + public OzoneLCAbortIncompleteMultipartUpload(Integer daysAfterInitiation) { + this.daysAfterInitiation = daysAfterInitiation; + } + + public Integer getDaysAfterInitiation() { + return daysAfterInitiation; + } + } + /** * A class that encapsulates {@link org.apache.hadoop.ozone.om.helpers.OmLifecycleRuleAndOperator}. */ @@ -120,14 +135,17 @@ public static class OzoneLCRule { private final String prefix; private final String status; private final OzoneLCExpiration expiration; + private final OzoneLCAbortIncompleteMultipartUpload abortIncompleteMultipartUpload; private final OzoneLCFilter filter; public OzoneLCRule(String id, String prefix, String status, - OzoneLCExpiration expiration, OzoneLCFilter filter) { + OzoneLCExpiration expiration, OzoneLCAbortIncompleteMultipartUpload abortIncompleteMultipartUpload, + OzoneLCFilter filter) { this.id = id; this.prefix = prefix; this.status = status; this.expiration = expiration; + this.abortIncompleteMultipartUpload = abortIncompleteMultipartUpload; this.filter = filter; } @@ -147,6 +165,10 @@ public OzoneLCExpiration getExpiration() { return expiration; } + public OzoneLCAbortIncompleteMultipartUpload getAbortIncompleteMultipartUpload() { + return abortIncompleteMultipartUpload; + } + public OzoneLCFilter getFilter() { return filter; } @@ -181,6 +203,12 @@ public static OzoneLifecycleConfiguration fromOmLifecycleConfiguration( r.getExpiration().getDays(), r.getExpiration().getDate()); } + OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload a = null; + if (r.getAbortIncompleteMultipartUpload() != null) { + a = new OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload( + r.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + OzoneLifecycleConfiguration.OzoneLCFilter f = null; if (r.getFilter() != null) { LifecycleAndOperator andOperator = null; @@ -193,7 +221,7 @@ public static OzoneLifecycleConfiguration fromOmLifecycleConfiguration( } rules.add(new OzoneLifecycleConfiguration.OzoneLCRule(r.getId(), - r.getPrefix(), (r.isEnabled() ? "Enabled" : "Disabled"), e, f)); + r.getPrefix(), (r.isEnabled() ? "Enabled" : "Disabled"), e, a, f)); } return new OzoneLifecycleConfiguration(lifecycleConfiguration.getVolume(), diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAbortIncompleteMultipartUpload.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAbortIncompleteMultipartUpload.java new file mode 100644 index 000000000000..a9c06c28f7c5 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAbortIncompleteMultipartUpload.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.helpers; + +import jakarta.annotation.Nullable; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AbortIncompleteMultipartUpload; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LifecycleAction; + +/** + * A class that encapsulates lifecycle rule AbortIncompleteMultipartUpload action. + * This class extends OmLCAction and represents the AbortIncompleteMultipartUpload + * action type in lifecycle configuration. + */ +public final class OmLCAbortIncompleteMultipartUpload implements OmLCAction { + private final Integer daysAfterInitiation; + private long daysInMilli; + + private OmLCAbortIncompleteMultipartUpload() { + throw new UnsupportedOperationException("Default constructor is not supported. Use Builder."); + } + + private OmLCAbortIncompleteMultipartUpload(Builder builder) { + this.daysAfterInitiation = builder.daysAfterInitiation; + } + + @Nullable + public Integer getDaysAfterInitiation() { + return daysAfterInitiation; + } + + /** + * Checks if a multipart upload is eligible for abort based on its creation time. + * + * @param creationTimestamp The creation time of the multipart upload in milliseconds since epoch + * @return true if the upload should be aborted, false otherwise + */ + public boolean shouldAbort(long creationTimestamp) { + ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + ZonedDateTime dateTime = ZonedDateTime.ofInstant( + Instant.ofEpochMilli(creationTimestamp + daysInMilli), ZoneOffset.UTC); + return now.isAfter(dateTime); + } + + @Override + public ActionType getActionType() { + return ActionType.ABORT_INCOMPLETE_MULTIPART_UPLOAD; + } + + /** + * Validates the AbortIncompleteMultipartUpload configuration. + * - DaysAfterInitiation must be specified + * - DaysAfterInitiation must be a positive number greater than zero + * + * @param creationTime The creation time of the lifecycle configuration in milliseconds since epoch + * @throws OMException if the validation fails + */ + @Override + public void valid(long creationTime) throws OMException { + if (daysAfterInitiation == null) { + throw new OMException("Invalid lifecycle configuration: 'DaysAfterInitiation' " + + "must be specified for AbortIncompleteMultipartUpload action.", + OMException.ResultCodes.INVALID_REQUEST); + } + + if (daysAfterInitiation <= 0) { + throw new OMException("'DaysAfterInitiation' for AbortIncompleteMultipartUpload action " + + "must be a positive integer greater than zero.", + OMException.ResultCodes.INVALID_REQUEST); + } + + daysInMilli = TimeUnit.DAYS.toMillis(daysAfterInitiation); + } + + @Override + public LifecycleAction getProtobuf() { + AbortIncompleteMultipartUpload.Builder builder = AbortIncompleteMultipartUpload.newBuilder(); + + if (daysAfterInitiation != null) { + builder.setDaysAfterInitiation(daysAfterInitiation); + } + + return LifecycleAction.newBuilder() + .setAbortIncompleteMultipartUpload(builder).build(); + } + + public static OmLCAbortIncompleteMultipartUpload getFromProtobuf( + AbortIncompleteMultipartUpload abortIncompleteMultipartUpload) { + OmLCAbortIncompleteMultipartUpload.Builder builder = new Builder(); + + if (abortIncompleteMultipartUpload.hasDaysAfterInitiation()) { + builder.setDaysAfterInitiation(abortIncompleteMultipartUpload.getDaysAfterInitiation()); + } + + return builder.build(); + } + + @Override + public String toString() { + return "OmLCAbortIncompleteMultipartUpload{" + + "daysAfterInitiation=" + daysAfterInitiation + + '}'; + } + + /** + * Builder of OmLCAbortIncompleteMultipartUpload. + */ + public static class Builder { + private Integer daysAfterInitiation = null; + + public Builder setDaysAfterInitiation(int days) { + this.daysAfterInitiation = days; + return this; + } + + public OmLCAbortIncompleteMultipartUpload build() { + return new OmLCAbortIncompleteMultipartUpload(this); + } + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAction.java index 4db13e20743e..3d746bd1381b 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAction.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAction.java @@ -53,6 +53,7 @@ public interface OmLCAction { */ enum ActionType { EXPIRATION, + ABORT_INCOMPLETE_MULTIPART_UPLOAD, // Future action types can be added here (e.g., TRANSITION) } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCRule.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCRule.java index 00875842655c..10b627790a03 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCRule.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCRule.java @@ -124,6 +124,21 @@ public OmLCExpiration getExpiration() { return null; } + /** + * Get the AbortIncompleteMultipartUpload action if present. + * + * @return the AbortIncompleteMultipartUpload action if present, null otherwise + */ + @Nullable + public OmLCAbortIncompleteMultipartUpload getAbortIncompleteMultipartUpload() { + for (OmLCAction action : actions) { + if (action instanceof OmLCAbortIncompleteMultipartUpload) { + return (OmLCAbortIncompleteMultipartUpload) action; + } + } + return null; + } + @Nullable public OmLCFilter getFilter() { return filter; @@ -300,6 +315,10 @@ public static OmLCRule getFromProtobuf(LifecycleRule lifecycleRule, BucketLayout if (lifecycleAction.hasExpiration()) { builder.addAction(OmLCExpiration.getFromProtobuf(lifecycleAction.getExpiration())); } + if (lifecycleAction.hasAbortIncompleteMultipartUpload()) { + builder.addAction(OmLCAbortIncompleteMultipartUpload.getFromProtobuf( + lifecycleAction.getAbortIncompleteMultipartUpload())); + } } if (lifecycleRule.hasFilter()) { builder.setFilter(OmLCFilter.getFromProtobuf(lifecycleRule.getFilter(), layout)); diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCAbortIncompleteMultipartUpload.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCAbortIncompleteMultipartUpload.java new file mode 100644 index 000000000000..1a55751c513f --- /dev/null +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCAbortIncompleteMultipartUpload.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.helpers; + +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST; +import static org.apache.hadoop.ozone.om.helpers.OMLCUtils.assertOMException; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AbortIncompleteMultipartUpload; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LifecycleAction; +import org.junit.jupiter.api.Test; + +/** + * Test OmLCAbortIncompleteMultipartUpload. + */ +class TestOmLCAbortIncompleteMultipartUpload { + + @Test + public void testCreateValidAbortIncompleteMultipartUpload() { + long currentTime = System.currentTimeMillis(); + + OmLCAbortIncompleteMultipartUpload.Builder abort1 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(1); + assertDoesNotThrow(() -> abort1.build().valid(currentTime)); + + OmLCAbortIncompleteMultipartUpload.Builder abort2 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7); + assertDoesNotThrow(() -> abort2.build().valid(currentTime)); + + OmLCAbortIncompleteMultipartUpload.Builder abort3 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(365); + assertDoesNotThrow(() -> abort3.build().valid(currentTime)); + } + + @Test + public void testCreateInvalidAbortIncompleteMultipartUpload() { + long currentTime = System.currentTimeMillis(); + + // Null days should fail + OmLCAbortIncompleteMultipartUpload.Builder abort1 = + new OmLCAbortIncompleteMultipartUpload.Builder(); + assertOMException(() -> abort1.build().valid(currentTime), INVALID_REQUEST, + "must be specified for AbortIncompleteMultipartUpload action"); + + // Zero days should fail + OmLCAbortIncompleteMultipartUpload.Builder abort2 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(0); + assertOMException(() -> abort2.build().valid(currentTime), INVALID_REQUEST, + "must be a positive integer greater than zero"); + + // Negative days should fail + OmLCAbortIncompleteMultipartUpload.Builder abort3 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(-1); + assertOMException(() -> abort3.build().valid(currentTime), INVALID_REQUEST, + "must be a positive integer greater than zero"); + + OmLCAbortIncompleteMultipartUpload.Builder abort4 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(-100); + assertOMException(() -> abort4.build().valid(currentTime), INVALID_REQUEST, + "must be a positive integer greater than zero"); + } + + @Test + public void testShouldAbort() throws OMException { + long currentTime = System.currentTimeMillis(); + + // Upload created 10 days ago + long uploadCreationTime = currentTime - TimeUnit.DAYS.toMillis(10); + + // Rule: abort after 7 days - should abort + OmLCAbortIncompleteMultipartUpload abort7Days = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + abort7Days.valid(currentTime); + assertTrue(abort7Days.shouldAbort(uploadCreationTime), + "Upload created 10 days ago should be aborted with 7-day threshold"); + + // Rule: abort after 15 days - should NOT abort + OmLCAbortIncompleteMultipartUpload abort15Days = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(15) + .build(); + abort15Days.valid(currentTime); + assertFalse(abort15Days.shouldAbort(uploadCreationTime), + "Upload created 10 days ago should NOT be aborted with 15-day threshold"); + + // Upload created 1 hour ago - should NOT abort + long recentUploadTime = currentTime - TimeUnit.HOURS.toMillis(1); + OmLCAbortIncompleteMultipartUpload abort1Day = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(1) + .build(); + abort1Day.valid(currentTime); + assertFalse(abort1Day.shouldAbort(recentUploadTime), + "Upload created 1 hour ago should NOT be aborted with 1-day threshold"); + + // Upload created 1 day + 1 second ago - should abort + long moreThanOneDay = currentTime - TimeUnit.DAYS.toMillis(1) - TimeUnit.SECONDS.toMillis(1); + assertTrue(abort1Day.shouldAbort(moreThanOneDay), + "Upload created more than 1 day ago should be aborted"); + } + + @Test + public void testProtobufConversion() throws OMException { + long currentTime = System.currentTimeMillis(); + + // Create with 7 days + OmLCAbortIncompleteMultipartUpload original = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + + // Convert to protobuf + LifecycleAction proto = original.getProtobuf(); + assertTrue(proto.hasAbortIncompleteMultipartUpload(), + "Protobuf should have AbortIncompleteMultipartUpload"); + + AbortIncompleteMultipartUpload abortProto = proto.getAbortIncompleteMultipartUpload(); + assertEquals(7, abortProto.getDaysAfterInitiation(), + "Days should be preserved in protobuf"); + + // Convert back from protobuf + OmLCAbortIncompleteMultipartUpload fromProto = + OmLCAbortIncompleteMultipartUpload.getFromProtobuf(abortProto); + assertEquals(7, fromProto.getDaysAfterInitiation(), + "Days should be preserved after protobuf round-trip"); + + // Validate the converted object + assertDoesNotThrow(() -> fromProto.valid(currentTime), + "Object from protobuf should be valid"); + } + + @Test + public void testActionType() { + OmLCAbortIncompleteMultipartUpload abort = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + + assertEquals(OmLCAction.ActionType.ABORT_INCOMPLETE_MULTIPART_UPLOAD, + abort.getActionType(), + "Action type should be ABORT_INCOMPLETE_MULTIPART_UPLOAD"); + } +} diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCRule.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCRule.java index f81b8afca3ab..15e72e5a8c37 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCRule.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCRule.java @@ -318,4 +318,109 @@ public void testProtobufConversion() throws OMException { assertEquals(30, ruleFromProto3.getExpiration().getDays()); assertNull(ruleFromProto3.getFilter()); } + + @Test + public void testRuleWithAbortIncompleteMultipartUpload() throws OMException { + long currentTime = System.currentTimeMillis(); + + // Test rule with only AbortIncompleteMultipartUpload action + OmLCAbortIncompleteMultipartUpload abortAction = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + + OmLCRule.Builder rule1 = new OmLCRule.Builder() + .setId("abort-incomplete-uploads") + .setEnabled(true) + .setPrefix("uploads/") + .setAction(abortAction); + + OmLCRule builtRule = rule1.build(); + assertDoesNotThrow(() -> builtRule.valid(BucketLayout.DEFAULT, currentTime)); + assertNotNull(builtRule.getAbortIncompleteMultipartUpload()); + assertEquals(7, builtRule.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + + @Test + public void testRuleWithBothExpirationAndAbortActions() throws OMException { + long currentTime = System.currentTimeMillis(); + + OmLCExpiration expiration = new OmLCExpiration.Builder() + .setDays(30) + .build(); + + OmLCAbortIncompleteMultipartUpload abortAction = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + + OmLCRule.Builder rule = new OmLCRule.Builder() + .setId("combined-rule") + .setEnabled(true) + .setPrefix("temp/") + .addAction(expiration) + .addAction(abortAction); + + OmLCRule builtRule = rule.build(); + assertDoesNotThrow(() -> builtRule.valid(BucketLayout.DEFAULT, currentTime)); + assertNotNull(builtRule.getExpiration()); + assertNotNull(builtRule.getAbortIncompleteMultipartUpload()); + assertEquals(30, builtRule.getExpiration().getDays()); + assertEquals(7, builtRule.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + + @Test + public void testProtobufConversionWithAbortAction() throws OMException { + OmLCAbortIncompleteMultipartUpload abortAction = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(14) + .build(); + + OmLCRule originalRule = new OmLCRule.Builder() + .setId("test-abort-rule") + .setEnabled(true) + .setPrefix("multipart/") + .setAction(abortAction) + .build(); + + LifecycleRule proto = originalRule.getProtobuf(); + + OmLCRule ruleFromProto = OmLCRule.getFromProtobuf(proto, BucketLayout.DEFAULT); + assertEquals("test-abort-rule", ruleFromProto.getId()); + assertTrue(ruleFromProto.isEnabled()); + assertEquals("multipart/", ruleFromProto.getPrefix()); + assertNotNull(ruleFromProto.getAbortIncompleteMultipartUpload()); + assertEquals(14, ruleFromProto.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + + @Test + public void testProtobufConversionWithBothActions() throws OMException { + OmLCExpiration expiration = new OmLCExpiration.Builder() + .setDays(60) + .build(); + + OmLCAbortIncompleteMultipartUpload abortAction = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(5) + .build(); + + OmLCRule originalRule = new OmLCRule.Builder() + .setId("combined-actions") + .setEnabled(true) + .setPrefix("data/") + .addAction(expiration) + .addAction(abortAction) + .build(); + + LifecycleRule proto = originalRule.getProtobuf(); + + OmLCRule ruleFromProto = OmLCRule.getFromProtobuf(proto, BucketLayout.DEFAULT); + assertEquals("combined-actions", ruleFromProto.getId()); + assertTrue(ruleFromProto.isEnabled()); + assertEquals("data/", ruleFromProto.getPrefix()); + assertNotNull(ruleFromProto.getExpiration()); + assertNotNull(ruleFromProto.getAbortIncompleteMultipartUpload()); + assertEquals(60, ruleFromProto.getExpiration().getDays()); + assertEquals(5, ruleFromProto.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } } diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java index 1b754132c7bd..dfc390176888 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java @@ -33,6 +33,7 @@ import com.amazonaws.AmazonServiceException.ErrorType; import com.amazonaws.HttpMethod; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortIncompleteMultipartUpload; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.Bucket; @@ -1207,6 +1208,68 @@ public void testS3LifecycleConfigurationGet() { assertEquals(retrievedConfig.getRules().size(), configFromRequest.getRules().size()); } + @Test + public void testGetLifecycleWithAbortIncompleteMultipartUpload() { + final String bucketName = getBucketName(); + s3Client.createBucket(bucketName); + + BucketLifecycleConfiguration configuration = new BucketLifecycleConfiguration(); + List rules = new ArrayList<>(); + + BucketLifecycleConfiguration.Rule rule1 = new BucketLifecycleConfiguration.Rule() + .withId("abort-incomplete-mpu-with-prefix") + .withPrefix("uploads/") + .withStatus(BucketLifecycleConfiguration.ENABLED); + rule1.setAbortIncompleteMultipartUpload( + new AbortIncompleteMultipartUpload().withDaysAfterInitiation(7)); + + BucketLifecycleConfiguration.Rule rule2 = new BucketLifecycleConfiguration.Rule() + .withId("abort-incomplete-mpu-temp") + .withPrefix("temp/") + .withStatus(BucketLifecycleConfiguration.ENABLED); + rule2.setAbortIncompleteMultipartUpload( + new AbortIncompleteMultipartUpload().withDaysAfterInitiation(3)); + + BucketLifecycleConfiguration.Rule rule3 = new BucketLifecycleConfiguration.Rule() + .withId("abort-incomplete-mpu-no-prefix") + .withPrefix("") + .withStatus(BucketLifecycleConfiguration.ENABLED); + rule3.setAbortIncompleteMultipartUpload( + new AbortIncompleteMultipartUpload().withDaysAfterInitiation(30)); + + rules.add(rule1); + rules.add(rule2); + rules.add(rule3); + configuration.setRules(rules); + + // Set lifecycle configuration + s3Client.setBucketLifecycleConfiguration(bucketName, configuration); + + // Get and verify the configuration + BucketLifecycleConfiguration retrievedConfig = + s3Client.getBucketLifecycleConfiguration(bucketName); + + assertEquals(3, retrievedConfig.getRules().size()); + + BucketLifecycleConfiguration.Rule retrievedRule1 = retrievedConfig.getRules().get(0); + assertEquals("abort-incomplete-mpu-with-prefix", retrievedRule1.getId()); + assertEquals("uploads/", retrievedRule1.getPrefix()); + assertEquals(BucketLifecycleConfiguration.ENABLED, retrievedRule1.getStatus()); + assertEquals(7, retrievedRule1.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + + BucketLifecycleConfiguration.Rule retrievedRule2 = retrievedConfig.getRules().get(1); + assertEquals("abort-incomplete-mpu-temp", retrievedRule2.getId()); + assertEquals("temp/", retrievedRule2.getPrefix()); + assertEquals(BucketLifecycleConfiguration.ENABLED, retrievedRule2.getStatus()); + assertEquals(3, retrievedRule2.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + + BucketLifecycleConfiguration.Rule retrievedRule3 = retrievedConfig.getRules().get(2); + assertEquals("abort-incomplete-mpu-no-prefix", retrievedRule3.getId()); + assertEquals("", retrievedRule3.getPrefix()); + assertEquals(BucketLifecycleConfiguration.ENABLED, retrievedRule3.getStatus()); + assertEquals(30, retrievedRule3.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + @Nested @TestInstance(TestInstance.Lifecycle.PER_CLASS) class PresignedUrlTests { diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java index 09026dcb9182..17f144b7a8f4 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java @@ -1061,6 +1061,107 @@ public void testReadSnapshotDirectoryUsingS3SDK() throws Exception { assertEquals(content, snapshotResponse.asUtf8String()); } + @Test + public void testGetLifecycleWithAbortIncompleteMultipartUpload() { + final String bucketName = getBucketName(); + s3Client.createBucket(b -> b.bucket(bucketName)); + + software.amazon.awssdk.services.s3.model.LifecycleRule rule1 = + software.amazon.awssdk.services.s3.model.LifecycleRule.builder() + .id("abort-incomplete-mpu-with-prefix") + .prefix("uploads/") + .status(software.amazon.awssdk.services.s3.model.ExpirationStatus.ENABLED) + .abortIncompleteMultipartUpload( + software.amazon.awssdk.services.s3.model.AbortIncompleteMultipartUpload.builder() + .daysAfterInitiation(7) + .build()) + .build(); + + software.amazon.awssdk.services.s3.model.LifecycleRule rule2 = + software.amazon.awssdk.services.s3.model.LifecycleRule.builder() + .id("abort-incomplete-mpu-with-tag") + .filter(software.amazon.awssdk.services.s3.model.LifecycleRuleFilter.builder() + .tag(Tag.builder().key("env").value("dev").build()) + .build()) + .status(software.amazon.awssdk.services.s3.model.ExpirationStatus.ENABLED) + .abortIncompleteMultipartUpload( + software.amazon.awssdk.services.s3.model.AbortIncompleteMultipartUpload.builder() + .daysAfterInitiation(14) + .build()) + .build(); + + software.amazon.awssdk.services.s3.model.LifecycleRule rule3 = + software.amazon.awssdk.services.s3.model.LifecycleRule.builder() + .id("abort-incomplete-mpu-with-and-operator") + .filter(software.amazon.awssdk.services.s3.model.LifecycleRuleFilter.builder() + .and(software.amazon.awssdk.services.s3.model.LifecycleRuleAndOperator.builder() + .prefix("temp/") + .tags(Tag.builder().key("type").value("temporary").build()) + .build()) + .build()) + .status(software.amazon.awssdk.services.s3.model.ExpirationStatus.ENABLED) + .abortIncompleteMultipartUpload( + software.amazon.awssdk.services.s3.model.AbortIncompleteMultipartUpload.builder() + .daysAfterInitiation(3) + .build()) + .build(); + + software.amazon.awssdk.services.s3.model.LifecycleRule rule4 = + software.amazon.awssdk.services.s3.model.LifecycleRule.builder() + .id("abort-incomplete-mpu-no-filter") + .prefix("") + .status(software.amazon.awssdk.services.s3.model.ExpirationStatus.ENABLED) + .abortIncompleteMultipartUpload( + software.amazon.awssdk.services.s3.model.AbortIncompleteMultipartUpload.builder() + .daysAfterInitiation(30) + .build()) + .build(); + + software.amazon.awssdk.services.s3.model.BucketLifecycleConfiguration configuration = + software.amazon.awssdk.services.s3.model.BucketLifecycleConfiguration.builder() + .rules(rule1, rule2, rule3, rule4) + .build(); + + s3Client.putBucketLifecycleConfiguration(b -> b + .bucket(bucketName) + .lifecycleConfiguration(configuration)); + + software.amazon.awssdk.services.s3.model.GetBucketLifecycleConfigurationResponse response = + s3Client.getBucketLifecycleConfiguration(b -> b.bucket(bucketName)); + + List rules = response.rules(); + assertEquals(4, rules.size()); + + software.amazon.awssdk.services.s3.model.LifecycleRule retrievedRule1 = rules.get(0); + assertEquals("abort-incomplete-mpu-with-prefix", retrievedRule1.id()); + assertEquals("uploads/", retrievedRule1.prefix()); + assertEquals(software.amazon.awssdk.services.s3.model.ExpirationStatus.ENABLED, retrievedRule1.status()); + assertEquals(7, retrievedRule1.abortIncompleteMultipartUpload().daysAfterInitiation()); + + software.amazon.awssdk.services.s3.model.LifecycleRule retrievedRule2 = rules.get(1); + assertEquals("abort-incomplete-mpu-with-tag", retrievedRule2.id()); + assertEquals(software.amazon.awssdk.services.s3.model.ExpirationStatus.ENABLED, retrievedRule2.status()); + assertEquals(14, retrievedRule2.abortIncompleteMultipartUpload().daysAfterInitiation()); + assertEquals("env", retrievedRule2.filter().tag().key()); + assertEquals("dev", retrievedRule2.filter().tag().value()); + + software.amazon.awssdk.services.s3.model.LifecycleRule retrievedRule3 = rules.get(2); + assertEquals("abort-incomplete-mpu-with-and-operator", retrievedRule3.id()); + assertEquals(software.amazon.awssdk.services.s3.model.ExpirationStatus.ENABLED, retrievedRule3.status()); + assertEquals(3, retrievedRule3.abortIncompleteMultipartUpload().daysAfterInitiation()); + assertEquals("temp/", retrievedRule3.filter().and().prefix()); + assertEquals(1, retrievedRule3.filter().and().tags().size()); + Tag andTag = retrievedRule3.filter().and().tags().get(0); + assertEquals("type", andTag.key()); + assertEquals("temporary", andTag.value()); + + software.amazon.awssdk.services.s3.model.LifecycleRule retrievedRule4 = rules.get(3); + assertEquals("abort-incomplete-mpu-no-filter", retrievedRule4.id()); + assertEquals("", retrievedRule4.prefix()); + assertEquals(software.amazon.awssdk.services.s3.model.ExpirationStatus.ENABLED, retrievedRule4.status()); + assertEquals(30, retrievedRule4.abortIncompleteMultipartUpload().daysAfterInitiation()); + } + private String getBucketName() { return getBucketName(""); } diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 1aecf1c791fb..8a6048806a93 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -2434,6 +2434,10 @@ message LifecycleExpiration { optional string date = 2; } +message AbortIncompleteMultipartUpload { + optional uint32 daysAfterInitiation = 1; +} + message LifecycleRule { required string id = 1; required bool enabled = 2; @@ -2444,6 +2448,7 @@ message LifecycleRule { message LifecycleAction { optional LifecycleExpiration expiration = 1; + optional AbortIncompleteMultipartUpload abortIncompleteMultipartUpload = 2; } message LifecycleConfiguration { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java index 3666401da164..c081adab2440 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.nio.file.Paths; import java.security.PrivilegedExceptionAction; +import java.time.Instant; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -79,9 +80,12 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmLCRule; import org.apache.hadoop.ozone.om.helpers.OmLifecycleConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyArgs; @@ -264,6 +268,8 @@ public final class LifecycleActionTask implements BackgroundTask { private long numKeyRenamed = 0; private long sizeKeyRenamed = 0; private long numDirRenamed = 0; + private long numMultipartUploadIterated = 0; + private long numMultipartUploadAborted = 0; public LifecycleActionTask(OmLifecycleConfiguration lcConfig) { this.policy = lcConfig; @@ -366,6 +372,10 @@ public BackgroundTaskResult call() { handleAndClearFullList(bucket, expiredKeyList, false); handleAndClearFullList(bucket, expiredDirList, true); } + + // Process AbortIncompleteMultipartUpload actions + processMultipartUploads(bucket, ruleList); + onSuccess(bucketKey); } @@ -743,6 +753,208 @@ private void evaluateBucket(OmBucketInfo bucketInfo, } } + /** + * Process AbortIncompleteMultipartUpload actions for incomplete multipart uploads. + * Iterates through the multipartInfoTable and aborts uploads that match the rule criteria + * and have exceeded the configured days after initiation. + * + * @param bucketInfo the bucket information + * @param ruleList list of lifecycle rules to evaluate + */ + private void processMultipartUploads(OmBucketInfo bucketInfo, List ruleList) { + // Filter rules that have AbortIncompleteMultipartUpload actions + List mpuRules = ruleList.stream() + .filter(r -> r.getAbortIncompleteMultipartUpload() != null) + .collect(Collectors.toList()); + + if (mpuRules.isEmpty()) { + return; + } + + String volumeName = bucketInfo.getVolumeName(); + String bucketName = bucketInfo.getBucketName(); + String bucketPrefix = omMetadataManager.getMultipartKey(volumeName, bucketName, "", ""); + + LOG.debug("Processing AbortIncompleteMultipartUpload actions for bucket {}/{}", volumeName, bucketName); + + LimitedSizeList expiredUploads = new LimitedSizeList<>(listMaxSize); + + try (TableIterator> mpuIterator = + omMetadataManager.getMultipartInfoTable().iterator(bucketPrefix)) { + while (mpuIterator.hasNext()) { + if (!shouldRun()) { + LOG.info("KeyLifecycleService is suspended or disabled. " + + "Stopping multipart upload processing for bucket {}.", bucketName); + return; + } + + Table.KeyValue entry = mpuIterator.next(); + OmMultipartKeyInfo mpuKeyInfo = entry.getValue(); + numMultipartUploadIterated++; + + // Extract multipart upload information from the key + OmMultipartUpload upload; + try { + upload = OmMultipartUpload.from(entry.getKey()); + } catch (IllegalArgumentException e) { + LOG.warn("Failed to parse multipart upload key {} in bucket {}/{}, skipping", + entry.getKey(), volumeName, bucketName, e); + continue; + } + + upload.setCreationTime(Instant.ofEpochMilli(mpuKeyInfo.getCreationTime())); + String keyName = upload.getKeyName(); + + // Get the open key to access tags + String multipartOpenKey; + try { + multipartOpenKey = OMMultipartUploadUtils.getMultipartOpenKey( + volumeName, bucketName, keyName, upload.getUploadId(), + omMetadataManager, bucketInfo.getBucketLayout()); + } catch (OMException e) { + LOG.warn("Failed to get multipart open key for {}/{}/{}, skipping", + volumeName, bucketName, keyName, e); + continue; + } + + OmKeyInfo openKeyInfo = omMetadataManager.getOpenKeyTable(bucketInfo.getBucketLayout()) + .get(multipartOpenKey); + if (openKeyInfo == null) { + LOG.warn("Open key not found for multipart upload {}/{}/{}, skipping", + volumeName, bucketName, keyName); + continue; + } + + // Check each rule to see if this upload should be aborted + for (OmLCRule rule : mpuRules) { + if (shouldAbortUpload(openKeyInfo, upload, keyName, rule)) { + if (expiredUploads.isFull()) { + LOG.info("Multipart upload list reached batch size {}, aborting current batch for bucket {}/{}", + listMaxSize, volumeName, bucketName); + abortExpiredMultipartUploadsAndClear(bucketInfo, expiredUploads); + } + + expiredUploads.add(upload); + LOG.debug("Multipart upload {}/{}/{} with uploadId {} will be aborted", + volumeName, bucketName, keyName, upload.getUploadId()); + break; // One rule match is enough + } + } + } + } catch (IOException e) { + LOG.warn("Failed to iterate multipartInfoTable for bucket {}/{}", volumeName, bucketName, e); + return; + } + + if (!expiredUploads.isEmpty()) { + LOG.info("{} expired multipart uploads remaining for bucket {}/{}", + expiredUploads.size(), volumeName, bucketName); + abortExpiredMultipartUploadsAndClear(bucketInfo, expiredUploads); + } + } + + /** + * Check if a multipart upload should be aborted based on the lifecycle rule. + * + * @param openKeyInfo the open key information with tags + * @param upload the multipart upload information + * @param keyName the key name of the upload + * @param rule the lifecycle rule to evaluate against + * @return true if the upload should be aborted, false otherwise + */ + private boolean shouldAbortUpload(OmKeyInfo openKeyInfo, OmMultipartUpload upload, + String keyName, OmLCRule rule) { + // Check if upload age exceeds the threshold + if (!rule.getAbortIncompleteMultipartUpload().shouldAbort( + upload.getCreationTime().toEpochMilli())) { + return false; + } + + // Check prefix and tag filtering using the existing rule.match() logic + // The rule.match() method handles both prefix and tag filtering + if (!rule.match(openKeyInfo, keyName)) { + return false; // Prefix or tag doesn't match + } + + return true; + } + + /** + * Abort expired multipart uploads by sending an abort request. + * + * @param bucketInfo the bucket information + * @param expiredUploads list of expired multipart uploads to abort + */ + private void abortExpiredMultipartUploads(OmBucketInfo bucketInfo, List expiredUploads) { + String volumeName = bucketInfo.getVolumeName(); + String bucketName = bucketInfo.getBucketName(); + + List expiredMPUInfoList = expiredUploads.stream() + .map(upload -> OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo.newBuilder() + .setName(upload.getDbKey()) + .build()) + .collect(Collectors.toList()); + + OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket expiredMPUBucket = + OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .addAllMultipartUploads(expiredMPUInfoList) + .build(); + + OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest abortRequest = + OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest.newBuilder() + .addExpiredMultipartUploadsPerBucket(expiredMPUBucket) + .build(); + + OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.AbortExpiredMultiPartUploads) + .setMultipartUploadsExpiredAbortRequest(abortRequest) + .setVersion(ClientVersion.CURRENT_VERSION) + .setClientId(clientId.toString()) + .build(); + + try { + long startTime = System.nanoTime(); + OzoneManagerProtocolProtos.OMResponse response = OzoneManagerRatisUtils.submitRequest( + getOzoneManager(), omRequest, clientId, callId.getAndIncrement()); + long endTime = System.nanoTime(); + + if (response != null) { + if (response.getSuccess()) { + numMultipartUploadAborted += expiredUploads.size(); + + LOG.info("Successfully aborted {} multipart uploads for bucket {}/{} in {} ns", + expiredUploads.size(), volumeName, bucketName, endTime - startTime); + } else { + LOG.error("Failed to abort multipart uploads for bucket {}/{}: {}", + volumeName, bucketName, response.getMessage()); + } + } else { + LOG.error("Received null response when aborting multipart uploads for bucket {}/{}", + volumeName, bucketName); + } + } catch (ServiceException e) { + LOG.error("Failed to submit abort multipart uploads request for bucket {}/{}", + volumeName, bucketName, e); + } + } + + private void abortExpiredMultipartUploadsAndClear(OmBucketInfo bucketInfo, + LimitedSizeList expiredUploads) { + if (expiredUploads.isEmpty()) { + return; + } + + List uploadList = new ArrayList<>(); + for (int i = 0; i < expiredUploads.size(); i++) { + uploadList.add(expiredUploads.get(i)); + } + + abortExpiredMultipartUploads(bucketInfo, uploadList); + expiredUploads.clear(); + } + /** * If prefix is /dir1/dir2, but dir1 doesn't exist, then it will return exception. * If prefix is /dir1/dir2, but dir2 doesn't exist, then it will return a list with dir1 only. @@ -786,6 +998,7 @@ private void onFailure(String bucketName) { metrics.incrNumFailureTask(); metrics.incNumKeyIterated(numKeyIterated); metrics.incNumDirIterated(numDirIterated); + metrics.incNumMultipartUploadIterated(numMultipartUploadIterated); } private void onSuccess(String bucketName) { @@ -795,9 +1008,13 @@ private void onSuccess(String bucketName) { metrics.incTaskLatencyMs(timeSpent); metrics.incNumKeyIterated(numKeyIterated); metrics.incNumDirIterated(numDirIterated); - LOG.info("Spend {} ms on bucket {} to iterate {} keys and {} dirs, deleted {} keys with {} bytes, " + - "and {} dirs, renamed {} keys with {} bytes, and {} dirs to trash", timeSpent, bucketName, numKeyIterated, - numDirIterated, numKeyDeleted, sizeKeyDeleted, numDirDeleted, numKeyRenamed, sizeKeyRenamed, numDirRenamed); + metrics.incNumMultipartUploadIterated(numMultipartUploadIterated); + metrics.incNumMultipartUploadAborted(numMultipartUploadAborted); + LOG.info("Spend {} ms on bucket {} to iterate {} keys and {} dirs and {} multipart uploads, " + + "deleted {} keys with {} bytes, and {} dirs, renamed {} keys with {} bytes, and {} dirs to trash, " + + "aborted {} multipart uploads", timeSpent, bucketName, numKeyIterated, + numDirIterated, numMultipartUploadIterated, numKeyDeleted, sizeKeyDeleted, numDirDeleted, + numKeyRenamed, sizeKeyRenamed, numDirRenamed, numMultipartUploadAborted); } private void handleAndClearFullList(OmBucketInfo bucket, LimitedExpiredObjectList keysList, boolean dir) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java index 54539a75c921..b4ffe055396a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java @@ -62,6 +62,11 @@ public final class KeyLifecycleServiceMetrics { private MutableGaugeLong sizeKeyDeleted; @Metric("Total size of keys renamed") private MutableGaugeLong sizeKeyRenamed; + @Metric("Number of multipart uploads iterated") + private MutableGaugeLong numMultipartUploadsIterated; + + @Metric("Total multipart uploads aborted") + private MutableGaugeLong numMultipartUploadsAborted; private KeyLifecycleServiceMetrics() { this.registry = new MetricsRegistry(METRICS_SOURCE_NAME); @@ -168,4 +173,24 @@ public void incNumKeyIterated(long keyCount) { public void incNumDirIterated(long dirCount) { numDirIterated.incr(dirCount); } + + public void incNumMultipartUploadIterated(long count) { + numMultipartUploadsIterated.incr(count); + } + + public void incNumMultipartUploadAborted(long count) { + numMultipartUploadsAborted.incr(count); + } + + public MutableGaugeLong getNumMultipartUploadsIterated() { + return numMultipartUploadsIterated; + } + + public MutableGaugeLong getNumMultipartUploadsAborted() { + return numMultipartUploadsAborted; + } + + public MutableGaugeLong getNumSuccessTask() { + return numSuccessTask; + } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java index e499e0dd9ef8..67e46f6151f9 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java @@ -91,6 +91,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmLCAbortIncompleteMultipartUpload; import org.apache.hadoop.ozone.om.helpers.OmLCExpiration; import org.apache.hadoop.ozone.om.helpers.OmLCFilter; import org.apache.hadoop.ozone.om.helpers.OmLCRule; @@ -1607,6 +1608,136 @@ void testDisableMoveToTrashDeletesDirectly() throws Exception { assertEquals(0, getKeyCount(FILE_SYSTEM_OPTIMIZED) - initialKeyCount); deleteLifecyclePolicy(volumeName, bucketName); } + + @Test + void testAbortIncompleteMultipartUploadWithFilters() throws Exception { + final String volumeName = getTestName(); + final String bucketName = uniqueObjectName("bucket"); + + // Create volume and bucket + createVolumeAndBucket(volumeName, bucketName, OBJECT_STORE, + UserGroupInformation.getCurrentUser().getShortUserName()); + + String owner = UserGroupInformation.getCurrentUser().getShortUserName(); + + // Create multipart uploads with different prefixes + // MPU 1: prefix "uploads/file1" (should match "uploads/" prefix rule) + OmKeyArgs keyArgs1 = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName("uploads/file1") + .setAcls(Collections.emptyList()) + .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) + .setLocationInfoList(new ArrayList<>()) + .setOwnerName(owner) + .build(); + writeClient.initiateMultipartUpload(keyArgs1); + + // MPU 2: prefix "uploads/file2" (should match "uploads/" prefix rule) + OmKeyArgs keyArgs2 = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName("uploads/file2") + .setAcls(Collections.emptyList()) + .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) + .setLocationInfoList(new ArrayList<>()) + .setOwnerName(owner) + .build(); + writeClient.initiateMultipartUpload(keyArgs2); + + // MPU 3: prefix "temp/file3" (should match "temp/" prefix rule) + OmKeyArgs keyArgs3 = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName("temp/file3") + .setAcls(Collections.emptyList()) + .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) + .setLocationInfoList(new ArrayList<>()) + .setOwnerName(owner) + .build(); + writeClient.initiateMultipartUpload(keyArgs3); + + // MPU 4: prefix "keep/file4" (should NOT match any rule) + OmKeyArgs keyArgs4 = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName("keep/file4") + .setAcls(Collections.emptyList()) + .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) + .setLocationInfoList(new ArrayList<>()) + .setOwnerName(owner) + .build(); + writeClient.initiateMultipartUpload(keyArgs4); + + List rules = new ArrayList<>(); + + // Rule 1: Abort MPUs with prefix "uploads/" after 1 day + OmLCRule rule1 = new OmLCRule.Builder() + .setId("abort-mpu-uploads") + .setEnabled(true) + .setFilter(new OmLCFilter.Builder() + .setPrefix("uploads/") + .build()) + .setAction(new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(1) + .build()) + .build(); + rules.add(rule1); + + // Rule 2: Abort MPUs with prefix "temp/" after 1 day + OmLCRule rule2 = new OmLCRule.Builder() + .setId("abort-mpu-temp") + .setEnabled(true) + .setFilter(new OmLCFilter.Builder() + .setPrefix("temp/") + .build()) + .setAction(new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(1) + .build()) + .build(); + rules.add(rule2); + + // Validate the rules have AbortIncompleteMultipartUpload actions + for (OmLCRule rule : rules) { + assertNotNull(rule.getAbortIncompleteMultipartUpload(), + "Rule should have AbortIncompleteMultipartUpload action"); + assertEquals(1, rule.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + + createLifecyclePolicy(volumeName, bucketName, OBJECT_STORE, rules); + + // Verify lifecycle configuration was stored correctly + String lcKey = "/" + volumeName + "/" + bucketName; + OmLifecycleConfiguration storedConfig = metadataManager.getLifecycleConfigurationTable() + .get(lcKey); + assertNotNull(storedConfig, "Lifecycle configuration should be stored"); + assertEquals(2, storedConfig.getRules().size(), "Should have 2 rules"); + + // Verify rules preserve AbortIncompleteMultipartUpload actions after serialization/deserialization + for (OmLCRule rule : storedConfig.getRules()) { + assertNotNull(rule.getAbortIncompleteMultipartUpload(), + "Stored rule should have AbortIncompleteMultipartUpload action"); + assertEquals(1, rule.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + + // Verify the lifecycle configuration is valid + storedConfig.valid(); + + // Verify MPUs were created successfully (at least 4 in total across all buckets) + long mpuCount = metadataManager.getMultipartInfoTable().getEstimatedKeyCount(); + assertTrue(mpuCount >= 4, "Expected at least 4 MPUs, but got " + mpuCount); + + // Verify rules can filter correctly + // Rule with prefix "uploads/" should match keys starting with "uploads/" + assertNotNull(rule1.getFilter()); + assertEquals("uploads/", rule1.getFilter().getPrefix()); + + // Rule with prefix "temp/" should match keys starting with "temp/" + assertNotNull(rule2.getFilter()); + assertEquals("temp/", rule2.getFilter().getPrefix()); + + deleteLifecyclePolicy(volumeName, bucketName); + } } /** diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3LifecycleConfiguration.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3LifecycleConfiguration.java index 5cbc4a275d21..ff72a322dee3 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3LifecycleConfiguration.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3LifecycleConfiguration.java @@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneLifecycleConfiguration; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmLCAbortIncompleteMultipartUpload; import org.apache.hadoop.ozone.om.helpers.OmLCExpiration; import org.apache.hadoop.ozone.om.helpers.OmLCFilter; import org.apache.hadoop.ozone.om.helpers.OmLCRule; @@ -72,6 +73,9 @@ public static class Rule { @XmlElement(name = "Expiration") private Expiration expiration; + @XmlElement(name = "AbortIncompleteMultipartUpload") + private AbortIncompleteMultipartUpload abortIncompleteMultipartUpload; + @XmlElement(name = "Filter") private Filter filter; @@ -107,6 +111,14 @@ public void setExpiration(Expiration expiration) { this.expiration = expiration; } + public AbortIncompleteMultipartUpload getAbortIncompleteMultipartUpload() { + return abortIncompleteMultipartUpload; + } + + public void setAbortIncompleteMultipartUpload(AbortIncompleteMultipartUpload abortIncompleteMultipartUpload) { + this.abortIncompleteMultipartUpload = abortIncompleteMultipartUpload; + } + public Filter getFilter() { return filter; } @@ -145,6 +157,24 @@ public void setDate(String date) { } } + /** + * AbortIncompleteMultipartUpload entity for lifecycle rule. + */ + @XmlAccessorType(XmlAccessType.FIELD) + @XmlRootElement(name = "AbortIncompleteMultipartUpload") + public static class AbortIncompleteMultipartUpload { + @XmlElement(name = "DaysAfterInitiation") + private Integer daysAfterInitiation; + + public Integer getDaysAfterInitiation() { + return daysAfterInitiation; + } + + public void setDaysAfterInitiation(Integer daysAfterInitiation) { + this.daysAfterInitiation = daysAfterInitiation; + } + } + /** * Tag entity for filter criteria. */ @@ -293,7 +323,10 @@ private OmLCRule convertToOmRule(Rule rule) throws OMException, OS3Exception { .setPrefix(rule.getPrefix()); if (rule.getExpiration() != null) { - builder.setAction(convertToOmExpiration(rule.getExpiration())); + builder.addAction(convertToOmExpiration(rule.getExpiration())); + } + if (rule.getAbortIncompleteMultipartUpload() != null) { + builder.addAction(convertToOmAbortIncompleteMultipartUpload(rule.getAbortIncompleteMultipartUpload())); } if (rule.getFilter() != null) { builder.setFilter(convertToOmFilter(rule.getFilter())); @@ -321,6 +354,24 @@ private OmLCExpiration convertToOmExpiration(Expiration expiration) throws OMExc return builder.build(); } + /** + * Converts S3 AbortIncompleteMultipartUpload to internal representation. + * + * @param abortIncompleteMultipartUpload the S3 AbortIncompleteMultipartUpload + * @return OmLCAbortIncompleteMultipartUpload internal representation + */ + private OmLCAbortIncompleteMultipartUpload convertToOmAbortIncompleteMultipartUpload( + AbortIncompleteMultipartUpload abortIncompleteMultipartUpload) throws OMException { + OmLCAbortIncompleteMultipartUpload.Builder builder = + new OmLCAbortIncompleteMultipartUpload.Builder(); + + if (abortIncompleteMultipartUpload.getDaysAfterInitiation() != null) { + builder.setDaysAfterInitiation(abortIncompleteMultipartUpload.getDaysAfterInitiation()); + } + + return builder.build(); + } + /** * Converts S3 filter to internal filter. * @@ -402,6 +453,10 @@ private static Rule convertFromOzoneRule(OzoneLifecycleConfiguration.OzoneLCRule if (ozoneRule.getExpiration() != null) { rule.setExpiration(convertFromOzoneExpiration(ozoneRule.getExpiration())); } + if (ozoneRule.getAbortIncompleteMultipartUpload() != null) { + rule.setAbortIncompleteMultipartUpload( + convertFromOzoneAbortIncompleteMultipartUpload(ozoneRule.getAbortIncompleteMultipartUpload())); + } if (ozoneRule.getFilter() != null) { rule.setFilter(convertFromOzoneFilter(ozoneRule.getFilter())); } @@ -431,6 +486,25 @@ private static Expiration convertFromOzoneExpiration( return expiration; } + /** + * Converts an Ozone internal AbortIncompleteMultipartUpload to S3 representation. + * + * @param ozoneAbortIncompleteMultipartUpload internal AbortIncompleteMultipartUpload + * @return AbortIncompleteMultipartUpload S3 representation + */ + private static AbortIncompleteMultipartUpload convertFromOzoneAbortIncompleteMultipartUpload( + OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload ozoneAbortIncompleteMultipartUpload) { + + AbortIncompleteMultipartUpload abortIncompleteMultipartUpload = new AbortIncompleteMultipartUpload(); + + if (ozoneAbortIncompleteMultipartUpload.getDaysAfterInitiation() > 0) { + abortIncompleteMultipartUpload.setDaysAfterInitiation( + ozoneAbortIncompleteMultipartUpload.getDaysAfterInitiation()); + } + + return abortIncompleteMultipartUpload; + } + /** * Converts an Ozone internal filter to S3 filter. * diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 1bb18583c36b..b054174adf7b 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -725,12 +725,17 @@ private static OzoneLifecycleConfiguration toOzoneLifecycleConfiguration( for (OmLCRule r: omLifecycleConfiguration.getRules()) { OzoneLifecycleConfiguration.OzoneLCExpiration e = null; + OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload a = null; OzoneLifecycleConfiguration.OzoneLCFilter f = null; if (r.getExpiration() != null) { e = new OzoneLifecycleConfiguration.OzoneLCExpiration( r.getExpiration().getDays(), r.getExpiration().getDate()); } + if (r.getAbortIncompleteMultipartUpload() != null) { + a = new OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload( + r.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } if (r.getFilter() != null) { OzoneLifecycleConfiguration.LifecycleAndOperator andOperator = null; if (r.getFilter().getAndOperator() != null) { @@ -742,7 +747,7 @@ private static OzoneLifecycleConfiguration toOzoneLifecycleConfiguration( } rules.add(new OzoneLifecycleConfiguration.OzoneLCRule(r.getId(), - r.getEffectivePrefix(), (r.isEnabled() ? "Enabled" : "Disabled"), e, f)); + r.getEffectivePrefix(), (r.isEnabled() ? "Enabled" : "Disabled"), e, a, f)); } return new OzoneLifecycleConfiguration( diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java index 2d602b4f4fcb..0558af9c1f8c 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java @@ -83,6 +83,47 @@ public void testGetLifecycleConfiguration() throws Exception { lcc.getRules().get(0).getExpiration().getDays().intValue()); } + @Test + public void testGetLifecycleWithAbortIncompleteMultipartUpload() throws Exception { + String bucketName = "bucket1"; + bucketEndpoint.put(bucketName, getBodyWithAbortAction()); + Response r = bucketEndpoint.get(bucketName); + + assertEquals(HTTP_OK, r.getStatus()); + S3LifecycleConfiguration lcc = (S3LifecycleConfiguration) r.getEntity(); + assertEquals(1, lcc.getRules().size()); + S3LifecycleConfiguration.Rule rule = lcc.getRules().get(0); + + assertEquals("abort-incomplete-uploads", rule.getId()); + assertEquals("uploads/", rule.getPrefix()); + assertEquals("Enabled", rule.getStatus()); + assertEquals(7, rule.getAbortIncompleteMultipartUpload() + .getDaysAfterInitiation().intValue()); + } + + @Test + public void testGetLifecycleWithBothActions() throws Exception { + String bucketName = "bucket1"; + bucketEndpoint.put(bucketName, getBodyWithBothActions()); + Response r = bucketEndpoint.get(bucketName); + + assertEquals(HTTP_OK, r.getStatus()); + S3LifecycleConfiguration lcc = (S3LifecycleConfiguration) r.getEntity(); + assertEquals(1, lcc.getRules().size()); + S3LifecycleConfiguration.Rule rule = lcc.getRules().get(0); + + assertEquals("cleanup-rule", rule.getId()); + assertEquals("temp/", rule.getPrefix()); + assertEquals("Enabled", rule.getStatus()); + + // Verify Expiration action + assertEquals(30, rule.getExpiration().getDays().intValue()); + + // Verify AbortIncompleteMultipartUpload action + assertEquals(7, rule.getAbortIncompleteMultipartUpload() + .getDaysAfterInitiation().intValue()); + } + private static InputStream getBody() { String xml = ("" + @@ -96,4 +137,37 @@ private static InputStream getBody() { return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); } + + private static InputStream getBodyWithAbortAction() { + String xml = "" + + "" + + "abort-incomplete-uploads" + + "uploads/" + + "Enabled" + + "" + + "7" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static InputStream getBodyWithBothActions() { + String xml = "" + + "" + + "cleanup-rule" + + "temp/" + + "Enabled" + + "" + + "30" + + "" + + "" + + "7" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationPut.java index a5f60e59536f..5b294e71a0a0 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationPut.java @@ -179,6 +179,29 @@ public void testPutLifecycleConfigurationFailsWithNonBucketOwner() } } + @Test + public void testPutLifecycleWithAbortIncompleteMultipartUpload() throws Exception { + assertEquals(HTTP_OK, bucketEndpoint.put("bucket1", withAbortIncompleteMultipartUpload()).getStatus()); + } + + @Test + public void testPutLifecycleWithBothExpirationAndAbort() throws Exception { + assertEquals(HTTP_OK, bucketEndpoint.put("bucket1", withBothExpirationAndAbort()).getStatus()); + } + + @Test + public void testPutInvalidAbortIncompleteMultipartUploadConfig() throws Exception { + // Test with zero days - should fail + testInvalidLifecycleConfiguration( + TestS3LifecycleConfigurationPut::withAbortZeroDays, + HTTP_BAD_REQUEST, INVALID_REQUEST.getCode()); + + // Test with negative days - should fail + testInvalidLifecycleConfiguration( + TestS3LifecycleConfigurationPut::withAbortNegativeDays, + HTTP_BAD_REQUEST, INVALID_REQUEST.getCode()); + } + private static InputStream onePrefix() { String xml = ("" + @@ -514,4 +537,67 @@ private InputStream useDuplicateTagInAndOperator() { return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); } + + private static InputStream withAbortIncompleteMultipartUpload() { + String xml = "" + + "" + + "abort-incomplete-uploads" + + "uploads/" + + "Enabled" + + "" + + "7" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static InputStream withBothExpirationAndAbort() { + String xml = "" + + "" + + "cleanup-rule" + + "temp/" + + "Enabled" + + "" + + "30" + + "" + + "" + + "7" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static InputStream withAbortZeroDays() { + String xml = "" + + "" + + "invalid-abort" + + "uploads/" + + "Enabled" + + "" + + "0" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static InputStream withAbortNegativeDays() { + String xml = "" + + "" + + "invalid-abort" + + "uploads/" + + "Enabled" + + "" + + "-1" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } }