-
Notifications
You must be signed in to change notification settings - Fork 19
fix: fix CipherSubscriber to only call onNext once per request #456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
caeac1d
m
e05b523
m
e11731a
m
b2fd263
m
62028e1
m
478e870
m
9ad2d16
m
f029007
m
11ad1d7
m
c23a407
m
dde8989
m
e7cd2e2
m
daf1493
m
9febcce
m
faf99ee
m
84fe71d
m
84be7aa
m
12be3e3
m
53d62d6
m
b30a380
m
2a6c43d
m
05ec321
includes debugging
kessplas 0af0ad6
remove logging
kessplas da80e66
restore subscriber
kessplas 8ca7c00
finalBytes only once
kessplas e8914d4
improve taglength, comment code
kessplas 1564b49
more code comments
kessplas 31270f2
convert to block comment
kessplas adb5a41
remove log4j properties
kessplas 1604c4e
feedback
kessplas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -11,22 +11,26 @@ | |||||||||||||
| import javax.crypto.Cipher; | ||||||||||||||
| import java.nio.ByteBuffer; | ||||||||||||||
| import java.security.GeneralSecurityException; | ||||||||||||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||||||||||||
| import java.util.concurrent.atomic.AtomicLong; | ||||||||||||||
|
|
||||||||||||||
| public class CipherSubscriber implements Subscriber<ByteBuffer> { | ||||||||||||||
| private final AtomicLong contentRead = new AtomicLong(0); | ||||||||||||||
| private final Subscriber<? super ByteBuffer> wrappedSubscriber; | ||||||||||||||
| private Cipher cipher; | ||||||||||||||
| private final Cipher cipher; | ||||||||||||||
| private final Long contentLength; | ||||||||||||||
| private boolean isLastPart; | ||||||||||||||
| private final boolean isLastPart; | ||||||||||||||
| private final int tagLength; | ||||||||||||||
| private final AtomicBoolean finalBytesCalled = new AtomicBoolean(false); | ||||||||||||||
|
|
||||||||||||||
| private byte[] outputBuffer; | ||||||||||||||
|
|
||||||||||||||
| CipherSubscriber(Subscriber<? super ByteBuffer> wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv, boolean isLastPart) { | ||||||||||||||
| this.wrappedSubscriber = wrappedSubscriber; | ||||||||||||||
| this.contentLength = contentLength; | ||||||||||||||
| cipher = materials.getCipher(iv); | ||||||||||||||
| this.cipher = materials.getCipher(iv); | ||||||||||||||
| this.isLastPart = isLastPart; | ||||||||||||||
| this.tagLength = materials.algorithmSuite().cipherTagLengthBytes(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| CipherSubscriber(Subscriber<? super ByteBuffer> wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv) { | ||||||||||||||
|
|
@@ -46,20 +50,48 @@ public void onNext(ByteBuffer byteBuffer) { | |||||||||||||
| if (amountToReadFromByteBuffer > 0) { | ||||||||||||||
| byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer); | ||||||||||||||
| outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer); | ||||||||||||||
|
|
||||||||||||||
| if (outputBuffer == null || outputBuffer.length == 0) { | ||||||||||||||
| // The underlying data is too short to fill in the block cipher. | ||||||||||||||
| // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case, | ||||||||||||||
| // in practice SunJCE and ACCP return an empty buffer instead, hence checks for | ||||||||||||||
| // null OR length == 0. | ||||||||||||||
| if (contentRead.get() == contentLength) { | ||||||||||||||
| if (contentRead.get() + tagLength >= contentLength) { | ||||||||||||||
| // All content has been read, so complete to get the final bytes | ||||||||||||||
| this.onComplete(); | ||||||||||||||
| finalBytes(); | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| // Otherwise, wait for more bytes. To avoid blocking, | ||||||||||||||
| // send an empty buffer to the wrapped subscriber. | ||||||||||||||
| wrappedSubscriber.onNext(ByteBuffer.allocate(0)); | ||||||||||||||
| } else { | ||||||||||||||
| wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); | ||||||||||||||
| /* | ||||||||||||||
| Check if stream has read all expected content. | ||||||||||||||
| Once all content has been read, call `finalBytes`. | ||||||||||||||
|
|
||||||||||||||
| This determines that all content has been read by checking if | ||||||||||||||
| the amount of data read so far plus the tag length is at least the content length. | ||||||||||||||
| Once this is true, downstream will never call `request` again | ||||||||||||||
| (beyond the current request that is being responded to in this onNext invocation.) | ||||||||||||||
| As a result, this class can only call `wrappedSubscriber.onNext` one more time. | ||||||||||||||
| (Reactive streams require that downstream sends a `request(n)` | ||||||||||||||
| to indicate it is ready for more data, and upstream responds to that request by calling `onNext`. | ||||||||||||||
| The `n` in request is the maximum number of `onNext` calls that downstream | ||||||||||||||
| will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.) | ||||||||||||||
| Since this class can only call `wrappedSubscriber.onNext` once, | ||||||||||||||
| it must send all remaining data in the next onNext call, | ||||||||||||||
| including the result of cipher.doFinal(), if applicable. | ||||||||||||||
| Calling `wrappedSubscriber.onNext` more than once for `request(1)` | ||||||||||||||
| violates the Reactive Streams specification and can cause exceptions downstream. | ||||||||||||||
| */ | ||||||||||||||
| if (contentRead.get() + tagLength >= contentLength) { | ||||||||||||||
| // All content has been read; complete the stream. | ||||||||||||||
| finalBytes(); | ||||||||||||||
| } else { | ||||||||||||||
| // Needs to read more data, so send the data downstream, | ||||||||||||||
| // expecting that downstream will continue to request more data. | ||||||||||||||
| wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } else { | ||||||||||||||
| // Do nothing | ||||||||||||||
|
|
@@ -91,21 +123,63 @@ public void onError(Throwable t) { | |||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void onComplete() { | ||||||||||||||
| // In rare cases, e.g. when the last part of a low-level MPU has 0 length, | ||||||||||||||
| // onComplete will be called before onNext is called once. | ||||||||||||||
| if (contentRead.get() + tagLength <= contentLength) { | ||||||||||||||
| finalBytes(); | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+128
to
+130
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this just be
Suggested change
Since if they're equal, then we know the finalBytes already happened? Maybe? |
||||||||||||||
| wrappedSubscriber.onComplete(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Finalize encryption, including calculating the auth tag for AES-GCM. | ||||||||||||||
| * As such this method MUST only be called once, which is enforced using | ||||||||||||||
| * `finalBytesCalled`. | ||||||||||||||
| */ | ||||||||||||||
| private void finalBytes() { | ||||||||||||||
| if (!finalBytesCalled.compareAndSet(false, true)) { | ||||||||||||||
| // already called, don't repeat | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // If this isn't the last part, skip doFinal and just send outputBuffer downstream. | ||||||||||||||
| // doFinal requires that all parts have been processed to compute the tag, | ||||||||||||||
| // so the tag will only be computed when the last part is processed. | ||||||||||||||
| if (!isLastPart) { | ||||||||||||||
| // If this isn't the last part, skip doFinal, we aren't done | ||||||||||||||
| wrappedSubscriber.onComplete(); | ||||||||||||||
| wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // If this is the last part, compute doFinal and include its result in the value sent downstream. | ||||||||||||||
| // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call. | ||||||||||||||
| byte[] finalBytes; | ||||||||||||||
| try { | ||||||||||||||
| outputBuffer = cipher.doFinal(); | ||||||||||||||
| // Send the final bytes to the wrapped subscriber | ||||||||||||||
| wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); | ||||||||||||||
| finalBytes = cipher.doFinal(); | ||||||||||||||
| } catch (final GeneralSecurityException exception) { | ||||||||||||||
| // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer | ||||||||||||||
| wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer)); | ||||||||||||||
| // Forward error, else the wrapped subscriber waits indefinitely | ||||||||||||||
| wrappedSubscriber.onError(exception); | ||||||||||||||
| throw new S3EncryptionClientSecurityException(exception.getMessage(), exception); | ||||||||||||||
| } | ||||||||||||||
| wrappedSubscriber.onComplete(); | ||||||||||||||
|
|
||||||||||||||
| // Combine the bytes from outputBuffer and finalBytes into one onNext call. | ||||||||||||||
| // Downstream has requested one item in its request method, so this class can only call onNext once. | ||||||||||||||
| // This single onNext call must contain both the bytes from outputBuffer and the tag. | ||||||||||||||
| byte[] combinedBytes; | ||||||||||||||
| if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) { | ||||||||||||||
| combinedBytes = new byte[outputBuffer.length + finalBytes.length]; | ||||||||||||||
| System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length); | ||||||||||||||
| System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length); | ||||||||||||||
| } else if (outputBuffer != null && outputBuffer.length > 0) { | ||||||||||||||
| combinedBytes = outputBuffer; | ||||||||||||||
| } else if (finalBytes != null && finalBytes.length > 0) { | ||||||||||||||
| combinedBytes = finalBytes; | ||||||||||||||
| } else { | ||||||||||||||
| combinedBytes = new byte[0]; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes)); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| } | ||||||||||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @kessplas
Is the lack of calling
onComplete()afterfinalBytes()deliberate? There seems to be a change in behaviour here. edit: this doesn't seem to be a problem, I've downloaded the src directly and confirmed that re-adding it makes no difference.FWIW, I'm looking into a failure in our software that has been introduced with
v3.3.3of this SDK, and it seems to relate to our Kotlin Flows not completing normally and instead "cancelling", which leads to our completion handlers not firing properly. My current working theory is that the missingonComplete()here is leading to the Kotlin cancellation, when the upload Flow has completed sending all ByteBuffers and shutdown, but without the subscriber having properly calledonComplete(). edit: this is not the case, and I'm still confusedAny idea what might be going on here? Are you sure that this change was safe?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @davidconnard ,
This change made the
CipherSubscribermore spec compliant asonNext()MUST be only called once for oneonNext()signal. The spec is not abundantly clear about the validity of signalingonComplete()from withinonNext()but intuitively, theonComplete()signal should come from the publisher once all data is published and flow through the subscribers accordingly; callingonComplete()from withinonNext()subverts this expectation.We don't specifically support Kotlin, as we don't test against the Kotlin SDK. That said, I would recommend opening an issue for this, as it's easier for us to track than comments on a specific PR. Thanks!
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kessplas for the response ... Yesterday, I didn't yet have enough information to raise an issue. I've done some further digging, and I've uncovered where the change in behaviour is coming from.
In the AWS SDK, in the
netty-nio-client, in theNettyRequestExecutor.StreamedRequestclass, in theonNext()method, there is a check forshouldContinuePublishing(), which checks if the content-length has been reached. When it has been reached, the subscription is cancelled, and anonComplete()call is made. See https://github.com/aws/aws-sdk-java-v2/blob/7009f86260a3d77f811c1dde31679d1297c1cc01/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java#L453-L456It is this subscription cancellation that is causing my test failure. Encountering this subscription cancellation behaviour is new (as far as my test is concerned). Before this change, the final content-length would only be reached (ie. final bytes sent) as a side-effect of the
onComplete()call, and the subscription cancellation would have no actual effect (ie. would not fire ononError()for the publisher). With your changes, the final content-length is reached in the terminalonNext()call, and the cancellation (triggered by the AWS SDK) fires at that point (prior to publisher completion), leading to anonError()call, which triggers different completion behaviour in the Kotlin Flow.So, this isn't a problem with your change, and apologies for that! Previously, the
CipherPublisherwas hiding this early cancellation behaviour of the AWS SDK from our flow, and your change has simply exposed our code to this behaviour in the underlying AWS SDK. If I remove the AWS S3 Encryption SDK from the picture, and apply our Flow transformer (with its completion handler) over the raw AWS S3 SDK, then I see the exact same behaviour - our completion handler is invoked (with an error) after cancellation of the subscription/flow).The solution for us appears to be to restructure our flow / publisher logic. For reference, when our requests to S3 are using the AWS S3 encryption SDK (which is not all the time), we are wrapping the byte stream publisher in another publisher, which performs additional verification of the plaintext (original bytes) checksum prior to sending to S3. To date, we had used
onNext()calls to accumulate the plaintext checksum, and we performed the verification in theonComplete()call. While that worked, on reflection, it does not seem to be the correct way to verify this, and it only worked because of theCipherSubscriberpreviously delayed sending the final bytes to S3 (and therefore, delayed the cancellation) until theonComplete()call (rather than having it fire from the terminalonNext()call).The right solution seems to be to change our code to perform the verification in the terminal
onNext()call (by tracking the byte count, like the AWS SDK is doing). This will verify the plaintext checksum before sending the final bytes to the wrapped subscriber (ie.CipherSubscriber) in the chainedonNext()call.