diff --git a/.github/workflows/amplify_firehose_dart.yaml b/.github/workflows/amplify_firehose_dart.yaml new file mode 100644 index 00000000000..74614db994a --- /dev/null +++ b/.github/workflows/amplify_firehose_dart.yaml @@ -0,0 +1,83 @@ +# Generated with aft. To update, run: `aft generate workflows` +name: amplify_firehose_dart +on: + push: + branches: + - main + - stable + paths: + - '.github/workflows/amplify_firehose_dart.yaml' + - '.github/workflows/dart_vm.yaml' + - 'packages/amplify_core/lib/**/*.dart' + - 'packages/amplify_core/pubspec.yaml' + - 'packages/amplify_foundation/amplify_foundation_dart/lib/**/*.dart' + - 'packages/amplify_foundation/amplify_foundation_dart/pubspec.yaml' + - 'packages/amplify_foundation/amplify_foundation_dart_bridge/lib/**/*.dart' + - 'packages/amplify_foundation/amplify_foundation_dart_bridge/pubspec.yaml' + - 'packages/amplify_lints/lib/**/*.yaml' + - 'packages/amplify_lints/pubspec.yaml' + - 'packages/aws_common/lib/**/*.dart' + - 'packages/aws_common/pubspec.yaml' + - 'packages/aws_signature_v4/lib/**/*.dart' + - 'packages/aws_signature_v4/pubspec.yaml' + - 'packages/common/amplify_db_common_dart/lib/**/*.dart' + - 'packages/common/amplify_db_common_dart/pubspec.yaml' + - 'packages/kinesis/amplify_firehose_dart/**/*.dart' + - 'packages/kinesis/amplify_firehose_dart/**/*.yaml' + - 'packages/kinesis/amplify_firehose_dart/lib/**/*' + - 'packages/kinesis/amplify_firehose_dart/test/**/*' + - 'packages/smithy/smithy/lib/**/*.dart' + - 'packages/smithy/smithy/pubspec.yaml' + - 'packages/smithy/smithy_aws/lib/**/*.dart' + - 'packages/smithy/smithy_aws/pubspec.yaml' + pull_request: + paths: + - '.github/workflows/amplify_firehose_dart.yaml' + - '.github/workflows/dart_vm.yaml' + - 'packages/amplify_core/lib/**/*.dart' + - 'packages/amplify_core/pubspec.yaml' + - 'packages/amplify_foundation/amplify_foundation_dart/lib/**/*.dart' + - 'packages/amplify_foundation/amplify_foundation_dart/pubspec.yaml' + - 'packages/amplify_foundation/amplify_foundation_dart_bridge/lib/**/*.dart' + - 'packages/amplify_foundation/amplify_foundation_dart_bridge/pubspec.yaml' + - 'packages/amplify_lints/lib/**/*.yaml' + - 'packages/amplify_lints/pubspec.yaml' + - 'packages/aws_common/lib/**/*.dart' + - 'packages/aws_common/pubspec.yaml' + - 'packages/aws_signature_v4/lib/**/*.dart' + - 'packages/aws_signature_v4/pubspec.yaml' + - 'packages/common/amplify_db_common_dart/lib/**/*.dart' + - 'packages/common/amplify_db_common_dart/pubspec.yaml' + - 'packages/kinesis/amplify_firehose_dart/**/*.dart' + - 'packages/kinesis/amplify_firehose_dart/**/*.yaml' + - 'packages/kinesis/amplify_firehose_dart/lib/**/*' + - 'packages/kinesis/amplify_firehose_dart/test/**/*' + - 'packages/smithy/smithy/lib/**/*.dart' + - 'packages/smithy/smithy/pubspec.yaml' + - 'packages/smithy/smithy_aws/lib/**/*.dart' + - 'packages/smithy/smithy_aws/pubspec.yaml' + schedule: + - cron: "0 13 * * 1" # Every Monday at 06:00 PST + workflow_dispatch: +defaults: + run: + shell: bash + +# These permissions are needed to interact with GitHub's OIDC Token endpoint. +permissions: + id-token: write + contents: read + +# Cancels in-progress job when there is another push to same ref. +# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-only-cancel-in-progress-jobs-or-runs-for-the-current-workflow +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + uses: ./.github/workflows/dart_vm.yaml + secrets: inherit + with: + package-name: amplify_firehose_dart + working-directory: packages/kinesis/amplify_firehose_dart diff --git a/.github/workflows/amplify_kinesis.yaml b/.github/workflows/amplify_kinesis.yaml index 3c71abc57cb..d27eb6e2b14 100644 --- a/.github/workflows/amplify_kinesis.yaml +++ b/.github/workflows/amplify_kinesis.yaml @@ -28,6 +28,8 @@ on: - 'packages/kinesis/amplify_kinesis/test/**/*' - 'packages/kinesis/amplify_kinesis_dart/lib/**/*.dart' - 'packages/kinesis/amplify_kinesis_dart/pubspec.yaml' + - 'packages/kinesis/amplify_record_cache_dart/lib/**/*.dart' + - 'packages/kinesis/amplify_record_cache_dart/pubspec.yaml' - 'packages/smithy/smithy/lib/**/*.dart' - 'packages/smithy/smithy/pubspec.yaml' - 'packages/smithy/smithy_aws/lib/**/*.dart' @@ -56,6 +58,8 @@ on: - 'packages/kinesis/amplify_kinesis/test/**/*' - 'packages/kinesis/amplify_kinesis_dart/lib/**/*.dart' - 'packages/kinesis/amplify_kinesis_dart/pubspec.yaml' + - 'packages/kinesis/amplify_record_cache_dart/lib/**/*.dart' + - 'packages/kinesis/amplify_record_cache_dart/pubspec.yaml' - 'packages/smithy/smithy/lib/**/*.dart' - 'packages/smithy/smithy/pubspec.yaml' - 'packages/smithy/smithy_aws/lib/**/*.dart' diff --git a/.github/workflows/amplify_kinesis_dart.yaml b/.github/workflows/amplify_kinesis_dart.yaml index baaa6e452e2..bb65205e7ad 100644 --- a/.github/workflows/amplify_kinesis_dart.yaml +++ b/.github/workflows/amplify_kinesis_dart.yaml @@ -27,6 +27,8 @@ on: - 'packages/kinesis/amplify_kinesis_dart/**/*.yaml' - 'packages/kinesis/amplify_kinesis_dart/lib/**/*' - 'packages/kinesis/amplify_kinesis_dart/test/**/*' + - 'packages/kinesis/amplify_record_cache_dart/lib/**/*.dart' + - 'packages/kinesis/amplify_record_cache_dart/pubspec.yaml' - 'packages/smithy/smithy/lib/**/*.dart' - 'packages/smithy/smithy/pubspec.yaml' - 'packages/smithy/smithy_aws/lib/**/*.dart' @@ -54,6 +56,8 @@ on: - 'packages/kinesis/amplify_kinesis_dart/**/*.yaml' - 'packages/kinesis/amplify_kinesis_dart/lib/**/*' - 'packages/kinesis/amplify_kinesis_dart/test/**/*' + - 'packages/kinesis/amplify_record_cache_dart/lib/**/*.dart' + - 'packages/kinesis/amplify_record_cache_dart/pubspec.yaml' - 'packages/smithy/smithy/lib/**/*.dart' - 'packages/smithy/smithy/pubspec.yaml' - 'packages/smithy/smithy_aws/lib/**/*.dart' diff --git a/.github/workflows/amplify_kinesis_example.yaml b/.github/workflows/amplify_kinesis_example.yaml index 160ac0b1731..54da2af128c 100644 --- a/.github/workflows/amplify_kinesis_example.yaml +++ b/.github/workflows/amplify_kinesis_example.yaml @@ -57,6 +57,8 @@ on: - 'packages/kinesis/amplify_kinesis/pubspec.yaml' - 'packages/kinesis/amplify_kinesis_dart/lib/**/*.dart' - 'packages/kinesis/amplify_kinesis_dart/pubspec.yaml' + - 'packages/kinesis/amplify_record_cache_dart/lib/**/*.dart' + - 'packages/kinesis/amplify_record_cache_dart/pubspec.yaml' - 'packages/secure_storage/amplify_secure_storage/android/**/*' - 'packages/secure_storage/amplify_secure_storage/ios/**/*' - 'packages/secure_storage/amplify_secure_storage/lib/**/*.dart' @@ -127,6 +129,8 @@ on: - 'packages/kinesis/amplify_kinesis/pubspec.yaml' - 'packages/kinesis/amplify_kinesis_dart/lib/**/*.dart' - 'packages/kinesis/amplify_kinesis_dart/pubspec.yaml' + - 'packages/kinesis/amplify_record_cache_dart/lib/**/*.dart' + - 'packages/kinesis/amplify_record_cache_dart/pubspec.yaml' - 'packages/secure_storage/amplify_secure_storage/android/**/*' - 'packages/secure_storage/amplify_secure_storage/ios/**/*' - 'packages/secure_storage/amplify_secure_storage/lib/**/*.dart' diff --git a/.github/workflows/amplify_record_cache_dart.yaml b/.github/workflows/amplify_record_cache_dart.yaml new file mode 100644 index 00000000000..b60dcf6b9a8 --- /dev/null +++ b/.github/workflows/amplify_record_cache_dart.yaml @@ -0,0 +1,75 @@ +# Generated with aft. To update, run: `aft generate workflows` +name: amplify_record_cache_dart +on: + push: + branches: + - main + - stable + paths: + - '.github/workflows/amplify_record_cache_dart.yaml' + - '.github/workflows/dart_vm.yaml' + - 'packages/amplify_core/lib/**/*.dart' + - 'packages/amplify_core/pubspec.yaml' + - 'packages/amplify_foundation/amplify_foundation_dart/lib/**/*.dart' + - 'packages/amplify_foundation/amplify_foundation_dart/pubspec.yaml' + - 'packages/amplify_lints/lib/**/*.yaml' + - 'packages/amplify_lints/pubspec.yaml' + - 'packages/aws_common/lib/**/*.dart' + - 'packages/aws_common/pubspec.yaml' + - 'packages/aws_signature_v4/lib/**/*.dart' + - 'packages/aws_signature_v4/pubspec.yaml' + - 'packages/common/amplify_db_common_dart/lib/**/*.dart' + - 'packages/common/amplify_db_common_dart/pubspec.yaml' + - 'packages/kinesis/amplify_record_cache_dart/**/*.dart' + - 'packages/kinesis/amplify_record_cache_dart/**/*.yaml' + - 'packages/kinesis/amplify_record_cache_dart/lib/**/*' + - 'packages/kinesis/amplify_record_cache_dart/test/**/*' + - 'packages/smithy/smithy/lib/**/*.dart' + - 'packages/smithy/smithy/pubspec.yaml' + pull_request: + paths: + - '.github/workflows/amplify_record_cache_dart.yaml' + - '.github/workflows/dart_vm.yaml' + - 'packages/amplify_core/lib/**/*.dart' + - 'packages/amplify_core/pubspec.yaml' + - 'packages/amplify_foundation/amplify_foundation_dart/lib/**/*.dart' + - 'packages/amplify_foundation/amplify_foundation_dart/pubspec.yaml' + - 'packages/amplify_lints/lib/**/*.yaml' + - 'packages/amplify_lints/pubspec.yaml' + - 'packages/aws_common/lib/**/*.dart' + - 'packages/aws_common/pubspec.yaml' + - 'packages/aws_signature_v4/lib/**/*.dart' + - 'packages/aws_signature_v4/pubspec.yaml' + - 'packages/common/amplify_db_common_dart/lib/**/*.dart' + - 'packages/common/amplify_db_common_dart/pubspec.yaml' + - 'packages/kinesis/amplify_record_cache_dart/**/*.dart' + - 'packages/kinesis/amplify_record_cache_dart/**/*.yaml' + - 'packages/kinesis/amplify_record_cache_dart/lib/**/*' + - 'packages/kinesis/amplify_record_cache_dart/test/**/*' + - 'packages/smithy/smithy/lib/**/*.dart' + - 'packages/smithy/smithy/pubspec.yaml' + schedule: + - cron: "0 13 * * 1" # Every Monday at 06:00 PST + workflow_dispatch: +defaults: + run: + shell: bash + +# These permissions are needed to interact with GitHub's OIDC Token endpoint. +permissions: + id-token: write + contents: read + +# Cancels in-progress job when there is another push to same ref. +# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-only-cancel-in-progress-jobs-or-runs-for-the-current-workflow +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + uses: ./.github/workflows/dart_vm.yaml + secrets: inherit + with: + package-name: amplify_record_cache_dart + working-directory: packages/kinesis/amplify_record_cache_dart diff --git a/packages/kinesis/amplify_kinesis_dart/lib/amplify_kinesis_dart.dart b/packages/kinesis/amplify_kinesis_dart/lib/amplify_kinesis_dart.dart index 761db258e44..ced24275aa0 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/amplify_kinesis_dart.dart +++ b/packages/kinesis/amplify_kinesis_dart/lib/amplify_kinesis_dart.dart @@ -4,18 +4,22 @@ /// Amplify Kinesis Data Streams client for Dart. library; +// Re-export shared types used in the public API +export 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart' + show + FlushStrategy, + FlushInterval, + FlushNone, + FlushData, + RecordData, + ClearCacheData; + // Main client export 'src/amplify_kinesis_client.dart'; // Options export 'src/amplify_kinesis_client_options.dart'; // Exceptions export 'src/exception/amplify_kinesis_exception.dart'; -// Flush strategies -export 'src/flush_strategy/flush_strategy.dart'; -// Return types -export 'src/model/clear_cache_data.dart'; -export 'src/model/flush_data.dart'; -export 'src/model/record_data.dart'; // SDK client (for escape hatch) export 'src/sdk/kinesis.dart' show diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client.dart b/packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client.dart index 638edab1483..b9ddd5ae924 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client.dart +++ b/packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client.dart @@ -12,17 +12,12 @@ import 'package:amplify_foundation_dart/amplify_foundation_dart.dart' import 'package:amplify_foundation_dart_bridge/amplify_foundation_dart_bridge.dart'; import 'package:amplify_kinesis_dart/src/amplify_kinesis_client_options.dart'; import 'package:amplify_kinesis_dart/src/exception/amplify_kinesis_exception.dart'; -import 'package:amplify_kinesis_dart/src/flush_strategy/flush_strategy.dart'; -import 'package:amplify_kinesis_dart/src/impl/auto_flush_scheduler.dart'; import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; import 'package:amplify_kinesis_dart/src/impl/kinesis_sender.dart'; -import 'package:amplify_kinesis_dart/src/impl/record_client.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/platform/record_storage_platform.dart'; -import 'package:amplify_kinesis_dart/src/model/clear_cache_data.dart'; -import 'package:amplify_kinesis_dart/src/model/flush_data.dart'; -import 'package:amplify_kinesis_dart/src/model/record_data.dart'; +import 'package:amplify_kinesis_dart/src/kinesis_limits.dart' as limits; import 'package:amplify_kinesis_dart/src/sdk/kinesis.dart'; import 'package:amplify_kinesis_dart/src/version.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:smithy/smithy.dart' show WithUserAgent; /// User agent component identifying this library. @@ -114,6 +109,11 @@ class AmplifyKinesisClient { identifier: region, storagePath: storagePath, maxCacheBytes: opts.cacheMaxBytes, + maxRecordsPerBatch: limits.maxRecordsPerStream, + maxBytesPerBatch: limits.maxPutRecordsSizeBytes, + maxRecordSizeBytes: limits.maxRecordSizeBytes, + dbPrefix: 'kinesis_records', + storeName: 'kinesis_records', ); final kinesisClient = KinesisClient( @@ -208,8 +208,21 @@ class AmplifyKinesisClient { _logger.debug('Record collection is disabled, dropping record'); return const Result.ok(RecordData()); } + // KDS-specific partition key validation + final codePoints = partitionKey.runes.length; + if (codePoints == 0 || codePoints > limits.maxPartitionKeyLength) { + return Result.error( + KinesisValidationException( + 'Partition key length ($codePoints) is outside the allowed ' + 'range of 1-${limits.maxPartitionKeyLength} characters.', + recoverySuggestion: + 'Use a partition key between 1 and ' + '${limits.maxPartitionKeyLength} characters.', + ), + ); + } _logger.verbose('Recording to stream: $streamName'); - final kinesisRecord = RecordInput.now( + final kinesisRecord = createKinesisRecordInputNow( data: data, partitionKey: partitionKey, streamName: streamName, diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client_options.dart b/packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client_options.dart index c2b896be47f..82b66d15045 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client_options.dart +++ b/packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client_options.dart @@ -3,7 +3,7 @@ import 'package:amplify_kinesis_dart/src/amplify_kinesis_client.dart' show AmplifyKinesisClient; -import 'package:amplify_kinesis_dart/src/flush_strategy/flush_strategy.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; /// {@template amplify_kinesis.amplify_kinesis_client_options} /// Configuration options for [AmplifyKinesisClient]. diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/exception/amplify_kinesis_exception.dart b/packages/kinesis/amplify_kinesis_dart/lib/src/exception/amplify_kinesis_exception.dart index 815649bca50..43192b0019d 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/exception/amplify_kinesis_exception.dart +++ b/packages/kinesis/amplify_kinesis_dart/lib/src/exception/amplify_kinesis_exception.dart @@ -2,11 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import 'package:amplify_core/amplify_core.dart'; -import 'package:amplify_kinesis_dart/src/exception/record_cache_exception.dart'; - -/// Default recovery suggestion for errors. -const String defaultRecoverySuggestion = - 'Inspect the underlying error for more details.'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; /// {@template amplify_kinesis.amplify_kinesis_exception} /// Base exception for Amplify Kinesis Data Streams errors. diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_record.dart b/packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_record.dart index e1bb025c2fe..91a560ec50e 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_record.dart +++ b/packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_record.dart @@ -4,46 +4,39 @@ import 'dart:convert'; import 'dart:typed_data'; -/// Internal representation of a record to be sent to Kinesis. -final class RecordInput { - /// Creates a new Kinesis record. - RecordInput({ - required this.data, - required this.partitionKey, - required this.streamName, - required this.createdAt, - }) : dataSize = data.length + utf8.encode(partitionKey).length; - - /// Creates a Kinesis record with the current timestamp. - factory RecordInput.now({ - required Uint8List data, - required String partitionKey, - required String streamName, - }) { - return RecordInput( - data: data, - partitionKey: partitionKey, - streamName: streamName, - createdAt: DateTime.now(), - ); - } - - /// The data blob to send to Kinesis. - final Uint8List data; - - /// The partition key for the record. - final String partitionKey; - - /// The name of the Kinesis Data Stream. - final String streamName; - - /// The size of the record in bytes (data blob + partition key). - /// - /// Per AWS docs, the record size limit applies to the total size of the - /// partition key and data blob combined. Computed once at construction - /// to avoid repeated UTF-8 encoding of the partition key. - final int dataSize; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; + +/// Creates a [RecordInput] for Kinesis Data Streams. +/// +/// Unlike the generic `RecordInput`, this factory computes `dataSize` +/// as `data.length + utf8.encode(partitionKey).length` per the KDS +/// PutRecords API spec. +RecordInput createKinesisRecordInput({ + required Uint8List data, + required String partitionKey, + required String streamName, + required DateTime createdAt, +}) { + return RecordInput( + data: data, + streamName: streamName, + partitionKey: partitionKey, + dataSize: data.length + utf8.encode(partitionKey).length, + createdAt: createdAt, + ); +} - /// Timestamp of when the record was created. - final DateTime createdAt; +/// Creates a [RecordInput] for Kinesis Data Streams with the current +/// timestamp. +RecordInput createKinesisRecordInputNow({ + required Uint8List data, + required String partitionKey, + required String streamName, +}) { + return createKinesisRecordInput( + data: data, + partitionKey: partitionKey, + streamName: streamName, + createdAt: DateTime.now(), + ); } diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_sender.dart b/packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_sender.dart index a33eefb3fa1..7d77636e9d6 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_sender.dart +++ b/packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_sender.dart @@ -1,34 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import 'package:amplify_kinesis_dart/src/model/record.dart'; import 'package:amplify_kinesis_dart/src/sdk/kinesis.dart'; - -/// Result of a PutRecords operation. -/// -/// Records are categorized into three buckets: -/// - [successfulIds]: records that were accepted by Kinesis. -/// - [retryableIds]: records that failed with any error code but have not -/// yet exceeded the retry limit. These will be retried in the next flush. -/// - [failedIds]: records that have exceeded the retry limit and should be -/// deleted from the cache. -final class PutRecordsResult { - /// Creates a new [PutRecordsResult]. - const PutRecordsResult({ - required this.successfulIds, - required this.retryableIds, - required this.failedIds, - }); - - /// IDs of records that were successfully sent. - final List successfulIds; - - /// IDs of records that failed but can be retried (retry count < max). - final List retryableIds; - - /// IDs of records that exceeded the retry limit and should be deleted. - final List failedIds; -} +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; /// {@template amplify_kinesis.kinesis_sender} /// Handles communication with AWS Kinesis Data Streams. @@ -37,7 +11,7 @@ final class PutRecordsResult { /// categorization so that all error codes are treated as retryable /// until the record exceeds `maxRetries`. /// {@endtemplate} -class KinesisSender { +class KinesisSender implements Sender { /// {@macro amplify_kinesis.kinesis_sender} KinesisSender({required KinesisClient kinesisClient, required int maxRetries}) : _kinesisClient = kinesisClient, @@ -46,18 +20,13 @@ class KinesisSender { final KinesisClient _kinesisClient; final int _maxRetries; - /// Sends records to a Kinesis stream and categorizes the response. - /// - /// Each record in the response is categorized as: - /// - successful: no error code - /// - failed: has an error code AND retry count >= [_maxRetries] - /// - retryable: has an error code AND retry count < [_maxRetries] - Future putRecords({ + @override + Future sendBatch({ required String streamName, required List records, }) async { if (records.isEmpty) { - return const PutRecordsResult( + return const SendResult( successfulIds: [], retryableIds: [], failedIds: [], @@ -68,7 +37,7 @@ class KinesisSender { .map( (record) => PutRecordsRequestEntry( data: record.data, - partitionKey: record.partitionKey, + partitionKey: record.partitionKey!, ), ) .toList(); @@ -84,10 +53,7 @@ class KinesisSender { /// Splits the PutRecords response into successful, retryable, and failed /// record IDs based on error codes and retry counts. - PutRecordsResult _splitResponse( - PutRecordsResponse response, - List records, - ) { + SendResult _splitResponse(PutRecordsResponse response, List records) { final successfulIds = []; final retryableIds = []; final failedIds = []; @@ -111,7 +77,7 @@ class KinesisSender { } } - return PutRecordsResult( + return SendResult( successfulIds: successfulIds, retryableIds: retryableIds, failedIds: failedIds, diff --git a/packages/kinesis/amplify_kinesis_dart/pubspec.yaml b/packages/kinesis/amplify_kinesis_dart/pubspec.yaml index 69e6378bc7e..550419bd08c 100644 --- a/packages/kinesis/amplify_kinesis_dart/pubspec.yaml +++ b/packages/kinesis/amplify_kinesis_dart/pubspec.yaml @@ -17,26 +17,23 @@ environment: dependencies: amplify_core: ">=2.10.0 <2.11.0" - amplify_db_common_dart: ">=0.4.17 <0.5.0" amplify_foundation_dart: ">=2.11.0 <2.12.0" amplify_foundation_dart_bridge: ">=2.11.0 <2.12.0" + amplify_record_cache_dart: ">=0.1.0 <0.2.0" aws_common: ">=0.7.12 <0.8.0" aws_signature_v4: ">=0.6.10 <0.7.0" built_collection: ^5.1.1 built_value: ^8.10.1 - drift: ^2.25.0 meta: ^1.16.0 smithy: ">=0.7.10 <0.8.0" smithy_aws: ">=0.7.10 <0.8.0" - synchronized: ^3.3.0 - web: ^1.1.1 dev_dependencies: amplify_lints: ">=3.1.4 <3.2.0" - build_runner: ^2.4.15 - build_version: ^2.1.1 - built_value_generator: ^8.10.1 - drift_dev: ^2.25.1 + # drift is a dev dependency because it's only needed for tests + # (NativeDatabase.memory()). Runtime drift comes transitively via + # amplify_record_cache_dart. + drift: ^2.25.0 fake_async: ^1.3.0 mocktail: ^1.0.0 test: ^1.22.1 diff --git a/packages/kinesis/amplify_kinesis_dart/test/amplify_kinesis_exception_test.dart b/packages/kinesis/amplify_kinesis_dart/test/amplify_kinesis_exception_test.dart index 61f9de630b1..5a7106f8436 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/amplify_kinesis_exception_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/amplify_kinesis_exception_test.dart @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import 'package:amplify_kinesis_dart/src/exception/amplify_kinesis_exception.dart'; -import 'package:amplify_kinesis_dart/src/exception/record_cache_exception.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:test/test.dart'; void main() { @@ -16,7 +16,7 @@ void main() { test( 'converts RecordCacheValidationException to KinesisValidationException', () { - final cause = RecordCacheValidationException('bad input', 'fix it'); + const cause = RecordCacheValidationException('bad input', 'fix it'); final result = AmplifyKinesisException.from(cause); expect(result, isA()); expect(result.message, 'bad input'); @@ -44,7 +44,7 @@ void main() { test( 'converts RecordCacheLimitExceededException to KinesisLimitExceededException', () { - final cause = RecordCacheLimitExceededException( + const cause = RecordCacheLimitExceededException( 'cache full', 'flush first', ); diff --git a/packages/kinesis/amplify_kinesis_dart/test/auto_flush_scheduler_test.dart b/packages/kinesis/amplify_kinesis_dart/test/auto_flush_scheduler_test.dart index bf574220d71..c8e1591dfa6 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/auto_flush_scheduler_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/auto_flush_scheduler_test.dart @@ -1,9 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import 'package:amplify_kinesis_dart/src/impl/auto_flush_scheduler.dart'; -import 'package:amplify_kinesis_dart/src/impl/record_client.dart'; -import 'package:amplify_kinesis_dart/src/model/flush_data.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:fake_async/fake_async.dart'; import 'package:mocktail/mocktail.dart'; import 'package:test/test.dart'; diff --git a/packages/kinesis/amplify_kinesis_dart/test/common/mocktail_mocks.dart b/packages/kinesis/amplify_kinesis_dart/test/common/mocktail_mocks.dart index f77e0098ecf..58a4526d99e 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/common/mocktail_mocks.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/common/mocktail_mocks.dart @@ -9,8 +9,8 @@ import 'dart:async'; import 'package:amplify_foundation_dart/amplify_foundation_dart.dart' as foundation; import 'package:amplify_kinesis_dart/src/impl/kinesis_sender.dart'; -import 'package:amplify_kinesis_dart/src/impl/record_client.dart'; import 'package:amplify_kinesis_dart/src/sdk/src/kinesis/kinesis_client.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:aws_common/aws_common.dart'; import 'package:mocktail/mocktail.dart'; import 'package:smithy/smithy.dart'; @@ -20,13 +20,6 @@ import 'package:smithy/smithy.dart'; // ============================================================================= /// Creates a mock [SmithyOperation] that returns the result of [fn]. -/// -/// Use this helper to mock SDK client method returns: -/// ```dart -/// when(() => mockClient.putRecords(any())).thenReturn( -/// mockSmithyOperation(() => PutRecordsResponse(...)), -/// ); -/// ``` SmithyOperation mockSmithyOperation(FutureOr Function() fn) => SmithyOperation( CancelableOperation.fromFuture(Future.value(fn())), @@ -39,13 +32,6 @@ SmithyOperation mockSmithyOperation(FutureOr Function() fn) => class MockKinesisClient extends Mock implements KinesisClient {} /// Mock implementation of [SmithyOperation]. -/// -/// Use when you need to throw exceptions from SDK operations: -/// ```dart -/// final mockOperation = MockSmithyOperation(); -/// when(() => mockOperation.result).thenThrow(SomeException()); -/// when(() => mockClient.putRecords(any())).thenReturn(mockOperation); -/// ``` class MockSmithyOperation extends Mock implements SmithyOperation {} /// Mock implementation of [AWSHttpException]. diff --git a/packages/kinesis/amplify_kinesis_dart/test/helpers/test_database.dart b/packages/kinesis/amplify_kinesis_dart/test/helpers/test_database.dart index ebde2530ee1..a334ecb2a4c 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/helpers/test_database.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/helpers/test_database.dart @@ -1,10 +1,10 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import 'package:amplify_kinesis_dart/src/db/kinesis_record_database.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:drift/native.dart'; /// Creates an in-memory database for testing. -KinesisRecordDatabase createTestDatabase() { - return KinesisRecordDatabase.forTesting(NativeDatabase.memory()); +RecordCacheDatabase createTestDatabase() { + return RecordCacheDatabase.forTesting(NativeDatabase.memory()); } diff --git a/packages/kinesis/amplify_kinesis_dart/test/in_memory_record_storage_test.dart b/packages/kinesis/amplify_kinesis_dart/test/in_memory_record_storage_test.dart index 16acbc9b1a2..bc494cf011e 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/in_memory_record_storage_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/in_memory_record_storage_test.dart @@ -4,8 +4,7 @@ import 'dart:typed_data'; import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage_memory.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:test/test.dart'; void main() { @@ -13,7 +12,12 @@ void main() { late InMemoryRecordStorage storage; setUp(() { - storage = InMemoryRecordStorage(maxCacheBytes: 10 * 1024 * 1024); + storage = InMemoryRecordStorage( + maxCacheBytes: 10 * 1024 * 1024, + maxRecordsPerBatch: 500, + maxBytesPerBatch: 10 * 1024 * 1024, + maxRecordSizeBytes: 10 * 1024 * 1024, + ); }); tearDown(() async { @@ -29,7 +33,7 @@ void main() { group('addRecord', () { test('saves and retrieves a record', () async { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3, 4, 5]), partitionKey: 'test-partition', streamName: 'test-stream', @@ -52,7 +56,7 @@ void main() { test('removes correct records by ID', () async { for (var i = 0; i < 5; i++) { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([i]), partitionKey: 'pk-$i', streamName: 'stream', @@ -74,7 +78,7 @@ void main() { test('handles empty ID list gracefully', () async { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1]), partitionKey: 'pk', streamName: 'stream', @@ -89,7 +93,7 @@ void main() { group('incrementRetryCount', () { test('increments retry count correctly', () async { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1]), partitionKey: 'pk', streamName: 'stream', @@ -112,21 +116,21 @@ void main() { group('getRecordsByStream', () { test('returns records grouped by stream name', () async { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1]), partitionKey: 'pk', streamName: 'stream-a', ), ); await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([2]), partitionKey: 'pk', streamName: 'stream-b', ), ); await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([3]), partitionKey: 'pk', streamName: 'stream-a', @@ -149,7 +153,7 @@ void main() { test('removes all records', () async { for (var i = 0; i < 5; i++) { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([i]), partitionKey: 'pk-$i', streamName: 'stream', @@ -169,7 +173,7 @@ void main() { expect(await storage.getRecordCount(), equals(0)); for (var i = 0; i < 3; i++) { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([i]), partitionKey: 'pk-$i', streamName: 'stream', diff --git a/packages/kinesis/amplify_kinesis_dart/test/kinesis_data_streams_test.dart b/packages/kinesis/amplify_kinesis_dart/test/kinesis_data_streams_test.dart index 13f107cce5c..522986197d5 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/kinesis_data_streams_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/kinesis_data_streams_test.dart @@ -8,11 +8,8 @@ import 'package:amplify_foundation_dart/amplify_foundation_dart.dart' import 'package:amplify_kinesis_dart/src/amplify_kinesis_client.dart'; import 'package:amplify_kinesis_dart/src/amplify_kinesis_client_options.dart'; import 'package:amplify_kinesis_dart/src/exception/amplify_kinesis_exception.dart'; -import 'package:amplify_kinesis_dart/src/flush_strategy/flush_strategy.dart'; import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/model/clear_cache_data.dart'; -import 'package:amplify_kinesis_dart/src/model/flush_data.dart'; -import 'package:amplify_kinesis_dart/src/model/record_data.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:mocktail/mocktail.dart'; import 'package:test/test.dart'; @@ -24,7 +21,11 @@ void main() { setUpAll(() { registerFallbackValue( - RecordInput.now(data: Uint8List(0), partitionKey: '', streamName: ''), + createKinesisRecordInputNow( + data: Uint8List(0), + partitionKey: '', + streamName: '', + ), ); }); diff --git a/packages/kinesis/amplify_kinesis_dart/test/kinesis_sender_test.dart b/packages/kinesis/amplify_kinesis_dart/test/kinesis_sender_test.dart index c3a95a760c6..485cdc54ee2 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/kinesis_sender_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/kinesis_sender_test.dart @@ -10,8 +10,8 @@ library; import 'dart:typed_data'; import 'package:amplify_kinesis_dart/src/impl/kinesis_sender.dart'; -import 'package:amplify_kinesis_dart/src/model/record.dart'; import 'package:amplify_kinesis_dart/src/sdk/kinesis.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:mocktail/mocktail.dart'; import 'package:test/test.dart'; @@ -79,7 +79,7 @@ void main() { final sender = _DirectMockSender(mockClient, maxRetries: maxRetries); - await sender.putRecords( + await sender.sendBatch( streamName: 'my-stream', records: [ _testRecord( @@ -132,7 +132,7 @@ void main() { final sender = _DirectMockSender(mockClient, maxRetries: maxRetries); - final result = await sender.putRecords( + final result = await sender.sendBatch( streamName: 'test-stream', records: [ _testRecord( diff --git a/packages/kinesis/amplify_kinesis_dart/test/record_client_concurrent_flush_test.dart b/packages/kinesis/amplify_kinesis_dart/test/record_client_concurrent_flush_test.dart index ec6fc52ad76..75028ed81d3 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/record_client_concurrent_flush_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/record_client_concurrent_flush_test.dart @@ -11,10 +11,7 @@ import 'dart:async'; import 'dart:typed_data'; import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_sender.dart'; -import 'package:amplify_kinesis_dart/src/impl/record_client.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage_sqlite.dart'; -import 'package:amplify_kinesis_dart/src/model/record.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:test/test.dart'; import 'helpers/test_database.dart'; @@ -28,6 +25,9 @@ void main() { final storage = SqliteRecordStorage( database: db, maxCacheBytes: 1024 * 1024, + maxRecordsPerBatch: 500, + maxBytesPerBatch: 10 * 1024 * 1024, + maxRecordSizeBytes: 10 * 1024 * 1024, ); final sender = _GatedSender(); final client = RecordClient( @@ -39,7 +39,7 @@ void main() { // Seed records for (var i = 0; i < 5; i++) { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([i]), partitionKey: 'key$i', streamName: 'test-stream', @@ -47,7 +47,7 @@ void main() { ); } - // Launch flush1 — it will block inside putRecords until we complete + // Launch flush1 — it will block inside sendBatch until we complete // the gate completer. final flush1 = client.flush(); @@ -78,31 +78,28 @@ void main() { }); } -/// A sender that blocks inside [putRecords] until [gate] is completed, +/// A sender that blocks inside [sendBatch] until [gate] is completed, /// giving the test deterministic control over when the flush finishes. -class _GatedSender implements KinesisSender { - /// Completes when [putRecords] is entered, signaling that the flush +class _GatedSender implements Sender { + /// Completes when [sendBatch] is entered, signaling that the flush /// is in progress and holding the `_flushing` flag. final Completer entered = Completer(); - /// The test completes this to unblock [putRecords]. + /// The test completes this to unblock [sendBatch]. final Completer gate = Completer(); @override - dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation); - - @override - Future putRecords({ + Future sendBatch({ required String streamName, required List records, }) async { - // Signal that we're inside putRecords (flush is in progress). + // Signal that we're inside sendBatch (flush is in progress). if (!entered.isCompleted) entered.complete(); // Block until the test says go. await gate.future; - return PutRecordsResult( + return SendResult( successfulIds: records.map((r) => r.id).toList(), failedIds: const [], retryableIds: const [], diff --git a/packages/kinesis/amplify_kinesis_dart/test/record_client_test.dart b/packages/kinesis/amplify_kinesis_dart/test/record_client_test.dart index 19942d358d2..b66eafc8624 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/record_client_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/record_client_test.dart @@ -3,7 +3,7 @@ /// Tests for RecordClient. /// -/// Uses mocktail mocks for KinesisSender with pre-built PutRecordsResult +/// Uses mocktail mocks for KinesisSender with pre-built SendResult /// values and explicit IDs, rather than behavioral test doubles with /// callback logic. library; @@ -11,13 +11,8 @@ library; import 'dart:typed_data'; import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_sender.dart'; -import 'package:amplify_kinesis_dart/src/impl/record_client.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage_sqlite.dart'; -import 'package:amplify_kinesis_dart/src/model/clear_cache_data.dart'; -import 'package:amplify_kinesis_dart/src/model/flush_data.dart'; import 'package:amplify_kinesis_dart/src/sdk/kinesis.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:mocktail/mocktail.dart'; import 'package:test/test.dart'; @@ -34,7 +29,10 @@ void main() { final db = createTestDatabase(); storage = SqliteRecordStorage( database: db, - maxCacheBytes: 1024, // 1KB for testing + maxCacheBytes: 1024, + maxRecordsPerBatch: 500, + maxBytesPerBatch: 10 * 1024 * 1024, + maxRecordSizeBytes: 10 * 1024 * 1024, ); mockSender = MockKinesisSender(); client = RecordClient( @@ -59,7 +57,7 @@ void main() { group('record()', () { test('accepts records when enabled', () async { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3]), partitionKey: 'pk', streamName: 'stream', @@ -75,7 +73,7 @@ void main() { test('sends all cached records and returns FlushData', () async { for (var i = 0; i < 3; i++) { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([i]), partitionKey: 'pk-$i', streamName: 'stream', @@ -85,14 +83,13 @@ void main() { final allRecords = await getAllRecords(); - // Mock: all records succeed when( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: any(named: 'streamName'), records: any(named: 'records'), ), ).thenAnswer( - (_) async => PutRecordsResult( + (_) async => SendResult( successfulIds: allRecords.map((r) => r.id).toList(), retryableIds: [], failedIds: [], @@ -104,7 +101,7 @@ void main() { expect(result, isA()); expect(result.recordsFlushed, equals(3)); verify( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: any(named: 'streamName'), records: any(named: 'records'), ), @@ -113,36 +110,35 @@ void main() { test('separates records by stream', () async { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1]), partitionKey: 'pk', streamName: 'stream-a', ), ); await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([2]), partitionKey: 'pk', streamName: 'stream-b', ), ); await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([3]), partitionKey: 'pk', streamName: 'stream-a', ), ); - // Mock: all records succeed for any stream when( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: any(named: 'streamName'), records: any(named: 'records'), ), ).thenAnswer((invocation) async { final records = invocation.namedArguments[#records] as List; - return PutRecordsResult( + return SendResult( successfulIds: records.map((r) => r.id).toList(), retryableIds: [], failedIds: [], @@ -153,7 +149,7 @@ void main() { expect(result.recordsFlushed, equals(3)); verify( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: any(named: 'streamName'), records: any(named: 'records'), ), @@ -162,7 +158,7 @@ void main() { test('deletes successful records after send', () async { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1]), partitionKey: 'pk', streamName: 'stream', @@ -171,12 +167,12 @@ void main() { final allRecords = await getAllRecords(); when( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: any(named: 'streamName'), records: any(named: 'records'), ), ).thenAnswer( - (_) async => PutRecordsResult( + (_) async => SendResult( successfulIds: allRecords.map((r) => r.id).toList(), retryableIds: [], failedIds: [], @@ -191,7 +187,7 @@ void main() { test('handles mixed success, retryable, and failed', () async { for (var i = 0; i < 3; i++) { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([i]), partitionKey: 'pk-$i', streamName: 'stream', @@ -206,14 +202,13 @@ void main() { await storage.incrementRetryCount([allRecords[2].id]); } - // Mock: record 1 succeeds, record 2 retryable, record 3 failed when( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: any(named: 'streamName'), records: any(named: 'records'), ), ).thenAnswer( - (_) async => PutRecordsResult( + (_) async => SendResult( successfulIds: [allRecords[0].id], retryableIds: [allRecords[1].id], failedIds: [allRecords[2].id], @@ -235,7 +230,7 @@ void main() { () async { for (var i = 0; i < 3; i++) { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([i]), partitionKey: 'key$i', streamName: 'stream', @@ -244,7 +239,7 @@ void main() { } when( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: any(named: 'streamName'), records: any(named: 'records'), ), @@ -265,59 +260,62 @@ void main() { }, ); - test('deletes records at max retries when non-SDK error occurs', () async { - for (var i = 0; i < 3; i++) { - await client.record( - RecordInput.now( - data: Uint8List.fromList([i]), - partitionKey: 'key$i', - streamName: 'stream', - ), - ); - } + test( + 'deletes records at max retries when non-SDK error occurs', + () async { + for (var i = 0; i < 3; i++) { + await client.record( + createKinesisRecordInputNow( + data: Uint8List.fromList([i]), + partitionKey: 'key$i', + streamName: 'stream', + ), + ); + } - // Set records 2 and 3 to max retries (3) - final allRecords = await getAllRecords(); - for (var i = 0; i < 3; i++) { - await storage.incrementRetryCount([ - allRecords[1].id, - allRecords[2].id, - ]); - } + // Set records 2 and 3 to max retries (3) + final allRecords = await getAllRecords(); + for (var i = 0; i < 3; i++) { + await storage.incrementRetryCount([ + allRecords[1].id, + allRecords[2].id, + ]); + } - when( - () => mockSender.putRecords( - streamName: any(named: 'streamName'), - records: any(named: 'records'), - ), - ).thenThrow(Exception('Network error')); + when( + () => mockSender.sendBatch( + streamName: any(named: 'streamName'), + records: any(named: 'records'), + ), + ).thenThrow(Exception('Network error')); - try { - await client.flush(); - fail('Expected flush to throw'); - } on Exception { - // Expected — non-SDK errors are rethrown - } + try { + await client.flush(); + fail('Expected flush to throw'); + } on Exception { + // Expected — non-SDK errors are rethrown + } - // Only record 1 should remain (records 2 and 3 deleted at max retries) - final remaining = await getAllRecords(); - expect(remaining, hasLength(1)); - expect(remaining[0].id, equals(allRecords[0].id)); - expect(remaining[0].retryCount, equals(1)); - }); + // Only record 1 should remain (records 2 and 3 at max retries) + final remaining = await getAllRecords(); + expect(remaining, hasLength(1)); + expect(remaining[0].id, equals(allRecords[0].id)); + expect(remaining[0].retryCount, equals(1)); + }, + ); test( 'invalid stream records do not block valid stream flushes', () async { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3]), partitionKey: 'pk', streamName: 'invalid-stream', ), ); await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([4, 5, 6]), partitionKey: 'pk', streamName: 'valid-stream', @@ -330,19 +328,19 @@ void main() { ); when( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: 'invalid-stream', records: any(named: 'records'), ), ).thenThrow(ResourceNotFoundException(message: 'Stream not found')); when( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: 'valid-stream', records: any(named: 'records'), ), ).thenAnswer( - (_) async => PutRecordsResult( + (_) async => SendResult( successfulIds: [validRecord.id], retryableIds: [], failedIds: [], @@ -356,7 +354,7 @@ void main() { test('non-SDK errors abort the flush', () async { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3]), partitionKey: 'pk', streamName: 'stream', @@ -364,7 +362,7 @@ void main() { ); when( - () => mockSender.putRecords( + () => mockSender.sendBatch( streamName: any(named: 'streamName'), records: any(named: 'records'), ), @@ -378,7 +376,7 @@ void main() { test('removes all cached records and returns ClearCacheData', () async { for (var i = 0; i < 5; i++) { await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([i]), partitionKey: 'pk-$i', streamName: 'stream', diff --git a/packages/kinesis/amplify_kinesis_dart/test/record_validation_test.dart b/packages/kinesis/amplify_kinesis_dart/test/record_validation_test.dart index 46604c31e58..a0160c8c265 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/record_validation_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/record_validation_test.dart @@ -18,13 +18,13 @@ library; import 'dart:convert'; import 'dart:typed_data'; -import 'package:amplify_kinesis_dart/src/exception/record_cache_exception.dart'; +import 'package:amplify_foundation_dart/amplify_foundation_dart.dart' + show Error, Ok; +import 'package:amplify_kinesis_dart/src/amplify_kinesis_client.dart'; +import 'package:amplify_kinesis_dart/src/exception/amplify_kinesis_exception.dart'; import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_sender.dart'; -import 'package:amplify_kinesis_dart/src/impl/record_client.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage_sqlite.dart'; import 'package:amplify_kinesis_dart/src/kinesis_limits.dart' as limits; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:test/test.dart'; import 'helpers/test_database.dart'; @@ -48,7 +48,13 @@ void main() { setUp(() { final db = createTestDatabase(); - storage = SqliteRecordStorage(database: db, maxCacheBytes: 10000); + storage = SqliteRecordStorage( + database: db, + maxCacheBytes: 10000, + maxRecordsPerBatch: 500, + maxBytesPerBatch: 10 * 1024 * 1024, + maxRecordSizeBytes: 10 * 1024 * 1024, + ); client = createClient(storage: storage); }); @@ -62,12 +68,16 @@ void main() { group('per-record size limit', () { test('record exactly at max size is accepted', () async { - // RecordClient validates against limits.maxRecordSizeBytes (10 MiB). - // Use a large-cache client so the cache limit doesn't interfere. + // Close the default client so we can create one with a larger cache. + await client.close(); + final largeDb = createTestDatabase(); final largeStorage = SqliteRecordStorage( database: largeDb, maxCacheBytes: 20 * 1024 * 1024, + maxRecordsPerBatch: 500, + maxBytesPerBatch: 20 * 1024 * 1024, + maxRecordSizeBytes: 10 * 1024 * 1024, ); final largeClient = createClient(storage: largeStorage); @@ -78,7 +88,7 @@ void main() { ); await largeClient.record( - RecordInput.now( + createKinesisRecordInputNow( data: exactLimitData, partitionKey: partitionKey, streamName: 'stream', @@ -90,7 +100,8 @@ void main() { .toList(); expect(records, hasLength(1)); - await largeClient.close(); + // Reassign so tearDown closes this client instead. + client = largeClient; }); test('record exceeding max size by one byte is rejected', () async { @@ -102,7 +113,7 @@ void main() { expect( () => client.record( - RecordInput.now( + createKinesisRecordInputNow( data: oversizedData, partitionKey: partitionKey, streamName: 'stream', @@ -121,7 +132,7 @@ void main() { test('dataSize accounts for partition key bytes', () { final partitionKey = 'k' * 10; // 10 bytes UTF-8 final data = Uint8List(50); - final record = RecordInput.now( + final record = createKinesisRecordInputNow( data: data, partitionKey: partitionKey, streamName: 'stream', @@ -134,7 +145,7 @@ void main() { // Each emoji is 4 bytes in UTF-8, 2 emojis = 8 bytes const partitionKey = '😀😀'; final data = Uint8List(10); - final record = RecordInput.now( + final record = createKinesisRecordInputNow( data: data, partitionKey: partitionKey, streamName: 'stream', @@ -156,6 +167,9 @@ void main() { final tightStorage = SqliteRecordStorage( database: tightDb, maxCacheBytes: 80, + maxRecordsPerBatch: 500, + maxBytesPerBatch: 10 * 1024 * 1024, + maxRecordSizeBytes: 10 * 1024 * 1024, ); final tightClient = createClient(storage: tightStorage); @@ -165,7 +179,7 @@ void main() { // First record: 40 bytes — fits in 80-byte cache await tightClient.record( - RecordInput.now( + createKinesisRecordInputNow( data: data, partitionKey: partitionKey, streamName: 'stream', @@ -174,7 +188,7 @@ void main() { // Second record: 40 more → total 80 — still fits await tightClient.record( - RecordInput.now( + createKinesisRecordInputNow( data: data, partitionKey: partitionKey, streamName: 'stream', @@ -184,7 +198,7 @@ void main() { // Third record: 40 more → total 120 > 80 limit expect( () => tightClient.record( - RecordInput.now( + createKinesisRecordInputNow( data: data, partitionKey: partitionKey, streamName: 'stream', @@ -203,27 +217,34 @@ void main() { // --------------------------------------------------------------- group('partition key validation', () { + late AmplifyKinesisClient kinesisClient; + + setUp(() { + kinesisClient = AmplifyKinesisClient.withRecordClient( + recordClient: client, + ); + }); + test('empty partition key is rejected', () async { + final result = await kinesisClient.record( + data: Uint8List.fromList([1, 2, 3]), + partitionKey: '', + streamName: 'stream', + ); + expect(result, isA>()); expect( - () => client.record( - RecordInput.now( - data: Uint8List.fromList([1, 2, 3]), - partitionKey: '', - streamName: 'stream', - ), - ), - throwsA(isA()), + (result as Error).error, + isA(), ); }); test('partition key at max length 256 code points is accepted', () async { - await client.record( - RecordInput.now( - data: Uint8List.fromList([1]), - partitionKey: 'k' * 256, - streamName: 'stream', - ), + final result = await kinesisClient.record( + data: Uint8List.fromList([1]), + partitionKey: 'k' * 256, + streamName: 'stream', ); + expect(result, isA>()); final records = (await storage.getRecordsByStream()).values .expand((r) => r) @@ -232,15 +253,15 @@ void main() { }); test('partition key exceeding 256 code points is rejected', () async { + final result = await kinesisClient.record( + data: Uint8List.fromList([1]), + partitionKey: 'k' * 257, + streamName: 'stream', + ); + expect(result, isA>()); expect( - () => client.record( - RecordInput.now( - data: Uint8List.fromList([1]), - partitionKey: 'k' * 257, - streamName: 'stream', - ), - ), - throwsA(isA()), + (result as Error).error, + isA(), ); }); @@ -248,13 +269,12 @@ void main() { // Each emoji (😀) is 1 code point but 4 bytes in UTF-8. // 10 emoji = 10 code points (within 256 limit). final partitionKey = '😀' * 10; - await client.record( - RecordInput.now( - data: Uint8List.fromList([1]), - partitionKey: partitionKey, - streamName: 'stream', - ), + final result = await kinesisClient.record( + data: Uint8List.fromList([1]), + partitionKey: partitionKey, + streamName: 'stream', ); + expect(result, isA>()); final records = (await storage.getRecordsByStream()).values .expand((r) => r) @@ -267,15 +287,15 @@ void main() { () async { // 257 emoji = 257 code points > 256 limit final partitionKey = '😀' * 257; + final result = await kinesisClient.record( + data: Uint8List.fromList([1]), + partitionKey: partitionKey, + streamName: 'stream', + ); + expect(result, isA>()); expect( - () => client.record( - RecordInput.now( - data: Uint8List.fromList([1]), - partitionKey: partitionKey, - streamName: 'stream', - ), - ), - throwsA(isA()), + (result as Error).error, + isA(), ); }, ); @@ -292,7 +312,7 @@ void main() { // Oversized record should be rejected expect( () => client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List(limits.maxRecordSizeBytes), partitionKey: 'k' * 20, streamName: 'stream', @@ -303,7 +323,7 @@ void main() { // Valid record should still work await client.record( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3]), partitionKey: 'a', streamName: 'stream', @@ -320,16 +340,13 @@ void main() { } /// No-op sender for validation tests that don't need to send records. -class _NoOpSender implements KinesisSender { - @override - dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation); - +class _NoOpSender implements Sender { @override - Future putRecords({ + Future sendBatch({ required String streamName, required List records, }) async { - return PutRecordsResult( + return SendResult( successfulIds: records.map((r) => r.id).toList(), failedIds: const [], retryableIds: const [], diff --git a/packages/kinesis/amplify_kinesis_dart/test/sqlite_record_storage_cache_accuracy_test.dart b/packages/kinesis/amplify_kinesis_dart/test/sqlite_record_storage_cache_accuracy_test.dart index 1a13a96aae7..f8328f82d62 100644 --- a/packages/kinesis/amplify_kinesis_dart/test/sqlite_record_storage_cache_accuracy_test.dart +++ b/packages/kinesis/amplify_kinesis_dart/test/sqlite_record_storage_cache_accuracy_test.dart @@ -11,7 +11,7 @@ library; import 'dart:typed_data'; import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage_sqlite.dart'; +import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'; import 'package:test/test.dart'; import 'helpers/test_database.dart'; @@ -22,7 +22,13 @@ void main() { setUp(() { final db = createTestDatabase(); - storage = SqliteRecordStorage(database: db, maxCacheBytes: 1024 * 1024); + storage = SqliteRecordStorage( + database: db, + maxCacheBytes: 1024 * 1024, + maxRecordsPerBatch: 500, + maxBytesPerBatch: 10 * 1024 * 1024, + maxRecordSizeBytes: 10 * 1024 * 1024, + ); }); tearDown(() async { @@ -35,14 +41,14 @@ void main() { test('cached size matches database after add operations', () async { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3]), partitionKey: 'a', streamName: 'stream1', ), ); await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([4, 5, 6, 7]), partitionKey: 'b', streamName: 'stream1', @@ -56,21 +62,21 @@ void main() { test('cached size matches database after delete operations', () async { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3]), partitionKey: 'a', streamName: 'stream1', ), ); await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([4, 5, 6, 7]), partitionKey: 'b', streamName: 'stream1', ), ); await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([8, 9]), partitionKey: 'c', streamName: 'stream2', @@ -90,14 +96,14 @@ void main() { test('cached size matches database after clear operations', () async { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3]), partitionKey: 'a', streamName: 'stream1', ), ); await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([4, 5]), partitionKey: 'b', streamName: 'stream2', @@ -113,7 +119,7 @@ void main() { test('cached size remains accurate through mixed operations', () async { // "a"(1) + data(5) = 6 await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1, 2, 3, 4, 5]), partitionKey: 'a', streamName: 'stream1', @@ -121,7 +127,7 @@ void main() { ); // "b"(1) + data(3) = 4 await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([6, 7, 8]), partitionKey: 'b', streamName: 'stream2', @@ -142,7 +148,7 @@ void main() { // Add another record: "c"(1) + data(2) = 3 await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([9, 10]), partitionKey: 'c', streamName: 'stream3', @@ -174,7 +180,7 @@ void main() { for (var i = 0; i < recordsPerProducer; i++) { final key = 'producer${p}_record$i'; await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List(recordSize), partitionKey: key, streamName: 'stream$p', @@ -194,7 +200,7 @@ void main() { if (records.isNotEmpty) { final toDelete = records.first; await storage.deleteRecords([toDelete.id]); - deleted.add(toDelete.partitionKey); + deleted.add(toDelete.partitionKey ?? ''); } } deletedKeys.add(deleted); @@ -215,7 +221,9 @@ void main() { expect(finalCacheSize, equals(expectedCacheSize)); // Verify every created key is either in DB or was deleted - final remainingKeys = finalRecords.map((r) => r.partitionKey).toSet(); + final remainingKeys = finalRecords + .map((r) => r.partitionKey ?? '') + .toSet(); final allCreatedKeys = createdKeys.values .expand((keys) => keys) .toSet(); @@ -253,21 +261,21 @@ void main() { 'getRecordsByStream with empty excludingIds returns all records', () async { await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([1]), partitionKey: 'key1', streamName: 'stream1', ), ); await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([2]), partitionKey: 'key2', streamName: 'stream1', ), ); await storage.addRecord( - RecordInput.now( + createKinesisRecordInputNow( data: Uint8List.fromList([3]), partitionKey: 'key3', streamName: 'stream2', diff --git a/packages/kinesis/amplify_record_cache_dart/analysis_options.yaml b/packages/kinesis/amplify_record_cache_dart/analysis_options.yaml new file mode 100644 index 00000000000..01538d576cf --- /dev/null +++ b/packages/kinesis/amplify_record_cache_dart/analysis_options.yaml @@ -0,0 +1,5 @@ +include: package:amplify_lints/library.yaml + +analyzer: + exclude: + - '**/*.g.dart' diff --git a/packages/kinesis/amplify_record_cache_dart/lib/amplify_record_cache_dart.dart b/packages/kinesis/amplify_record_cache_dart/lib/amplify_record_cache_dart.dart new file mode 100644 index 00000000000..d6b3883c4af --- /dev/null +++ b/packages/kinesis/amplify_record_cache_dart/lib/amplify_record_cache_dart.dart @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/// Shared record caching infrastructure for Amplify streaming clients. +library; + +// Client +export 'src/client/auto_flush_scheduler.dart'; +export 'src/client/record_client.dart'; +// Database +export 'src/db/record_cache_database.dart' show RecordCacheDatabase; +// Exceptions +export 'src/exception/record_cache_exception.dart'; +// Flush strategy +export 'src/flush_strategy/flush_strategy.dart'; +// Models +export 'src/model/clear_cache_data.dart'; +export 'src/model/flush_data.dart'; +export 'src/model/record.dart'; +export 'src/model/record_data.dart'; +export 'src/model/record_input.dart'; +// Sender +export 'src/sender/sender.dart'; +// Storage +export 'src/storage/platform/record_storage_platform.dart'; +export 'src/storage/record_storage.dart'; +export 'src/storage/record_storage_memory.dart'; +export 'src/storage/record_storage_sqlite.dart'; diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/auto_flush_scheduler.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/client/auto_flush_scheduler.dart similarity index 91% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/auto_flush_scheduler.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/client/auto_flush_scheduler.dart index 3332abfa806..3220eb30fa6 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/auto_flush_scheduler.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/client/auto_flush_scheduler.dart @@ -4,9 +4,9 @@ import 'dart:async'; import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'; -import 'package:amplify_kinesis_dart/src/impl/record_client.dart'; +import 'package:amplify_record_cache_dart/src/client/record_client.dart'; -/// {@template amplify_kinesis.auto_flush_scheduler} +/// {@template amplify_record_cache.auto_flush_scheduler} /// Manages automatic flush scheduling at a fixed interval. /// /// Takes a [Duration] interval and a [RecordClient]. @@ -17,7 +17,7 @@ import 'package:amplify_kinesis_dart/src/impl/record_client.dart'; /// a new one, preventing duplicate concurrent loops. /// {@endtemplate} final class AutoFlushScheduler { - /// {@macro amplify_kinesis.auto_flush_scheduler} + /// {@macro amplify_record_cache.auto_flush_scheduler} AutoFlushScheduler({required Duration interval, required RecordClient client}) : _interval = interval, _client = client; diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/record_client.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/client/record_client.dart similarity index 83% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/record_client.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/client/record_client.dart index 873ce07d116..9d564418ca2 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/record_client.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/client/record_client.dart @@ -2,17 +2,16 @@ // SPDX-License-Identifier: Apache-2.0 import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_sender.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; -import 'package:amplify_kinesis_dart/src/model/clear_cache_data.dart'; -import 'package:amplify_kinesis_dart/src/model/flush_data.dart'; -import 'package:amplify_kinesis_dart/src/model/record_data.dart' - show RecordData; +import 'package:amplify_record_cache_dart/src/model/clear_cache_data.dart'; +import 'package:amplify_record_cache_dart/src/model/flush_data.dart'; +import 'package:amplify_record_cache_dart/src/model/record_data.dart'; +import 'package:amplify_record_cache_dart/src/model/record_input.dart'; +import 'package:amplify_record_cache_dart/src/sender/sender.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage.dart'; import 'package:smithy/smithy.dart' show SmithyHttpException, UnknownSmithyHttpException; -/// {@template amplify_kinesis.record_client} +/// {@template amplify_record_cache.record_client} /// Orchestrates record operations: storage, sending, and retry logic. /// /// - `record()` delegates directly to `storage.addRecord()` (validation @@ -21,22 +20,25 @@ import 'package:smithy/smithy.dart' /// stream and sends it. /// {@endtemplate} class RecordClient { - /// {@macro amplify_kinesis.record_client} + /// {@macro amplify_record_cache.record_client} RecordClient({ required RecordStorage storage, - required KinesisSender sender, + required Sender sender, required int maxRetries, }) : _storage = storage, _sender = sender, _maxRetries = maxRetries; final RecordStorage _storage; - final KinesisSender _sender; + final Sender _sender; final int _maxRetries; final Logger _logger = AmplifyLogging.logger('RecordClient'); bool _flushing = false; + /// Provides access to the underlying storage (for testing). + RecordStorage get storage => _storage; + /// Records data to the local cache. /// /// Delegates to [RecordStorage.addRecord] which handles validation @@ -48,7 +50,7 @@ class RecordClient { return const RecordData(); } - /// Flushes cached records to Kinesis. + /// Flushes cached records to the streaming service. /// /// Single-pass: retrieves one batch of records per stream, sends each /// batch, and returns. Records beyond the per-stream limit are picked @@ -79,8 +81,7 @@ class RecordClient { ? 'HTTP ${e.statusCode}: ${e.body}' : e.message; _logger.warn( - 'Kinesis SDK error flushing stream $streamName: $details. ' - 'Skipping', + 'SDK error flushing stream $streamName: $details. Skipping', ); await _handleFailedRequest(records); } catch (e) { @@ -97,7 +98,7 @@ class RecordClient { } Future _sendStreamBatch(String streamName, List records) async { - final result = await _sender.putRecords( + final result = await _sender.sendBatch( streamName: streamName, records: records, ); diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/db/kinesis_record_database.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/db/record_cache_database.dart similarity index 58% rename from packages/kinesis/amplify_kinesis_dart/lib/src/db/kinesis_record_database.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/db/record_cache_database.dart index 3f72a6da0b2..c03289937c6 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/db/kinesis_record_database.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/db/record_cache_database.dart @@ -6,26 +6,30 @@ import 'dart:async'; import 'package:amplify_db_common_dart/amplify_db_common_dart.dart'; import 'package:drift/drift.dart'; -part 'kinesis_record_database.g.dart'; +part 'record_cache_database.g.dart'; -/// Schema of the KinesisRecords table in SQLite. +/// Schema of the cached records table in SQLite. /// -/// When updating this schema, please bump [KinesisRecordDatabase.schemaVersion]. +/// The `partitionKey` column is present for Kinesis Data Streams +/// compatibility. Firehose clients write an empty string. +/// +/// When updating this schema, please bump +/// [RecordCacheDatabase.schemaVersion]. @DataClassName('DriftStoredRecord') class KinesisRecords extends Table { /// Auto-incrementing primary key. IntColumn get id => integer().autoIncrement()(); - /// The name of the Kinesis Data Stream. + /// The name of the target stream. TextColumn get streamName => text()(); - /// The partition key for the record. - TextColumn get partitionKey => text()(); + /// The partition key (empty string for services that don't use it). + TextColumn get partitionKey => text().withDefault(const Constant(''))(); - /// The data blob to send to Kinesis. + /// The data blob to send. BlobColumn get data => blob()(); - /// The size of the data blob in bytes. + /// The size of the record in bytes. IntColumn get dataSize => integer()(); /// The number of times this record has been retried. @@ -35,30 +39,33 @@ class KinesisRecords extends Table { IntColumn get createdAt => integer()(); } -/// {@template amplify_kinesis.kinesis_record_database} -/// Drift database for managing stored Kinesis records. +/// {@template amplify_record_cache.record_cache_database} +/// Drift database for managing cached records. /// {@endtemplate} @DriftDatabase(tables: [KinesisRecords]) -class KinesisRecordDatabase extends _$KinesisRecordDatabase { - /// {@macro amplify_kinesis.kinesis_record_database} +class RecordCacheDatabase extends _$RecordCacheDatabase { + /// {@macro amplify_record_cache.record_cache_database} /// + /// [dbPrefix] is the database name prefix (e.g. `kinesis_records`, + /// `firehose_records`). /// [identifier] is used to namespace the database (typically the AWS region). - /// [storagePath] is the directory path for the database file - factory KinesisRecordDatabase({ + /// [storagePath] is the directory path for the database file. + factory RecordCacheDatabase({ + required String dbPrefix, required String identifier, required FutureOr? storagePath, }) { final driftQueryExecutor = connect( - name: 'kinesis_records_$identifier', + name: '${dbPrefix}_$identifier', path: storagePath, ); - return KinesisRecordDatabase._(driftQueryExecutor); + return RecordCacheDatabase._(driftQueryExecutor); } /// Creates a database with a custom query executor (for testing). - KinesisRecordDatabase.forTesting(super.executor); + RecordCacheDatabase.forTesting(super.executor); - KinesisRecordDatabase._(super.driftQueryExecutor); + RecordCacheDatabase._(super.driftQueryExecutor); // Bump this number whenever you change or add a table definition. @override @@ -69,7 +76,6 @@ class KinesisRecordDatabase extends _$KinesisRecordDatabase { return MigrationStrategy( onCreate: (Migrator m) async { await m.createAll(); - // Indices matching the Android schema. await customStatement( 'CREATE INDEX IF NOT EXISTS idx_stream_id ' 'ON kinesis_records(stream_name, id)', diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/db/kinesis_record_database.g.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/db/record_cache_database.g.dart similarity index 94% rename from packages/kinesis/amplify_kinesis_dart/lib/src/db/kinesis_record_database.g.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/db/record_cache_database.g.dart index e76bbb21b87..20545c89084 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/db/kinesis_record_database.g.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/db/record_cache_database.g.dart @@ -1,6 +1,6 @@ // GENERATED CODE - DO NOT MODIFY BY HAND -part of 'kinesis_record_database.dart'; +part of 'record_cache_database.dart'; // ignore_for_file: type=lint class $KinesisRecordsTable extends KinesisRecords @@ -42,7 +42,8 @@ class $KinesisRecordsTable extends KinesisRecords aliasedName, false, type: DriftSqlType.string, - requiredDuringInsert: true, + requiredDuringInsert: false, + defaultValue: const Constant(''), ); static const VerificationMeta _dataMeta = const VerificationMeta('data'); @override @@ -128,8 +129,6 @@ class $KinesisRecordsTable extends KinesisRecords _partitionKeyMeta, ), ); - } else if (isInserting) { - context.missing(_partitionKeyMeta); } if (data.containsKey('data')) { context.handle( @@ -212,16 +211,16 @@ class DriftStoredRecord extends DataClass /// Auto-incrementing primary key. final int id; - /// The name of the Kinesis Data Stream. + /// The name of the target stream. final String streamName; - /// The partition key for the record. + /// The partition key (empty string for services that don't use it). final String partitionKey; - /// The data blob to send to Kinesis. + /// The data blob to send. final Uint8List data; - /// The size of the data blob in bytes. + /// The size of the record in bytes. final int dataSize; /// The number of times this record has been retried. @@ -384,13 +383,12 @@ class KinesisRecordsCompanion extends UpdateCompanion { KinesisRecordsCompanion.insert({ this.id = const Value.absent(), required String streamName, - required String partitionKey, + this.partitionKey = const Value.absent(), required Uint8List data, required int dataSize, this.retryCount = const Value.absent(), required int createdAt, }) : streamName = Value(streamName), - partitionKey = Value(partitionKey), data = Value(data), dataSize = Value(dataSize), createdAt = Value(createdAt); @@ -476,10 +474,9 @@ class KinesisRecordsCompanion extends UpdateCompanion { } } -abstract class _$KinesisRecordDatabase extends GeneratedDatabase { - _$KinesisRecordDatabase(QueryExecutor e) : super(e); - $KinesisRecordDatabaseManager get managers => - $KinesisRecordDatabaseManager(this); +abstract class _$RecordCacheDatabase extends GeneratedDatabase { + _$RecordCacheDatabase(QueryExecutor e) : super(e); + $RecordCacheDatabaseManager get managers => $RecordCacheDatabaseManager(this); late final $KinesisRecordsTable kinesisRecords = $KinesisRecordsTable(this); @override Iterable> get allTables => @@ -492,7 +489,7 @@ typedef $$KinesisRecordsTableCreateCompanionBuilder = KinesisRecordsCompanion Function({ Value id, required String streamName, - required String partitionKey, + Value partitionKey, required Uint8List data, required int dataSize, Value retryCount, @@ -510,7 +507,7 @@ typedef $$KinesisRecordsTableUpdateCompanionBuilder = }); class $$KinesisRecordsTableFilterComposer - extends Composer<_$KinesisRecordDatabase, $KinesisRecordsTable> { + extends Composer<_$RecordCacheDatabase, $KinesisRecordsTable> { $$KinesisRecordsTableFilterComposer({ required super.$db, required super.$table, @@ -555,7 +552,7 @@ class $$KinesisRecordsTableFilterComposer } class $$KinesisRecordsTableOrderingComposer - extends Composer<_$KinesisRecordDatabase, $KinesisRecordsTable> { + extends Composer<_$RecordCacheDatabase, $KinesisRecordsTable> { $$KinesisRecordsTableOrderingComposer({ required super.$db, required super.$table, @@ -600,7 +597,7 @@ class $$KinesisRecordsTableOrderingComposer } class $$KinesisRecordsTableAnnotationComposer - extends Composer<_$KinesisRecordDatabase, $KinesisRecordsTable> { + extends Composer<_$RecordCacheDatabase, $KinesisRecordsTable> { $$KinesisRecordsTableAnnotationComposer({ required super.$db, required super.$table, @@ -639,7 +636,7 @@ class $$KinesisRecordsTableAnnotationComposer class $$KinesisRecordsTableTableManager extends RootTableManager< - _$KinesisRecordDatabase, + _$RecordCacheDatabase, $KinesisRecordsTable, DriftStoredRecord, $$KinesisRecordsTableFilterComposer, @@ -650,7 +647,7 @@ class $$KinesisRecordsTableTableManager ( DriftStoredRecord, BaseReferences< - _$KinesisRecordDatabase, + _$RecordCacheDatabase, $KinesisRecordsTable, DriftStoredRecord >, @@ -659,7 +656,7 @@ class $$KinesisRecordsTableTableManager PrefetchHooks Function() > { $$KinesisRecordsTableTableManager( - _$KinesisRecordDatabase db, + _$RecordCacheDatabase db, $KinesisRecordsTable table, ) : super( TableManagerState( @@ -693,7 +690,7 @@ class $$KinesisRecordsTableTableManager ({ Value id = const Value.absent(), required String streamName, - required String partitionKey, + Value partitionKey = const Value.absent(), required Uint8List data, required int dataSize, Value retryCount = const Value.absent(), @@ -717,7 +714,7 @@ class $$KinesisRecordsTableTableManager typedef $$KinesisRecordsTableProcessedTableManager = ProcessedTableManager< - _$KinesisRecordDatabase, + _$RecordCacheDatabase, $KinesisRecordsTable, DriftStoredRecord, $$KinesisRecordsTableFilterComposer, @@ -728,7 +725,7 @@ typedef $$KinesisRecordsTableProcessedTableManager = ( DriftStoredRecord, BaseReferences< - _$KinesisRecordDatabase, + _$RecordCacheDatabase, $KinesisRecordsTable, DriftStoredRecord >, @@ -737,9 +734,9 @@ typedef $$KinesisRecordsTableProcessedTableManager = PrefetchHooks Function() >; -class $KinesisRecordDatabaseManager { - final _$KinesisRecordDatabase _db; - $KinesisRecordDatabaseManager(this._db); +class $RecordCacheDatabaseManager { + final _$RecordCacheDatabase _db; + $RecordCacheDatabaseManager(this._db); $$KinesisRecordsTableTableManager get kinesisRecords => $$KinesisRecordsTableTableManager(_db, _db.kinesisRecords); } diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/exception/record_cache_exception.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/exception/record_cache_exception.dart similarity index 73% rename from packages/kinesis/amplify_kinesis_dart/lib/src/exception/record_cache_exception.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/exception/record_cache_exception.dart index 969cd892d2d..a5152118c22 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/exception/record_cache_exception.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/exception/record_cache_exception.dart @@ -1,13 +1,19 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +/// {@template amplify_record_cache.record_cache_exception} /// Internal error type used by RecordClient / RecordStorage. /// -/// Mapped to the public AmplifyKinesisException hierarchy at the -/// AmplifyKinesisClient boundary via `AmplifyKinesisException.from`. +/// Mapped to the public exception hierarchy at the client boundary +/// (e.g. `AmplifyKinesisException.from` or `AmplifyFirehoseException.from`). +/// {@endtemplate} sealed class RecordCacheException implements Exception { /// Creates a [RecordCacheException]. - RecordCacheException(this.message, this.recoverySuggestion, [this.cause]); + const RecordCacheException( + this.message, + this.recoverySuggestion, [ + this.cause, + ]); /// A message describing the error. final String message; @@ -29,7 +35,7 @@ sealed class RecordCacheException implements Exception { /// Database operation failed. final class RecordCacheDatabaseException extends RecordCacheException { /// Creates a [RecordCacheDatabaseException]. - RecordCacheDatabaseException( + const RecordCacheDatabaseException( super.message, super.recoverySuggestion, [ super.cause, @@ -39,18 +45,17 @@ final class RecordCacheDatabaseException extends RecordCacheException { /// Cache limit exceeded — no space for new records. final class RecordCacheLimitExceededException extends RecordCacheException { /// Creates a [RecordCacheLimitExceededException]. - RecordCacheLimitExceededException( + const RecordCacheLimitExceededException( super.message, super.recoverySuggestion, [ super.cause, ]); } -/// Record input validation failed (e.g. oversized record, invalid partition -/// key). +/// Record input validation failed (e.g. oversized record). final class RecordCacheValidationException extends RecordCacheException { /// Creates a [RecordCacheValidationException]. - RecordCacheValidationException( + const RecordCacheValidationException( super.message, super.recoverySuggestion, [ super.cause, diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/flush_strategy/flush_strategy.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/flush_strategy/flush_strategy.dart similarity index 71% rename from packages/kinesis/amplify_kinesis_dart/lib/src/flush_strategy/flush_strategy.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/flush_strategy/flush_strategy.dart index 63b0c62135d..106d803be95 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/flush_strategy/flush_strategy.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/flush_strategy/flush_strategy.dart @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -/// {@template amplify_kinesis.flush_strategy} +/// {@template amplify_record_cache.flush_strategy} /// Determines when automatic flushing of cached records occurs. /// /// Available strategies: @@ -9,25 +9,25 @@ /// - [FlushNone]: Disable automatic flushing entirely /// {@endtemplate} sealed class FlushStrategy { - /// {@macro amplify_kinesis.flush_strategy} + /// {@macro amplify_record_cache.flush_strategy} const FlushStrategy(); } -/// {@template amplify_kinesis.interval_flush_strategy} +/// {@template amplify_record_cache.interval_flush_strategy} /// A flush strategy that triggers automatic flushes at a fixed interval. /// {@endtemplate} final class FlushInterval extends FlushStrategy { - /// {@macro amplify_kinesis.interval_flush_strategy} + /// {@macro amplify_record_cache.interval_flush_strategy} const FlushInterval({this.interval = const Duration(seconds: 30)}); /// The interval between automatic flush operations. final Duration interval; } -/// {@template amplify_kinesis.none_flush_strategy} +/// {@template amplify_record_cache.none_flush_strategy} /// A flush strategy that disables automatic flushing. /// {@endtemplate} final class FlushNone extends FlushStrategy { - /// {@macro amplify_kinesis.none_flush_strategy} + /// {@macro amplify_record_cache.none_flush_strategy} const FlushNone(); } diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/model/clear_cache_data.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/model/clear_cache_data.dart similarity index 80% rename from packages/kinesis/amplify_kinesis_dart/lib/src/model/clear_cache_data.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/model/clear_cache_data.dart index e0016ac18ef..b8d8a0e38da 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/model/clear_cache_data.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/model/clear_cache_data.dart @@ -1,11 +1,11 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -/// {@template amplify_kinesis.clear_cache_data} +/// {@template amplify_record_cache.clear_cache_data} /// Data returned from a clearCache operation. /// {@endtemplate} final class ClearCacheData { - /// {@macro amplify_kinesis.clear_cache_data} + /// {@macro amplify_record_cache.clear_cache_data} const ClearCacheData({this.recordsCleared = 0}); /// The number of records that were cleared from the cache. diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/model/flush_data.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/model/flush_data.dart similarity index 71% rename from packages/kinesis/amplify_kinesis_dart/lib/src/model/flush_data.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/model/flush_data.dart index b908b88be8a..b69f968554b 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/model/flush_data.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/model/flush_data.dart @@ -1,11 +1,11 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -/// {@template amplify_kinesis.flush_data} +/// {@template amplify_record_cache.flush_data} /// Data returned from a flush operation. /// {@endtemplate} final class FlushData { - /// {@macro amplify_kinesis.flush_data} + /// {@macro amplify_record_cache.flush_data} const FlushData({this.recordsFlushed = 0, this.flushInProgress = false}); /// The number of records successfully flushed. @@ -16,5 +16,6 @@ final class FlushData { @override String toString() => - 'FlushData(recordsFlushed: $recordsFlushed, flushInProgress: $flushInProgress)'; + 'FlushData(recordsFlushed: $recordsFlushed, ' + 'flushInProgress: $flushInProgress)'; } diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/model/record.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/model/record.dart similarity index 68% rename from packages/kinesis/amplify_kinesis_dart/lib/src/model/record.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/model/record.dart index 77e89a89d11..3ee0f9f0fb1 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/model/record.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/model/record.dart @@ -3,37 +3,38 @@ import 'dart:typed_data'; -/// {@template amplify_kinesis.record} -/// A record persisted in local storage, ready to be flushed to Kinesis. +/// {@template amplify_record_cache.record} +/// A record persisted in local storage, ready to be flushed to a streaming +/// service (Kinesis Data Streams, Firehose, etc.). /// /// This is a plain Dart class with no ORM coupling, shared across all /// storage backends (SQLite, IndexedDB, In-Memory). /// {@endtemplate} final class Record { - /// {@macro amplify_kinesis.record} + /// {@macro amplify_record_cache.record} const Record({ required this.id, required this.streamName, - required this.partitionKey, required this.data, required this.dataSize, required this.retryCount, required this.createdAt, + this.partitionKey, }); /// Auto-incrementing primary key. final int id; - /// The name of the Kinesis Data Stream. + /// The name of the target stream. final String streamName; - /// The partition key for the record. - final String partitionKey; + /// Optional partition key (used by Kinesis Data Streams, null for Firehose). + final String? partitionKey; - /// The data blob to send to Kinesis. + /// The data blob to send. final Uint8List data; - /// The size of the data blob in bytes. + /// The size of the record in bytes. final int dataSize; /// The number of times this record has been retried. @@ -45,6 +46,5 @@ final class Record { @override String toString() => 'Record(id: $id, streamName: $streamName, ' - 'partitionKey: $partitionKey, dataSize: $dataSize, ' - 'retryCount: $retryCount)'; + 'dataSize: $dataSize, retryCount: $retryCount)'; } diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/model/record_data.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/model/record_data.dart similarity index 73% rename from packages/kinesis/amplify_kinesis_dart/lib/src/model/record_data.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/model/record_data.dart index 149c2302406..5421fbd4ab4 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/model/record_data.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/model/record_data.dart @@ -1,11 +1,11 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -/// {@template amplify_kinesis.record_data} +/// {@template amplify_record_cache.record_data} /// Data returned from a record operation. /// {@endtemplate} final class RecordData { - /// {@macro amplify_kinesis.record_data} + /// {@macro amplify_record_cache.record_data} const RecordData(); @override diff --git a/packages/kinesis/amplify_record_cache_dart/lib/src/model/record_input.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/model/record_input.dart new file mode 100644 index 00000000000..f622c604219 --- /dev/null +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/model/record_input.dart @@ -0,0 +1,54 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'dart:typed_data'; + +/// {@template amplify_record_cache.record_input} +/// A record to be persisted in the cache before sending. +/// +/// The [dataSize] is provided by the caller so that each service can +/// compute it according to its own rules: +/// - Kinesis Data Streams: `data.length + utf8.encode(partitionKey).length` +/// - Firehose: `data.length` +/// {@endtemplate} +final class RecordInput { + /// Creates a new record input. + RecordInput({ + required this.data, + required this.streamName, + required this.dataSize, + required this.createdAt, + this.partitionKey, + }); + + /// Creates a record input with the current timestamp. + factory RecordInput.now({ + required Uint8List data, + required String streamName, + required int dataSize, + String? partitionKey, + }) { + return RecordInput( + data: data, + streamName: streamName, + dataSize: dataSize, + createdAt: DateTime.now(), + partitionKey: partitionKey, + ); + } + + /// The data blob. + final Uint8List data; + + /// The name of the target stream. + final String streamName; + + /// Optional partition key (used by KDS, null for Firehose). + final String? partitionKey; + + /// The size of the record in bytes (caller-computed). + final int dataSize; + + /// Timestamp of when the record was created. + final DateTime createdAt; +} diff --git a/packages/kinesis/amplify_record_cache_dart/lib/src/sender/sender.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/sender/sender.dart new file mode 100644 index 00000000000..0fe6cde261d --- /dev/null +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/sender/sender.dart @@ -0,0 +1,45 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'package:amplify_record_cache_dart/src/model/record.dart'; + +/// Result of a batch send operation. +/// +/// Records are categorized into three buckets: +/// - [successfulIds]: records accepted by the service. +/// - [retryableIds]: records that failed but can be retried. +/// - [failedIds]: records that exceeded the retry limit and should be deleted. +final class SendResult { + /// Creates a new [SendResult]. + const SendResult({ + required this.successfulIds, + required this.retryableIds, + required this.failedIds, + }); + + /// IDs of records that were successfully sent. + final List successfulIds; + + /// IDs of records that failed but can be retried (retry count < max). + final List retryableIds; + + /// IDs of records that exceeded the retry limit and should be deleted. + final List failedIds; +} + +/// {@template amplify_record_cache.sender} +/// Abstract interface for sending a batch of records to a streaming service. +/// +/// Implementations handle the service-specific API call (e.g. Kinesis +/// `PutRecords`, Firehose `PutRecordBatch`) and categorize the response +/// into successful, retryable, and failed records. +/// {@endtemplate} +// ignore: one_member_abstracts +abstract interface class Sender { + /// Sends a batch of records to the specified stream and returns the + /// categorized result. + Future sendBatch({ + required String streamName, + required List records, + }); +} diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform.dart similarity index 100% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform.dart diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_stub.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_stub.dart similarity index 73% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_stub.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_stub.dart index 2cbf1693390..eda24335ef2 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_stub.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_stub.dart @@ -3,7 +3,7 @@ import 'dart:async'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage.dart'; /// Creates a platform-specific [RecordStorage] instance. /// @@ -13,6 +13,11 @@ Future createPlatformRecordStorage({ required String identifier, required FutureOr? storagePath, required int maxCacheBytes, + required int maxRecordsPerBatch, + required int maxBytesPerBatch, + required int maxRecordSizeBytes, + required String dbPrefix, + required String storeName, }) { throw UnsupportedError( 'Cannot create RecordStorage: no platform implementation available.', diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_vm.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_vm.dart similarity index 58% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_vm.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_vm.dart index 273caba33af..9ee9d72b91a 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_vm.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_vm.dart @@ -4,26 +4,35 @@ import 'dart:async'; import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'; -import 'package:amplify_kinesis_dart/src/db/kinesis_record_database.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage_sqlite.dart'; +import 'package:amplify_record_cache_dart/src/db/record_cache_database.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage_sqlite.dart'; /// Creates a [SqliteRecordStorage] for VM platforms. Future createPlatformRecordStorage({ required String identifier, required FutureOr? storagePath, required int maxCacheBytes, + required int maxRecordsPerBatch, + required int maxBytesPerBatch, + required int maxRecordSizeBytes, + required String dbPrefix, + required String storeName, }) async { assert(storagePath != null, 'storagePath is required on VM platforms.'); AmplifyLogging.logger( 'RecordStorage', ).info('Using SQLite storage (path: $storagePath)'); - final database = KinesisRecordDatabase( + final database = RecordCacheDatabase( + dbPrefix: dbPrefix, identifier: identifier, storagePath: storagePath, ); return SqliteRecordStorage.create( database: database, maxCacheBytes: maxCacheBytes, + maxRecordsPerBatch: maxRecordsPerBatch, + maxBytesPerBatch: maxBytesPerBatch, + maxRecordSizeBytes: maxRecordSizeBytes, ); } diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_web.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_web.dart similarity index 59% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_web.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_web.dart index 8b96184e92c..f1b37aeb35f 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/platform/record_storage_platform_web.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/platform/record_storage_platform_web.dart @@ -4,9 +4,9 @@ import 'dart:async'; import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage_indexeddb.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage_memory.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage_indexeddb.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage_memory.dart'; /// Creates a web [RecordStorage] instance. /// @@ -16,6 +16,11 @@ Future createPlatformRecordStorage({ required String identifier, required FutureOr? storagePath, required int maxCacheBytes, + required int maxRecordsPerBatch, + required int maxBytesPerBatch, + required int maxRecordSizeBytes, + required String dbPrefix, + required String storeName, }) async { final logger = AmplifyLogging.logger('RecordStorage'); // storagePath is ignored on web. @@ -24,11 +29,21 @@ Future createPlatformRecordStorage({ return IndexedDbRecordStorage.create( identifier: identifier, maxCacheBytes: maxCacheBytes, + maxRecordsPerBatch: maxRecordsPerBatch, + maxBytesPerBatch: maxBytesPerBatch, + maxRecordSizeBytes: maxRecordSizeBytes, + dbPrefix: dbPrefix, + storeName: storeName, ); } logger.warn( 'IndexedDB is not available. Falling back to in-memory storage. ' 'Records will be lost when the page is closed.', ); - return InMemoryRecordStorage(maxCacheBytes: maxCacheBytes); + return InMemoryRecordStorage( + maxCacheBytes: maxCacheBytes, + maxRecordsPerBatch: maxRecordsPerBatch, + maxBytesPerBatch: maxBytesPerBatch, + maxRecordSizeBytes: maxRecordSizeBytes, + ); } diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage.dart similarity index 67% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage.dart index 5833c12d7d0..91cf6784e27 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage.dart @@ -1,23 +1,27 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import 'package:amplify_kinesis_dart/src/exception/amplify_kinesis_exception.dart' - show defaultRecoverySuggestion; -import 'package:amplify_kinesis_dart/src/exception/record_cache_exception.dart'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/kinesis_limits.dart' as limits; -import 'package:amplify_kinesis_dart/src/model/record.dart'; +import 'package:amplify_record_cache_dart/src/exception/record_cache_exception.dart'; +import 'package:amplify_record_cache_dart/src/model/record.dart'; +import 'package:amplify_record_cache_dart/src/model/record_input.dart'; import 'package:meta/meta.dart'; -export 'package:amplify_kinesis_dart/src/model/record.dart'; +export 'package:amplify_record_cache_dart/src/model/record.dart'; -/// {@template amplify_kinesis.record_storage} +/// Default recovery suggestion for wrapped database errors. +const defaultRecoverySuggestion = + 'This is an internal error. Please report it as a bug.'; + +/// {@template amplify_record_cache.record_storage} /// Abstract base class for record persistence. /// /// Implementations provide platform-specific storage (SQLite on VM, -/// IndexedDB on web, in-memory fallback). Validation of partition key -/// length, record size, and cache limits is handled here in [addRecord]; -/// subclasses implement [writeRecord] for the actual write. +/// IndexedDB on web, in-memory fallback). Validation of record size +/// and cache limits is handled here in [addRecord]; subclasses +/// implement [writeRecord] for the actual write. +/// +/// Service-specific validation (e.g. partition key length for KDS) +/// should be performed by the client before calling [addRecord]. /// /// All public methods wrap unexpected errors as /// [RecordCacheDatabaseException]. Subclasses throw @@ -25,26 +29,23 @@ export 'package:amplify_kinesis_dart/src/model/record.dart'; /// caught and wrapped automatically. /// {@endtemplate} abstract class RecordStorage { - /// {@macro amplify_kinesis.record_storage} + /// {@macro amplify_record_cache.record_storage} RecordStorage({ required int maxCacheBytes, - int maxRecordsPerStream = limits.maxRecordsPerStream, - int maxBytesPerStream = limits.maxPutRecordsSizeBytes, - int maxRecordSizeBytes = limits.maxRecordSizeBytes, - int maxPartitionKeyLength = limits.maxPartitionKeyLength, + required int maxRecordsPerBatch, + required int maxBytesPerBatch, + required int maxRecordSizeBytes, int initialCachedSize = 0, }) : _maxCacheBytes = maxCacheBytes, - _maxRecordsPerStream = maxRecordsPerStream, - _maxBytesPerStream = maxBytesPerStream, + _maxRecordsPerBatch = maxRecordsPerBatch, + _maxBytesPerBatch = maxBytesPerBatch, _maxRecordSizeBytes = maxRecordSizeBytes, - _maxPartitionKeyLength = maxPartitionKeyLength, cachedSize = initialCachedSize; final int _maxCacheBytes; - final int _maxRecordsPerStream; - final int _maxBytesPerStream; + final int _maxRecordsPerBatch; + final int _maxBytesPerBatch; final int _maxRecordSizeBytes; - final int _maxPartitionKeyLength; /// The current total cached size in bytes. @protected @@ -53,11 +54,11 @@ abstract class RecordStorage { /// The maximum cache size in bytes. int get maxCacheBytes => _maxCacheBytes; - /// Maximum number of records per stream in a single batch. - int get maxRecordsPerStream => _maxRecordsPerStream; + /// Maximum number of records per batch. + int get maxRecordsPerBatch => _maxRecordsPerBatch; - /// Maximum total bytes per stream in a single batch. - int get maxBytesPerStream => _maxBytesPerStream; + /// Maximum total bytes per batch. + int get maxBytesPerBatch => _maxBytesPerBatch; /// Validates and saves a record to storage. /// Throws [RecordCacheValidationException] on invalid input. @@ -65,20 +66,11 @@ abstract class RecordStorage { /// Throws [RecordCacheDatabaseException] on storage errors. Future addRecord(RecordInput record) => _wrap('Failed to add record to cache', () async { - final codePoints = record.partitionKey.runes.length; - if (codePoints == 0 || codePoints > _maxPartitionKeyLength) { - throw RecordCacheValidationException( - 'Partition key length ($codePoints) is outside the allowed ' - 'range of 1-$_maxPartitionKeyLength characters.', - 'Use a partition key between 1 and ' - '$_maxPartitionKeyLength characters.', - ); - } if (record.dataSize > _maxRecordSizeBytes) { throw RecordCacheValidationException( 'Record size (${record.dataSize} bytes) exceeds the maximum ' - 'of $_maxRecordSizeBytes bytes (partition key + data blob).', - 'Reduce the record payload size or use a shorter partition key.', + 'of $_maxRecordSizeBytes bytes.', + 'Reduce the record payload size.', ); } if (cachedSize + record.dataSize > _maxCacheBytes) { diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_indexeddb.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_indexeddb.dart similarity index 77% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_indexeddb.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_indexeddb.dart index 3b6a82fbe11..edcf9ca4742 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_indexeddb.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_indexeddb.dart @@ -5,13 +5,13 @@ import 'dart:async'; import 'dart:js_interop'; import 'dart:js_interop_unsafe'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; +import 'package:amplify_record_cache_dart/src/model/record_input.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage.dart'; // ignore: implementation_imports import 'package:aws_common/src/js/indexed_db.dart'; import 'package:web/web.dart'; -/// {@template amplify_kinesis.indexeddb_record_storage} +/// {@template amplify_record_cache.indexeddb_record_storage} /// IndexedDB-backed [RecordStorage] implementation for web. /// /// Use [create] to open the database and eagerly compute the initial @@ -20,29 +20,48 @@ import 'package:web/web.dart'; final class IndexedDbRecordStorage extends RecordStorage { IndexedDbRecordStorage._({ required super.maxCacheBytes, + required super.maxRecordsPerBatch, + required super.maxBytesPerBatch, + required super.maxRecordSizeBytes, required super.initialCachedSize, required IDBDatabase database, - }) : _database = database; + required String storeName, + }) : _database = database, + _storeName = storeName; - /// {@macro amplify_kinesis.indexeddb_record_storage} + /// {@macro amplify_record_cache.indexeddb_record_storage} /// /// Opens the IndexedDB database and eagerly computes the initial /// cache size. + /// + /// [dbPrefix] is used to namespace the database (e.g. `amplify_kinesis_`, + /// `amplify_firehose_`). + /// [storeName] is the object store name (e.g. `kinesis_records`, + /// `firehose_records`). static Future create({ required int maxCacheBytes, + required int maxRecordsPerBatch, + required int maxBytesPerBatch, + required int maxRecordSizeBytes, required String identifier, + required String dbPrefix, + required String storeName, }) async { - final database = await _openDatabase('amplify_kinesis_$identifier'); - final initialSize = await _computeCacheSize(database); + final database = await _openDatabase('$dbPrefix$identifier', storeName); + final initialSize = await _computeCacheSize(database, storeName); return IndexedDbRecordStorage._( maxCacheBytes: maxCacheBytes, + maxRecordsPerBatch: maxRecordsPerBatch, + maxBytesPerBatch: maxBytesPerBatch, + maxRecordSizeBytes: maxRecordSizeBytes, initialCachedSize: initialSize, database: database, + storeName: storeName, ); } final IDBDatabase _database; - static const _storeName = 'kinesis_records'; + final String _storeName; /// Returns an object store handle within a new transaction. IDBObjectStore _getStore([String mode = 'readwrite']) { @@ -54,7 +73,7 @@ final class IndexedDbRecordStorage extends RecordStorage { Future writeRecord(RecordInput record) async { final obj = JSObject() ..setProperty('stream_name'.toJS, record.streamName.toJS) - ..setProperty('partition_key'.toJS, record.partitionKey.toJS) + ..setProperty('partition_key'.toJS, (record.partitionKey ?? '').toJS) ..setProperty('data'.toJS, record.data.toJS) ..setProperty('data_size'.toJS, record.dataSize.toJS) ..setProperty('retry_count'.toJS, 0.toJS) @@ -82,8 +101,8 @@ final class IndexedDbRecordStorage extends RecordStorage { final stream = record.streamName; final count = streamCounts[stream] ?? 0; final size = streamSizes[stream] ?? 0; - if (count >= maxRecordsPerStream) continue; - if (size + record.dataSize > maxBytesPerStream) continue; + if (count >= maxRecordsPerBatch) continue; + if (size + record.dataSize > maxBytesPerBatch) continue; result.putIfAbsent(stream, () => []).add(record); streamCounts[stream] = count + 1; @@ -104,7 +123,7 @@ final class IndexedDbRecordStorage extends RecordStorage { } @override - Future doQueryCacheSize() => _computeCacheSize(_database); + Future doQueryCacheSize() => _computeCacheSize(_database, _storeName); @override Future doIncrementRetryCount(Iterable ids) async { @@ -161,10 +180,11 @@ final class IndexedDbRecordStorage extends RecordStorage { } static Record _jsToRecord(JSObject obj) { + final pk = obj.getProperty('partition_key'.toJS).toDart; return Record( id: obj.getProperty('id'.toJS).toDartInt, streamName: obj.getProperty('stream_name'.toJS).toDart, - partitionKey: obj.getProperty('partition_key'.toJS).toDart, + partitionKey: pk.isEmpty ? null : pk, data: (obj.getProperty('data'.toJS)).toDart, dataSize: obj.getProperty('data_size'.toJS).toDartInt, retryCount: obj.getProperty('retry_count'.toJS).toDartInt, @@ -173,7 +193,10 @@ final class IndexedDbRecordStorage extends RecordStorage { } /// Opens an IndexedDB database, creating the object store if needed. - static Future _openDatabase(String dbName) async { + static Future _openDatabase( + String dbName, + String storeName, + ) async { final db = indexedDB; if (db == null) { throw StateError('IndexedDB is not available'); @@ -182,10 +205,10 @@ final class IndexedDbRecordStorage extends RecordStorage { void onUpgradeNeeded(IDBVersionChangeEvent event) { final database = event.target?.getProperty('result'.toJS); final names = database?.objectStoreNames; - if (!(names?.contains(_storeName) ?? false)) { + if (!(names?.contains(storeName) ?? false)) { database! .createObjectStore( - _storeName, + storeName, IDBObjectStoreParameters(keyPath: 'id'.toJS, autoIncrement: true), ) .createIndex( @@ -205,9 +228,12 @@ final class IndexedDbRecordStorage extends RecordStorage { } /// Computes cache size from DB using a cursor. - static Future _computeCacheSize(IDBDatabase database) async { - final tx = database.transaction(_storeName.toJS, 'readonly'); - final store = tx.objectStore(_storeName); + static Future _computeCacheSize( + IDBDatabase database, + String storeName, + ) async { + final tx = database.transaction(storeName.toJS, 'readonly'); + final store = tx.objectStore(storeName); final request = store.openCursor(); final completer = Completer(); var total = 0; @@ -241,7 +267,7 @@ final class IndexedDbRecordStorage extends RecordStorage { static Future checkIsSupported() async { if (indexedDB == null) return false; try { - final request = indexedDB!.open('kinesis_idb_test', 1); + final request = indexedDB!.open('record_cache_idb_test', 1); await request.future; return true; } on Object { diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_memory.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_memory.dart similarity index 84% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_memory.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_memory.dart index 7131c6ae9a0..24708ffa5b2 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_memory.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_memory.dart @@ -3,21 +3,20 @@ import 'dart:collection'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; +import 'package:amplify_record_cache_dart/src/model/record_input.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage.dart'; -/// {@template amplify_kinesis.in_memory_record_storage} +/// {@template amplify_record_cache.in_memory_record_storage} /// In-memory [RecordStorage] fallback for web when IndexedDB is unavailable. /// Records are not persisted. /// {@endtemplate} final class InMemoryRecordStorage extends RecordStorage { - /// {@macro amplify_kinesis.in_memory_record_storage} + /// {@macro amplify_record_cache.in_memory_record_storage} InMemoryRecordStorage({ required super.maxCacheBytes, - super.maxRecordsPerStream, - super.maxBytesPerStream, - super.maxRecordSizeBytes, - super.maxPartitionKeyLength, + required super.maxRecordsPerBatch, + required super.maxBytesPerBatch, + required super.maxRecordSizeBytes, }); int _nextId = 1; @@ -54,8 +53,8 @@ final class InMemoryRecordStorage extends RecordStorage { final stream = record.streamName; final count = streamCounts[stream] ?? 0; final size = streamSizes[stream] ?? 0; - if (count >= maxRecordsPerStream) continue; - if (size + record.dataSize > maxBytesPerStream) continue; + if (count >= maxRecordsPerBatch) continue; + if (size + record.dataSize > maxBytesPerBatch) continue; result.putIfAbsent(stream, () => []).add(record); streamCounts[stream] = count + 1; diff --git a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_sqlite.dart b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_sqlite.dart similarity index 75% rename from packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_sqlite.dart rename to packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_sqlite.dart index d3b5a88c9e3..a5f1c6677d5 100644 --- a/packages/kinesis/amplify_kinesis_dart/lib/src/impl/storage/record_storage_sqlite.dart +++ b/packages/kinesis/amplify_record_cache_dart/lib/src/storage/record_storage_sqlite.dart @@ -3,55 +3,60 @@ import 'dart:async'; -import 'package:amplify_kinesis_dart/src/db/kinesis_record_database.dart'; -import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart'; -import 'package:amplify_kinesis_dart/src/impl/storage/record_storage.dart'; +import 'package:amplify_record_cache_dart/src/db/record_cache_database.dart'; +import 'package:amplify_record_cache_dart/src/model/record_input.dart'; +import 'package:amplify_record_cache_dart/src/storage/record_storage.dart'; import 'package:drift/drift.dart'; import 'package:meta/meta.dart'; -/// {@template amplify_kinesis.sqlite_record_storage} +/// {@template amplify_record_cache.sqlite_record_storage} /// SQLite-backed [RecordStorage] implementation using Drift. /// /// Used on VM (iOS, macOS, Linux, Windows, Android) platforms. /// {@endtemplate} final class SqliteRecordStorage extends RecordStorage { - /// {@macro amplify_kinesis.sqlite_record_storage} + /// {@macro amplify_record_cache.sqlite_record_storage} /// /// Prefer [create] for production use — it eagerly queries the cache /// size from the database. This constructor is available for tests /// where the database starts empty. SqliteRecordStorage({ - required KinesisRecordDatabase database, + required RecordCacheDatabase database, required super.maxCacheBytes, - super.maxRecordsPerStream, - super.maxBytesPerStream, - super.maxRecordSizeBytes, - super.maxPartitionKeyLength, + required super.maxRecordsPerBatch, + required super.maxBytesPerBatch, + required super.maxRecordSizeBytes, super.initialCachedSize, }) : _db = database; - /// {@macro amplify_kinesis.sqlite_record_storage} + /// {@macro amplify_record_cache.sqlite_record_storage} /// - /// Opens the database. + /// Opens the database and eagerly queries the cache size. static Future create({ - required KinesisRecordDatabase database, + required RecordCacheDatabase database, required int maxCacheBytes, + required int maxRecordsPerBatch, + required int maxBytesPerBatch, + required int maxRecordSizeBytes, }) async { final initialSize = await _queryCacheSize(database); return SqliteRecordStorage( database: database, maxCacheBytes: maxCacheBytes, + maxRecordsPerBatch: maxRecordsPerBatch, + maxBytesPerBatch: maxBytesPerBatch, + maxRecordSizeBytes: maxRecordSizeBytes, initialCachedSize: initialSize, ); } - final KinesisRecordDatabase _db; + final RecordCacheDatabase _db; /// Provides access to the underlying database (for testing). - KinesisRecordDatabase get database => _db; + RecordCacheDatabase get database => _db; /// Queries the current cache size from the database. - static Future _queryCacheSize(KinesisRecordDatabase db) async { + static Future _queryCacheSize(RecordCacheDatabase db) async { final query = db.selectOnly(db.kinesisRecords) ..addColumns([db.kinesisRecords.dataSize.sum()]); final result = await query.getSingleOrNull(); @@ -70,7 +75,7 @@ final class SqliteRecordStorage extends RecordStorage { .insert( KinesisRecordsCompanion.insert( streamName: record.streamName, - partitionKey: record.partitionKey, + partitionKey: Value(record.partitionKey ?? ''), data: record.data, dataSize: record.dataSize, createdAt: record.createdAt.millisecondsSinceEpoch, @@ -83,19 +88,21 @@ final class SqliteRecordStorage extends RecordStorage { final results = await _db .customSelect( ''' - SELECT id, stream_name, partition_key, data, data_size, retry_count, created_at + SELECT id, stream_name, partition_key, data, data_size, retry_count, + created_at FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY stream_name ORDER BY id) as rn, - SUM(data_size) OVER (PARTITION BY stream_name ORDER BY id) as running_size + SUM(data_size) OVER (PARTITION BY stream_name ORDER BY id) + as running_size FROM kinesis_records ) WHERE rn <= ?1 AND running_size <= ?2 ORDER BY stream_name, id ''', variables: [ - Variable.withInt(maxRecordsPerStream), - Variable.withInt(maxBytesPerStream), + Variable.withInt(maxRecordsPerBatch), + Variable.withInt(maxBytesPerBatch), ], readsFrom: {_db.kinesisRecords}, ) @@ -148,10 +155,11 @@ final class SqliteRecordStorage extends RecordStorage { } Record _rowToRecord(QueryRow row) { + final pk = row.read('partition_key'); return Record( id: row.read('id'), streamName: row.read('stream_name'), - partitionKey: row.read('partition_key'), + partitionKey: pk.isEmpty ? null : pk, data: row.read('data'), dataSize: row.read('data_size'), retryCount: row.read('retry_count'), diff --git a/packages/kinesis/amplify_record_cache_dart/pubspec.yaml b/packages/kinesis/amplify_record_cache_dart/pubspec.yaml new file mode 100644 index 00000000000..60d61f47791 --- /dev/null +++ b/packages/kinesis/amplify_record_cache_dart/pubspec.yaml @@ -0,0 +1,24 @@ +name: amplify_record_cache_dart +description: Shared record caching infrastructure for Amplify streaming clients (Kinesis Data Streams, Firehose). Provides offline storage, batching, retry logic, and auto-flush scheduling. +version: 0.1.0 +publish_to: none + +environment: + sdk: ^3.9.0 + +dependencies: + amplify_db_common_dart: ">=0.4.17 <0.5.0" + amplify_foundation_dart: ">=2.11.0 <2.12.0" + aws_common: ">=0.7.12 <0.8.0" + drift: ^2.25.0 + meta: ^1.16.0 + smithy: ">=0.7.10 <0.8.0" + web: ^1.1.1 + +dev_dependencies: + amplify_lints: ">=3.1.4 <3.2.0" + build_runner: ^2.4.15 + drift_dev: ^2.25.1 + fake_async: ^1.3.0 + mocktail: ^1.0.0 + test: ^1.22.1