Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;

private volatile boolean closed;
private volatile boolean closing;
// how much of data is actually written yet to underlying stream
Expand Down Expand Up @@ -130,7 +122,6 @@ private ECKeyOutputStream(Builder builder) {
return flushStripeFromQueue();
});
this.flushCheckpoint = new AtomicLong(0);
this.atomicKeyCreation = builder.getAtomicKeyCreation();
}

@Override
Expand Down Expand Up @@ -489,12 +480,6 @@ public void close() throws IOException {
Preconditions.checkArgument(writeOffset == offset,
"Expected writeOffset= " + writeOffset
+ " Expected offset=" + offset);
if (atomicKeyCreation) {
long expectedSize = blockOutputStreamEntryPool.getDataSize();
Preconditions.checkState(expectedSize == offset, String.format(
"Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput

private long clientID;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;

private List<CheckedRunnable<IOException>> preCommits = Collections.emptyList();

public void setPreCommits(@Nonnull List<CheckedRunnable<IOException>> preCommits) {
Expand Down Expand Up @@ -129,7 +121,6 @@ public KeyDataStreamOutput() {

this.writeOffset = 0;
this.clientID = 0L;
this.atomicKeyCreation = false;
}

@SuppressWarnings({"parameternumber", "squid:S00107"})
Expand All @@ -140,9 +131,7 @@ public KeyDataStreamOutput(
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
boolean unsafeByteBufferConversion,
boolean atomicKeyCreation
) {
boolean unsafeByteBufferConversion) {
super(HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval()));
this.config = config;
Expand All @@ -162,7 +151,6 @@ public KeyDataStreamOutput(
// encrypted bucket.
this.writeOffset = 0;
this.clientID = handler.getId();
this.atomicKeyCreation = atomicKeyCreation;
}

/**
Expand Down Expand Up @@ -457,12 +445,6 @@ public void close() throws IOException {
if (!isException()) {
Preconditions.checkArgument(writeOffset == offset);
}
if (atomicKeyCreation) {
long expectedSize = blockDataStreamOutputEntryPool.getDataSize();
Preconditions.checkArgument(expectedSize == offset,
String.format("Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
Expand Down Expand Up @@ -501,7 +483,6 @@ public static class Builder {
private boolean unsafeByteBufferConversion;
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
private boolean atomicKeyCreation = false;

public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
Expand Down Expand Up @@ -553,11 +534,6 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) {
return this;
}

public Builder setAtomicKeyCreation(boolean atomicKey) {
this.atomicKeyCreation = atomicKey;
return this;
}

public KeyDataStreamOutput build() {
return new KeyDataStreamOutput(
clientConfig,
Expand All @@ -570,8 +546,7 @@ public KeyDataStreamOutput build() {
multipartUploadID,
multipartNumber,
isMultipartKey,
unsafeByteBufferConversion,
atomicKeyCreation);
unsafeByteBufferConversion);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ public class KeyOutputStream extends OutputStream
private long clientID;
private StreamBufferArgs streamBufferArgs;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;
private ContainerClientMetrics clientMetrics;
private OzoneManagerVersion ozoneManagerVersion;
private final Lock writeLock = new ReentrantLock();
Expand Down Expand Up @@ -186,7 +179,6 @@ public KeyOutputStream(Builder b) {
this.isException = false;
this.writeOffset = 0;
this.clientID = b.getOpenHandler().getId();
this.atomicKeyCreation = b.getAtomicKeyCreation();
this.streamBufferArgs = b.getStreamBufferArgs();
this.clientMetrics = b.getClientMetrics();
this.ozoneManagerVersion = b.ozoneManagerVersion;
Expand Down Expand Up @@ -656,12 +648,6 @@ private void closeInternal() throws IOException {
if (!isException) {
Preconditions.checkArgument(writeOffset == offset);
}
if (atomicKeyCreation) {
long expectedSize = blockOutputStreamEntryPool.getDataSize();
Preconditions.checkState(expectedSize == offset,
String.format("Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
Expand Down Expand Up @@ -701,7 +687,6 @@ public static class Builder {
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
private ContainerClientMetrics clientMetrics;
private boolean atomicKeyCreation = false;
private StreamBufferArgs streamBufferArgs;
private Supplier<ExecutorService> executorServiceSupplier;
private OzoneManagerVersion ozoneManagerVersion;
Expand Down Expand Up @@ -800,11 +785,6 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) {
return this;
}

public Builder setAtomicKeyCreation(boolean atomicKey) {
this.atomicKeyCreation = atomicKey;
return this;
}

public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
this.clientMetrics = clientMetrics;
return this;
Expand All @@ -814,10 +794,6 @@ public ContainerClientMetrics getClientMetrics() {
return clientMetrics;
}

public boolean getAtomicKeyCreation() {
return atomicKeyCreation;
}

public Builder setExecutorServiceSupplier(Supplier<ExecutorService> executorServiceSupplier) {
this.executorServiceSupplier = executorServiceSupplier;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1473,11 +1473,9 @@ private OmKeyArgs.Builder createWriteKeyArgsBuilder(String volumeName,
private OzoneOutputStream openOutputStream(OmKeyArgs keyArgs, long size)
throws IOException {
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
// For bucket with layout OBJECT_STORE, when create an empty file (size=0),
// OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
// which will cause S3G's atomic write length check to fail,
// so reset size to 0 here.
if (isS3GRequest.get() && size == 0) {
// Keep non-positive S3 writes from carrying OM's preallocated block size
// or an internal unknown-length marker into later allocate-block requests.
if (isS3GRequest.get() && size <= 0) {
openKey.getKeyInfo().setDataSize(0);
}
return createOutputStream(openKey);
Expand Down Expand Up @@ -2117,7 +2115,8 @@ public OzoneDataStreamOutput createMultipartStreamKey(
keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream secureOut = createSecureOutputStream(openKey, keyOutputStream, null);
final OzoneOutputStream secureOut = createSecureOutputStream(openKey,
keyOutputStream, null);
out = secureOut != null ? secureOut : keyOutputStream;
} else {
out = createMultipartOutputStream(openKey, uploadID, partNumber);
Expand Down Expand Up @@ -2579,7 +2578,8 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
final OzoneOutputStream secureOut = createSecureOutputStream(openKey, keyOutputStream, null);
final OzoneOutputStream secureOut = createSecureOutputStream(openKey,
keyOutputStream, null);
out = secureOut != null ? secureOut : keyOutputStream;
} else {
out = createOutputStream(openKey);
Expand All @@ -2588,21 +2588,17 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
}

private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
// Amazon S3 never adds partial objects, So for S3 requests we need to
// set atomicKeyCreation to true
// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
return new KeyDataStreamOutput.Builder()
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get());
.setConfig(clientConfig);
}

private OzoneOutputStream createOutputStream(OpenKeySession openKey)
throws IOException {
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.build();
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey).build();
return createOutputStream(openKey, keyOutputStream);
}

Expand Down Expand Up @@ -2670,7 +2666,6 @@ private KeyOutputStream.Builder createKeyOutputStream(
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.setClientMetrics(clientMetrics)
.setExecutorServiceSupplier(writeExecutor)
.setStreamBufferArgs(streamBufferArgs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,38 +220,70 @@ public void testPutKeyWithECReplicationConfig() throws IOException {
}
}

/**
* This test validates that for S3G,
* the key upload process needs to be atomic.
* It simulates two mismatch scenarios where the actual write data size does
* not match the expected size.
*/
@Test
public void testPutKeySizeMismatch() throws IOException {
public void testPutKeySizeMismatchCommitsActualSizeForS3Request()
throws IOException {
String value = new String(new byte[1024], UTF_8);
OzoneBucket bucket = getOzoneBucket();
String keyName = UUID.randomUUID().toString();
String shortKeyName = UUID.randomUUID().toString();
String shortValue = value.substring(0, value.length() - 1);
String longKeyName = UUID.randomUUID().toString();
String longValue = value + "1";
try {
// Simulating first mismatch: Write less data than expected
client.getProxy().setIsS3Request(true);
OzoneOutputStream out1 = bucket.createKey(keyName,
value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
new HashMap<>());
out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8));
assertThrows(IllegalStateException.class, out1::close,
"Expected IllegalArgumentException due to size mismatch.");
writeKey(bucket, shortKeyName, value.getBytes(UTF_8).length,
shortValue);
writeKey(bucket, longKeyName, value.getBytes(UTF_8).length,
longValue);
} finally {
client.getProxy().setIsS3Request(false);
}
assertKeyContent(bucket, shortKeyName, shortValue);
assertKeyContent(bucket, longKeyName, longValue);
}

// Simulating second mismatch: Write more data than expected
OzoneOutputStream out2 = bucket.createKey(keyName,
value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
new HashMap<>());
value += "1";
out2.write(value.getBytes(UTF_8));
assertThrows(IllegalStateException.class, out2::close,
"Expected IllegalArgumentException due to size mismatch.");
@Test
public void testPutKeyWithUnknownSizeForS3Request() throws IOException {
String value = "sample value";
OzoneBucket bucket = getOzoneBucket();
String keyName = UUID.randomUUID().toString();
try {
client.getProxy().setIsS3Request(true);
writeKey(bucket, keyName, -1, value);
} finally {
client.getProxy().setIsS3Request(false);
}
assertKeyContent(bucket, keyName, value);
}

private void writeKey(OzoneBucket bucket, String keyName, long size,
String value) throws IOException {
try (OzoneOutputStream out = bucket.createKey(keyName, size,
ReplicationType.RATIS, ONE, new HashMap<>())) {
out.write(value.getBytes(UTF_8));
}
}

private void assertKeyContent(OzoneBucket bucket, String keyName,
String value) throws IOException {
byte[] bytes = value.getBytes(UTF_8);
OzoneKey key = bucket.getKey(keyName);
assertEquals(bytes.length, key.getDataSize());

byte[] fileContent = new byte[bytes.length];
try (OzoneInputStream is = bucket.readKey(keyName)) {
int offset = 0;
while (offset < fileContent.length) {
int read = is.read(fileContent, offset, fileContent.length - offset);
if (read == -1) {
break;
}
offset += read;
}
assertEquals(fileContent.length, offset);
assertEquals(-1, is.read());
}
assertEquals(value, new String(fileContent, UTF_8));
}

private OzoneBucket getOzoneBucket() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ protected int getIOBufferSize(long fileLength) {
LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
}
if (fileLength == 0) {
// for empty file
if (fileLength <= 0) {
// for empty file or unknown length
return bufferSize;
} else {
return fileLength < bufferSize ? (int) fileLength : bufferSize;
Expand Down
Loading