diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 628798572..6e5ba90cb 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -11,22 +11,26 @@ import javax.crypto.Cipher; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class CipherSubscriber implements Subscriber { private final AtomicLong contentRead = new AtomicLong(0); private final Subscriber wrappedSubscriber; - private Cipher cipher; + private final Cipher cipher; private final Long contentLength; - private boolean isLastPart; + private final boolean isLastPart; + private final int tagLength; + private final AtomicBoolean finalBytesCalled = new AtomicBoolean(false); private byte[] outputBuffer; CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv, boolean isLastPart) { this.wrappedSubscriber = wrappedSubscriber; this.contentLength = contentLength; - cipher = materials.getCipher(iv); + this.cipher = materials.getCipher(iv); this.isLastPart = isLastPart; + this.tagLength = materials.algorithmSuite().cipherTagLengthBytes(); } CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv) { @@ -46,20 +50,48 @@ public void onNext(ByteBuffer byteBuffer) { if (amountToReadFromByteBuffer > 0) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); + if (outputBuffer == null || outputBuffer.length == 0) { // The underlying data is too short to fill in the block cipher. // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case, // in practice SunJCE and ACCP return an empty buffer instead, hence checks for // null OR length == 0. - if (contentRead.get() == contentLength) { + if (contentRead.get() + tagLength >= contentLength) { // All content has been read, so complete to get the final bytes - this.onComplete(); + finalBytes(); + return; } // Otherwise, wait for more bytes. To avoid blocking, // send an empty buffer to the wrapped subscriber. wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + /* + Check if stream has read all expected content. + Once all content has been read, call `finalBytes`. + + This determines that all content has been read by checking if + the amount of data read so far plus the tag length is at least the content length. + Once this is true, downstream will never call `request` again + (beyond the current request that is being responded to in this onNext invocation.) + As a result, this class can only call `wrappedSubscriber.onNext` one more time. + (Reactive streams require that downstream sends a `request(n)` + to indicate it is ready for more data, and upstream responds to that request by calling `onNext`. + The `n` in request is the maximum number of `onNext` calls that downstream + will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.) + Since this class can only call `wrappedSubscriber.onNext` once, + it must send all remaining data in the next onNext call, + including the result of cipher.doFinal(), if applicable. + Calling `wrappedSubscriber.onNext` more than once for `request(1)` + violates the Reactive Streams specification and can cause exceptions downstream. + */ + if (contentRead.get() + tagLength >= contentLength) { + // All content has been read; complete the stream. + finalBytes(); + } else { + // Needs to read more data, so send the data downstream, + // expecting that downstream will continue to request more data. + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + } } } else { // Do nothing @@ -91,21 +123,63 @@ public void onError(Throwable t) { @Override public void onComplete() { + // In rare cases, e.g. when the last part of a low-level MPU has 0 length, + // onComplete will be called before onNext is called once. + if (contentRead.get() + tagLength <= contentLength) { + finalBytes(); + } + wrappedSubscriber.onComplete(); + } + + /** + * Finalize encryption, including calculating the auth tag for AES-GCM. + * As such this method MUST only be called once, which is enforced using + * `finalBytesCalled`. + */ + private void finalBytes() { + if (!finalBytesCalled.compareAndSet(false, true)) { + // already called, don't repeat + return; + } + + // If this isn't the last part, skip doFinal and just send outputBuffer downstream. + // doFinal requires that all parts have been processed to compute the tag, + // so the tag will only be computed when the last part is processed. if (!isLastPart) { - // If this isn't the last part, skip doFinal, we aren't done - wrappedSubscriber.onComplete(); + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); return; } + + // If this is the last part, compute doFinal and include its result in the value sent downstream. + // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call. + byte[] finalBytes; try { - outputBuffer = cipher.doFinal(); - // Send the final bytes to the wrapped subscriber - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + finalBytes = cipher.doFinal(); } catch (final GeneralSecurityException exception) { + // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } - wrappedSubscriber.onComplete(); + + // Combine the bytes from outputBuffer and finalBytes into one onNext call. + // Downstream has requested one item in its request method, so this class can only call onNext once. + // This single onNext call must contain both the bytes from outputBuffer and the tag. + byte[] combinedBytes; + if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + combinedBytes = new byte[outputBuffer.length + finalBytes.length]; + System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); + System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); + } else if (outputBuffer != null && outputBuffer.length > 0) { + combinedBytes = outputBuffer; + } else if (finalBytes != null && finalBytes.length > 0) { + combinedBytes = finalBytes; + } else { + combinedBytes = new byte[0]; + } + + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); } } \ No newline at end of file diff --git a/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java b/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java index ec0e90707..be4521411 100644 --- a/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java +++ b/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java @@ -729,6 +729,8 @@ public void copyObjectTransparentlyAsync() { * Test which artificially limits the size of buffers using {@link TinyBufferAsyncRequestBody}. * This tests edge cases where network conditions result in buffers with length shorter than * the cipher's block size. + * Note that TinyAsyncRequestBody is not fully spec-compliant, and will cause IllegalStateExceptions + * to be logged when debug logging is enabled. * @throws IOException */ @Test diff --git a/src/test/java/software/amazon/encryption/s3/S3EncryptionClientRangedGetCompatibilityTest.java b/src/test/java/software/amazon/encryption/s3/S3EncryptionClientRangedGetCompatibilityTest.java index 305dda8ee..a6809e55c 100644 --- a/src/test/java/software/amazon/encryption/s3/S3EncryptionClientRangedGetCompatibilityTest.java +++ b/src/test/java/software/amazon/encryption/s3/S3EncryptionClientRangedGetCompatibilityTest.java @@ -264,6 +264,7 @@ public void AsyncAesCbcV1toV3RangedGet(Object keyMaterial) { assertEquals("klmnopqrst0", output); // Valid start index within input and end index out of range, returns object from start index to End of Stream + // This causes a spurious NPE to be logged when debug logging is enabled. objectResponse = v3Client.getObject(builder -> builder .bucket(BUCKET) .range("bytes=190-300") @@ -288,6 +289,7 @@ public void AsyncAesCbcV1toV3RangedGet(Object keyMaterial) { assertEquals(input, output); // Invalid range starting index and ending index greater than object length but within Cipher Block size, returns empty object + // This causes a spurious NPE to be logged when debug logging is enabled. objectResponse = v3Client.getObject(builder -> builder .bucket(BUCKET) .range("bytes=216-217") diff --git a/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java b/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java index 50c4ba6e0..14b3f7c99 100644 --- a/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java +++ b/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java @@ -10,6 +10,9 @@ * AsyncRequestBody which wraps another AsyncRequestBody with a {@link TinyBufferSubscriber}. * This is useful for testing poor network conditions where buffers may not be larger than * the cipher's block size. + * DO NOT USE THIS IN PRODUCTION. In addition to degraded performance, + * it will cause IllegalStateExceptions in the base Subscriber as it does not comply + * with the Reactive Streaming spec. */ public class TinyBufferAsyncRequestBody implements AsyncRequestBody {