diff --git a/aws-kinesis/src/androidTest/java/com/amplifyframework/kinesis/KinesisDataStreamsInstrumentationTest.kt b/aws-kinesis/src/androidTest/java/com/amplifyframework/kinesis/KinesisDataStreamsInstrumentationTest.kt index 4dec484eeb..c9509782c0 100644 --- a/aws-kinesis/src/androidTest/java/com/amplifyframework/kinesis/KinesisDataStreamsInstrumentationTest.kt +++ b/aws-kinesis/src/androidTest/java/com/amplifyframework/kinesis/KinesisDataStreamsInstrumentationTest.kt @@ -24,12 +24,11 @@ import com.amplifyframework.core.configuration.AmplifyOutputs import com.amplifyframework.foundation.credentials.AwsCredentials import com.amplifyframework.foundation.credentials.AwsCredentialsProvider import com.amplifyframework.foundation.credentials.toAwsCredentialsProvider -import com.amplifyframework.foundation.result.Result +import com.amplifyframework.foundation.result.get import com.amplifyframework.kinesis.test.R import com.amplifyframework.recordcache.FlushStrategy import com.amplifyframework.testutils.Resources import com.amplifyframework.testutils.assertions.shouldBeFailure -import com.amplifyframework.testutils.assertions.shouldBeSuccess import com.amplifyframework.testutils.sync.SynchronousAuth import io.kotest.matchers.booleans.shouldBeFalse import io.kotest.matchers.booleans.shouldBeTrue @@ -129,22 +128,21 @@ class KinesisDataStreamsInstrumentationTest { /** Record data and flush — verify records were actually flushed. */ @Test fun testRecordAndFlush(): Unit = runBlocking { - val result = kinesis.record( + kinesis.record( data = "test-record".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) - result.shouldBeSuccess() + ).get() val flushResult = kinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBeGreaterThan 0 + flushResult.get().recordsFlushed shouldBeGreaterThan 0 } /** Flush with no cached records returns zero flushed. */ @Test fun testFlushWhenEmpty(): Unit = runBlocking { val flushResult = kinesis.flush() - val data = flushResult.shouldBeSuccess().data + val data = flushResult.get() data.recordsFlushed shouldBe 0 data.flushInProgress.shouldBeFalse() } @@ -154,17 +152,16 @@ class KinesisDataStreamsInstrumentationTest { fun testRecordWhileDisabledDropsRecords(): Unit = runBlocking { kinesis.disable() - val result = kinesis.record( + // record() returns success even when disabled (silently dropped) + kinesis.record( data = "dropped-record".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) - // record() returns success even when disabled (silently dropped) - result.shouldBeSuccess() + ).get() kinesis.enable() val flushResult = kinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe 0 + flushResult.get().recordsFlushed shouldBe 0 } /** Enable → record → disable → enable → flush verifies only pre-disable records flush. */ @@ -175,7 +172,7 @@ class KinesisDataStreamsInstrumentationTest { data = "before-disable".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() kinesis.disable() @@ -184,13 +181,13 @@ class KinesisDataStreamsInstrumentationTest { data = "while-disabled".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() kinesis.enable() val flushResult = kinesis.flush() // Only the pre-disable record should be flushed - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe 1 + flushResult.get().recordsFlushed shouldBe 1 } /** Two concurrent flushes — one should return flushInProgress = true. */ @@ -202,21 +199,17 @@ class KinesisDataStreamsInstrumentationTest { data = "record-$i".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() } val results = listOf( - async { kinesis.flush() }, - async { kinesis.flush() } + async { kinesis.flush().get() }, + async { kinesis.flush().get() } ).awaitAll() - val successResults = results.filter { it is Result.Success } - successResults.size shouldBe 2 - - val flushDatas = successResults.map { (it as Result.Success).data } // At least one should have done actual work, and one may report flushInProgress - val anyFlushed = flushDatas.any { it.recordsFlushed > 0 } - val anyInProgress = flushDatas.any { it.flushInProgress } + val anyFlushed = results.any { it.recordsFlushed > 0 } + val anyInProgress = results.any { it.flushInProgress } // Either both flushed (if first completed before second started) or one was skipped (anyFlushed || anyInProgress).shouldBeTrue() } @@ -247,7 +240,7 @@ class KinesisDataStreamsInstrumentationTest { data = bigData, partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() // This should exceed the 100-byte limit val result = smallCacheKinesis.record( @@ -270,13 +263,13 @@ class KinesisDataStreamsInstrumentationTest { data = "to-be-cleared".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() val clearResult = kinesis.clearCache() - clearResult.shouldBeSuccess().data.recordsCleared shouldBeGreaterThan 0 + clearResult.get().recordsCleared shouldBeGreaterThan 0 val flushResult = kinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe 0 + flushResult.get().recordsFlushed shouldBe 0 } // --------------------------------------------------------------- @@ -301,10 +294,7 @@ class KinesisDataStreamsInstrumentationTest { streamName = STREAM_NAME ) - val flushResult = kinesis.flush() - flushResult.shouldBeSuccess() - flushResult.data.recordsFlushed shouldBe 1 - + kinesis.flush().get().recordsFlushed shouldBe 1 kinesis.clearCache() } @@ -334,13 +324,10 @@ class KinesisDataStreamsInstrumentationTest { data = "bad-creds-record".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() // SDK exceptions are handled silently - flush returns Success - val flushResult = badKinesis.flush() - flushResult.shouldBeSuccess() - // Records are not flushed (SDK error), but the operation succeeds - flushResult.data.recordsFlushed shouldBe 0 + badKinesis.flush().get().recordsFlushed shouldBe 0 } finally { badKinesis.disable() badKinesis.clearCache() @@ -374,37 +361,37 @@ class KinesisDataStreamsInstrumentationTest { retryKinesis.enable() try { - retryKinesis.clearCache() + retryKinesis.clearCache().get() // Record to a nonexistent stream and a valid stream retryKinesis.record( data = "invalid-stream-record".toByteArray(), partitionKey = "partition-1", streamName = "nonexistent-stream-name" - ) + ).get() retryKinesis.record( data = "valid-stream-record".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() // First flush: valid record should be flushed, invalid stays val firstFlush = retryKinesis.flush() - firstFlush.shouldBeSuccess().data.recordsFlushed shouldBe 1 + firstFlush.get().recordsFlushed shouldBe 1 // Flush maxRetries more times (flushes 2–6) to exhaust retries on the invalid record repeat(maxRetries) { val result = retryKinesis.flush() - result.shouldBeSuccess().data.recordsFlushed shouldBe 0 + result.get().recordsFlushed shouldBe 0 } // Final flush: invalid record should have been evicted, nothing left val finalFlush = retryKinesis.flush() - finalFlush.shouldBeSuccess().data.recordsFlushed shouldBe 0 + finalFlush.get().recordsFlushed shouldBe 0 // Confirm cache is truly empty val clearResult = retryKinesis.clearCache() - clearResult.shouldBeSuccess().data.recordsCleared shouldBe 0 + clearResult.get().recordsCleared shouldBe 0 } finally { retryKinesis.disable() retryKinesis.clearCache() @@ -420,16 +407,14 @@ class KinesisDataStreamsInstrumentationTest { fun testHighVolumeRecordAndFlush(): Unit = runBlocking { val count = 50 repeat(count) { i -> - val result = kinesis.record( + kinesis.record( data = "stress-record-$i".toByteArray(), partitionKey = "partition-${i % 5}", streamName = STREAM_NAME - ) - result.shouldBeSuccess() + ).get() } - val flushResult = kinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe count + kinesis.flush().get().recordsFlushed shouldBe count } /** Record + flush in a loop — verify consistency across cycles. */ @@ -445,11 +430,9 @@ class KinesisDataStreamsInstrumentationTest { data = "cycle-$cycle-record-$i".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() } - val flushResult = kinesis.flush() - flushResult.shouldBeSuccess() - totalFlushed += flushResult.data.recordsFlushed + totalFlushed += kinesis.flush().get().recordsFlushed } totalFlushed shouldBe (cycles * recordsPerCycle) @@ -474,10 +457,8 @@ class KinesisDataStreamsInstrumentationTest { // Flusher: calls flush() every 500ms until all producers are done + one final drain val flusher = async { while (!producersDone.get()) { - val result = kinesis.flush() - if (result is Result.Success) { - totalFlushed.addAndGet(result.data.recordsFlushed) - } + val result = kinesis.flush().get() + totalFlushed.addAndGet(result.recordsFlushed) delay(500) } } @@ -490,7 +471,7 @@ class KinesisDataStreamsInstrumentationTest { data = "stress-p$p-r$i".toByteArray(), partitionKey = "partition-${p % 3}", streamName = STREAM_NAME - ) + ).get() } } } @@ -501,13 +482,10 @@ class KinesisDataStreamsInstrumentationTest { flusher.await() // Final drain flush to pick up anything the periodic flusher missed - val drainResult = kinesis.flush() - drainResult.shouldBeSuccess() - totalFlushed.addAndGet(drainResult.data.recordsFlushed) + totalFlushed.addAndGet(kinesis.flush().get().recordsFlushed) // Second drain to confirm nothing is left - val finalResult = kinesis.flush() - finalResult.shouldBeSuccess().data.recordsFlushed shouldBe 0 + kinesis.flush().get().recordsFlushed shouldBe 0 totalFlushed.get() shouldBe totalExpected } @@ -526,21 +504,18 @@ class KinesisDataStreamsInstrumentationTest { val recordCount = 1100 // Exceeds MAX_RECORDS_PER_STREAM (500) — requires 3 batches repeat(recordCount) { i -> - val result = kinesis.record( + kinesis.record( data = "batch-record-$i".toByteArray(), partitionKey = "partition-${i % 10}", streamName = STREAM_NAME - ) - result.shouldBeSuccess() + ).get() } // Single flush should drain all 1100 records across three batches (500 + 500 + 100) - val flushResult = kinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe recordCount + kinesis.flush().get().recordsFlushed shouldBe recordCount // Nothing left - val secondFlush = kinesis.flush() - secondFlush.shouldBeSuccess().data.recordsFlushed shouldBe 0 + kinesis.flush().get().recordsFlushed shouldBe 0 } // --------------------------------------------------------------- @@ -572,14 +547,13 @@ class KinesisDataStreamsInstrumentationTest { data = "auto-start-record".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() // Wait for auto-flush to trigger (3s interval + buffer) delay(6_000) // After auto-flush, a manual flush should find nothing left - val flushResult = defaultKinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe 0 + defaultKinesis.flush().get().recordsFlushed shouldBe 0 } finally { defaultKinesis.disable() defaultKinesis.clearCache() @@ -606,14 +580,13 @@ class KinesisDataStreamsInstrumentationTest { data = "auto-flush-record".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) + ).get() // Wait for auto-flush to trigger (5s interval + buffer) delay(8_000) // After auto-flush, a manual flush should find nothing left - val flushResult = autoFlushKinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe 0 + autoFlushKinesis.flush().get().recordsFlushed shouldBe 0 } finally { autoFlushKinesis.disable() autoFlushKinesis.clearCache() @@ -647,16 +620,14 @@ class KinesisDataStreamsInstrumentationTest { utf8ByteCount shouldBe 1024 // 256 emojis × 4 bytes each // Record with the emoji partition key - val result = kinesis.record( + kinesis.record( data = "test-data-with-emoji-partition-key".toByteArray(), partitionKey = emojiPartitionKey, streamName = STREAM_NAME - ) - result.shouldBeSuccess() + ).get() // Flush and verify the record was sent successfully - val flushResult = kinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe 1 + kinesis.flush().get().recordsFlushed shouldBe 1 } // --------------------------------------------------------------- @@ -694,23 +665,18 @@ class KinesisDataStreamsInstrumentationTest { repeat(recordCount) { i -> val partitionKey = "k".repeat(200) + "-$i" val data = ByteArray(recordDataSize) { (i % 256).toByte() } - val result = largeKinesis.record( + largeKinesis.record( data = data, partitionKey = partitionKey, streamName = STREAM_NAME - ) - result.shouldBeSuccess() + ).get() } // Flush sends all records across multiple batches (up to 10 MiB per batch) - val flush1 = largeKinesis.flush() - flush1.shouldBeSuccess() - val flushed1 = flush1.data.recordsFlushed - flushed1 shouldBe recordCount + largeKinesis.flush().get().recordsFlushed shouldBe recordCount // Second flush: nothing left - val flush2 = largeKinesis.flush() - flush2.shouldBeSuccess().data.recordsFlushed shouldBe 0 + largeKinesis.flush().get().recordsFlushed shouldBe 0 } finally { largeKinesis.disable() largeKinesis.clearCache() @@ -739,15 +705,13 @@ class KinesisDataStreamsInstrumentationTest { .shouldBeInstanceOf() // Now record a valid small record and flush — client should still work - val validResult = kinesis.record( + kinesis.record( data = "still-works".toByteArray(), partitionKey = "partition-1", streamName = STREAM_NAME - ) - validResult.shouldBeSuccess() + ).get() - val flushResult = kinesis.flush() - flushResult.shouldBeSuccess().data.recordsFlushed shouldBe 1 + kinesis.flush().get().recordsFlushed shouldBe 1 } // --------------------------------------------------------------- diff --git a/foundation/src/commonMain/kotlin/com/amplifyframework/foundation/result/Get.kt b/foundation/src/commonMain/kotlin/com/amplifyframework/foundation/result/Get.kt index ab7c94cde9..d111e2c484 100644 --- a/foundation/src/commonMain/kotlin/com/amplifyframework/foundation/result/Get.kt +++ b/foundation/src/commonMain/kotlin/com/amplifyframework/foundation/result/Get.kt @@ -32,6 +32,12 @@ fun Result.getOrThrow(): T { } } +/** + * Returns the Success data or throws the Failure error. This is an alias of getOrThrow + */ +@InternalAmplifyApi +fun Result.get() = getOrThrow() + /** * Returns the Success data or null in the case of Failure */ @@ -43,3 +49,15 @@ fun Result.getOrNull(): T? { } return if (this is Result.Success) data else null } + +/** + * Returns the Failure data or null in the case of Success + */ +@InternalAmplifyApi +fun Result<*, E>.errorOrNull(): E? { + contract { + returnsNotNull() implies (this@errorOrNull is Result.Failure) + returns(null) implies (this@errorOrNull is Result.Success) + } + return if (this is Result.Failure) error else null +} diff --git a/testutils/src/main/java/com/amplifyframework/testutils/assertions/ResultAssertions.kt b/testutils/src/main/java/com/amplifyframework/testutils/assertions/ResultAssertions.kt index 7a59d65c8d..f3c77d8658 100644 --- a/testutils/src/main/java/com/amplifyframework/testutils/assertions/ResultAssertions.kt +++ b/testutils/src/main/java/com/amplifyframework/testutils/assertions/ResultAssertions.kt @@ -18,6 +18,8 @@ package com.amplifyframework.testutils.assertions import com.amplifyframework.foundation.result.Result +import com.amplifyframework.foundation.result.errorOrNull +import com.amplifyframework.foundation.result.getOrNull import io.kotest.matchers.Matcher import io.kotest.matchers.MatcherResult import io.kotest.matchers.should @@ -28,16 +30,16 @@ import kotlin.contracts.contract private fun beFailure() = Matcher> { value -> MatcherResult( value is Result.Failure, - { "result expected to be failure but was success" }, - { "result expected to not be failure but was failure" } + { "result expected to be failure but was success with data: ${value.getOrNull()}" }, + { "result expected to not be failure but was failure with error: ${value.errorOrNull()}" } ) } private fun beSuccess() = Matcher> { value -> MatcherResult( value is Result.Success, - { "result expected to be success but was failure" }, - { "result expected to not be success but was success" } + { "result expected to be success but was failure with error: ${value.errorOrNull()}" }, + { "result expected to not be success but was success with data: ${value.getOrNull()}" } ) }