Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-4bab915.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Ensure that file modification exceptions in AsyncRequestBody#fromFile are propagated correctly."
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.Logger;
Expand Down Expand Up @@ -71,6 +72,8 @@ public final class FileAsyncRequestBody implements AsyncRequestBody {
private final int chunkSizeInBytes;
private final long position;
private final long numBytesToRead;
private FileTime modifiedTimeAtStart;
private Long sizeAtStart;

private FileAsyncRequestBody(DefaultBuilder builder) {
this.path = builder.path;
Expand All @@ -79,6 +82,27 @@ private FileAsyncRequestBody(DefaultBuilder builder) {
this.position = builder.position == null ? 0 : Validate.isNotNegative(builder.position, "position");
this.numBytesToRead = builder.numBytesToRead == null ? fileLength - this.position :
Validate.isNotNegative(builder.numBytesToRead, "numBytesToRead");
if (builder.modifiedTimeAtStart != null) {
this.modifiedTimeAtStart = builder.modifiedTimeAtStart;
} else {
try {
this.modifiedTimeAtStart = Files.getLastModifiedTime(path);
} catch (IOException e) {
log.debug(() -> "Failed to get last modified time for path " + path, e);
this.modifiedTimeAtStart = null;
Comment thread
alextwoods marked this conversation as resolved.
}
}

if (builder.sizeAtStart != null) {
this.sizeAtStart = builder.sizeAtStart;
} else {
try {
this.sizeAtStart = Files.size(path);
} catch (IOException e) {
log.debug(() -> "Failed to get file size for path " + path, e);
this.sizeAtStart = null;
Comment thread
alextwoods marked this conversation as resolved.
}
}
}

@Override
Expand Down Expand Up @@ -112,6 +136,14 @@ public long numBytesToRead() {
return numBytesToRead;
}

public FileTime modifiedTimeAtStart() {
return modifiedTimeAtStart;
}

public Long sizeAtStart() {
return sizeAtStart;
}

@Override
public Optional<Long> contentLength() {
return Optional.of(numBytesToRead);
Expand All @@ -131,7 +163,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
// We need to synchronize here because the subscriber could call
// request() from within onSubscribe which would potentially
// trigger onNext before onSubscribe is finished.
Subscription subscription = new FileSubscription(channel, s);
Subscription subscription = new FileSubscription(channel, s, modifiedTimeAtStart, sizeAtStart);

synchronized (subscription) {
s.onSubscribe(subscription);
Expand Down Expand Up @@ -203,6 +235,20 @@ public interface Builder extends SdkBuilder<Builder, FileAsyncRequestBody> {
* @return The builder for method chaining.
*/
Builder numBytesToRead(Long numBytesToRead);

/**
* Optional - sets the file modified time at the start of the request.
* @param modifiedTimeAtStart initial file modification time
* @return The builder for method chaining.
*/
Builder modifiedTimeAtStart(FileTime modifiedTimeAtStart);

/**
* Optional - sets the file size in bytes at the start of the request.
* @param sizeAtStart initial file size at start.
* @return The builder for method chaining.
*/
Builder sizeAtStart(Long sizeAtStart);
}

private static final class DefaultBuilder implements Builder {
Expand All @@ -211,6 +257,8 @@ private static final class DefaultBuilder implements Builder {
private Path path;
private Integer chunkSizeInBytes;
private Long numBytesToRead;
private FileTime modifiedTimeAtStart;
private Long sizeAtStart;

@Override
public Builder path(Path path) {
Expand Down Expand Up @@ -240,6 +288,18 @@ public Builder numBytesToRead(Long numBytesToRead) {
return this;
}

@Override
public Builder modifiedTimeAtStart(FileTime modifiedTimeAtStart) {
this.modifiedTimeAtStart = modifiedTimeAtStart;
return this;
}

@Override
public Builder sizeAtStart(Long sizeAtStart) {
this.sizeAtStart = sizeAtStart;
return this;
}

public void setChunkSizeInBytes(Integer chunkSizeInBytes) {
chunkSizeInBytes(chunkSizeInBytes);
}
Expand Down Expand Up @@ -267,13 +327,23 @@ private final class FileSubscription implements Subscription {
private final Object lock = new Object();

private FileSubscription(AsynchronousFileChannel inputChannel,
Subscriber<? super ByteBuffer> subscriber) throws IOException {
Subscriber<? super ByteBuffer> subscriber,
FileTime modifiedTimeAtStart, Long sizeAtStart) throws IOException {
this.inputChannel = inputChannel;
this.subscriber = subscriber;
this.sizeAtStart = inputChannel.size();
this.modifiedTimeAtStart = Files.getLastModifiedTime(path);
this.remainingBytes = new AtomicLong(numBytesToRead);
this.currentPosition = new AtomicLong(position);
if (sizeAtStart != null) {
this.sizeAtStart = sizeAtStart;
} else {
this.sizeAtStart = Files.size(path);
}

if (modifiedTimeAtStart != null) {
this.modifiedTimeAtStart = modifiedTimeAtStart;
} else {
this.modifiedTimeAtStart = Files.getLastModifiedTime(path);
Comment thread
alextwoods marked this conversation as resolved.
}
}

@Override
Expand Down Expand Up @@ -338,12 +408,21 @@ public void completed(Integer result, ByteBuffer attachment) {

int readBytes = attachment.remaining();
currentPosition.addAndGet(readBytes);
remainingBytes.addAndGet(-readBytes);
long remaining = remainingBytes.addAndGet(-readBytes);

// we need to validate the file is unchanged before providing the last bytes to subscriber
// the subscriber (eg: NettyRequestExecutor) may cancel subscription once all expected bytes have
// been received. Validating here ensures errors are correctly signaled.
if (remaining == 0) {
closeFile();
if (!validateFileUnchangedAndSignalErrors()) {
return;
}
}

signalOnNext(attachment);

if (remainingBytes.get() == 0) {
closeFile();
if (remaining == 0) {
signalOnComplete();
}

Expand Down Expand Up @@ -391,42 +470,49 @@ private void signalOnNext(ByteBuffer attachment) {
}

private void signalOnComplete() {
if (!validateFileUnchangedAndSignalErrors()) {
return;
}

synchronized (this) {
if (!done) {
done = true;
subscriber.onComplete();
}
}
}

private boolean validateFileUnchangedAndSignalErrors() {
try {
long sizeAtEnd = Files.size(path);
if (sizeAtStart != sizeAtEnd) {
signalOnError(new IOException("File size changed after reading started. Initial size: " + sizeAtStart + ". "
+ "Current size: " + sizeAtEnd));
return;
signalOnError(SdkClientException.create("File size changed after reading started. Initial size: "
+ sizeAtStart + ". Current size: " + sizeAtEnd));
return false;
}

if (remainingBytes.get() > 0) {
signalOnError(new IOException("Fewer bytes were read than were expected, was the file modified after "
+ "reading started?"));
return;
signalOnError(SdkClientException.create("Fewer bytes were read than were expected, was the file modified "
+ "after reading started?"));
return false;
}

FileTime modifiedTimeAtEnd = Files.getLastModifiedTime(path);
if (modifiedTimeAtStart.compareTo(modifiedTimeAtEnd) != 0) {
signalOnError(new IOException("File last-modified time changed after reading started. Initial modification "
+ "time: " + modifiedTimeAtStart + ". Current modification time: " +
modifiedTimeAtEnd));
return;
signalOnError(SdkClientException.create("File last-modified time changed after reading started. "
+ "Initial modification time: " + modifiedTimeAtStart
+ ". Current modification time: " + modifiedTimeAtEnd));
return false;
}
} catch (NoSuchFileException e) {
signalOnError(new IOException("Unable to check file status after read. Was the file deleted or were its "
+ "permissions changed?", e));
return;
signalOnError(SdkClientException.create("Unable to check file status after read. Was the file deleted"
+ " or were its permissions changed?", e));
return false;
} catch (IOException e) {
signalOnError(new IOException("Unable to check file status after read.", e));
return;
}

synchronized (this) {
if (!done) {
done = true;
subscriber.onComplete();
}
signalOnError(SdkClientException.create("Unable to check file status after read.", e));
return false;
}
return true;
}

private void signalOnError(Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.attribute.FileTime;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -53,6 +54,8 @@ public final class FileAsyncRequestBodySplitHelper {

private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0);
private AtomicInteger chunkIndex = new AtomicInteger(0);
private final FileTime modifiedTimeAtStart;
private final long sizeAtStart;

public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody,
AsyncRequestBodySplitConfiguration splitConfiguration) {
Expand All @@ -70,6 +73,8 @@ public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody,
splitConfiguration.bufferSizeInBytes();
this.bufferPerAsyncRequestBody = Math.min(asyncRequestBody.chunkSizeInBytes(),
NumericUtils.saturatedCast(totalBufferSize));
this.modifiedTimeAtStart = asyncRequestBody.modifiedTimeAtStart();
this.sizeAtStart = asyncRequestBody.sizeAtStart();
}

public SdkPublisher<AsyncRequestBody> split() {
Expand Down Expand Up @@ -134,6 +139,8 @@ private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher<AsyncRequestBod
.position(position)
.numBytesToRead(numBytesToReadForThisChunk)
.chunkSizeInBytes(bufferPerAsyncRequestBody)
.modifiedTimeAtStart(modifiedTimeAtStart)
.sizeAtStart(sizeAtStart)
.build();
return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher);
}
Expand Down
Loading
Loading