diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneLifecycleConfiguration.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneLifecycleConfiguration.java index ab4b881e65dc..a325f3820d07 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneLifecycleConfiguration.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneLifecycleConfiguration.java @@ -62,6 +62,21 @@ public Integer getDays() { } } + /** + * A class that encapsulates OzoneLCAbortIncompleteMultipartUpload. + */ + public static class OzoneLCAbortIncompleteMultipartUpload { + private final Integer daysAfterInitiation; + + public OzoneLCAbortIncompleteMultipartUpload(Integer daysAfterInitiation) { + this.daysAfterInitiation = daysAfterInitiation; + } + + public Integer getDaysAfterInitiation() { + return daysAfterInitiation; + } + } + /** * A class that encapsulates {@link org.apache.hadoop.ozone.om.helpers.OmLifecycleRuleAndOperator}. */ @@ -120,14 +135,17 @@ public static class OzoneLCRule { private final String prefix; private final String status; private final OzoneLCExpiration expiration; + private final OzoneLCAbortIncompleteMultipartUpload abortIncompleteMultipartUpload; private final OzoneLCFilter filter; public OzoneLCRule(String id, String prefix, String status, - OzoneLCExpiration expiration, OzoneLCFilter filter) { + OzoneLCExpiration expiration, OzoneLCAbortIncompleteMultipartUpload abortIncompleteMultipartUpload, + OzoneLCFilter filter) { this.id = id; this.prefix = prefix; this.status = status; this.expiration = expiration; + this.abortIncompleteMultipartUpload = abortIncompleteMultipartUpload; this.filter = filter; } @@ -147,6 +165,10 @@ public OzoneLCExpiration getExpiration() { return expiration; } + public OzoneLCAbortIncompleteMultipartUpload getAbortIncompleteMultipartUpload() { + return abortIncompleteMultipartUpload; + } + public OzoneLCFilter getFilter() { return filter; } @@ -181,6 +203,12 @@ public static OzoneLifecycleConfiguration fromOmLifecycleConfiguration( r.getExpiration().getDays(), r.getExpiration().getDate()); } + OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload a = null; + if (r.getAbortIncompleteMultipartUpload() != null) { + a = new OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload( + r.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + OzoneLifecycleConfiguration.OzoneLCFilter f = null; if (r.getFilter() != null) { LifecycleAndOperator andOperator = null; @@ -193,7 +221,7 @@ public static OzoneLifecycleConfiguration fromOmLifecycleConfiguration( } rules.add(new OzoneLifecycleConfiguration.OzoneLCRule(r.getId(), - r.getPrefix(), (r.isEnabled() ? "Enabled" : "Disabled"), e, f)); + r.getPrefix(), (r.isEnabled() ? "Enabled" : "Disabled"), e, a, f)); } return new OzoneLifecycleConfiguration(lifecycleConfiguration.getVolume(), diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAbortIncompleteMultipartUpload.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAbortIncompleteMultipartUpload.java new file mode 100644 index 000000000000..a9c06c28f7c5 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAbortIncompleteMultipartUpload.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.helpers; + +import jakarta.annotation.Nullable; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AbortIncompleteMultipartUpload; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LifecycleAction; + +/** + * A class that encapsulates lifecycle rule AbortIncompleteMultipartUpload action. + * This class extends OmLCAction and represents the AbortIncompleteMultipartUpload + * action type in lifecycle configuration. + */ +public final class OmLCAbortIncompleteMultipartUpload implements OmLCAction { + private final Integer daysAfterInitiation; + private long daysInMilli; + + private OmLCAbortIncompleteMultipartUpload() { + throw new UnsupportedOperationException("Default constructor is not supported. Use Builder."); + } + + private OmLCAbortIncompleteMultipartUpload(Builder builder) { + this.daysAfterInitiation = builder.daysAfterInitiation; + } + + @Nullable + public Integer getDaysAfterInitiation() { + return daysAfterInitiation; + } + + /** + * Checks if a multipart upload is eligible for abort based on its creation time. + * + * @param creationTimestamp The creation time of the multipart upload in milliseconds since epoch + * @return true if the upload should be aborted, false otherwise + */ + public boolean shouldAbort(long creationTimestamp) { + ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + ZonedDateTime dateTime = ZonedDateTime.ofInstant( + Instant.ofEpochMilli(creationTimestamp + daysInMilli), ZoneOffset.UTC); + return now.isAfter(dateTime); + } + + @Override + public ActionType getActionType() { + return ActionType.ABORT_INCOMPLETE_MULTIPART_UPLOAD; + } + + /** + * Validates the AbortIncompleteMultipartUpload configuration. + * - DaysAfterInitiation must be specified + * - DaysAfterInitiation must be a positive number greater than zero + * + * @param creationTime The creation time of the lifecycle configuration in milliseconds since epoch + * @throws OMException if the validation fails + */ + @Override + public void valid(long creationTime) throws OMException { + if (daysAfterInitiation == null) { + throw new OMException("Invalid lifecycle configuration: 'DaysAfterInitiation' " + + "must be specified for AbortIncompleteMultipartUpload action.", + OMException.ResultCodes.INVALID_REQUEST); + } + + if (daysAfterInitiation <= 0) { + throw new OMException("'DaysAfterInitiation' for AbortIncompleteMultipartUpload action " + + "must be a positive integer greater than zero.", + OMException.ResultCodes.INVALID_REQUEST); + } + + daysInMilli = TimeUnit.DAYS.toMillis(daysAfterInitiation); + } + + @Override + public LifecycleAction getProtobuf() { + AbortIncompleteMultipartUpload.Builder builder = AbortIncompleteMultipartUpload.newBuilder(); + + if (daysAfterInitiation != null) { + builder.setDaysAfterInitiation(daysAfterInitiation); + } + + return LifecycleAction.newBuilder() + .setAbortIncompleteMultipartUpload(builder).build(); + } + + public static OmLCAbortIncompleteMultipartUpload getFromProtobuf( + AbortIncompleteMultipartUpload abortIncompleteMultipartUpload) { + OmLCAbortIncompleteMultipartUpload.Builder builder = new Builder(); + + if (abortIncompleteMultipartUpload.hasDaysAfterInitiation()) { + builder.setDaysAfterInitiation(abortIncompleteMultipartUpload.getDaysAfterInitiation()); + } + + return builder.build(); + } + + @Override + public String toString() { + return "OmLCAbortIncompleteMultipartUpload{" + + "daysAfterInitiation=" + daysAfterInitiation + + '}'; + } + + /** + * Builder of OmLCAbortIncompleteMultipartUpload. + */ + public static class Builder { + private Integer daysAfterInitiation = null; + + public Builder setDaysAfterInitiation(int days) { + this.daysAfterInitiation = days; + return this; + } + + public OmLCAbortIncompleteMultipartUpload build() { + return new OmLCAbortIncompleteMultipartUpload(this); + } + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAction.java index 4db13e20743e..3d746bd1381b 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAction.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCAction.java @@ -53,6 +53,7 @@ public interface OmLCAction { */ enum ActionType { EXPIRATION, + ABORT_INCOMPLETE_MULTIPART_UPLOAD, // Future action types can be added here (e.g., TRANSITION) } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCRule.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCRule.java index 00875842655c..10b627790a03 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCRule.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCRule.java @@ -124,6 +124,21 @@ public OmLCExpiration getExpiration() { return null; } + /** + * Get the AbortIncompleteMultipartUpload action if present. + * + * @return the AbortIncompleteMultipartUpload action if present, null otherwise + */ + @Nullable + public OmLCAbortIncompleteMultipartUpload getAbortIncompleteMultipartUpload() { + for (OmLCAction action : actions) { + if (action instanceof OmLCAbortIncompleteMultipartUpload) { + return (OmLCAbortIncompleteMultipartUpload) action; + } + } + return null; + } + @Nullable public OmLCFilter getFilter() { return filter; @@ -300,6 +315,10 @@ public static OmLCRule getFromProtobuf(LifecycleRule lifecycleRule, BucketLayout if (lifecycleAction.hasExpiration()) { builder.addAction(OmLCExpiration.getFromProtobuf(lifecycleAction.getExpiration())); } + if (lifecycleAction.hasAbortIncompleteMultipartUpload()) { + builder.addAction(OmLCAbortIncompleteMultipartUpload.getFromProtobuf( + lifecycleAction.getAbortIncompleteMultipartUpload())); + } } if (lifecycleRule.hasFilter()) { builder.setFilter(OmLCFilter.getFromProtobuf(lifecycleRule.getFilter(), layout)); diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCAbortIncompleteMultipartUpload.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCAbortIncompleteMultipartUpload.java new file mode 100644 index 000000000000..1a55751c513f --- /dev/null +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCAbortIncompleteMultipartUpload.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.helpers; + +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST; +import static org.apache.hadoop.ozone.om.helpers.OMLCUtils.assertOMException; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AbortIncompleteMultipartUpload; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LifecycleAction; +import org.junit.jupiter.api.Test; + +/** + * Test OmLCAbortIncompleteMultipartUpload. + */ +class TestOmLCAbortIncompleteMultipartUpload { + + @Test + public void testCreateValidAbortIncompleteMultipartUpload() { + long currentTime = System.currentTimeMillis(); + + OmLCAbortIncompleteMultipartUpload.Builder abort1 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(1); + assertDoesNotThrow(() -> abort1.build().valid(currentTime)); + + OmLCAbortIncompleteMultipartUpload.Builder abort2 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7); + assertDoesNotThrow(() -> abort2.build().valid(currentTime)); + + OmLCAbortIncompleteMultipartUpload.Builder abort3 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(365); + assertDoesNotThrow(() -> abort3.build().valid(currentTime)); + } + + @Test + public void testCreateInvalidAbortIncompleteMultipartUpload() { + long currentTime = System.currentTimeMillis(); + + // Null days should fail + OmLCAbortIncompleteMultipartUpload.Builder abort1 = + new OmLCAbortIncompleteMultipartUpload.Builder(); + assertOMException(() -> abort1.build().valid(currentTime), INVALID_REQUEST, + "must be specified for AbortIncompleteMultipartUpload action"); + + // Zero days should fail + OmLCAbortIncompleteMultipartUpload.Builder abort2 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(0); + assertOMException(() -> abort2.build().valid(currentTime), INVALID_REQUEST, + "must be a positive integer greater than zero"); + + // Negative days should fail + OmLCAbortIncompleteMultipartUpload.Builder abort3 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(-1); + assertOMException(() -> abort3.build().valid(currentTime), INVALID_REQUEST, + "must be a positive integer greater than zero"); + + OmLCAbortIncompleteMultipartUpload.Builder abort4 = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(-100); + assertOMException(() -> abort4.build().valid(currentTime), INVALID_REQUEST, + "must be a positive integer greater than zero"); + } + + @Test + public void testShouldAbort() throws OMException { + long currentTime = System.currentTimeMillis(); + + // Upload created 10 days ago + long uploadCreationTime = currentTime - TimeUnit.DAYS.toMillis(10); + + // Rule: abort after 7 days - should abort + OmLCAbortIncompleteMultipartUpload abort7Days = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + abort7Days.valid(currentTime); + assertTrue(abort7Days.shouldAbort(uploadCreationTime), + "Upload created 10 days ago should be aborted with 7-day threshold"); + + // Rule: abort after 15 days - should NOT abort + OmLCAbortIncompleteMultipartUpload abort15Days = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(15) + .build(); + abort15Days.valid(currentTime); + assertFalse(abort15Days.shouldAbort(uploadCreationTime), + "Upload created 10 days ago should NOT be aborted with 15-day threshold"); + + // Upload created 1 hour ago - should NOT abort + long recentUploadTime = currentTime - TimeUnit.HOURS.toMillis(1); + OmLCAbortIncompleteMultipartUpload abort1Day = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(1) + .build(); + abort1Day.valid(currentTime); + assertFalse(abort1Day.shouldAbort(recentUploadTime), + "Upload created 1 hour ago should NOT be aborted with 1-day threshold"); + + // Upload created 1 day + 1 second ago - should abort + long moreThanOneDay = currentTime - TimeUnit.DAYS.toMillis(1) - TimeUnit.SECONDS.toMillis(1); + assertTrue(abort1Day.shouldAbort(moreThanOneDay), + "Upload created more than 1 day ago should be aborted"); + } + + @Test + public void testProtobufConversion() throws OMException { + long currentTime = System.currentTimeMillis(); + + // Create with 7 days + OmLCAbortIncompleteMultipartUpload original = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + + // Convert to protobuf + LifecycleAction proto = original.getProtobuf(); + assertTrue(proto.hasAbortIncompleteMultipartUpload(), + "Protobuf should have AbortIncompleteMultipartUpload"); + + AbortIncompleteMultipartUpload abortProto = proto.getAbortIncompleteMultipartUpload(); + assertEquals(7, abortProto.getDaysAfterInitiation(), + "Days should be preserved in protobuf"); + + // Convert back from protobuf + OmLCAbortIncompleteMultipartUpload fromProto = + OmLCAbortIncompleteMultipartUpload.getFromProtobuf(abortProto); + assertEquals(7, fromProto.getDaysAfterInitiation(), + "Days should be preserved after protobuf round-trip"); + + // Validate the converted object + assertDoesNotThrow(() -> fromProto.valid(currentTime), + "Object from protobuf should be valid"); + } + + @Test + public void testActionType() { + OmLCAbortIncompleteMultipartUpload abort = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + + assertEquals(OmLCAction.ActionType.ABORT_INCOMPLETE_MULTIPART_UPLOAD, + abort.getActionType(), + "Action type should be ABORT_INCOMPLETE_MULTIPART_UPLOAD"); + } +} diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCRule.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCRule.java index f81b8afca3ab..15e72e5a8c37 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCRule.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLCRule.java @@ -318,4 +318,109 @@ public void testProtobufConversion() throws OMException { assertEquals(30, ruleFromProto3.getExpiration().getDays()); assertNull(ruleFromProto3.getFilter()); } + + @Test + public void testRuleWithAbortIncompleteMultipartUpload() throws OMException { + long currentTime = System.currentTimeMillis(); + + // Test rule with only AbortIncompleteMultipartUpload action + OmLCAbortIncompleteMultipartUpload abortAction = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + + OmLCRule.Builder rule1 = new OmLCRule.Builder() + .setId("abort-incomplete-uploads") + .setEnabled(true) + .setPrefix("uploads/") + .setAction(abortAction); + + OmLCRule builtRule = rule1.build(); + assertDoesNotThrow(() -> builtRule.valid(BucketLayout.DEFAULT, currentTime)); + assertNotNull(builtRule.getAbortIncompleteMultipartUpload()); + assertEquals(7, builtRule.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + + @Test + public void testRuleWithBothExpirationAndAbortActions() throws OMException { + long currentTime = System.currentTimeMillis(); + + OmLCExpiration expiration = new OmLCExpiration.Builder() + .setDays(30) + .build(); + + OmLCAbortIncompleteMultipartUpload abortAction = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(7) + .build(); + + OmLCRule.Builder rule = new OmLCRule.Builder() + .setId("combined-rule") + .setEnabled(true) + .setPrefix("temp/") + .addAction(expiration) + .addAction(abortAction); + + OmLCRule builtRule = rule.build(); + assertDoesNotThrow(() -> builtRule.valid(BucketLayout.DEFAULT, currentTime)); + assertNotNull(builtRule.getExpiration()); + assertNotNull(builtRule.getAbortIncompleteMultipartUpload()); + assertEquals(30, builtRule.getExpiration().getDays()); + assertEquals(7, builtRule.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + + @Test + public void testProtobufConversionWithAbortAction() throws OMException { + OmLCAbortIncompleteMultipartUpload abortAction = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(14) + .build(); + + OmLCRule originalRule = new OmLCRule.Builder() + .setId("test-abort-rule") + .setEnabled(true) + .setPrefix("multipart/") + .setAction(abortAction) + .build(); + + LifecycleRule proto = originalRule.getProtobuf(); + + OmLCRule ruleFromProto = OmLCRule.getFromProtobuf(proto, BucketLayout.DEFAULT); + assertEquals("test-abort-rule", ruleFromProto.getId()); + assertTrue(ruleFromProto.isEnabled()); + assertEquals("multipart/", ruleFromProto.getPrefix()); + assertNotNull(ruleFromProto.getAbortIncompleteMultipartUpload()); + assertEquals(14, ruleFromProto.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } + + @Test + public void testProtobufConversionWithBothActions() throws OMException { + OmLCExpiration expiration = new OmLCExpiration.Builder() + .setDays(60) + .build(); + + OmLCAbortIncompleteMultipartUpload abortAction = + new OmLCAbortIncompleteMultipartUpload.Builder() + .setDaysAfterInitiation(5) + .build(); + + OmLCRule originalRule = new OmLCRule.Builder() + .setId("combined-actions") + .setEnabled(true) + .setPrefix("data/") + .addAction(expiration) + .addAction(abortAction) + .build(); + + LifecycleRule proto = originalRule.getProtobuf(); + + OmLCRule ruleFromProto = OmLCRule.getFromProtobuf(proto, BucketLayout.DEFAULT); + assertEquals("combined-actions", ruleFromProto.getId()); + assertTrue(ruleFromProto.isEnabled()); + assertEquals("data/", ruleFromProto.getPrefix()); + assertNotNull(ruleFromProto.getExpiration()); + assertNotNull(ruleFromProto.getAbortIncompleteMultipartUpload()); + assertEquals(60, ruleFromProto.getExpiration().getDays()); + assertEquals(5, ruleFromProto.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } } diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 1aecf1c791fb..8a6048806a93 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -2434,6 +2434,10 @@ message LifecycleExpiration { optional string date = 2; } +message AbortIncompleteMultipartUpload { + optional uint32 daysAfterInitiation = 1; +} + message LifecycleRule { required string id = 1; required bool enabled = 2; @@ -2444,6 +2448,7 @@ message LifecycleRule { message LifecycleAction { optional LifecycleExpiration expiration = 1; + optional AbortIncompleteMultipartUpload abortIncompleteMultipartUpload = 2; } message LifecycleConfiguration { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java index 3666401da164..c872ea6a3610 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.nio.file.Paths; import java.security.PrivilegedExceptionAction; +import java.time.Instant; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -79,6 +80,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmLCRule; import org.apache.hadoop.ozone.om.helpers.OmLifecycleConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.OMClientRequest; @@ -366,6 +369,10 @@ public BackgroundTaskResult call() { handleAndClearFullList(bucket, expiredKeyList, false); handleAndClearFullList(bucket, expiredDirList, true); } + + // Process AbortIncompleteMultipartUpload actions + processMultipartUploads(bucket, ruleList); + onSuccess(bucketKey); } @@ -743,6 +750,156 @@ private void evaluateBucket(OmBucketInfo bucketInfo, } } + /** + * Process AbortIncompleteMultipartUpload actions for incomplete multipart uploads. + * Iterates through the multipartInfoTable and aborts uploads that match the rule criteria + * and have exceeded the configured days after initiation. + * + * @param bucketInfo the bucket information + * @param ruleList list of lifecycle rules to evaluate + */ + private void processMultipartUploads(OmBucketInfo bucketInfo, List ruleList) { + // Filter rules that have AbortIncompleteMultipartUpload actions + List mpuRules = ruleList.stream() + .filter(r -> r.getAbortIncompleteMultipartUpload() != null) + .collect(Collectors.toList()); + + if (mpuRules.isEmpty()) { + return; + } + + String volumeName = bucketInfo.getVolumeName(); + String bucketName = bucketInfo.getBucketName(); + String bucketPrefix = omMetadataManager.getMultipartKey(volumeName, bucketName, "", ""); + + LOG.debug("Processing AbortIncompleteMultipartUpload actions for bucket {}/{}", volumeName, bucketName); + + List expiredUploads = new ArrayList<>(); + + try (TableIterator> mpuIterator = + omMetadataManager.getMultipartInfoTable().iterator(bucketPrefix)) { + while (mpuIterator.hasNext()) { + if (!shouldRun()) { + LOG.info("KeyLifecycleService is suspended or disabled. " + + "Stopping multipart upload processing for bucket {}.", bucketName); + return; + } + + Table.KeyValue entry = mpuIterator.next(); + OmMultipartKeyInfo mpuKeyInfo = entry.getValue(); + + // Extract multipart upload information from the key + OmMultipartUpload upload = OmMultipartUpload.from(entry.getKey()); + upload.setCreationTime(Instant.ofEpochMilli(mpuKeyInfo.getCreationTime())); + String keyName = upload.getKeyName(); + + // Check each rule to see if this upload should be aborted + for (OmLCRule rule : mpuRules) { + if (shouldAbortUpload(upload, keyName, rule)) { + expiredUploads.add(upload); + LOG.debug("Multipart upload {}/{}/{} with uploadId {} will be aborted", + volumeName, bucketName, keyName, upload.getUploadId()); + break; // One rule match is enough + } + } + } + } catch (IOException e) { + LOG.warn("Failed to iterate multipartInfoTable for bucket {}/{}", volumeName, bucketName, e); + return; + } + + if (!expiredUploads.isEmpty()) { + LOG.info("{} expired multipart uploads found for bucket {}/{}", + expiredUploads.size(), volumeName, bucketName); + abortExpiredMultipartUploads(bucketInfo, expiredUploads); + } + } + + /** + * Check if a multipart upload should be aborted based on the lifecycle rule. + * + * @param upload the multipart upload information + * @param keyName the key name of the upload + * @param rule the lifecycle rule to evaluate against + * @return true if the upload should be aborted, false otherwise + */ + private boolean shouldAbortUpload(OmMultipartUpload upload, String keyName, OmLCRule rule) { + // Check if upload age exceeds the threshold + if (!rule.getAbortIncompleteMultipartUpload().shouldAbort( + upload.getCreationTime().toEpochMilli())) { + return false; + } + + // Check prefix matching + String effectivePrefix = rule.getEffectivePrefix(); + if (effectivePrefix != null && !keyName.startsWith(effectivePrefix)) { + return false; + } + + // TODO: Add tag filtering support when multipart uploads support tags + + return true; + } + + /** + * Abort expired multipart uploads by sending an abort request. + * + * @param bucketInfo the bucket information + * @param expiredUploads list of expired multipart uploads to abort + */ + private void abortExpiredMultipartUploads(OmBucketInfo bucketInfo, List expiredUploads) { + String volumeName = bucketInfo.getVolumeName(); + String bucketName = bucketInfo.getBucketName(); + + List expiredMPUInfoList = expiredUploads.stream() + .map(upload -> OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo.newBuilder() + .setName(upload.getDbKey()) + .build()) + .collect(Collectors.toList()); + + OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket expiredMPUBucket = + OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .addAllMultipartUploads(expiredMPUInfoList) + .build(); + + OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest abortRequest = + OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest.newBuilder() + .addExpiredMultipartUploadsPerBucket(expiredMPUBucket) + .build(); + + OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.AbortExpiredMultiPartUploads) + .setMultipartUploadsExpiredAbortRequest(abortRequest) + .setVersion(ClientVersion.CURRENT_VERSION) + .setClientId(clientId.toString()) + .build(); + + try { + long startTime = System.nanoTime(); + OzoneManagerProtocolProtos.OMResponse response = OzoneManagerRatisUtils.submitRequest( + getOzoneManager(), omRequest, clientId, callId.getAndIncrement()); + long endTime = System.nanoTime(); + + if (response != null) { + if (response.getSuccess()) { + LOG.info("Successfully aborted {} multipart uploads for bucket {}/{} in {} ns", + expiredUploads.size(), volumeName, bucketName, endTime - startTime); + } else { + LOG.error("Failed to abort multipart uploads for bucket {}/{}: {}", + volumeName, bucketName, response.getMessage()); + } + } else { + LOG.error("Received null response when aborting multipart uploads for bucket {}/{}", + volumeName, bucketName); + } + } catch (ServiceException e) { + LOG.error("Failed to submit abort multipart uploads request for bucket {}/{}", + volumeName, bucketName, e); + } + } + /** * If prefix is /dir1/dir2, but dir1 doesn't exist, then it will return exception. * If prefix is /dir1/dir2, but dir2 doesn't exist, then it will return a list with dir1 only. diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3LifecycleConfiguration.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3LifecycleConfiguration.java index 5cbc4a275d21..ff72a322dee3 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3LifecycleConfiguration.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3LifecycleConfiguration.java @@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneLifecycleConfiguration; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmLCAbortIncompleteMultipartUpload; import org.apache.hadoop.ozone.om.helpers.OmLCExpiration; import org.apache.hadoop.ozone.om.helpers.OmLCFilter; import org.apache.hadoop.ozone.om.helpers.OmLCRule; @@ -72,6 +73,9 @@ public static class Rule { @XmlElement(name = "Expiration") private Expiration expiration; + @XmlElement(name = "AbortIncompleteMultipartUpload") + private AbortIncompleteMultipartUpload abortIncompleteMultipartUpload; + @XmlElement(name = "Filter") private Filter filter; @@ -107,6 +111,14 @@ public void setExpiration(Expiration expiration) { this.expiration = expiration; } + public AbortIncompleteMultipartUpload getAbortIncompleteMultipartUpload() { + return abortIncompleteMultipartUpload; + } + + public void setAbortIncompleteMultipartUpload(AbortIncompleteMultipartUpload abortIncompleteMultipartUpload) { + this.abortIncompleteMultipartUpload = abortIncompleteMultipartUpload; + } + public Filter getFilter() { return filter; } @@ -145,6 +157,24 @@ public void setDate(String date) { } } + /** + * AbortIncompleteMultipartUpload entity for lifecycle rule. + */ + @XmlAccessorType(XmlAccessType.FIELD) + @XmlRootElement(name = "AbortIncompleteMultipartUpload") + public static class AbortIncompleteMultipartUpload { + @XmlElement(name = "DaysAfterInitiation") + private Integer daysAfterInitiation; + + public Integer getDaysAfterInitiation() { + return daysAfterInitiation; + } + + public void setDaysAfterInitiation(Integer daysAfterInitiation) { + this.daysAfterInitiation = daysAfterInitiation; + } + } + /** * Tag entity for filter criteria. */ @@ -293,7 +323,10 @@ private OmLCRule convertToOmRule(Rule rule) throws OMException, OS3Exception { .setPrefix(rule.getPrefix()); if (rule.getExpiration() != null) { - builder.setAction(convertToOmExpiration(rule.getExpiration())); + builder.addAction(convertToOmExpiration(rule.getExpiration())); + } + if (rule.getAbortIncompleteMultipartUpload() != null) { + builder.addAction(convertToOmAbortIncompleteMultipartUpload(rule.getAbortIncompleteMultipartUpload())); } if (rule.getFilter() != null) { builder.setFilter(convertToOmFilter(rule.getFilter())); @@ -321,6 +354,24 @@ private OmLCExpiration convertToOmExpiration(Expiration expiration) throws OMExc return builder.build(); } + /** + * Converts S3 AbortIncompleteMultipartUpload to internal representation. + * + * @param abortIncompleteMultipartUpload the S3 AbortIncompleteMultipartUpload + * @return OmLCAbortIncompleteMultipartUpload internal representation + */ + private OmLCAbortIncompleteMultipartUpload convertToOmAbortIncompleteMultipartUpload( + AbortIncompleteMultipartUpload abortIncompleteMultipartUpload) throws OMException { + OmLCAbortIncompleteMultipartUpload.Builder builder = + new OmLCAbortIncompleteMultipartUpload.Builder(); + + if (abortIncompleteMultipartUpload.getDaysAfterInitiation() != null) { + builder.setDaysAfterInitiation(abortIncompleteMultipartUpload.getDaysAfterInitiation()); + } + + return builder.build(); + } + /** * Converts S3 filter to internal filter. * @@ -402,6 +453,10 @@ private static Rule convertFromOzoneRule(OzoneLifecycleConfiguration.OzoneLCRule if (ozoneRule.getExpiration() != null) { rule.setExpiration(convertFromOzoneExpiration(ozoneRule.getExpiration())); } + if (ozoneRule.getAbortIncompleteMultipartUpload() != null) { + rule.setAbortIncompleteMultipartUpload( + convertFromOzoneAbortIncompleteMultipartUpload(ozoneRule.getAbortIncompleteMultipartUpload())); + } if (ozoneRule.getFilter() != null) { rule.setFilter(convertFromOzoneFilter(ozoneRule.getFilter())); } @@ -431,6 +486,25 @@ private static Expiration convertFromOzoneExpiration( return expiration; } + /** + * Converts an Ozone internal AbortIncompleteMultipartUpload to S3 representation. + * + * @param ozoneAbortIncompleteMultipartUpload internal AbortIncompleteMultipartUpload + * @return AbortIncompleteMultipartUpload S3 representation + */ + private static AbortIncompleteMultipartUpload convertFromOzoneAbortIncompleteMultipartUpload( + OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload ozoneAbortIncompleteMultipartUpload) { + + AbortIncompleteMultipartUpload abortIncompleteMultipartUpload = new AbortIncompleteMultipartUpload(); + + if (ozoneAbortIncompleteMultipartUpload.getDaysAfterInitiation() > 0) { + abortIncompleteMultipartUpload.setDaysAfterInitiation( + ozoneAbortIncompleteMultipartUpload.getDaysAfterInitiation()); + } + + return abortIncompleteMultipartUpload; + } + /** * Converts an Ozone internal filter to S3 filter. * diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 1bb18583c36b..b054174adf7b 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -725,12 +725,17 @@ private static OzoneLifecycleConfiguration toOzoneLifecycleConfiguration( for (OmLCRule r: omLifecycleConfiguration.getRules()) { OzoneLifecycleConfiguration.OzoneLCExpiration e = null; + OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload a = null; OzoneLifecycleConfiguration.OzoneLCFilter f = null; if (r.getExpiration() != null) { e = new OzoneLifecycleConfiguration.OzoneLCExpiration( r.getExpiration().getDays(), r.getExpiration().getDate()); } + if (r.getAbortIncompleteMultipartUpload() != null) { + a = new OzoneLifecycleConfiguration.OzoneLCAbortIncompleteMultipartUpload( + r.getAbortIncompleteMultipartUpload().getDaysAfterInitiation()); + } if (r.getFilter() != null) { OzoneLifecycleConfiguration.LifecycleAndOperator andOperator = null; if (r.getFilter().getAndOperator() != null) { @@ -742,7 +747,7 @@ private static OzoneLifecycleConfiguration toOzoneLifecycleConfiguration( } rules.add(new OzoneLifecycleConfiguration.OzoneLCRule(r.getId(), - r.getEffectivePrefix(), (r.isEnabled() ? "Enabled" : "Disabled"), e, f)); + r.getEffectivePrefix(), (r.isEnabled() ? "Enabled" : "Disabled"), e, a, f)); } return new OzoneLifecycleConfiguration( diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java index 2d602b4f4fcb..0558af9c1f8c 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationGet.java @@ -83,6 +83,47 @@ public void testGetLifecycleConfiguration() throws Exception { lcc.getRules().get(0).getExpiration().getDays().intValue()); } + @Test + public void testGetLifecycleWithAbortIncompleteMultipartUpload() throws Exception { + String bucketName = "bucket1"; + bucketEndpoint.put(bucketName, getBodyWithAbortAction()); + Response r = bucketEndpoint.get(bucketName); + + assertEquals(HTTP_OK, r.getStatus()); + S3LifecycleConfiguration lcc = (S3LifecycleConfiguration) r.getEntity(); + assertEquals(1, lcc.getRules().size()); + S3LifecycleConfiguration.Rule rule = lcc.getRules().get(0); + + assertEquals("abort-incomplete-uploads", rule.getId()); + assertEquals("uploads/", rule.getPrefix()); + assertEquals("Enabled", rule.getStatus()); + assertEquals(7, rule.getAbortIncompleteMultipartUpload() + .getDaysAfterInitiation().intValue()); + } + + @Test + public void testGetLifecycleWithBothActions() throws Exception { + String bucketName = "bucket1"; + bucketEndpoint.put(bucketName, getBodyWithBothActions()); + Response r = bucketEndpoint.get(bucketName); + + assertEquals(HTTP_OK, r.getStatus()); + S3LifecycleConfiguration lcc = (S3LifecycleConfiguration) r.getEntity(); + assertEquals(1, lcc.getRules().size()); + S3LifecycleConfiguration.Rule rule = lcc.getRules().get(0); + + assertEquals("cleanup-rule", rule.getId()); + assertEquals("temp/", rule.getPrefix()); + assertEquals("Enabled", rule.getStatus()); + + // Verify Expiration action + assertEquals(30, rule.getExpiration().getDays().intValue()); + + // Verify AbortIncompleteMultipartUpload action + assertEquals(7, rule.getAbortIncompleteMultipartUpload() + .getDaysAfterInitiation().intValue()); + } + private static InputStream getBody() { String xml = ("" + @@ -96,4 +137,37 @@ private static InputStream getBody() { return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); } + + private static InputStream getBodyWithAbortAction() { + String xml = "" + + "" + + "abort-incomplete-uploads" + + "uploads/" + + "Enabled" + + "" + + "7" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static InputStream getBodyWithBothActions() { + String xml = "" + + "" + + "cleanup-rule" + + "temp/" + + "Enabled" + + "" + + "30" + + "" + + "" + + "7" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationPut.java index a5f60e59536f..5b294e71a0a0 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestS3LifecycleConfigurationPut.java @@ -179,6 +179,29 @@ public void testPutLifecycleConfigurationFailsWithNonBucketOwner() } } + @Test + public void testPutLifecycleWithAbortIncompleteMultipartUpload() throws Exception { + assertEquals(HTTP_OK, bucketEndpoint.put("bucket1", withAbortIncompleteMultipartUpload()).getStatus()); + } + + @Test + public void testPutLifecycleWithBothExpirationAndAbort() throws Exception { + assertEquals(HTTP_OK, bucketEndpoint.put("bucket1", withBothExpirationAndAbort()).getStatus()); + } + + @Test + public void testPutInvalidAbortIncompleteMultipartUploadConfig() throws Exception { + // Test with zero days - should fail + testInvalidLifecycleConfiguration( + TestS3LifecycleConfigurationPut::withAbortZeroDays, + HTTP_BAD_REQUEST, INVALID_REQUEST.getCode()); + + // Test with negative days - should fail + testInvalidLifecycleConfiguration( + TestS3LifecycleConfigurationPut::withAbortNegativeDays, + HTTP_BAD_REQUEST, INVALID_REQUEST.getCode()); + } + private static InputStream onePrefix() { String xml = ("" + @@ -514,4 +537,67 @@ private InputStream useDuplicateTagInAndOperator() { return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); } + + private static InputStream withAbortIncompleteMultipartUpload() { + String xml = "" + + "" + + "abort-incomplete-uploads" + + "uploads/" + + "Enabled" + + "" + + "7" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static InputStream withBothExpirationAndAbort() { + String xml = "" + + "" + + "cleanup-rule" + + "temp/" + + "Enabled" + + "" + + "30" + + "" + + "" + + "7" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static InputStream withAbortZeroDays() { + String xml = "" + + "" + + "invalid-abort" + + "uploads/" + + "Enabled" + + "" + + "0" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static InputStream withAbortNegativeDays() { + String xml = "" + + "" + + "invalid-abort" + + "uploads/" + + "Enabled" + + "" + + "-1" + + "" + + "" + + ""; + + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } }