From caeac1dfa96d920971a114a84efe7130b8cf1aba Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Fri, 25 Apr 2025 09:56:58 -0700 Subject: [PATCH 01/30] m --- .../s3/internal/CipherSubscriber.java | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 628798572..587b9283a 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -47,23 +47,39 @@ public void onNext(ByteBuffer byteBuffer) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); if (outputBuffer == null || outputBuffer.length == 0) { - // The underlying data is too short to fill in the block cipher. - // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case, - // in practice SunJCE and ACCP return an empty buffer instead, hence checks for - // null OR length == 0. - if (contentRead.get() == contentLength) { - // All content has been read, so complete to get the final bytes - this.onComplete(); - } - // Otherwise, wait for more bytes. To avoid blocking, - // send an empty buffer to the wrapped subscriber. - wrappedSubscriber.onNext(ByteBuffer.allocate(0)); + // No bytes provided from upstream; just return. + // No need to emit empty bytes to downstream; this can be misinterpreted. + return; } else { - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + boolean atEnd = isLastPart && contentRead.get() + amountToReadFromByteBuffer >= contentLength; + + if (atEnd) { + // If all content has been read, send the final bytes in this onNext call. + // The final bytes must be sent with the final onNext call, not during the onComplete call. + byte[] finalBytes; + try { + finalBytes = cipher.doFinal(); + } catch (final GeneralSecurityException exception) { + wrappedSubscriber.onError(exception); + throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); + } + + // Combine outputBuffer and finalBytes if both exist + byte[] combinedBuffer; + if (outputBuffer != null && outputBuffer.length > 0) { + combinedBuffer = new byte[outputBuffer.length + finalBytes.length]; + System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length); + System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length); + } else { + combinedBuffer = finalBytes; + } + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); + return; + } else { + // Not at end; send content so far + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + } } - } else { - // Do nothing - wrappedSubscriber.onNext(byteBuffer); } } @@ -91,20 +107,6 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (!isLastPart) { - // If this isn't the last part, skip doFinal, we aren't done - wrappedSubscriber.onComplete(); - return; - } - try { - outputBuffer = cipher.doFinal(); - // Send the final bytes to the wrapped subscriber - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); - } catch (final GeneralSecurityException exception) { - // Forward error, else the wrapped subscriber waits indefinitely - wrappedSubscriber.onError(exception); - throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); - } wrappedSubscriber.onComplete(); } From e05b5232b2d95a9d4fb76615b480a27a59acdc6e Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Fri, 25 Apr 2025 10:03:36 -0700 Subject: [PATCH 02/30] m --- .../amazon/encryption/s3/internal/CipherSubscriber.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 587b9283a..26571c30a 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -47,9 +47,8 @@ public void onNext(ByteBuffer byteBuffer) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); if (outputBuffer == null || outputBuffer.length == 0) { - // No bytes provided from upstream; just return. - // No need to emit empty bytes to downstream; this can be misinterpreted. - return; + // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber. + wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { boolean atEnd = isLastPart && contentRead.get() + amountToReadFromByteBuffer >= contentLength; @@ -80,6 +79,9 @@ public void onNext(ByteBuffer byteBuffer) { wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } + } else { + // Do nothing + wrappedSubscriber.onNext(byteBuffer); } } From e11731ab94ad76bc1439e6a6d965c41cd3e84a3f Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Fri, 25 Apr 2025 10:05:08 -0700 Subject: [PATCH 03/30] m --- .../software/amazon/encryption/s3/internal/CipherSubscriber.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 26571c30a..3d78332f5 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -73,7 +73,6 @@ public void onNext(ByteBuffer byteBuffer) { combinedBuffer = finalBytes; } wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); - return; } else { // Not at end; send content so far wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); From b2fd26327e43de9b119643a7763b3e36bfd76ee7 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Fri, 25 Apr 2025 12:03:31 -0700 Subject: [PATCH 04/30] m --- .../s3/internal/CipherSubscriber.java | 105 ++++++++++++------ 1 file changed, 73 insertions(+), 32 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 3d78332f5..1047b120b 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -11,6 +11,7 @@ import javax.crypto.Cipher; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; public class CipherSubscriber implements Subscriber { @@ -23,6 +24,7 @@ public class CipherSubscriber implements Subscriber { private byte[] outputBuffer; CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv, boolean isLastPart) { + System.out.println("[CipherSubscriber] Constructor called with contentLength: " + contentLength + ", isLastPart: " + isLastPart); this.wrappedSubscriber = wrappedSubscriber; this.contentLength = contentLength; cipher = materials.getCipher(iv); @@ -36,78 +38,117 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - wrappedSubscriber.onSubscribe(s); + System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); + + wrappedSubscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + System.out.println("[CipherSubscriber] Request called for " + n + " items"); + s.request(n); + } + + @Override + public void cancel() { + System.out.println("[CipherSubscriber] Cancel called"); + s.cancel(); + } + }); } @Override public void onNext(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); + System.out.println("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { + System.out.println("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer); byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); + outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); - if (outputBuffer == null || outputBuffer.length == 0) { - // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber. - wrappedSubscriber.onNext(ByteBuffer.allocate(0)); - } else { - boolean atEnd = isLastPart && contentRead.get() + amountToReadFromByteBuffer >= contentLength; - - if (atEnd) { - // If all content has been read, send the final bytes in this onNext call. - // The final bytes must be sent with the final onNext call, not during the onComplete call. - byte[] finalBytes; - try { - finalBytes = cipher.doFinal(); - } catch (final GeneralSecurityException exception) { - wrappedSubscriber.onError(exception); - throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); - } - - // Combine outputBuffer and finalBytes if both exist - byte[] combinedBuffer; - if (outputBuffer != null && outputBuffer.length > 0) { - combinedBuffer = new byte[outputBuffer.length + finalBytes.length]; - System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length); - System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length); - } else { - combinedBuffer = finalBytes; - } - wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); + System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0)); + + boolean atEnd = isLastPart && contentRead.get() >= contentLength - 16; + System.out.println("[CipherSubscriber] atEnd: " + atEnd + " (isLastPart: " + isLastPart + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength + ")"); + + if (atEnd) { + System.out.println("[CipherSubscriber] Processing final bytes"); + // The final bytes must be sent with the final onNext call, not during the onComplete call. + byte[] finalBytes; + try { + finalBytes = cipher.doFinal(); + System.out.println("[CipherSubscriber] Cipher doFinal produced " + finalBytes.length + " bytes"); + } catch (final GeneralSecurityException exception) { + System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); + wrappedSubscriber.onError(exception); + throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); + } + + // Combine outputBuffer and finalBytes if both exist + byte[] combinedBuffer; + if (outputBuffer != null && outputBuffer.length > 0) { + System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); + combinedBuffer = new byte[outputBuffer.length + finalBytes.length]; + System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length); + System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length); + System.out.println("[CipherSubscriber] Combined buffer total length: " + combinedBuffer.length); } else { - // Not at end; send content so far - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); + combinedBuffer = finalBytes; } + + System.out.println("[CipherSubscriber] Sending combined buffer to wrapped subscriber"); + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); + return; + } else if (outputBuffer == null || outputBuffer.length == 0) { + System.out.println("[CipherSubscriber] No bytes from cipher update, sending empty buffer"); + // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber. + return; + } else { + System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); + // Not at end; send content so far + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } else { - // Do nothing - wrappedSubscriber.onNext(byteBuffer); + System.out.println("[CipherSubscriber] No bytes to read from input buffer"); + return; } } private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer.remaining()); + System.out.println("[CipherSubscriber] Current contentRead: " + contentRead.get() + ", contentLength: " + contentLength); + // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { + System.out.println("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); + System.out.println("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining); if (amountRemaining > byteBuffer.remaining()) { + System.out.println("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { + System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { + System.out.println("[CipherSubscriber] onError called: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { + System.out.println("[CipherSubscriber] onComplete called"); wrappedSubscriber.onComplete(); } From 62028e17ee8afabf5f9ae2245b988251ea9fd664 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Fri, 25 Apr 2025 13:05:53 -0700 Subject: [PATCH 05/30] m --- .../s3/internal/CipherSubscriber.java | 111 ++++++++++-------- 1 file changed, 65 insertions(+), 46 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 1047b120b..70f29c34e 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -11,7 +11,6 @@ import javax.crypto.Cipher; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; -import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; public class CipherSubscriber implements Subscriber { @@ -20,15 +19,16 @@ public class CipherSubscriber implements Subscriber { private Cipher cipher; private final Long contentLength; private boolean isLastPart; + private boolean finalized; private byte[] outputBuffer; CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv, boolean isLastPart) { - System.out.println("[CipherSubscriber] Constructor called with contentLength: " + contentLength + ", isLastPart: " + isLastPart); this.wrappedSubscriber = wrappedSubscriber; this.contentLength = contentLength; cipher = materials.getCipher(iv); this.isLastPart = isLastPart; + this.finalized = false; } CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv) { @@ -39,17 +39,16 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); - wrappedSubscriber.onSubscribe(new Subscription() { @Override public void request(long n) { - System.out.println("[CipherSubscriber] Request called for " + n + " items"); + System.out.println("[CipherSubscriber] New request received for " + n + " items"); s.request(n); } @Override public void cancel() { - System.out.println("[CipherSubscriber] Cancel called"); + System.out.println("[CipherSubscriber] Subscription cancelled"); s.cancel(); } }); @@ -58,6 +57,7 @@ public void cancel() { @Override public void onNext(ByteBuffer byteBuffer) { System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); + System.out.println("[CipherSubscriber] isLastPart: " + isLastPart); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); System.out.println("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer); @@ -69,50 +69,53 @@ public void onNext(ByteBuffer byteBuffer) { outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0)); - boolean atEnd = isLastPart && contentRead.get() >= contentLength - 16; - System.out.println("[CipherSubscriber] atEnd: " + atEnd + " (isLastPart: " + isLastPart + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength + ")"); - - if (atEnd) { - System.out.println("[CipherSubscriber] Processing final bytes"); - // The final bytes must be sent with the final onNext call, not during the onComplete call. - byte[] finalBytes; - try { - finalBytes = cipher.doFinal(); - System.out.println("[CipherSubscriber] Cipher doFinal produced " + finalBytes.length + " bytes"); - } catch (final GeneralSecurityException exception) { - System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); - wrappedSubscriber.onError(exception); - throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); - } - - // Combine outputBuffer and finalBytes if both exist - byte[] combinedBuffer; - if (outputBuffer != null && outputBuffer.length > 0) { - System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); - combinedBuffer = new byte[outputBuffer.length + finalBytes.length]; - System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length); - System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length); - System.out.println("[CipherSubscriber] Combined buffer total length: " + combinedBuffer.length); - } else { - System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); - combinedBuffer = finalBytes; - } - - System.out.println("[CipherSubscriber] Sending combined buffer to wrapped subscriber"); - wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); - return; - } else if (outputBuffer == null || outputBuffer.length == 0) { - System.out.println("[CipherSubscriber] No bytes from cipher update, sending empty buffer"); + if (outputBuffer == null || outputBuffer.length == 0) { + System.out.println("[CipherSubscriber] No output from cipher update"); // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber. - return; + wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); - // Not at end; send content so far - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + boolean atEnd = isLastPart && contentRead.get() + amountToReadFromByteBuffer >= contentLength; + System.out.println("[CipherSubscriber] atEnd: " + atEnd + " (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + ")"); + + if (atEnd) { + System.out.println("[CipherSubscriber] Processing final bytes"); + // If all content has been read, send the final bytes in this onNext call. + // The final bytes must be sent with the final onNext call, not during the onComplete call. + byte[] finalBytes; + try { + finalBytes = cipher.doFinal(); + finalized = true; + System.out.println("[CipherSubscriber] Cipher doFinal produced " + finalBytes.length + " bytes"); + } catch (final GeneralSecurityException exception) { + System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); + wrappedSubscriber.onError(exception); + throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); + } + + // Combine outputBuffer and finalBytes if both exist + byte[] combinedBuffer; + if (outputBuffer != null && outputBuffer.length > 0) { + System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); + combinedBuffer = new byte[outputBuffer.length + finalBytes.length]; + System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length); + System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length); + System.out.println("[CipherSubscriber] Combined buffer total length: " + combinedBuffer.length); + } else { + System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); + combinedBuffer = finalBytes; + } + System.out.println("[CipherSubscriber] Sending combined buffer to wrapped subscriber"); + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); + } else { + System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); + // Not at end; send content so far + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + } } } else { - System.out.println("[CipherSubscriber] No bytes to read from input buffer"); - return; + System.out.println("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer"); + // Do nothing + wrappedSubscriber.onNext(byteBuffer); } } @@ -148,7 +151,23 @@ public void onError(Throwable t) { @Override public void onComplete() { - System.out.println("[CipherSubscriber] onComplete called"); + if (!isLastPart) { + // If this isn't the last part, skip doFinal, we aren't done + wrappedSubscriber.onComplete(); + return; + } if (finalized) { + wrappedSubscriber.onComplete(); + return; + } + try { + outputBuffer = cipher.doFinal(); + // Send the final bytes to the wrapped subscriber + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + } catch (final GeneralSecurityException exception) { + // Forward error, else the wrapped subscriber waits indefinitely + wrappedSubscriber.onError(exception); + throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); + } wrappedSubscriber.onComplete(); } From 478e8708d86917f600fa8a5904318c60b5e8f60f Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Fri, 25 Apr 2025 13:14:26 -0700 Subject: [PATCH 06/30] m --- .../s3/internal/CipherSubscriber.java | 29 ------------------- 1 file changed, 29 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 70f29c34e..65a8b635c 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -38,17 +38,14 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); wrappedSubscriber.onSubscribe(new Subscription() { @Override public void request(long n) { - System.out.println("[CipherSubscriber] New request received for " + n + " items"); s.request(n); } @Override public void cancel() { - System.out.println("[CipherSubscriber] Subscription cancelled"); s.cancel(); } }); @@ -56,38 +53,26 @@ public void cancel() { @Override public void onNext(ByteBuffer byteBuffer) { - System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); - System.out.println("[CipherSubscriber] isLastPart: " + isLastPart); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); - System.out.println("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { - System.out.println("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer); byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); - outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0)); if (outputBuffer == null || outputBuffer.length == 0) { - System.out.println("[CipherSubscriber] No output from cipher update"); // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber. wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { boolean atEnd = isLastPart && contentRead.get() + amountToReadFromByteBuffer >= contentLength; - System.out.println("[CipherSubscriber] atEnd: " + atEnd + " (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + ")"); if (atEnd) { - System.out.println("[CipherSubscriber] Processing final bytes"); // If all content has been read, send the final bytes in this onNext call. // The final bytes must be sent with the final onNext call, not during the onComplete call. byte[] finalBytes; try { finalBytes = cipher.doFinal(); finalized = true; - System.out.println("[CipherSubscriber] Cipher doFinal produced " + finalBytes.length + " bytes"); } catch (final GeneralSecurityException exception) { - System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); wrappedSubscriber.onError(exception); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } @@ -95,57 +80,43 @@ public void onNext(ByteBuffer byteBuffer) { // Combine outputBuffer and finalBytes if both exist byte[] combinedBuffer; if (outputBuffer != null && outputBuffer.length > 0) { - System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); combinedBuffer = new byte[outputBuffer.length + finalBytes.length]; System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length); System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length); - System.out.println("[CipherSubscriber] Combined buffer total length: " + combinedBuffer.length); } else { - System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); combinedBuffer = finalBytes; } - System.out.println("[CipherSubscriber] Sending combined buffer to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); } else { - System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); // Not at end; send content so far wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } } else { - System.out.println("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer"); // Do nothing wrappedSubscriber.onNext(byteBuffer); } } private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { - System.out.println("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer.remaining()); - System.out.println("[CipherSubscriber] Current contentRead: " + contentRead.get() + ", contentLength: " + contentLength); - // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { - System.out.println("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); - System.out.println("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining); if (amountRemaining > byteBuffer.remaining()) { - System.out.println("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { - System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { - System.out.println("[CipherSubscriber] onError called: " + t.getMessage()); wrappedSubscriber.onError(t); } From 9ad2d16930ffcacc1dbfcffa1e4c234ae1308a2c Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Fri, 25 Apr 2025 13:20:16 -0700 Subject: [PATCH 07/30] m --- .../amazon/encryption/s3/internal/CipherSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 65a8b635c..04b0a05ca 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -63,7 +63,7 @@ public void onNext(ByteBuffer byteBuffer) { // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber. wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - boolean atEnd = isLastPart && contentRead.get() + amountToReadFromByteBuffer >= contentLength; + boolean atEnd = isLastPart && contentRead.get() >= contentLength; if (atEnd) { // If all content has been read, send the final bytes in this onNext call. From f029007c4208cc2c5e1e82748d37fb657a7db58e Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Fri, 25 Apr 2025 13:35:20 -0700 Subject: [PATCH 08/30] m --- .../s3/internal/CipherSubscriber.java | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 04b0a05ca..ba9ff2303 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -53,26 +53,41 @@ public void cancel() { @Override public void onNext(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); + System.out.println("[CipherSubscriber] isLastPart: " + isLastPart); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); + System.out.println("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { + System.out.println("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer); byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); + outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0)); if (outputBuffer == null || outputBuffer.length == 0) { + System.out.println("[CipherSubscriber] No output from cipher update"); // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber. wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - boolean atEnd = isLastPart && contentRead.get() >= contentLength; + boolean atEnd = isLastPart && contentRead.get() >= contentLength - 16; + System.out.println("[CipherSubscriber] atEnd check - isLastPart: " + isLastPart + + ", contentRead: " + contentRead.get() + + ", contentLength: " + contentLength + + ", atEnd: " + atEnd); if (atEnd) { + System.out.println("[CipherSubscriber] Processing final bytes"); // If all content has been read, send the final bytes in this onNext call. // The final bytes must be sent with the final onNext call, not during the onComplete call. byte[] finalBytes; try { finalBytes = cipher.doFinal(); finalized = true; + System.out.println("[CipherSubscriber] Cipher doFinal produced " + finalBytes.length + " bytes"); } catch (final GeneralSecurityException exception) { + System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); wrappedSubscriber.onError(exception); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } @@ -80,65 +95,87 @@ public void onNext(ByteBuffer byteBuffer) { // Combine outputBuffer and finalBytes if both exist byte[] combinedBuffer; if (outputBuffer != null && outputBuffer.length > 0) { + System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); combinedBuffer = new byte[outputBuffer.length + finalBytes.length]; System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length); System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length); + System.out.println("[CipherSubscriber] Combined buffer total length: " + combinedBuffer.length); } else { + System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); combinedBuffer = finalBytes; } + System.out.println("[CipherSubscriber] Sending combined buffer to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); } else { + System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); // Not at end; send content so far wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } } else { + System.out.println("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer"); // Do nothing wrappedSubscriber.onNext(byteBuffer); } } private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer.remaining()); + System.out.println("[CipherSubscriber] Current contentRead: " + contentRead.get() + ", contentLength: " + contentLength); + // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { + System.out.println("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); + System.out.println("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining); if (amountRemaining > byteBuffer.remaining()) { + System.out.println("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { + System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { + System.out.println("[CipherSubscriber] onError called: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { + System.out.println("[CipherSubscriber] onComplete called, isLastPart: " + isLastPart); if (!isLastPart) { + System.out.println("[CipherSubscriber] Not last part, skipping doFinal"); // If this isn't the last part, skip doFinal, we aren't done wrappedSubscriber.onComplete(); return; - } if (finalized) { + } + if (finalized) { + System.out.println("[CipherSubscriber] Finalized"); wrappedSubscriber.onComplete(); return; } try { + System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); outputBuffer = cipher.doFinal(); + System.out.println("[CipherSubscriber] doFinal produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes"); // Send the final bytes to the wrapped subscriber wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } catch (final GeneralSecurityException exception) { + System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } + System.out.println("[CipherSubscriber] Completing wrapped subscriber"); wrappedSubscriber.onComplete(); } From 11ad1d7a29e1a667e44c8a7b1518c8afb01411c1 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 11:23:30 -0700 Subject: [PATCH 09/30] m --- .../s3/internal/CipherSubscriber.java | 117 +++++------------- 1 file changed, 31 insertions(+), 86 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index ba9ff2303..92aefeef0 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -19,7 +19,7 @@ public class CipherSubscriber implements Subscriber { private Cipher cipher; private final Long contentLength; private boolean isLastPart; - private boolean finalized; + private boolean onCompleteCalled = false; private byte[] outputBuffer; @@ -28,7 +28,6 @@ public class CipherSubscriber implements Subscriber { this.contentLength = contentLength; cipher = materials.getCipher(iv); this.isLastPart = isLastPart; - this.finalized = false; } CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv) { @@ -38,144 +37,90 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - wrappedSubscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { - s.request(n); - } - - @Override - public void cancel() { - s.cancel(); - } - }); + wrappedSubscriber.onSubscribe(s); } @Override public void onNext(ByteBuffer byteBuffer) { - System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); - System.out.println("[CipherSubscriber] isLastPart: " + isLastPart); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); - System.out.println("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { - System.out.println("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer); byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0)); if (outputBuffer == null || outputBuffer.length == 0) { - System.out.println("[CipherSubscriber] No output from cipher update"); - // No bytes provided from upstream; to avoid blocking, send an empty buffer to the wrapped subscriber. + if (contentRead.get() == contentLength) { + this.onComplete(); + } wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - boolean atEnd = isLastPart && contentRead.get() >= contentLength - 16; - System.out.println("[CipherSubscriber] atEnd check - isLastPart: " + isLastPart + - ", contentRead: " + contentRead.get() + - ", contentLength: " + contentLength + - ", atEnd: " + atEnd); - - if (atEnd) { - System.out.println("[CipherSubscriber] Processing final bytes"); - // If all content has been read, send the final bytes in this onNext call. - // The final bytes must be sent with the final onNext call, not during the onComplete call. - byte[] finalBytes; - try { - finalBytes = cipher.doFinal(); - finalized = true; - System.out.println("[CipherSubscriber] Cipher doFinal produced " + finalBytes.length + " bytes"); - } catch (final GeneralSecurityException exception) { - System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); - wrappedSubscriber.onError(exception); - throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); - } - - // Combine outputBuffer and finalBytes if both exist - byte[] combinedBuffer; - if (outputBuffer != null && outputBuffer.length > 0) { - System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); - combinedBuffer = new byte[outputBuffer.length + finalBytes.length]; - System.arraycopy(outputBuffer, 0, combinedBuffer, 0, outputBuffer.length); - System.arraycopy(finalBytes, 0, combinedBuffer, outputBuffer.length, finalBytes.length); - System.out.println("[CipherSubscriber] Combined buffer total length: " + combinedBuffer.length); - } else { - System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); - combinedBuffer = finalBytes; - } - System.out.println("[CipherSubscriber] Sending combined buffer to wrapped subscriber"); - wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBuffer)); - } else { - System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); - // Not at end; send content so far + Long amount = isLastPart ? contentLength - 31 : contentLength - 15; + if (contentRead.get() < amount) { wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + } else { + this.onComplete(); } } } else { - System.out.println("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer"); - // Do nothing wrappedSubscriber.onNext(byteBuffer); } } private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { - System.out.println("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer.remaining()); - System.out.println("[CipherSubscriber] Current contentRead: " + contentRead.get() + ", contentLength: " + contentLength); - - // If content length is null, we should include everything in the cipher because the stream is essentially - // unbounded. if (contentLength == null) { - System.out.println("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); - System.out.println("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining); if (amountRemaining > byteBuffer.remaining()) { - System.out.println("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { - System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { - System.out.println("[CipherSubscriber] onError called: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { - System.out.println("[CipherSubscriber] onComplete called, isLastPart: " + isLastPart); - if (!isLastPart) { - System.out.println("[CipherSubscriber] Not last part, skipping doFinal"); - // If this isn't the last part, skip doFinal, we aren't done - wrappedSubscriber.onComplete(); + if (onCompleteCalled) { return; } - if (finalized) { - System.out.println("[CipherSubscriber] Finalized"); + onCompleteCalled = true; + if (!isLastPart) { + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); wrappedSubscriber.onComplete(); return; } try { - System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); - outputBuffer = cipher.doFinal(); - System.out.println("[CipherSubscriber] doFinal produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes"); - // Send the final bytes to the wrapped subscriber - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + byte[] finalBytes = cipher.doFinal(); + + byte[] combinedBytes; + if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + combinedBytes = new byte[outputBuffer.length + finalBytes.length]; + System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); + System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); + } else if (outputBuffer != null && outputBuffer.length > 0) { + combinedBytes = outputBuffer; + } else if (finalBytes != null && finalBytes.length > 0) { + combinedBytes = finalBytes; + } else { + combinedBytes = new byte[0]; + } + + if (combinedBytes.length > 0) { + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); + } } catch (final GeneralSecurityException exception) { - System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); - // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } - System.out.println("[CipherSubscriber] Completing wrapped subscriber"); wrappedSubscriber.onComplete(); } From c23a4077a300b4a9f7a2e11acd9de86a331f2279 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 11:38:49 -0700 Subject: [PATCH 10/30] m --- .../s3/internal/CipherSubscriber.java | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 92aefeef0..7007b7371 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -20,6 +20,7 @@ public class CipherSubscriber implements Subscriber { private final Long contentLength; private boolean isLastPart; private boolean onCompleteCalled = false; + private final AtomicLong outstandingRequests = new AtomicLong(0); private byte[] outputBuffer; @@ -37,53 +38,90 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - wrappedSubscriber.onSubscribe(s); + System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); + wrappedSubscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + System.out.println("[CipherSubscriber] New request received for " + n + " items"); + outstandingRequests.addAndGet(n); + System.out.println("[CipherSubscriber] Current outstanding requests: " + outstandingRequests.get()); + s.request(n); + } + + @Override + public void cancel() { + System.out.println("[CipherSubscriber] Subscription cancelled"); + s.cancel(); + } + }); } @Override public void onNext(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); + System.out.println("[CipherSubscriber] isLastPart: " + isLastPart); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); + System.out.println("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { + System.out.println("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer); byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0)); if (outputBuffer == null || outputBuffer.length == 0) { + System.out.println("[CipherSubscriber] No output from cipher update"); + System.out.println("[CipherSubscriber] contentRead: " + contentRead.get() + ", contentLength: " + contentLength); if (contentRead.get() == contentLength) { + System.out.println("[CipherSubscriber] All content read (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + "), calling onComplete"); this.onComplete(); } + System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { + System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); + System.out.println("[CipherSubscriber] contentRead: " + contentRead.get() + ", contentLength: " + contentLength); Long amount = isLastPart ? contentLength - 31 : contentLength - 15; if (contentRead.get() < amount) { wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } else { + System.out.println("[CipherSubscriber] All content read (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + "), calling onComplete"); this.onComplete(); } } } else { + System.out.println("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer"); wrappedSubscriber.onNext(byteBuffer); } } private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer.remaining()); + System.out.println("[CipherSubscriber] Current contentRead: " + contentRead.get() + ", contentLength: " + contentLength); + if (contentLength == null) { + System.out.println("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); + System.out.println("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining); if (amountRemaining > byteBuffer.remaining()) { + System.out.println("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { + System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { + System.out.println("[CipherSubscriber] onError called: " + t.getMessage()); wrappedSubscriber.onError(t); } @@ -93,34 +131,45 @@ public void onComplete() { return; } onCompleteCalled = true; + System.out.println("[CipherSubscriber] onComplete called, isLastPart: " + isLastPart); if (!isLastPart) { + System.out.println("[CipherSubscriber] Not last part, skipping doFinal"); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); wrappedSubscriber.onComplete(); return; } try { + System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); byte[] finalBytes = cipher.doFinal(); + System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes"); byte[] combinedBytes; if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = new byte[outputBuffer.length + finalBytes.length]; System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); } else if (outputBuffer != null && outputBuffer.length > 0) { + System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); combinedBytes = outputBuffer; } else if (finalBytes != null && finalBytes.length > 0) { + System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = finalBytes; } else { + System.out.println("[CipherSubscriber] No bytes to send"); combinedBytes = new byte[0]; } if (combinedBytes.length > 0) { + System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); } } catch (final GeneralSecurityException exception) { + System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); wrappedSubscriber.onError(exception); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } + System.out.println("[CipherSubscriber] Completing wrapped subscriber"); wrappedSubscriber.onComplete(); } From dde89892394607754bdca4209c866c82836dfa59 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 12:39:33 -0700 Subject: [PATCH 11/30] m --- .../s3/internal/CipherSubscriber.java | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 7007b7371..08510842a 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -58,6 +58,11 @@ public void cancel() { @Override public void onNext(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] ByteBuffer content: " + byteBuffer.toString()); +// while (byteBuffer.hasRemaining()) { +// byte b = byteBuffer.get(); +// System.out.printf("%02x ", b); // Print as hex +// } System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); System.out.println("[CipherSubscriber] isLastPart: " + isLastPart); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); @@ -67,7 +72,7 @@ public void onNext(ByteBuffer byteBuffer) { System.out.println("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer); byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); - + outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0)); @@ -87,8 +92,8 @@ public void onNext(ByteBuffer byteBuffer) { if (contentRead.get() < amount) { wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } else { - System.out.println("[CipherSubscriber] All content read (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + "), calling onComplete"); - this.onComplete(); + System.out.println("[CipherSubscriber] All content read (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + "), waiting for onComplete"); +// this.onComplete(); } } } else { @@ -138,37 +143,38 @@ public void onComplete() { wrappedSubscriber.onComplete(); return; } + byte[] finalBytes; try { System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); - byte[] finalBytes = cipher.doFinal(); - System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes"); - - byte[] combinedBytes; - if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { - System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); - combinedBytes = new byte[outputBuffer.length + finalBytes.length]; - System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); - System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); - } else if (outputBuffer != null && outputBuffer.length > 0) { - System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); - combinedBytes = outputBuffer; - } else if (finalBytes != null && finalBytes.length > 0) { - System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); - combinedBytes = finalBytes; - } else { - System.out.println("[CipherSubscriber] No bytes to send"); - combinedBytes = new byte[0]; - } - - if (combinedBytes.length > 0) { - System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); - wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); - } + finalBytes = cipher.doFinal(); } catch (final GeneralSecurityException exception) { System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); wrappedSubscriber.onError(exception); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } + System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes"); + + byte[] combinedBytes; + if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); + combinedBytes = new byte[outputBuffer.length + finalBytes.length]; + System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); + System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); + } else if (outputBuffer != null && outputBuffer.length > 0) { + System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); + combinedBytes = outputBuffer; + } else if (finalBytes != null && finalBytes.length > 0) { + System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); + combinedBytes = finalBytes; + } else { + System.out.println("[CipherSubscriber] No bytes to send"); + combinedBytes = new byte[0]; + } + + if (combinedBytes.length > 0) { + System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); + } System.out.println("[CipherSubscriber] Completing wrapped subscriber"); wrappedSubscriber.onComplete(); } From e7cd2e2a00a08bf5499c3ddacc21f0791c6a8fa3 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 15:04:04 -0700 Subject: [PATCH 12/30] m --- .../s3/internal/CipherSubscriber.java | 112 ++++++------------ 1 file changed, 35 insertions(+), 77 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 08510842a..aea0d06bf 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -20,7 +20,6 @@ public class CipherSubscriber implements Subscriber { private final Long contentLength; private boolean isLastPart; private boolean onCompleteCalled = false; - private final AtomicLong outstandingRequests = new AtomicLong(0); private byte[] outputBuffer; @@ -38,95 +37,62 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); - wrappedSubscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { - System.out.println("[CipherSubscriber] New request received for " + n + " items"); - outstandingRequests.addAndGet(n); - System.out.println("[CipherSubscriber] Current outstanding requests: " + outstandingRequests.get()); - s.request(n); - } - - @Override - public void cancel() { - System.out.println("[CipherSubscriber] Subscription cancelled"); - s.cancel(); - } - }); + wrappedSubscriber.onSubscribe(s); } @Override public void onNext(ByteBuffer byteBuffer) { - System.out.println("[CipherSubscriber] ByteBuffer content: " + byteBuffer.toString()); -// while (byteBuffer.hasRemaining()) { -// byte b = byteBuffer.get(); -// System.out.printf("%02x ", b); // Print as hex -// } - System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); - System.out.println("[CipherSubscriber] isLastPart: " + isLastPart); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); - System.out.println("[CipherSubscriber] amountToReadFromByteBuffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { - System.out.println("[CipherSubscriber] Processing chunk of size: " + amountToReadFromByteBuffer); byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); - outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Cipher update produced output buffer of length: " + (outputBuffer != null ? outputBuffer.length : 0)); - if (outputBuffer == null || outputBuffer.length == 0) { - System.out.println("[CipherSubscriber] No output from cipher update"); - System.out.println("[CipherSubscriber] contentRead: " + contentRead.get() + ", contentLength: " + contentLength); + // The underlying data is too short to fill in the block cipher. + // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case, + // in practice SunJCE and ACCP return an empty buffer instead, hence checks for + // null OR length == 0. if (contentRead.get() == contentLength) { - System.out.println("[CipherSubscriber] All content read (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + "), calling onComplete"); + // All content has been read, so complete to get the final bytes this.onComplete(); } - System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber"); + // Otherwise, wait for more bytes. To avoid blocking, + // send an empty buffer to the wrapped subscriber. wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); - System.out.println("[CipherSubscriber] contentRead: " + contentRead.get() + ", contentLength: " + contentLength); Long amount = isLastPart ? contentLength - 31 : contentLength - 15; if (contentRead.get() < amount) { wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } else { + // Done, wait for upstream to signal onComplete System.out.println("[CipherSubscriber] All content read (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + "), waiting for onComplete"); -// this.onComplete(); } } } else { - System.out.println("[CipherSubscriber] No bytes to read from input buffer, forwarding original buffer"); + // Do nothing wrappedSubscriber.onNext(byteBuffer); } } private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { - System.out.println("[CipherSubscriber] getAmountToReadFromByteBuffer called with buffer remaining: " + byteBuffer.remaining()); - System.out.println("[CipherSubscriber] Current contentRead: " + contentRead.get() + ", contentLength: " + contentLength); - + // If content length is null, we should include everything in the cipher because the stream is essentially + // unbounded. if (contentLength == null) { - System.out.println("[CipherSubscriber] No content length specified, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); - System.out.println("[CipherSubscriber] amountReadSoFar: " + amountReadSoFar + ", amountRemaining: " + amountRemaining); if (amountRemaining > byteBuffer.remaining()) { - System.out.println("[CipherSubscriber] More remaining than buffer size, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { - System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { - System.out.println("[CipherSubscriber] onError called: " + t.getMessage()); wrappedSubscriber.onError(t); } @@ -136,47 +102,39 @@ public void onComplete() { return; } onCompleteCalled = true; - System.out.println("[CipherSubscriber] onComplete called, isLastPart: " + isLastPart); if (!isLastPart) { - System.out.println("[CipherSubscriber] Not last part, skipping doFinal"); + // If this isn't the last part, skip doFinal, we aren't done wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); wrappedSubscriber.onComplete(); return; } - byte[] finalBytes; try { - System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); - finalBytes = cipher.doFinal(); + byte[] finalBytes = cipher.doFinal(); + + byte[] combinedBytes; + if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + combinedBytes = new byte[outputBuffer.length + finalBytes.length]; + System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); + System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); + } else if (outputBuffer != null && outputBuffer.length > 0) { + combinedBytes = outputBuffer; + } else if (finalBytes != null && finalBytes.length > 0) { + combinedBytes = finalBytes; + } else { + combinedBytes = new byte[0]; + } + + if (combinedBytes.length > 0) { + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); + } + wrappedSubscriber.onComplete(); } catch (final GeneralSecurityException exception) { - System.out.println("[CipherSubscriber] Error during doFinal: " + exception.getMessage()); + // Forward error, else the wrapped subscriber waits indefinitely + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); wrappedSubscriber.onError(exception); + wrappedSubscriber.onComplete(); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } - System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes"); - - byte[] combinedBytes; - if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { - System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); - combinedBytes = new byte[outputBuffer.length + finalBytes.length]; - System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); - System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); - } else if (outputBuffer != null && outputBuffer.length > 0) { - System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); - combinedBytes = outputBuffer; - } else if (finalBytes != null && finalBytes.length > 0) { - System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); - combinedBytes = finalBytes; - } else { - System.out.println("[CipherSubscriber] No bytes to send"); - combinedBytes = new byte[0]; - } - - if (combinedBytes.length > 0) { - System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); - wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); - } - System.out.println("[CipherSubscriber] Completing wrapped subscriber"); - wrappedSubscriber.onComplete(); } } \ No newline at end of file From daf14932aa4c15a2172e952312c88741721cdd44 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 15:16:43 -0700 Subject: [PATCH 13/30] m --- .../software/amazon/encryption/s3/internal/CipherSubscriber.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index aea0d06bf..45bcc12f5 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -66,6 +66,7 @@ public void onNext(ByteBuffer byteBuffer) { } else { // Done, wait for upstream to signal onComplete System.out.println("[CipherSubscriber] All content read (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + "), waiting for onComplete"); + this.onComplete(); } } } else { From 9febcce7f42384c9762696b8d802bebc94fbee0b Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 15:44:37 -0700 Subject: [PATCH 14/30] m --- .../s3/internal/CipherSubscriber.java | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 45bcc12f5..166b8566a 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -42,11 +42,16 @@ public void onSubscribe(Subscription s) { @Override public void onNext(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); + System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes"); + if (outputBuffer == null || outputBuffer.length == 0) { // The underlying data is too short to fill in the block cipher. // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case, @@ -54,23 +59,29 @@ public void onNext(ByteBuffer byteBuffer) { // null OR length == 0. if (contentRead.get() == contentLength) { // All content has been read, so complete to get the final bytes + System.out.println("[CipherSubscriber] All content read, calling onComplete"); this.onComplete(); } // Otherwise, wait for more bytes. To avoid blocking, // send an empty buffer to the wrapped subscriber. + System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - Long amount = isLastPart ? contentLength - 31 : contentLength - 15; + Long amount = isLastPart ? contentLength - (cipher.getBlockSize()) : contentLength - (cipher.getBlockSize()); + System.out.println("[CipherSubscriber] Amount: " + amount); + System.out.println("[CipherSubscriber] Content read: " + contentRead.get()); + System.out.println("content.get() - amount: " + (contentRead.get() - amount)); if (contentRead.get() < amount) { + System.out.println("[CipherSubscriber] Sending output buffer of size " + outputBuffer.length + " to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } else { - // Done, wait for upstream to signal onComplete - System.out.println("[CipherSubscriber] All content read (contentRead: " + contentRead.get() + ", contentLength: " + contentLength + "), waiting for onComplete"); + System.out.println("[CipherSubscriber] Content read threshold reached, calling onComplete"); this.onComplete(); } } } else { // Do nothing + System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly"); wrappedSubscriber.onNext(byteBuffer); } } @@ -79,58 +90,74 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { + System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); + System.out.println("[CipherSubscriber] Amount read so far: " + amountReadSoFar + ", remaining: " + amountRemaining); if (amountRemaining > byteBuffer.remaining()) { + System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { + System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { + System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { + System.out.println("[CipherSubscriber] onComplete called"); if (onCompleteCalled) { + System.out.println("[CipherSubscriber] onComplete already called, returning"); return; } onCompleteCalled = true; if (!isLastPart) { // If this isn't the last part, skip doFinal, we aren't done + System.out.println("[CipherSubscriber] Not last part, skipping doFinal"); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); wrappedSubscriber.onComplete(); return; } try { + System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); byte[] finalBytes = cipher.doFinal(); + System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes"); byte[] combinedBytes; if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = new byte[outputBuffer.length + finalBytes.length]; System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); } else if (outputBuffer != null && outputBuffer.length > 0) { + System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); combinedBytes = outputBuffer; } else if (finalBytes != null && finalBytes.length > 0) { + System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = finalBytes; } else { + System.out.println("[CipherSubscriber] No bytes to send"); combinedBytes = new byte[0]; } if (combinedBytes.length > 0) { + System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); } wrappedSubscriber.onComplete(); } catch (final GeneralSecurityException exception) { // Forward error, else the wrapped subscriber waits indefinitely + System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage()); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); wrappedSubscriber.onError(exception); wrappedSubscriber.onComplete(); From faf99ee6bcb317fa211fda6581d2a42055f9251a Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 16:03:33 -0700 Subject: [PATCH 15/30] m --- .../s3/internal/CipherSubscriber.java | 86 ++++++++----------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 166b8566a..e97851564 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -42,15 +42,11 @@ public void onSubscribe(Subscription s) { @Override public void onNext(ByteBuffer byteBuffer) { - System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining()); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); - System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes"); if (outputBuffer == null || outputBuffer.length == 0) { // The underlying data is too short to fill in the block cipher. @@ -59,29 +55,30 @@ public void onNext(ByteBuffer byteBuffer) { // null OR length == 0. if (contentRead.get() == contentLength) { // All content has been read, so complete to get the final bytes - System.out.println("[CipherSubscriber] All content read, calling onComplete"); this.onComplete(); } // Otherwise, wait for more bytes. To avoid blocking, // send an empty buffer to the wrapped subscriber. - System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - Long amount = isLastPart ? contentLength - (cipher.getBlockSize()) : contentLength - (cipher.getBlockSize()); - System.out.println("[CipherSubscriber] Amount: " + amount); - System.out.println("[CipherSubscriber] Content read: " + contentRead.get()); - System.out.println("content.get() - amount: " + (contentRead.get() - amount)); + // cipher.update will only return a block of data if it has been provided a full block of data. + // If it has been provided a partial block of data, it will not return partial data. + // If the CipherSubscriber is done sending data, but the total amount of data is not a multiple of the block size, + // the amount of content returned by the cipher will be less than the contentLength by at most the block size. + // Calling `doFinal` will return the remaining bytes along with the tag. + Long amount = contentLength - cipher.getBlockSize(); if (contentRead.get() < amount) { - System.out.println("[CipherSubscriber] Sending output buffer of size " + outputBuffer.length + " to wrapped subscriber"); + // If the amount of data read so far is less than the amount of data that should have been read, + // send the data downstream, expecting that downstream will request more data. wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } else { - System.out.println("[CipherSubscriber] Content read threshold reached, calling onComplete"); + // If the amount of data read so far is at least the amount of data that should have been read, + // complete the stream, as downstream will not request any more data. this.onComplete(); } } } else { // Do nothing - System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly"); wrappedSubscriber.onNext(byteBuffer); } } @@ -90,79 +87,70 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { - System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); - System.out.println("[CipherSubscriber] Amount read so far: " + amountReadSoFar + ", remaining: " + amountRemaining); if (amountRemaining > byteBuffer.remaining()) { - System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { - System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { - System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { - System.out.println("[CipherSubscriber] onComplete called"); if (onCompleteCalled) { - System.out.println("[CipherSubscriber] onComplete already called, returning"); return; } onCompleteCalled = true; if (!isLastPart) { // If this isn't the last part, skip doFinal, we aren't done - System.out.println("[CipherSubscriber] Not last part, skipping doFinal"); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); wrappedSubscriber.onComplete(); return; } - try { - System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); - byte[] finalBytes = cipher.doFinal(); - System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes"); - - byte[] combinedBytes; - if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { - System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); - combinedBytes = new byte[outputBuffer.length + finalBytes.length]; - System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); - System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); - } else if (outputBuffer != null && outputBuffer.length > 0) { - System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); - combinedBytes = outputBuffer; - } else if (finalBytes != null && finalBytes.length > 0) { - System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); - combinedBytes = finalBytes; - } else { - System.out.println("[CipherSubscriber] No bytes to send"); - combinedBytes = new byte[0]; - } - if (combinedBytes.length > 0) { - System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); - wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); - } - wrappedSubscriber.onComplete(); + byte[] finalBytes = null; + try { + finalBytes = cipher.doFinal(); } catch (final GeneralSecurityException exception) { - // Forward error, else the wrapped subscriber waits indefinitely - System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage()); + // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); + // Even though doFinal failed, downstream still expects to receive onComplete signal wrappedSubscriber.onComplete(); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } + + // Combine the bytes from outputBuffer and finalBytes into one onNext call. + // Downstream has requested `1` in its request method, so this class can only call onNext once. + // This onNext call must contain both the bytes from outputBuffer and the tag. + byte[] combinedBytes; + if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + combinedBytes = new byte[outputBuffer.length + finalBytes.length]; + System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); + System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); + } else if (outputBuffer != null && outputBuffer.length > 0) { + combinedBytes = outputBuffer; + } else if (finalBytes != null && finalBytes.length > 0) { + combinedBytes = finalBytes; + } else { + combinedBytes = new byte[0]; + } + + if (combinedBytes.length > 0) { + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); + } + wrappedSubscriber.onComplete(); } } \ No newline at end of file From 84fe71d5420d0e32fe13543954d3e6b467af9cfa Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 17:28:29 -0700 Subject: [PATCH 16/30] m --- .../s3/internal/CipherSubscriber.java | 53 ++++++++++++------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index e97851564..64f4e2f7d 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -19,6 +19,7 @@ public class CipherSubscriber implements Subscriber { private Cipher cipher; private final Long contentLength; private boolean isLastPart; + private int tagLength; private boolean onCompleteCalled = false; private byte[] outputBuffer; @@ -28,6 +29,15 @@ public class CipherSubscriber implements Subscriber { this.contentLength = contentLength; cipher = materials.getCipher(iv); this.isLastPart = isLastPart; + + // Determine the tag length based on the cipher algorithm + if (cipher.getAlgorithm().contains("GCM")) { + tagLength = 16; + } else if (cipher.getAlgorithm().contains("CBC") || cipher.getAlgorithm().contains("CTR")) { + tagLength = 0; + } else { + throw new IllegalArgumentException("Unsupported cipher type: " + cipher.getAlgorithm()); + } } CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv) { @@ -61,20 +71,19 @@ public void onNext(ByteBuffer byteBuffer) { // send an empty buffer to the wrapped subscriber. wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - // cipher.update will only return a block of data if it has been provided a full block of data. - // If it has been provided a partial block of data, it will not return partial data. - // If the CipherSubscriber is done sending data, but the total amount of data is not a multiple of the block size, - // the amount of content returned by the cipher will be less than the contentLength by at most the block size. - // Calling `doFinal` will return the remaining bytes along with the tag. - Long amount = contentLength - cipher.getBlockSize(); - if (contentRead.get() < amount) { - // If the amount of data read so far is less than the amount of data that should have been read, - // send the data downstream, expecting that downstream will request more data. - wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); - } else { - // If the amount of data read so far is at least the amount of data that should have been read, - // complete the stream, as downstream will not request any more data. + // Once all content has been read, call onComplete. + // This class can identify when all content has been read because the amount of data read so far + // plus the tag length will equal the content length. + if (contentRead.get() + tagLength == contentLength) { + // All content has been read, so complete the stream. + // The next onNext call MUST include all bytes, including the result of cipher.doFinal(). + // Sending any additional onNext calls violates the Reactive Streams specification + // and can lead to issues. this.onComplete(); + } else { + // Needs to read more data, so send the data downstream, + // expecting that downstream will continue to request more data. + wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } } else { @@ -107,17 +116,23 @@ public void onError(Throwable t) { @Override public void onComplete() { + // onComplete can be signalled to CipherSubscriber multiple times, + // but additional calls should be deduped. if (onCompleteCalled) { return; } onCompleteCalled = true; + + // If this isn't the last part, skip doFinal and just send outputBuffer downstream. if (!isLastPart) { - // If this isn't the last part, skip doFinal, we aren't done + // First, propagate the bytes that were in outputBuffer downstream. wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); + // Then, propagate the onComplete signal downstream. wrappedSubscriber.onComplete(); return; } + // If this is the last part, include the result of doFinal in the value sent downstream. byte[] finalBytes = null; try { finalBytes = cipher.doFinal(); @@ -126,14 +141,14 @@ public void onComplete() { wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); - // Even though doFinal failed, downstream still expects to receive onComplete signal + // Even though doFinal failed, propagate the onComplete signal downstream. wrappedSubscriber.onComplete(); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } // Combine the bytes from outputBuffer and finalBytes into one onNext call. - // Downstream has requested `1` in its request method, so this class can only call onNext once. - // This onNext call must contain both the bytes from outputBuffer and the tag. + // Downstream has requested one item in its request method, so this class can only call onNext once. + // This single onNext call must contain both the bytes from outputBuffer and the tag. byte[] combinedBytes; if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { combinedBytes = new byte[outputBuffer.length + finalBytes.length]; @@ -147,9 +162,7 @@ public void onComplete() { combinedBytes = new byte[0]; } - if (combinedBytes.length > 0) { - wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); - } + wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); wrappedSubscriber.onComplete(); } From 84be7aa2a8fc825845d882c6d2ac6c0b9b761d0e Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Mon, 28 Apr 2025 17:37:16 -0700 Subject: [PATCH 17/30] m --- .../amazon/encryption/s3/internal/CipherSubscriber.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 64f4e2f7d..4176f1626 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -73,8 +73,8 @@ public void onNext(ByteBuffer byteBuffer) { } else { // Once all content has been read, call onComplete. // This class can identify when all content has been read because the amount of data read so far - // plus the tag length will equal the content length. - if (contentRead.get() + tagLength == contentLength) { + // plus the tag length exceeds the content length. + if (contentRead.get() + tagLength >= contentLength) { // All content has been read, so complete the stream. // The next onNext call MUST include all bytes, including the result of cipher.doFinal(). // Sending any additional onNext calls violates the Reactive Streams specification From 12be3e35ec9d25ada7e5e716d1603e1b674afece Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Tue, 29 Apr 2025 09:45:39 -0700 Subject: [PATCH 18/30] m --- .../s3/internal/CipherSubscriber.java | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 4176f1626..e8efa5486 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -30,7 +30,8 @@ public class CipherSubscriber implements Subscriber { cipher = materials.getCipher(iv); this.isLastPart = isLastPart; - // Determine the tag length based on the cipher algorithm + // Determine the tag length based on the cipher algorithm. + // This class uses the tag length to identify the end of the stream before the onComplete signal is sent. if (cipher.getAlgorithm().contains("GCM")) { tagLength = 16; } else if (cipher.getAlgorithm().contains("CBC") || cipher.getAlgorithm().contains("CTR")) { @@ -71,14 +72,25 @@ public void onNext(ByteBuffer byteBuffer) { // send an empty buffer to the wrapped subscriber. wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { + // Check if stream has read all expected content. // Once all content has been read, call onComplete. - // This class can identify when all content has been read because the amount of data read so far - // plus the tag length exceeds the content length. + // + // This class determines that all content has been read by checking if + // the amount of data read so far plus the tag length is at least the content length. + // Once this is true, downstream will never call `request` again + // (beyond the current request that is being responded to in this onNext invocation.) + // As a result, this class can only call `wrappedSubscriber.onNext` one more time. + // (Reactive streams require that downstream sends a `request(n)` + // to indicate it is ready for more data, and upstream responds to that request by calling `onNext`. + // The `n` in request is the maximum number of `onNext` calls that downstream + // will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.) + // Since this class can only call `wrappedSubscriber.onNext` once, + // it must send all remaining data in the next onNext call, + // including the result of cipher.doFinal(), if applicable. + // Calling `wrappedSubscriber.onNext` more than once violates the Reactive Streams specification + // and can cause exceptions downstream. if (contentRead.get() + tagLength >= contentLength) { - // All content has been read, so complete the stream. - // The next onNext call MUST include all bytes, including the result of cipher.doFinal(). - // Sending any additional onNext calls violates the Reactive Streams specification - // and can lead to issues. + // All content has been read; complete the stream. this.onComplete(); } else { // Needs to read more data, so send the data downstream, @@ -116,14 +128,17 @@ public void onError(Throwable t) { @Override public void onComplete() { - // onComplete can be signalled to CipherSubscriber multiple times, - // but additional calls should be deduped. + // onComplete can be signalled to CipherSubscriber multiple times + // but additional calls should be deduped to avoid calling onNext multiple times + // and raising exceptions. if (onCompleteCalled) { return; } onCompleteCalled = true; // If this isn't the last part, skip doFinal and just send outputBuffer downstream. + // doFinal requires that all parts have been processed to compute the tag, + // so the tag will only be computed when the last part is processed. if (!isLastPart) { // First, propagate the bytes that were in outputBuffer downstream. wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); @@ -132,7 +147,9 @@ public void onComplete() { return; } - // If this is the last part, include the result of doFinal in the value sent downstream. + // If this is the last part, compute doFinal and include its result in the value sent downstream. + // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call. + // When this class calculates it has read all content byte[] finalBytes = null; try { finalBytes = cipher.doFinal(); @@ -141,7 +158,7 @@ public void onComplete() { wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); - // Even though doFinal failed, propagate the onComplete signal downstream. + // Even though doFinal failed, propagate the onComplete signal downstream wrappedSubscriber.onComplete(); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } From 53d62d6757c76aabb3be1086163b3b19e7789c3b Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Tue, 29 Apr 2025 09:46:56 -0700 Subject: [PATCH 19/30] m --- .../amazon/encryption/s3/internal/CipherSubscriber.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index e8efa5486..8fadb7851 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -87,10 +87,12 @@ public void onNext(ByteBuffer byteBuffer) { // Since this class can only call `wrappedSubscriber.onNext` once, // it must send all remaining data in the next onNext call, // including the result of cipher.doFinal(), if applicable. - // Calling `wrappedSubscriber.onNext` more than once violates the Reactive Streams specification - // and can cause exceptions downstream. + // Calling `wrappedSubscriber.onNext` more than once for `request(1)` + // violates the Reactive Streams specification and can cause exceptions downstream. if (contentRead.get() + tagLength >= contentLength) { // All content has been read; complete the stream. + // (Signalling onComplete from here is Reactive Streams-spec compliant; + // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.) this.onComplete(); } else { // Needs to read more data, so send the data downstream, @@ -128,7 +130,7 @@ public void onError(Throwable t) { @Override public void onComplete() { - // onComplete can be signalled to CipherSubscriber multiple times + // onComplete can be signalled to CipherSubscriber multiple times, // but additional calls should be deduped to avoid calling onNext multiple times // and raising exceptions. if (onCompleteCalled) { @@ -149,7 +151,6 @@ public void onComplete() { // If this is the last part, compute doFinal and include its result in the value sent downstream. // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call. - // When this class calculates it has read all content byte[] finalBytes = null; try { finalBytes = cipher.doFinal(); From b30a380c67c86b88c2e8105ca67932a8feafba26 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Wed, 30 Apr 2025 15:41:43 -0700 Subject: [PATCH 20/30] m --- .../s3/internal/CipherSubscriber.java | 63 ++++++++++++++++--- 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 8fadb7851..3163a8a2d 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -48,29 +48,49 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - wrappedSubscriber.onSubscribe(s); + System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); + wrappedSubscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + System.out.println("[CipherSubscriber] Request received for " + n + " items"); + s.request(n); + } + + @Override + public void cancel() { + System.out.println("[CipherSubscriber] Subscription cancelled"); + s.cancel(); + } + }); } @Override public void onNext(ByteBuffer byteBuffer) { + System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining() + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); + System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); + System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes"); if (outputBuffer == null || outputBuffer.length == 0) { // The underlying data is too short to fill in the block cipher. // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case, // in practice SunJCE and ACCP return an empty buffer instead, hence checks for // null OR length == 0. - if (contentRead.get() == contentLength) { + if (contentRead.get() + tagLength >= contentLength) { // All content has been read, so complete to get the final bytes - this.onComplete(); + System.out.println("[CipherSubscriber] All content read (" + contentRead.get() + " bytes), proceeding to finalBytes"); + finalBytes(); + return; } // Otherwise, wait for more bytes. To avoid blocking, // send an empty buffer to the wrapped subscriber. - wrappedSubscriber.onNext(ByteBuffer.allocate(0)); + System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber"); +// wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { // Check if stream has read all expected content. // Once all content has been read, call onComplete. @@ -80,7 +100,7 @@ public void onNext(ByteBuffer byteBuffer) { // Once this is true, downstream will never call `request` again // (beyond the current request that is being responded to in this onNext invocation.) // As a result, this class can only call `wrappedSubscriber.onNext` one more time. - // (Reactive streams require that downstream sends a `request(n)` + // (Reactive streams require that downstream sends a `request(n)` // to indicate it is ready for more data, and upstream responds to that request by calling `onNext`. // The `n` in request is the maximum number of `onNext` calls that downstream // will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.) @@ -89,19 +109,23 @@ public void onNext(ByteBuffer byteBuffer) { // including the result of cipher.doFinal(), if applicable. // Calling `wrappedSubscriber.onNext` more than once for `request(1)` // violates the Reactive Streams specification and can cause exceptions downstream. + System.out.println("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead.get() + ", tagLength=" + tagLength + ", contentLength=" + contentLength); if (contentRead.get() + tagLength >= contentLength) { // All content has been read; complete the stream. // (Signalling onComplete from here is Reactive Streams-spec compliant; // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.) - this.onComplete(); + System.out.println("[CipherSubscriber] Content read threshold reached, proceeding to finalBytes"); + finalBytes(); } else { // Needs to read more data, so send the data downstream, // expecting that downstream will continue to request more data. + System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } } else { // Do nothing + System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly"); wrappedSubscriber.onNext(byteBuffer); } } @@ -110,30 +134,42 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { + System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); + System.out.println("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer.remaining()); if (amountRemaining > byteBuffer.remaining()) { + System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { + System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { + System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { + System.out.println("[CipherSubscriber] onComplete called"); + wrappedSubscriber.onComplete(); + } + + public void finalBytes() { + System.out.println("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart + ", onCompleteCalled: " + onCompleteCalled); // onComplete can be signalled to CipherSubscriber multiple times, // but additional calls should be deduped to avoid calling onNext multiple times // and raising exceptions. if (onCompleteCalled) { + System.out.println("[CipherSubscriber] finalBytes already called, returning"); return; } onCompleteCalled = true; @@ -142,10 +178,11 @@ public void onComplete() { // doFinal requires that all parts have been processed to compute the tag, // so the tag will only be computed when the last part is processed. if (!isLastPart) { + System.out.println("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer.length : 0)); // First, propagate the bytes that were in outputBuffer downstream. wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); // Then, propagate the onComplete signal downstream. - wrappedSubscriber.onComplete(); +// wrappedSubscriber.onComplete(); return; } @@ -153,14 +190,17 @@ public void onComplete() { // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call. byte[] finalBytes = null; try { + System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); finalBytes = cipher.doFinal(); + System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes"); } catch (final GeneralSecurityException exception) { // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer + System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage()); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); // Even though doFinal failed, propagate the onComplete signal downstream - wrappedSubscriber.onComplete(); +// wrappedSubscriber.onComplete(); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } @@ -169,19 +209,24 @@ public void onComplete() { // This single onNext call must contain both the bytes from outputBuffer and the tag. byte[] combinedBytes; if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = new byte[outputBuffer.length + finalBytes.length]; System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); } else if (outputBuffer != null && outputBuffer.length > 0) { + System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); combinedBytes = outputBuffer; } else if (finalBytes != null && finalBytes.length > 0) { + System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = finalBytes; } else { + System.out.println("[CipherSubscriber] No bytes to send"); combinedBytes = new byte[0]; } + System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); - wrappedSubscriber.onComplete(); +// wrappedSubscriber.onComplete(); } } \ No newline at end of file From 2a6c43d8cb85c733ae5205e9e7e3036ea18765e8 Mon Sep 17 00:00:00 2001 From: Lucas McDonald Date: Wed, 30 Apr 2025 15:53:50 -0700 Subject: [PATCH 21/30] m --- .../s3/internal/CipherSubscriber.java | 61 +------------------ 1 file changed, 3 insertions(+), 58 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 3163a8a2d..b87c80b14 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -20,7 +20,6 @@ public class CipherSubscriber implements Subscriber { private final Long contentLength; private boolean isLastPart; private int tagLength; - private boolean onCompleteCalled = false; private byte[] outputBuffer; @@ -48,33 +47,16 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); - wrappedSubscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { - System.out.println("[CipherSubscriber] Request received for " + n + " items"); - s.request(n); - } - - @Override - public void cancel() { - System.out.println("[CipherSubscriber] Subscription cancelled"); - s.cancel(); - } - }); + wrappedSubscriber.onSubscribe(s); } @Override public void onNext(ByteBuffer byteBuffer) { - System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining() + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); - System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); - System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes"); if (outputBuffer == null || outputBuffer.length == 0) { // The underlying data is too short to fill in the block cipher. @@ -83,14 +65,12 @@ public void onNext(ByteBuffer byteBuffer) { // null OR length == 0. if (contentRead.get() + tagLength >= contentLength) { // All content has been read, so complete to get the final bytes - System.out.println("[CipherSubscriber] All content read (" + contentRead.get() + " bytes), proceeding to finalBytes"); finalBytes(); return; } // Otherwise, wait for more bytes. To avoid blocking, // send an empty buffer to the wrapped subscriber. - System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber"); -// wrappedSubscriber.onNext(ByteBuffer.allocate(0)); + wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { // Check if stream has read all expected content. // Once all content has been read, call onComplete. @@ -109,23 +89,19 @@ public void onNext(ByteBuffer byteBuffer) { // including the result of cipher.doFinal(), if applicable. // Calling `wrappedSubscriber.onNext` more than once for `request(1)` // violates the Reactive Streams specification and can cause exceptions downstream. - System.out.println("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead.get() + ", tagLength=" + tagLength + ", contentLength=" + contentLength); if (contentRead.get() + tagLength >= contentLength) { // All content has been read; complete the stream. // (Signalling onComplete from here is Reactive Streams-spec compliant; // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.) - System.out.println("[CipherSubscriber] Content read threshold reached, proceeding to finalBytes"); finalBytes(); } else { // Needs to read more data, so send the data downstream, // expecting that downstream will continue to request more data. - System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } } else { // Do nothing - System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly"); wrappedSubscriber.onNext(byteBuffer); } } @@ -134,73 +110,48 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { - System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); - System.out.println("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer.remaining()); if (amountRemaining > byteBuffer.remaining()) { - System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { - System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { - System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { - System.out.println("[CipherSubscriber] onComplete called"); wrappedSubscriber.onComplete(); } public void finalBytes() { - System.out.println("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart + ", onCompleteCalled: " + onCompleteCalled); - // onComplete can be signalled to CipherSubscriber multiple times, - // but additional calls should be deduped to avoid calling onNext multiple times - // and raising exceptions. - if (onCompleteCalled) { - System.out.println("[CipherSubscriber] finalBytes already called, returning"); - return; - } - onCompleteCalled = true; - // If this isn't the last part, skip doFinal and just send outputBuffer downstream. // doFinal requires that all parts have been processed to compute the tag, // so the tag will only be computed when the last part is processed. if (!isLastPart) { - System.out.println("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer.length : 0)); - // First, propagate the bytes that were in outputBuffer downstream. wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); - // Then, propagate the onComplete signal downstream. -// wrappedSubscriber.onComplete(); return; } // If this is the last part, compute doFinal and include its result in the value sent downstream. // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call. - byte[] finalBytes = null; + byte[] finalBytes; try { - System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); finalBytes = cipher.doFinal(); - System.out.println("[CipherSubscriber] doFinal produced " + (finalBytes != null ? finalBytes.length : 0) + " bytes"); } catch (final GeneralSecurityException exception) { // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer - System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage()); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); - // Even though doFinal failed, propagate the onComplete signal downstream -// wrappedSubscriber.onComplete(); throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); } @@ -209,24 +160,18 @@ public void finalBytes() { // This single onNext call must contain both the bytes from outputBuffer and the tag. byte[] combinedBytes; if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { - System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = new byte[outputBuffer.length + finalBytes.length]; System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); } else if (outputBuffer != null && outputBuffer.length > 0) { - System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); combinedBytes = outputBuffer; } else if (finalBytes != null && finalBytes.length > 0) { - System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = finalBytes; } else { - System.out.println("[CipherSubscriber] No bytes to send"); combinedBytes = new byte[0]; } - System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); -// wrappedSubscriber.onComplete(); } } \ No newline at end of file From 05ec321b48d5a012648807a4a0452bb339a4cf83 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Wed, 30 Apr 2025 17:04:40 -0700 Subject: [PATCH 22/30] includes debugging --- .../s3/internal/CipherSubscriber.java | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index b87c80b14..4156fa5a0 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -47,16 +47,33 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - wrappedSubscriber.onSubscribe(s); + //System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); + wrappedSubscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + //System.out.println("[CipherSubscriber] Request received for " + n + " items"); + s.request(n); + } + + @Override + public void cancel() { + //System.out.println("[CipherSubscriber] Subscription cancelled"); + s.cancel(); + } + }); } @Override public void onNext(ByteBuffer byteBuffer) { + //System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining() + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); + //System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); + //System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); + //System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes"); if (outputBuffer == null || outputBuffer.length == 0) { // The underlying data is too short to fill in the block cipher. @@ -65,11 +82,13 @@ public void onNext(ByteBuffer byteBuffer) { // null OR length == 0. if (contentRead.get() + tagLength >= contentLength) { // All content has been read, so complete to get the final bytes + //System.out.println("[CipherSubscriber] All content read (" + contentRead.get() + " bytes), proceeding to finalBytes"); finalBytes(); return; } // Otherwise, wait for more bytes. To avoid blocking, // send an empty buffer to the wrapped subscriber. + //System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { // Check if stream has read all expected content. @@ -89,19 +108,23 @@ public void onNext(ByteBuffer byteBuffer) { // including the result of cipher.doFinal(), if applicable. // Calling `wrappedSubscriber.onNext` more than once for `request(1)` // violates the Reactive Streams specification and can cause exceptions downstream. + //System.out.println("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead.get() + ", tagLength=" + tagLength + ", contentLength=" + contentLength); if (contentRead.get() + tagLength >= contentLength) { // All content has been read; complete the stream. // (Signalling onComplete from here is Reactive Streams-spec compliant; // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.) + //System.out.println("[CipherSubscriber] Content read threshold (" + contentRead.get() + ") reached, proceeding to finalBytes"); finalBytes(); } else { // Needs to read more data, so send the data downstream, // expecting that downstream will continue to request more data. + //System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } } else { // Do nothing + //System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly"); wrappedSubscriber.onNext(byteBuffer); } } @@ -110,26 +133,38 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { + //System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); + //System.out.println("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer.remaining()); if (amountRemaining > byteBuffer.remaining()) { + //System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { + //System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { + //System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { + //System.out.println("[CipherSubscriber] onComplete called"); + if (contentRead.get() < contentLength) { + //System.out.println("[CipherSubscriber] onComplete called prematurely! The content read is " + contentRead.get() + " but the contentLength is " + contentLength); + //System.out.println("try just calling finalBytes() straight up"); + finalBytes(); + //System.out.println("now let it complete"); + } wrappedSubscriber.onComplete(); } @@ -137,7 +172,9 @@ public void finalBytes() { // If this isn't the last part, skip doFinal and just send outputBuffer downstream. // doFinal requires that all parts have been processed to compute the tag, // so the tag will only be computed when the last part is processed. + //System.out.println("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart); if (!isLastPart) { + //System.out.println("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer.length : 0)); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); return; } @@ -146,9 +183,11 @@ public void finalBytes() { // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call. byte[] finalBytes; try { + //System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); finalBytes = cipher.doFinal(); } catch (final GeneralSecurityException exception) { // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer + //System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage()); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); @@ -160,17 +199,22 @@ public void finalBytes() { // This single onNext call must contain both the bytes from outputBuffer and the tag. byte[] combinedBytes; if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { + //System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = new byte[outputBuffer.length + finalBytes.length]; System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); } else if (outputBuffer != null && outputBuffer.length > 0) { + //System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); combinedBytes = outputBuffer; } else if (finalBytes != null && finalBytes.length > 0) { + //System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = finalBytes; } else { + //System.out.println("[CipherSubscriber] No bytes to send"); combinedBytes = new byte[0]; } + //System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); } From 0af0ad6503d223ea20f94e7739b06122c80a4faf Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Wed, 30 Apr 2025 17:16:57 -0700 Subject: [PATCH 23/30] remove logging --- .../s3/internal/CipherSubscriber.java | 33 ++----------------- src/main/resources/log4j.properties | 4 +++ 2 files changed, 6 insertions(+), 31 deletions(-) create mode 100644 src/main/resources/log4j.properties diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 4156fa5a0..d8f255146 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -47,17 +47,14 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - //System.out.println("[CipherSubscriber] onSubscribe called with subscription: " + s); wrappedSubscriber.onSubscribe(new Subscription() { @Override public void request(long n) { - //System.out.println("[CipherSubscriber] Request received for " + n + " items"); s.request(n); } @Override public void cancel() { - //System.out.println("[CipherSubscriber] Subscription cancelled"); s.cancel(); } }); @@ -65,15 +62,11 @@ public void cancel() { @Override public void onNext(ByteBuffer byteBuffer) { - //System.out.println("[CipherSubscriber] onNext called with buffer size: " + byteBuffer.remaining() + ", contentRead: " + contentRead.get() + ", contentLength: " + contentLength); int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer); - //System.out.println("[CipherSubscriber] Amount to read from buffer: " + amountToReadFromByteBuffer); if (amountToReadFromByteBuffer > 0) { byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); - //System.out.println("[CipherSubscriber] Copied " + buf.length + " bytes from input buffer"); outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); - //System.out.println("[CipherSubscriber] Cipher update produced " + (outputBuffer != null ? outputBuffer.length : 0) + " bytes"); if (outputBuffer == null || outputBuffer.length == 0) { // The underlying data is too short to fill in the block cipher. @@ -82,13 +75,11 @@ public void onNext(ByteBuffer byteBuffer) { // null OR length == 0. if (contentRead.get() + tagLength >= contentLength) { // All content has been read, so complete to get the final bytes - //System.out.println("[CipherSubscriber] All content read (" + contentRead.get() + " bytes), proceeding to finalBytes"); finalBytes(); return; } // Otherwise, wait for more bytes. To avoid blocking, // send an empty buffer to the wrapped subscriber. - //System.out.println("[CipherSubscriber] Sending empty buffer to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { // Check if stream has read all expected content. @@ -108,23 +99,19 @@ public void onNext(ByteBuffer byteBuffer) { // including the result of cipher.doFinal(), if applicable. // Calling `wrappedSubscriber.onNext` more than once for `request(1)` // violates the Reactive Streams specification and can cause exceptions downstream. - //System.out.println("[CipherSubscriber] Checking content read threshold: contentRead=" + contentRead.get() + ", tagLength=" + tagLength + ", contentLength=" + contentLength); if (contentRead.get() + tagLength >= contentLength) { // All content has been read; complete the stream. // (Signalling onComplete from here is Reactive Streams-spec compliant; // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.) - //System.out.println("[CipherSubscriber] Content read threshold (" + contentRead.get() + ") reached, proceeding to finalBytes"); finalBytes(); } else { // Needs to read more data, so send the data downstream, // expecting that downstream will continue to request more data. - //System.out.println("[CipherSubscriber] Sending " + outputBuffer.length + " bytes to wrapped subscriber"); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); } } } else { // Do nothing - //System.out.println("[CipherSubscriber] No data to process, forwarding buffer directly"); wrappedSubscriber.onNext(byteBuffer); } } @@ -133,37 +120,30 @@ private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) { // If content length is null, we should include everything in the cipher because the stream is essentially // unbounded. if (contentLength == null) { - //System.out.println("[CipherSubscriber] Content length is null, reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining()); long amountRemaining = Math.max(0, contentLength - amountReadSoFar); - //System.out.println("[CipherSubscriber] Buffer read calculation - read: " + amountReadSoFar + ", remaining: " + amountRemaining + ", buffer size: " + byteBuffer.remaining()); if (amountRemaining > byteBuffer.remaining()) { - //System.out.println("[CipherSubscriber] Reading entire buffer: " + byteBuffer.remaining()); return byteBuffer.remaining(); } else { - //System.out.println("[CipherSubscriber] Reading partial buffer: " + amountRemaining); return Math.toIntExact(amountRemaining); } } @Override public void onError(Throwable t) { - //System.out.println("[CipherSubscriber] Error occurred: " + t.getMessage()); wrappedSubscriber.onError(t); } @Override public void onComplete() { - //System.out.println("[CipherSubscriber] onComplete called"); + // In rare cases, e.g. when the last part of a low-level MPU has 0 length, + // onComplete will be called before onNext is called once. if (contentRead.get() < contentLength) { - //System.out.println("[CipherSubscriber] onComplete called prematurely! The content read is " + contentRead.get() + " but the contentLength is " + contentLength); - //System.out.println("try just calling finalBytes() straight up"); finalBytes(); - //System.out.println("now let it complete"); } wrappedSubscriber.onComplete(); } @@ -172,9 +152,7 @@ public void finalBytes() { // If this isn't the last part, skip doFinal and just send outputBuffer downstream. // doFinal requires that all parts have been processed to compute the tag, // so the tag will only be computed when the last part is processed. - //System.out.println("[CipherSubscriber] finalBytes called, isLastPart: " + isLastPart); if (!isLastPart) { - //System.out.println("[CipherSubscriber] Not last part, sending output buffer of size: " + (outputBuffer != null ? outputBuffer.length : 0)); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); return; } @@ -183,11 +161,9 @@ public void finalBytes() { // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call. byte[] finalBytes; try { - //System.out.println("[CipherSubscriber] Calling cipher.doFinal()"); finalBytes = cipher.doFinal(); } catch (final GeneralSecurityException exception) { // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer - //System.out.println("[CipherSubscriber] Security exception during doFinal: " + exception.getMessage()); wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); // Forward error, else the wrapped subscriber waits indefinitely wrappedSubscriber.onError(exception); @@ -199,22 +175,17 @@ public void finalBytes() { // This single onNext call must contain both the bytes from outputBuffer and the tag. byte[] combinedBytes; if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { - //System.out.println("[CipherSubscriber] Combining outputBuffer (" + outputBuffer.length + " bytes) with finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = new byte[outputBuffer.length + finalBytes.length]; System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); } else if (outputBuffer != null && outputBuffer.length > 0) { - //System.out.println("[CipherSubscriber] Using only outputBuffer (" + outputBuffer.length + " bytes)"); combinedBytes = outputBuffer; } else if (finalBytes != null && finalBytes.length > 0) { - //System.out.println("[CipherSubscriber] Using only finalBytes (" + finalBytes.length + " bytes)"); combinedBytes = finalBytes; } else { - //System.out.println("[CipherSubscriber] No bytes to send"); combinedBytes = new byte[0]; } - //System.out.println("[CipherSubscriber] Sending combined bytes to wrapped subscriber of length " + combinedBytes.length); wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); } diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 000000000..225d11dc2 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,4 @@ +log4j.properties: +log4j.rootLogger=ERROR,stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout From da80e6631bca36b9f6e3bdf17781e9a9068d6eec Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Wed, 30 Apr 2025 17:18:25 -0700 Subject: [PATCH 24/30] restore subscriber --- .../encryption/s3/internal/CipherSubscriber.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index d8f255146..e45bba99f 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -47,17 +47,7 @@ public class CipherSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - wrappedSubscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { - s.request(n); - } - - @Override - public void cancel() { - s.cancel(); - } - }); + wrappedSubscriber.onSubscribe(s); } @Override From 8ca7c008cf9f3e49fc4501cc3b9012b1a46fcb6d Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Wed, 30 Apr 2025 17:26:51 -0700 Subject: [PATCH 25/30] finalBytes only once --- .../amazon/encryption/s3/internal/CipherSubscriber.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index e45bba99f..194c7c8c8 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -11,6 +11,7 @@ import javax.crypto.Cipher; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class CipherSubscriber implements Subscriber { @@ -20,6 +21,7 @@ public class CipherSubscriber implements Subscriber { private final Long contentLength; private boolean isLastPart; private int tagLength; + private AtomicBoolean finalBytesCalled = new AtomicBoolean(false); private byte[] outputBuffer; @@ -139,6 +141,11 @@ public void onComplete() { } public void finalBytes() { + if (!finalBytesCalled.compareAndSet(false, true)) { + // already called, don't repeat + return; + } + // If this isn't the last part, skip doFinal and just send outputBuffer downstream. // doFinal requires that all parts have been processed to compute the tag, // so the tag will only be computed when the last part is processed. From e8914d4469d67d184b5e1ccd86e0e6cb579febae Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Thu, 1 May 2025 09:52:28 -0700 Subject: [PATCH 26/30] improve taglength, comment code --- .../s3/internal/CipherSubscriber.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 194c7c8c8..6ca6ffd74 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -17,29 +17,20 @@ public class CipherSubscriber implements Subscriber { private final AtomicLong contentRead = new AtomicLong(0); private final Subscriber wrappedSubscriber; - private Cipher cipher; + private final Cipher cipher; private final Long contentLength; - private boolean isLastPart; - private int tagLength; - private AtomicBoolean finalBytesCalled = new AtomicBoolean(false); + private final boolean isLastPart; + private final int tagLength; + private final AtomicBoolean finalBytesCalled = new AtomicBoolean(false); private byte[] outputBuffer; CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv, boolean isLastPart) { this.wrappedSubscriber = wrappedSubscriber; this.contentLength = contentLength; - cipher = materials.getCipher(iv); + this.cipher = materials.getCipher(iv); this.isLastPart = isLastPart; - - // Determine the tag length based on the cipher algorithm. - // This class uses the tag length to identify the end of the stream before the onComplete signal is sent. - if (cipher.getAlgorithm().contains("GCM")) { - tagLength = 16; - } else if (cipher.getAlgorithm().contains("CBC") || cipher.getAlgorithm().contains("CTR")) { - tagLength = 0; - } else { - throw new IllegalArgumentException("Unsupported cipher type: " + cipher.getAlgorithm()); - } + this.tagLength = materials.algorithmSuite().cipherTagLengthBytes(); } CipherSubscriber(Subscriber wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv) { @@ -140,7 +131,12 @@ public void onComplete() { wrappedSubscriber.onComplete(); } - public void finalBytes() { + /** + * Finalize encryption, including calculating the auth tag for AES-GCM. + * As such this method MUST only be called once, which is enforced using + * `finalBytesCalled`. + */ + private void finalBytes() { if (!finalBytesCalled.compareAndSet(false, true)) { // already called, don't repeat return; From 1564b49b6351fdf27dbef47f1ffd6c9e34b0a878 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Thu, 1 May 2025 10:27:59 -0700 Subject: [PATCH 27/30] more code comments --- .../amazon/encryption/s3/S3AsyncEncryptionClientTest.java | 2 ++ .../s3/S3EncryptionClientRangedGetCompatibilityTest.java | 2 ++ .../amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java | 3 +++ 3 files changed, 7 insertions(+) diff --git a/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java b/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java index ec0e90707..be4521411 100644 --- a/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java +++ b/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java @@ -729,6 +729,8 @@ public void copyObjectTransparentlyAsync() { * Test which artificially limits the size of buffers using {@link TinyBufferAsyncRequestBody}. * This tests edge cases where network conditions result in buffers with length shorter than * the cipher's block size. + * Note that TinyAsyncRequestBody is not fully spec-compliant, and will cause IllegalStateExceptions + * to be logged when debug logging is enabled. * @throws IOException */ @Test diff --git a/src/test/java/software/amazon/encryption/s3/S3EncryptionClientRangedGetCompatibilityTest.java b/src/test/java/software/amazon/encryption/s3/S3EncryptionClientRangedGetCompatibilityTest.java index 305dda8ee..a6809e55c 100644 --- a/src/test/java/software/amazon/encryption/s3/S3EncryptionClientRangedGetCompatibilityTest.java +++ b/src/test/java/software/amazon/encryption/s3/S3EncryptionClientRangedGetCompatibilityTest.java @@ -264,6 +264,7 @@ public void AsyncAesCbcV1toV3RangedGet(Object keyMaterial) { assertEquals("klmnopqrst0", output); // Valid start index within input and end index out of range, returns object from start index to End of Stream + // This causes a spurious NPE to be logged when debug logging is enabled. objectResponse = v3Client.getObject(builder -> builder .bucket(BUCKET) .range("bytes=190-300") @@ -288,6 +289,7 @@ public void AsyncAesCbcV1toV3RangedGet(Object keyMaterial) { assertEquals(input, output); // Invalid range starting index and ending index greater than object length but within Cipher Block size, returns empty object + // This causes a spurious NPE to be logged when debug logging is enabled. objectResponse = v3Client.getObject(builder -> builder .bucket(BUCKET) .range("bytes=216-217") diff --git a/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java b/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java index 50c4ba6e0..14b3f7c99 100644 --- a/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java +++ b/src/test/java/software/amazon/encryption/s3/utils/TinyBufferAsyncRequestBody.java @@ -10,6 +10,9 @@ * AsyncRequestBody which wraps another AsyncRequestBody with a {@link TinyBufferSubscriber}. * This is useful for testing poor network conditions where buffers may not be larger than * the cipher's block size. + * DO NOT USE THIS IN PRODUCTION. In addition to degraded performance, + * it will cause IllegalStateExceptions in the base Subscriber as it does not comply + * with the Reactive Streaming spec. */ public class TinyBufferAsyncRequestBody implements AsyncRequestBody { From 31270f2919b862af3d90aa0e3d506a372be15306 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Thu, 1 May 2025 10:51:27 -0700 Subject: [PATCH 28/30] convert to block comment --- .../s3/internal/CipherSubscriber.java | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index 6ca6ffd74..dce69a065 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -65,23 +65,25 @@ public void onNext(ByteBuffer byteBuffer) { // send an empty buffer to the wrapped subscriber. wrappedSubscriber.onNext(ByteBuffer.allocate(0)); } else { - // Check if stream has read all expected content. - // Once all content has been read, call onComplete. - // - // This class determines that all content has been read by checking if - // the amount of data read so far plus the tag length is at least the content length. - // Once this is true, downstream will never call `request` again - // (beyond the current request that is being responded to in this onNext invocation.) - // As a result, this class can only call `wrappedSubscriber.onNext` one more time. - // (Reactive streams require that downstream sends a `request(n)` - // to indicate it is ready for more data, and upstream responds to that request by calling `onNext`. - // The `n` in request is the maximum number of `onNext` calls that downstream - // will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.) - // Since this class can only call `wrappedSubscriber.onNext` once, - // it must send all remaining data in the next onNext call, - // including the result of cipher.doFinal(), if applicable. - // Calling `wrappedSubscriber.onNext` more than once for `request(1)` - // violates the Reactive Streams specification and can cause exceptions downstream. + /* + Check if stream has read all expected content. + Once all content has been read, call onComplete. + + This determines that all content has been read by checking if + the amount of data read so far plus the tag length is at least the content length. + Once this is true, downstream will never call `request` again + (beyond the current request that is being responded to in this onNext invocation.) + As a result, this class can only call `wrappedSubscriber.onNext` one more time. + (Reactive streams require that downstream sends a `request(n)` + to indicate it is ready for more data, and upstream responds to that request by calling `onNext`. + The `n` in request is the maximum number of `onNext` calls that downstream + will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.) + Since this class can only call `wrappedSubscriber.onNext` once, + it must send all remaining data in the next onNext call, + including the result of cipher.doFinal(), if applicable. + Calling `wrappedSubscriber.onNext` more than once for `request(1)` + violates the Reactive Streams specification and can cause exceptions downstream. + */ if (contentRead.get() + tagLength >= contentLength) { // All content has been read; complete the stream. // (Signalling onComplete from here is Reactive Streams-spec compliant; From adb5a41ff97f06aeb7ff134762feed60acc1a94c Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Thu, 1 May 2025 10:53:32 -0700 Subject: [PATCH 29/30] remove log4j properties --- src/main/resources/log4j.properties | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 src/main/resources/log4j.properties diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties deleted file mode 100644 index 225d11dc2..000000000 --- a/src/main/resources/log4j.properties +++ /dev/null @@ -1,4 +0,0 @@ -log4j.properties: -log4j.rootLogger=ERROR,stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout From 1604c4eda0dbeda37d0d6d5980d2788112952962 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Thu, 1 May 2025 14:02:11 -0700 Subject: [PATCH 30/30] feedback --- .../amazon/encryption/s3/internal/CipherSubscriber.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java index dce69a065..6e5ba90cb 100644 --- a/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java +++ b/src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java @@ -67,7 +67,7 @@ public void onNext(ByteBuffer byteBuffer) { } else { /* Check if stream has read all expected content. - Once all content has been read, call onComplete. + Once all content has been read, call `finalBytes`. This determines that all content has been read by checking if the amount of data read so far plus the tag length is at least the content length. @@ -86,8 +86,6 @@ public void onNext(ByteBuffer byteBuffer) { */ if (contentRead.get() + tagLength >= contentLength) { // All content has been read; complete the stream. - // (Signalling onComplete from here is Reactive Streams-spec compliant; - // this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.) finalBytes(); } else { // Needs to read more data, so send the data downstream, @@ -127,7 +125,7 @@ public void onError(Throwable t) { public void onComplete() { // In rare cases, e.g. when the last part of a low-level MPU has 0 length, // onComplete will be called before onNext is called once. - if (contentRead.get() < contentLength) { + if (contentRead.get() + tagLength <= contentLength) { finalBytes(); } wrappedSubscriber.onComplete();