-
Notifications
You must be signed in to change notification settings - Fork 281
feat(firehose): add exceptions and record storage types #6828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ekjotmultani
wants to merge
1
commit into
feat/amplify-firehose-client
Choose a base branch
from
feat/firehose-exceptions-storage
base: feat/amplify-firehose-client
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
117 changes: 117 additions & 0 deletions
117
packages/kinesis/amplify_firehose_dart/lib/src/exception/amplify_firehose_exception.dart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import 'package:amplify_core/amplify_core.dart'; | ||
| import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart'; | ||
|
|
||
| /// Default recovery suggestion for errors. | ||
| const String defaultRecoverySuggestion = | ||
| 'Inspect the underlying error for more details.'; | ||
|
|
||
| /// {@template amplify_firehose.amplify_firehose_exception} | ||
| /// Base exception for Amplify Amazon Data Firehose errors. | ||
| /// {@endtemplate} | ||
| sealed class AmplifyFirehoseException extends AmplifyException { | ||
| /// {@macro amplify_firehose.amplify_firehose_exception} | ||
| const AmplifyFirehoseException( | ||
| super.message, { | ||
| super.recoverySuggestion, | ||
| super.underlyingException, | ||
| }); | ||
|
|
||
| /// Maps an arbitrary error into the appropriate [AmplifyFirehoseException] | ||
| /// subtype. If [error] is already an [AmplifyFirehoseException], it is | ||
| /// returned as-is. [RecordCacheException] subtypes are mapped to their | ||
| /// corresponding public exception types. | ||
| static AmplifyFirehoseException from(Object error) => switch (error) { | ||
| final AmplifyFirehoseException e => e, | ||
| final RecordCacheValidationException e => FirehoseValidationException( | ||
| e.message, | ||
| recoverySuggestion: e.recoverySuggestion, | ||
| ), | ||
| final RecordCacheLimitExceededException e => FirehoseLimitExceededException( | ||
| message: e.message, | ||
| recoverySuggestion: e.recoverySuggestion, | ||
| ), | ||
| final RecordCacheDatabaseException e => FirehoseStorageException( | ||
| e.message, | ||
| recoverySuggestion: e.recoverySuggestion, | ||
| underlyingException: e.cause, | ||
| ), | ||
| final Exception e => FirehoseUnknownException( | ||
| e.toString(), | ||
| underlyingException: e, | ||
| ), | ||
| _ => FirehoseUnknownException(error.toString()), | ||
| }; | ||
| } | ||
|
|
||
| /// {@template amplify_firehose.firehose_storage_exception} | ||
| /// Thrown when a local cache/database error occurs. | ||
| /// {@endtemplate} | ||
| final class FirehoseStorageException extends AmplifyFirehoseException { | ||
| /// {@macro amplify_firehose.firehose_storage_exception} | ||
| const FirehoseStorageException( | ||
| super.message, { | ||
| super.recoverySuggestion, | ||
| super.underlyingException, | ||
| }); | ||
|
|
||
| @override | ||
| String get runtimeTypeName => 'FirehoseStorageException'; | ||
| } | ||
|
|
||
| /// {@template amplify_firehose.firehose_limit_exceeded_exception} | ||
| /// Thrown when the local cache is full. | ||
| /// {@endtemplate} | ||
| final class FirehoseLimitExceededException extends AmplifyFirehoseException { | ||
| /// {@macro amplify_firehose.firehose_limit_exceeded_exception} | ||
| const FirehoseLimitExceededException({ | ||
| String? message, | ||
| String? recoverySuggestion, | ||
| }) : super( | ||
| message ?? 'Cache is full', | ||
| recoverySuggestion: | ||
| recoverySuggestion ?? 'Call flush() or clearCache().', | ||
| ); | ||
|
|
||
| @override | ||
| String get runtimeTypeName => 'FirehoseLimitExceededException'; | ||
| } | ||
|
|
||
| /// {@template amplify_firehose.firehose_validation_exception} | ||
| /// Thrown when record input validation fails (e.g. oversized record). | ||
| /// {@endtemplate} | ||
| final class FirehoseValidationException extends AmplifyFirehoseException { | ||
| /// {@macro amplify_firehose.firehose_validation_exception} | ||
| const FirehoseValidationException(super.message, {super.recoverySuggestion}); | ||
|
|
||
| @override | ||
| String get runtimeTypeName => 'FirehoseValidationException'; | ||
| } | ||
|
|
||
| /// {@template amplify_firehose.firehose_unknown_exception} | ||
| /// Catch-all for unexpected errors. | ||
| /// {@endtemplate} | ||
| final class FirehoseUnknownException extends AmplifyFirehoseException { | ||
| /// {@macro amplify_firehose.firehose_unknown_exception} | ||
| const FirehoseUnknownException(super.message, {super.underlyingException}); | ||
|
|
||
| @override | ||
| String get runtimeTypeName => 'FirehoseUnknownException'; | ||
| } | ||
|
|
||
| /// {@template amplify_firehose.client_closed_exception} | ||
| /// Thrown when an operation is attempted on a closed client. | ||
| /// {@endtemplate} | ||
| final class ClientClosedException extends AmplifyFirehoseException { | ||
| /// {@macro amplify_firehose.client_closed_exception} | ||
| const ClientClosedException() | ||
| : super( | ||
| 'Client has been closed', | ||
| recoverySuggestion: 'Create a new AmplifyFirehoseClient instance.', | ||
| ); | ||
|
|
||
| @override | ||
| String get runtimeTypeName => 'ClientClosedException'; | ||
| } |
57 changes: 57 additions & 0 deletions
57
packages/kinesis/amplify_firehose_dart/lib/src/exception/record_cache_exception.dart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| /// Internal error type used by RecordClient / RecordStorage. | ||
| /// | ||
| /// Mapped to the public AmplifyFirehoseException hierarchy at the | ||
| /// AmplifyFirehoseClient boundary via `AmplifyFirehoseException.from`. | ||
| sealed class RecordCacheException implements Exception { | ||
| /// Creates a [RecordCacheException]. | ||
| const RecordCacheException(this.message, this.recoverySuggestion, [this.cause]); | ||
|
|
||
| /// A message describing the error. | ||
| final String message; | ||
|
|
||
| /// A suggestion for how to recover from the error. | ||
| final String recoverySuggestion; | ||
|
|
||
| /// The underlying cause of the error, if any. | ||
| final Object? cause; | ||
|
|
||
| @override | ||
| String toString() { | ||
| final buf = StringBuffer('RecordCacheException: $message'); | ||
| if (cause != null) buf.write('\nCaused by: $cause'); | ||
| return buf.toString(); | ||
| } | ||
| } | ||
|
|
||
| /// Database operation failed. | ||
| final class RecordCacheDatabaseException extends RecordCacheException { | ||
| /// Creates a [RecordCacheDatabaseException]. | ||
| const RecordCacheDatabaseException( | ||
| super.message, | ||
| super.recoverySuggestion, [ | ||
| super.cause, | ||
| ]); | ||
| } | ||
|
|
||
| /// Cache limit exceeded — no space for new records. | ||
| final class RecordCacheLimitExceededException extends RecordCacheException { | ||
| /// Creates a [RecordCacheLimitExceededException]. | ||
| const RecordCacheLimitExceededException( | ||
| super.message, | ||
| super.recoverySuggestion, [ | ||
| super.cause, | ||
| ]); | ||
| } | ||
|
|
||
| /// Record input validation failed (e.g. oversized record). | ||
| final class RecordCacheValidationException extends RecordCacheException { | ||
| /// Creates a [RecordCacheValidationException]. | ||
| const RecordCacheValidationException( | ||
| super.message, | ||
| super.recoverySuggestion, [ | ||
| super.cause, | ||
| ]); | ||
| } |
45 changes: 45 additions & 0 deletions
45
packages/kinesis/amplify_firehose_dart/lib/src/impl/firehose_record.dart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import 'dart:typed_data'; | ||
|
|
||
| /// Internal representation of a record to be sent to Firehose. | ||
| /// | ||
| /// Unlike Kinesis Data Streams, Firehose records do not have a | ||
| /// partition key. The record size is simply the data blob length. | ||
| final class RecordInput { | ||
| /// Creates a new Firehose record. | ||
| RecordInput({ | ||
| required this.data, | ||
| required this.streamName, | ||
| required this.createdAt, | ||
| }) : dataSize = data.length; | ||
|
|
||
| /// Creates a Firehose record with the current timestamp. | ||
| factory RecordInput.now({ | ||
| required Uint8List data, | ||
| required String streamName, | ||
| }) { | ||
| return RecordInput( | ||
| data: data, | ||
| streamName: streamName, | ||
| createdAt: DateTime.now(), | ||
| ); | ||
| } | ||
|
|
||
| /// The data blob to send to Firehose. | ||
| final Uint8List data; | ||
|
|
||
| /// The name of the Firehose delivery stream. | ||
| final String streamName; | ||
|
|
||
| /// The size of the record in bytes (data blob only). | ||
| /// | ||
| /// Per AWS docs, the per-record size limit for Firehose is 1,000 KiB. | ||
| /// Unlike Kinesis Data Streams, there is no partition key to include | ||
| /// in the size calculation. | ||
| final int dataSize; | ||
|
|
||
| /// Timestamp of when the record was created. | ||
| final DateTime createdAt; | ||
| } |
156 changes: 156 additions & 0 deletions
156
packages/kinesis/amplify_firehose_dart/lib/src/impl/storage/record_storage.dart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import 'package:amplify_firehose_dart/src/exception/amplify_firehose_exception.dart' | ||
| show defaultRecoverySuggestion; | ||
| import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart'; | ||
| import 'package:amplify_firehose_dart/src/firehose_limits.dart' as limits; | ||
| import 'package:amplify_firehose_dart/src/impl/firehose_record.dart'; | ||
| import 'package:amplify_firehose_dart/src/model/record.dart'; | ||
| import 'package:meta/meta.dart'; | ||
|
|
||
| export 'package:amplify_firehose_dart/src/model/record.dart'; | ||
|
|
||
| /// {@template amplify_firehose.record_storage} | ||
| /// Abstract base class for record persistence. | ||
| /// | ||
| /// Implementations provide platform-specific storage (SQLite on VM, | ||
| /// IndexedDB on web, in-memory fallback). Validation of record size | ||
| /// and cache limits is handled here in [addRecord]; subclasses | ||
| /// implement [writeRecord] for the actual write. | ||
| /// | ||
| /// All public methods wrap unexpected errors as | ||
| /// [RecordCacheDatabaseException]. Subclasses throw | ||
| /// [RecordCacheException] subtypes for known errors; anything else is | ||
| /// caught and wrapped automatically. | ||
| /// {@endtemplate} | ||
| abstract class RecordStorage { | ||
| /// {@macro amplify_firehose.record_storage} | ||
| RecordStorage({ | ||
| required int maxCacheBytes, | ||
| int maxRecordsPerBatch = limits.maxRecordsPerBatch, | ||
| int maxBytesPerBatch = limits.maxBatchSizeBytes, | ||
| int maxRecordSizeBytes = limits.maxRecordSizeBytes, | ||
| int initialCachedSize = 0, | ||
| }) : _maxCacheBytes = maxCacheBytes, | ||
| _maxRecordsPerBatch = maxRecordsPerBatch, | ||
| _maxBytesPerBatch = maxBytesPerBatch, | ||
| _maxRecordSizeBytes = maxRecordSizeBytes, | ||
| cachedSize = initialCachedSize; | ||
|
|
||
| final int _maxCacheBytes; | ||
| final int _maxRecordsPerBatch; | ||
| final int _maxBytesPerBatch; | ||
| final int _maxRecordSizeBytes; | ||
|
|
||
| /// The current total cached size in bytes. | ||
| @protected | ||
| int cachedSize; | ||
|
|
||
| /// The maximum cache size in bytes. | ||
| int get maxCacheBytes => _maxCacheBytes; | ||
|
|
||
| /// Maximum number of records per batch. | ||
| int get maxRecordsPerBatch => _maxRecordsPerBatch; | ||
|
|
||
| /// Maximum total bytes per batch. | ||
| int get maxBytesPerBatch => _maxBytesPerBatch; | ||
|
|
||
| /// Validates and saves a record to storage. | ||
| /// Throws [RecordCacheValidationException] on invalid input. | ||
| /// Throws [RecordCacheLimitExceededException] if the cache is full. | ||
| /// Throws [RecordCacheDatabaseException] on storage errors. | ||
| Future<void> addRecord(RecordInput record) => | ||
| _wrap('Failed to add record to cache', () async { | ||
| if (record.dataSize > _maxRecordSizeBytes) { | ||
| throw RecordCacheValidationException( | ||
| 'Record size (${record.dataSize} bytes) exceeds the maximum ' | ||
| 'of $_maxRecordSizeBytes bytes.', | ||
| 'Reduce the record payload size.', | ||
| ); | ||
| } | ||
| if (cachedSize + record.dataSize > _maxCacheBytes) { | ||
| throw RecordCacheLimitExceededException( | ||
| 'Cache size limit exceeded: ' | ||
| '${cachedSize + record.dataSize} bytes > $_maxCacheBytes bytes', | ||
| 'Call flush() to send cached records or increase cache size limit.', | ||
| ); | ||
| } | ||
| await writeRecord(record); | ||
| cachedSize += record.dataSize; | ||
| }); | ||
|
|
||
| /// Retrieves records grouped by stream. | ||
| Future<Map<String, List<Record>>> getRecordsByStream() => | ||
| _wrap('Could not retrieve records from storage', doGetRecordsByStream); | ||
|
|
||
| /// Deletes records by their IDs and refreshes [cachedSize]. | ||
| Future<void> deleteRecords(Iterable<int> ids) => | ||
| _wrap('Failed to delete records from cache', () async { | ||
| await doDeleteRecords(ids); | ||
| cachedSize = await doQueryCacheSize(); | ||
| }); | ||
|
|
||
| /// Increments the retry count for the specified records. | ||
| Future<void> incrementRetryCount(Iterable<int> ids) => _wrap( | ||
| 'Failed to increment retry count', | ||
| () => doIncrementRetryCount(ids), | ||
| ); | ||
|
|
||
| /// Returns the total number of cached records. | ||
| Future<int> getRecordCount() => | ||
| _wrap('Failed to get record count', doGetRecordCount); | ||
|
|
||
| /// Deletes all records and resets [cachedSize] to 0. | ||
| Future<void> clearRecords() => _wrap('Failed to clear cache', () async { | ||
| await doClearRecords(); | ||
| cachedSize = 0; | ||
| }); | ||
|
|
||
| /// Closes the storage and releases resources. | ||
| Future<void> close() => _wrap('Failed to close storage', doClose); | ||
|
|
||
| /// Writes a validated record to the underlying storage. | ||
| @protected | ||
| Future<void> writeRecord(RecordInput record); | ||
|
|
||
| /// Retrieves records grouped by stream name. | ||
| @protected | ||
| Future<Map<String, List<Record>>> doGetRecordsByStream(); | ||
|
|
||
| /// Deletes records by their IDs. | ||
| @protected | ||
| Future<void> doDeleteRecords(Iterable<int> ids); | ||
|
|
||
| /// Increments the retry count for the specified records. | ||
| @protected | ||
| Future<void> doIncrementRetryCount(Iterable<int> ids); | ||
|
|
||
| /// Returns the total number of cached records. | ||
| @protected | ||
| Future<int> doGetRecordCount(); | ||
|
|
||
| /// Deletes all records (without updating [cachedSize] — the base class | ||
| /// resets it to 0). | ||
| @protected | ||
| Future<void> doClearRecords(); | ||
|
|
||
| /// Returns the current total cache size in bytes from the underlying | ||
| /// storage. Called by the base class after deletions. | ||
| @protected | ||
| Future<int> doQueryCacheSize(); | ||
|
|
||
| /// Closes the storage and releases resources. | ||
| @protected | ||
| Future<void> doClose(); | ||
|
|
||
| Future<T> _wrap<T>(String message, Future<T> Function() operation) async { | ||
| try { | ||
| return await operation(); | ||
| } on RecordCacheException { | ||
| rethrow; | ||
| } on Object catch (e) { | ||
| throw RecordCacheDatabaseException(message, defaultRecoverySuggestion, e); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid re-implementing those for the firehose client?