Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 107 additions & 66 deletions hadoop-hdds/docs/content/design/s3-conditional-requests.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,37 @@ void testRewriteKeyIfMatchSuccess(BucketLayout layout) throws IOException {
.getMetadata().get(ETAG));
}

@ParameterizedTest
@EnumSource
void testRewriteKeyIfMatchFailsDueToOutdatedGenerationAtCommit(
BucketLayout layout) throws IOException {
checkFeatureEnable(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
OzoneBucket bucket = createBucket(layout);
OzoneKeyDetails keyDetails = createTestKeyWithETag(bucket);
String etag = keyDetails.getMetadata().get(ETAG);
byte[] overwriteContent = "overwrite".getBytes(UTF_8);

OzoneOutputStream out = null;
try {
out = bucket.rewriteKeyIfMatch(
keyDetails.getName(), keyDetails.getDataSize(), etag,
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
keyDetails.getMetadata(), Collections.emptyMap());
out.write("rewrite".getBytes(UTF_8));

createTestKey(bucket, keyDetails.getName(), overwriteContent);

OMException e = assertThrows(OMException.class, out::close);
assertEquals(ATOMIC_WRITE_CONFLICT, e.getResult());
} finally {
if (out != null) {
out.close();
}
}

assertKeyContent(bucket, keyDetails.getName(), overwriteContent);
}

@ParameterizedTest
@EnumSource
void testRewriteKeyIfMatchFailsWithWrongETag(BucketLayout layout) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,47 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
KeyArgs resolvedKeyArgs =
resolveBucketAndCheckOpenKeyAcls(newKeyArgs.build(), ozoneManager,
IAccessAuthorizer.ACLType.WRITE, commitKeyRequest.getClientID());
validateConditionalCommitAtAdmission(ozoneManager.getMetadataManager(),
resolvedKeyArgs, commitKeyRequest.getClientID());

return request.toBuilder()
.setCommitKeyRequest(commitKeyRequest.toBuilder()
.setKeyArgs(resolvedKeyArgs)).build();
}

protected void validateConditionalCommitAtAdmission(
OMMetadataManager omMetadataManager, KeyArgs keyArgs, long clientId)
throws IOException {
OmKeyInfo openKeyInfo = getOpenKeyInfoForCommitAdmission(
omMetadataManager, keyArgs, clientId);
if (openKeyInfo == null) {
return;
}

validateAtomicRewriteAtCommit(
getCommittedKeyInfoForCommitAdmission(omMetadataManager, keyArgs),
openKeyInfo.getExpectedDataGeneration(), null);
}

protected OmKeyInfo getOpenKeyInfoForCommitAdmission(
OMMetadataManager omMetadataManager, KeyArgs keyArgs, long clientId)
throws IOException {
String dbOpenKey = omMetadataManager.getOpenKey(
keyArgs.getVolumeName(), keyArgs.getBucketName(),
keyArgs.getKeyName(), clientId);
return omMetadataManager.getOpenKeyTable(getBucketLayout())
.get(dbOpenKey);
}

protected OmKeyInfo getCommittedKeyInfoForCommitAdmission(
OMMetadataManager omMetadataManager, KeyArgs keyArgs)
throws IOException {
String dbOzoneKey = omMetadataManager.getOzoneKey(
keyArgs.getVolumeName(), keyArgs.getBucketName(),
keyArgs.getKeyName());
return omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
}

@Override
@SuppressWarnings("methodlength")
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) {
Expand Down Expand Up @@ -302,7 +337,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
}
}

validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap);
validateAtomicRewriteAtCommit(keyToDelete, omKeyInfo, auditMap);
// Optimistic locking validation has passed. Now set the rewrite fields to null so they are
// not persisted in the key table.
// Combination
Expand Down Expand Up @@ -616,32 +651,4 @@ public static OMRequest disallowRecovery(
}
return req;
}

protected void validateAtomicRewrite(OmKeyInfo existing, OmKeyInfo toCommit, Map<String, String> auditMap)
throws OMException {
if (toCommit.getExpectedDataGeneration() != null) {
// These values are not passed in the request keyArgs, so add them into the auditMap if they are present
// in the open key entry.
Long expectedGen = toCommit.getExpectedDataGeneration();
auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(expectedGen));

if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_ABSENT) {
if (existing != null) {
throw new OMException("Atomic create-if-not-exists conflicted with an existing key",
OMException.ResultCodes.ATOMIC_WRITE_CONFLICT);
}
} else {
if (existing == null) {
throw new OMException("Atomic rewrite conflicted because the key no longer exists",
OMException.ResultCodes.ATOMIC_WRITE_CONFLICT);
}
if (expectedGen != existing.getUpdateID()) {
throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() +
") does not match the expected generation to rewrite (" + expectedGen + ")",
OMException.ResultCodes.ATOMIC_WRITE_CONFLICT);
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.hadoop.ozone.om.request.key;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.BUCKET_LOCK;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -73,6 +76,62 @@ public OMKeyCommitRequestWithFSO(OMRequest omRequest,
super(omRequest, bucketLayout);
}

@Override
protected OmKeyInfo getOpenKeyInfoForCommitAdmission(
OMMetadataManager omMetadataManager, KeyArgs keyArgs, long clientId)
throws IOException {
try {
OmFSOFile fsoFile = getOmFSOFileForCommitAdmission(
omMetadataManager, keyArgs);
return OMFileRequest.getOmKeyInfoFromFileTable(true,
omMetadataManager, fsoFile.getOpenFileName(clientId),
keyArgs.getKeyName());
} catch (OMException ex) {
if (isMissingNamespaceForCommitAdmission(ex)) {
return null;
}
throw ex;
}
}

@Override
protected OmKeyInfo getCommittedKeyInfoForCommitAdmission(
OMMetadataManager omMetadataManager, KeyArgs keyArgs)
throws IOException {
try {
OmFSOFile fsoFile = getOmFSOFileForCommitAdmission(
omMetadataManager, keyArgs);
return omMetadataManager.getKeyTable(getBucketLayout())
.get(fsoFile.getOzonePathKey());
} catch (OMException ex) {
if (isMissingNamespaceForCommitAdmission(ex)) {
return null;
}
throw ex;
}
}

private OmFSOFile getOmFSOFileForCommitAdmission(
OMMetadataManager omMetadataManager, KeyArgs keyArgs)
throws IOException {
String keyName = keyArgs.getKeyName();
String errMsg = "Cannot create file : " + keyName
+ " as parent directory doesn't exist";
return new OmFSOFile.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyName)
.setOmMetadataManager(omMetadataManager)
.setErrMsg(errMsg)
.build();
}

private boolean isMissingNamespaceForCommitAdmission(OMException ex) {
return ex.getResult() == VOLUME_NOT_FOUND
|| ex.getResult() == BUCKET_NOT_FOUND
|| ex.getResult() == DIRECTORY_NOT_FOUND;
}

@Override
@SuppressWarnings("methodlength")
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) {
Expand Down Expand Up @@ -242,7 +301,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
// creation after the knob turned on.
Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;

validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap);
validateAtomicRewriteAtCommit(keyToDelete, omKeyInfo, auditMap);
// Optimistic locking validation has passed. Now set the rewrite fields to null so they are
// not persisted in the key table.
omKeyInfo.setExpectedDataGeneration(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
keyName);
OmKeyInfo dbKeyInfo = omMetadataManager.getKeyTable(getBucketLayout())
.getIfExist(dbKeyName);
validateAtomicRewrite(dbKeyInfo, keyArgs);
keyArgs = validateAndRewriteIfMatchAsExpectedGeneration(keyArgs, dbKeyInfo);
keyArgs = resolveConditionalWriteAtAdmission(
dbKeyInfo, keyArgs, auditMap);

OmBucketInfo bucketInfo =
getBucketInfo(omMetadataManager, volumeName, bucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
dbFileInfo = OMFileRequest.getOmKeyInfoFromFileTable(false,
omMetadataManager, dbFileKey, keyName);
}
validateAtomicRewrite(dbFileInfo, keyArgs);
keyArgs = validateAndRewriteIfMatchAsExpectedGeneration(
keyArgs, dbFileInfo);
keyArgs = resolveConditionalWriteAtAdmission(
dbFileInfo, keyArgs, auditMap);

// Check if a file or directory exists with same key name.
if (pathInfoFSO.getDirectoryResult() == DIRECTORY_EXISTS) {
Expand Down
Loading