Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}

if (!ozoneManager.getVersionManager().isAllowed(OMLayoutFeature.MPU_PARTS_TABLE_SPLIT)
&& multipartKeyInfo.getSchemaVersion() != 0) {
throw new OMException("MPU parts-table split behavior is not allowed " +
"before cluster finalization.",
OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION);
}


multipartKeyInfo = multipartKeyInfo.toBuilder()
.setUpdateID(trxnLogIndex)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut

multipartKeyInfo = omMetadataManager.getMultipartInfoTable()
.get(multipartKey);

if (!ozoneManager.getVersionManager().isAllowed(OMLayoutFeature.MPU_PARTS_TABLE_SPLIT)
&& multipartKeyInfo.getSchemaVersion() != 0) {
throw new OMException("MPU parts-table split behavior is not allowed " +
"before cluster finalization for commit part request.",
OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION);
}


openKey = getOpenKey(volumeName, bucketName, keyName, omMetadataManager,
clientID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,13 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut

OmMultipartKeyInfo multipartKeyInfo = omMetadataManager
.getMultipartInfoTable().get(multipartKey);

if (!ozoneManager.getVersionManager().isAllowed(OMLayoutFeature.MPU_PARTS_TABLE_SPLIT)
&& multipartKeyInfo.getSchemaVersion() != 0) {
throw new OMException("MPU parts-table split behavior is not allowed " +
"before cluster finalization for commit part request.",
OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION);
}

String ozoneKey = omMetadataManager.getOzoneKey(
volumeName, bucketName, keyName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public enum OMLayoutFeature implements LayoutFeature {
QUOTA(6, "Ozone quota re-calculate"),
HBASE_SUPPORT(7, "Full support of hsync, lease recovery and listOpenFiles APIs for HBase"),
DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"),
SNAPSHOT_DEFRAG(9, "Supporting defragmentation of snapshot");
SNAPSHOT_DEFRAG(9, "Supporting defragmentation of snapshot"),

MPU_PARTS_TABLE_SPLIT(10, "Split multipart table into separate table for parts and key");

/////////////////////////////// /////////////////////////////

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.Map;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.om.IOmMetadataReader;
Expand All @@ -47,8 +49,10 @@
import org.apache.hadoop.ozone.om.ResolvedBucket;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
Expand Down Expand Up @@ -302,6 +306,46 @@ protected OMRequest doPreExecuteCompleteMPU(

}

/**
* Initiate an MPU and optionally rewrite the stored multipart metadata to a
* specific schema version.
*
* <p>The schema version rewrite lets tests emulate post-finalization MPU
* entries without needing the rest of the upgrade pipeline.</p>
*/
protected String initiateMultipartUploadWithSchemaVersion(
String volumeName, String bucketName, String keyName,
byte schemaVersion) throws Exception {
OMRequest initiateMPURequest =
doPreExecuteInitiateMPU(volumeName, bucketName, keyName);

S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
getS3InitiateMultipartUploadReq(initiateMPURequest);

OMClientResponse omClientResponse =
s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
1L);

String multipartUploadID = omClientResponse.getOMResponse()
.getInitiateMultiPartUploadResponse().getMultipartUploadID();

if (schemaVersion != 0) {
String multipartKey = omMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, multipartUploadID);
OmMultipartKeyInfo multipartKeyInfo = omMetadataManager
.getMultipartInfoTable().get(multipartKey);
assertNotNull(multipartKeyInfo);

omMetadataManager.getMultipartInfoTable().addCacheEntry(
new CacheKey<>(multipartKey),
CacheValue.get(2L, multipartKeyInfo.toBuilder()
.setSchemaVersion(schemaVersion)
.build()));
}

return multipartUploadID;
}

/**
* Perform preExecute of Initiate Multipart upload request for given
* volume, bucket and key name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,38 @@ public void testValidateAndUpdateCache() throws Exception {

}

@Test
public void testValidateAndUpdateCacheRejectsSchemaVersionOneBeforeFinalization()
throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = getKeyName();

OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, getBucketLayout());

createParentPath(volumeName, bucketName);

String multipartUploadID =
initiateMultipartUploadWithSchemaVersion(volumeName, bucketName,
keyName, (byte) 1);

OMRequest abortMPURequest =
doPreExecuteAbortMPU(volumeName, bucketName, keyName,
multipartUploadID);

S3MultipartUploadAbortRequest s3MultipartUploadAbortRequest =
getS3MultipartUploadAbortReq(abortMPURequest);

// The multipart metadata exists, but schema version 1 is not allowed yet.
OMClientResponse omClientResponse =
s3MultipartUploadAbortRequest.validateAndUpdateCache(ozoneManager, 2L);

assertEquals(OzoneManagerProtocolProtos.Status
.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION,
omClientResponse.getOMResponse().getStatus());
}

@Test
public void testValidateAndUpdateCacheMultipartNotFound() throws Exception {
String volumeName = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,42 @@ public void testValidateAndUpdateCacheSuccess() throws Exception {
.get(partKey));
}

@Test
public void testValidateAndUpdateCacheRejectsSchemaVersionOneBeforeFinalization()
throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = getKeyName();

OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, getBucketLayout());

createParentPath(volumeName, bucketName);

String multipartUploadID =
initiateMultipartUploadWithSchemaVersion(volumeName, bucketName,
keyName, (byte) 1);

long clientID = Time.now();
OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
bucketName, keyName, clientID, multipartUploadID, 1);

S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
getS3MultipartUploadCommitReq(commitMultipartRequest);

addKeyToOpenKeyTable(volumeName, bucketName, keyName, clientID);

// Regular part metadata is present; the upgrade gate should still reject
// this schema version before commit proceeds.
OMClientResponse omClientResponse =
s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
2L);

assertEquals(OzoneManagerProtocolProtos.Status
.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION,
omClientResponse.getOMResponse().getStatus());
}

@Test
public void testValidateAndUpdateCacheMultipartNotFound() throws Exception {
String volumeName = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,36 @@ private String checkValidateAndUpdateCacheSuccess(String volumeName,
return multipartUploadID;
}

@Test
public void testValidateAndUpdateCacheRejectsSchemaVersionOneBeforeFinalization()
throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = getKeyName();

OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, getBucketLayout());

String multipartUploadID =
initiateMultipartUploadWithSchemaVersion(volumeName, bucketName,
keyName, (byte) 1);

// The request is still rejected before the empty part-list validation.
OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName,
bucketName, keyName, multipartUploadID, new ArrayList<>());

S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest =
getS3MultipartUploadCompleteReq(completeMultipartRequest);

OMClientResponse omClientResponse =
s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager,
3L);

assertEquals(OzoneManagerProtocolProtos.Status
.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION,
omClientResponse.getOMResponse().getStatus());
}

protected void addVolumeAndBucket(String volumeName, String bucketName)
throws Exception {
OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
Expand Down