Skip to content

Commit 24b809a

Browse files
committed
feat(firehose): add exception hierarchy, storage abstraction, InMemoryRecordStorage, and tests"
1 parent deab741 commit 24b809a

10 files changed

Lines changed: 847 additions & 2 deletions

packages/kinesis/amplify_firehose_dart/lib/amplify_firehose_dart.dart

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@
44
/// Amplify Amazon Data Firehose client for Dart.
55
library;
66

7-
// SDK client (for escape hatch)
8-
// Exports will be added as implementation PRs land.
7+
// Exceptions
8+
export 'src/exception/amplify_firehose_exception.dart'
9+
hide defaultRecoverySuggestion;
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import 'package:amplify_core/amplify_core.dart';
5+
import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart';
6+
7+
/// Default recovery suggestion for errors.
8+
const String defaultRecoverySuggestion =
9+
'Inspect the underlying error for more details.';
10+
11+
/// {@template amplify_firehose.amplify_firehose_exception}
12+
/// Base exception for Amplify Amazon Data Firehose errors.
13+
/// {@endtemplate}
14+
sealed class AmplifyFirehoseException extends AmplifyException {
15+
/// {@macro amplify_firehose.amplify_firehose_exception}
16+
const AmplifyFirehoseException(
17+
super.message, {
18+
super.recoverySuggestion,
19+
super.underlyingException,
20+
});
21+
22+
/// Maps an arbitrary error into the appropriate [AmplifyFirehoseException]
23+
/// subtype. If [error] is already an [AmplifyFirehoseException], it is
24+
/// returned as-is. [RecordCacheException] subtypes are mapped to their
25+
/// corresponding public exception types.
26+
static AmplifyFirehoseException from(Object error) => switch (error) {
27+
final AmplifyFirehoseException e => e,
28+
final RecordCacheValidationException e => FirehoseValidationException(
29+
e.message,
30+
recoverySuggestion: e.recoverySuggestion,
31+
),
32+
final RecordCacheLimitExceededException e => FirehoseLimitExceededException(
33+
message: e.message,
34+
recoverySuggestion: e.recoverySuggestion,
35+
),
36+
final RecordCacheDatabaseException e => FirehoseStorageException(
37+
e.message,
38+
recoverySuggestion: e.recoverySuggestion,
39+
underlyingException: e.cause,
40+
),
41+
final Exception e => FirehoseUnknownException(
42+
e.toString(),
43+
underlyingException: e,
44+
),
45+
_ => FirehoseUnknownException(error.toString()),
46+
};
47+
}
48+
49+
/// {@template amplify_firehose.firehose_storage_exception}
50+
/// Thrown when a local cache/database error occurs.
51+
/// {@endtemplate}
52+
final class FirehoseStorageException extends AmplifyFirehoseException {
53+
/// {@macro amplify_firehose.firehose_storage_exception}
54+
const FirehoseStorageException(
55+
super.message, {
56+
super.recoverySuggestion,
57+
super.underlyingException,
58+
});
59+
60+
@override
61+
String get runtimeTypeName => 'FirehoseStorageException';
62+
}
63+
64+
/// {@template amplify_firehose.firehose_limit_exceeded_exception}
65+
/// Thrown when the local cache is full.
66+
/// {@endtemplate}
67+
final class FirehoseLimitExceededException extends AmplifyFirehoseException {
68+
/// {@macro amplify_firehose.firehose_limit_exceeded_exception}
69+
const FirehoseLimitExceededException({
70+
String? message,
71+
String? recoverySuggestion,
72+
}) : super(
73+
message ?? 'Cache is full',
74+
recoverySuggestion:
75+
recoverySuggestion ?? 'Call flush() or clearCache().',
76+
);
77+
78+
@override
79+
String get runtimeTypeName => 'FirehoseLimitExceededException';
80+
}
81+
82+
/// {@template amplify_firehose.firehose_validation_exception}
83+
/// Thrown when record input validation fails (e.g. oversized record).
84+
/// {@endtemplate}
85+
final class FirehoseValidationException extends AmplifyFirehoseException {
86+
/// {@macro amplify_firehose.firehose_validation_exception}
87+
const FirehoseValidationException(super.message, {super.recoverySuggestion});
88+
89+
@override
90+
String get runtimeTypeName => 'FirehoseValidationException';
91+
}
92+
93+
/// {@template amplify_firehose.firehose_unknown_exception}
94+
/// Catch-all for unexpected errors.
95+
/// {@endtemplate}
96+
final class FirehoseUnknownException extends AmplifyFirehoseException {
97+
/// {@macro amplify_firehose.firehose_unknown_exception}
98+
const FirehoseUnknownException(super.message, {super.underlyingException});
99+
100+
@override
101+
String get runtimeTypeName => 'FirehoseUnknownException';
102+
}
103+
104+
/// {@template amplify_firehose.client_closed_exception}
105+
/// Thrown when an operation is attempted on a closed client.
106+
/// {@endtemplate}
107+
final class ClientClosedException extends AmplifyFirehoseException {
108+
/// {@macro amplify_firehose.client_closed_exception}
109+
const ClientClosedException()
110+
: super(
111+
'Client has been closed',
112+
recoverySuggestion: 'Create a new AmplifyFirehoseClient instance.',
113+
);
114+
115+
@override
116+
String get runtimeTypeName => 'ClientClosedException';
117+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
/// Internal error type used by RecordClient / RecordStorage.
5+
///
6+
/// Mapped to the public AmplifyFirehoseException hierarchy at the
7+
/// AmplifyFirehoseClient boundary via `AmplifyFirehoseException.from`.
8+
sealed class RecordCacheException implements Exception {
9+
/// Creates a [RecordCacheException].
10+
const RecordCacheException(this.message, this.recoverySuggestion, [this.cause]);
11+
12+
/// A message describing the error.
13+
final String message;
14+
15+
/// A suggestion for how to recover from the error.
16+
final String recoverySuggestion;
17+
18+
/// The underlying cause of the error, if any.
19+
final Object? cause;
20+
21+
@override
22+
String toString() {
23+
final buf = StringBuffer('RecordCacheException: $message');
24+
if (cause != null) buf.write('\nCaused by: $cause');
25+
return buf.toString();
26+
}
27+
}
28+
29+
/// Database operation failed.
30+
final class RecordCacheDatabaseException extends RecordCacheException {
31+
/// Creates a [RecordCacheDatabaseException].
32+
const RecordCacheDatabaseException(
33+
super.message,
34+
super.recoverySuggestion, [
35+
super.cause,
36+
]);
37+
}
38+
39+
/// Cache limit exceeded — no space for new records.
40+
final class RecordCacheLimitExceededException extends RecordCacheException {
41+
/// Creates a [RecordCacheLimitExceededException].
42+
const RecordCacheLimitExceededException(
43+
super.message,
44+
super.recoverySuggestion, [
45+
super.cause,
46+
]);
47+
}
48+
49+
/// Record input validation failed (e.g. oversized record).
50+
final class RecordCacheValidationException extends RecordCacheException {
51+
/// Creates a [RecordCacheValidationException].
52+
const RecordCacheValidationException(
53+
super.message,
54+
super.recoverySuggestion, [
55+
super.cause,
56+
]);
57+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import 'dart:typed_data';
5+
6+
/// Internal representation of a record to be sent to Firehose.
7+
///
8+
/// Unlike Kinesis Data Streams, Firehose records do not have a
9+
/// partition key. The record size is simply the data blob length.
10+
final class RecordInput {
11+
/// Creates a new Firehose record.
12+
RecordInput({
13+
required this.data,
14+
required this.streamName,
15+
required this.createdAt,
16+
}) : dataSize = data.length;
17+
18+
/// Creates a Firehose record with the current timestamp.
19+
factory RecordInput.now({
20+
required Uint8List data,
21+
required String streamName,
22+
}) {
23+
return RecordInput(
24+
data: data,
25+
streamName: streamName,
26+
createdAt: DateTime.now(),
27+
);
28+
}
29+
30+
/// The data blob to send to Firehose.
31+
final Uint8List data;
32+
33+
/// The name of the Firehose delivery stream.
34+
final String streamName;
35+
36+
/// The size of the record in bytes (data blob only).
37+
///
38+
/// Per AWS docs, the per-record size limit for Firehose is 1,000 KiB.
39+
/// Unlike Kinesis Data Streams, there is no partition key to include
40+
/// in the size calculation.
41+
final int dataSize;
42+
43+
/// Timestamp of when the record was created.
44+
final DateTime createdAt;
45+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import 'package:amplify_firehose_dart/src/exception/amplify_firehose_exception.dart'
5+
show defaultRecoverySuggestion;
6+
import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart';
7+
import 'package:amplify_firehose_dart/src/firehose_limits.dart' as limits;
8+
import 'package:amplify_firehose_dart/src/impl/firehose_record.dart';
9+
import 'package:amplify_firehose_dart/src/model/record.dart';
10+
import 'package:meta/meta.dart';
11+
12+
export 'package:amplify_firehose_dart/src/model/record.dart';
13+
14+
/// {@template amplify_firehose.record_storage}
15+
/// Abstract base class for record persistence.
16+
///
17+
/// Implementations provide platform-specific storage (SQLite on VM,
18+
/// IndexedDB on web, in-memory fallback). Validation of record size
19+
/// and cache limits is handled here in [addRecord]; subclasses
20+
/// implement [writeRecord] for the actual write.
21+
///
22+
/// All public methods wrap unexpected errors as
23+
/// [RecordCacheDatabaseException]. Subclasses throw
24+
/// [RecordCacheException] subtypes for known errors; anything else is
25+
/// caught and wrapped automatically.
26+
/// {@endtemplate}
27+
abstract class RecordStorage {
28+
/// {@macro amplify_firehose.record_storage}
29+
RecordStorage({
30+
required int maxCacheBytes,
31+
int maxRecordsPerBatch = limits.maxRecordsPerBatch,
32+
int maxBytesPerBatch = limits.maxBatchSizeBytes,
33+
int maxRecordSizeBytes = limits.maxRecordSizeBytes,
34+
int initialCachedSize = 0,
35+
}) : _maxCacheBytes = maxCacheBytes,
36+
_maxRecordsPerBatch = maxRecordsPerBatch,
37+
_maxBytesPerBatch = maxBytesPerBatch,
38+
_maxRecordSizeBytes = maxRecordSizeBytes,
39+
cachedSize = initialCachedSize;
40+
41+
final int _maxCacheBytes;
42+
final int _maxRecordsPerBatch;
43+
final int _maxBytesPerBatch;
44+
final int _maxRecordSizeBytes;
45+
46+
/// The current total cached size in bytes.
47+
@protected
48+
int cachedSize;
49+
50+
/// The maximum cache size in bytes.
51+
int get maxCacheBytes => _maxCacheBytes;
52+
53+
/// Maximum number of records per batch.
54+
int get maxRecordsPerBatch => _maxRecordsPerBatch;
55+
56+
/// Maximum total bytes per batch.
57+
int get maxBytesPerBatch => _maxBytesPerBatch;
58+
59+
/// Validates and saves a record to storage.
60+
/// Throws [RecordCacheValidationException] on invalid input.
61+
/// Throws [RecordCacheLimitExceededException] if the cache is full.
62+
/// Throws [RecordCacheDatabaseException] on storage errors.
63+
Future<void> addRecord(RecordInput record) =>
64+
_wrap('Failed to add record to cache', () async {
65+
if (record.dataSize > _maxRecordSizeBytes) {
66+
throw RecordCacheValidationException(
67+
'Record size (${record.dataSize} bytes) exceeds the maximum '
68+
'of $_maxRecordSizeBytes bytes.',
69+
'Reduce the record payload size.',
70+
);
71+
}
72+
if (cachedSize + record.dataSize > _maxCacheBytes) {
73+
throw RecordCacheLimitExceededException(
74+
'Cache size limit exceeded: '
75+
'${cachedSize + record.dataSize} bytes > $_maxCacheBytes bytes',
76+
'Call flush() to send cached records or increase cache size limit.',
77+
);
78+
}
79+
await writeRecord(record);
80+
cachedSize += record.dataSize;
81+
});
82+
83+
/// Retrieves records grouped by stream.
84+
Future<Map<String, List<Record>>> getRecordsByStream() =>
85+
_wrap('Could not retrieve records from storage', doGetRecordsByStream);
86+
87+
/// Deletes records by their IDs and refreshes [cachedSize].
88+
Future<void> deleteRecords(Iterable<int> ids) =>
89+
_wrap('Failed to delete records from cache', () async {
90+
await doDeleteRecords(ids);
91+
cachedSize = await doQueryCacheSize();
92+
});
93+
94+
/// Increments the retry count for the specified records.
95+
Future<void> incrementRetryCount(Iterable<int> ids) => _wrap(
96+
'Failed to increment retry count',
97+
() => doIncrementRetryCount(ids),
98+
);
99+
100+
/// Returns the total number of cached records.
101+
Future<int> getRecordCount() =>
102+
_wrap('Failed to get record count', doGetRecordCount);
103+
104+
/// Deletes all records and resets [cachedSize] to 0.
105+
Future<void> clearRecords() => _wrap('Failed to clear cache', () async {
106+
await doClearRecords();
107+
cachedSize = 0;
108+
});
109+
110+
/// Closes the storage and releases resources.
111+
Future<void> close() => _wrap('Failed to close storage', doClose);
112+
113+
/// Writes a validated record to the underlying storage.
114+
@protected
115+
Future<void> writeRecord(RecordInput record);
116+
117+
/// Retrieves records grouped by stream name.
118+
@protected
119+
Future<Map<String, List<Record>>> doGetRecordsByStream();
120+
121+
/// Deletes records by their IDs.
122+
@protected
123+
Future<void> doDeleteRecords(Iterable<int> ids);
124+
125+
/// Increments the retry count for the specified records.
126+
@protected
127+
Future<void> doIncrementRetryCount(Iterable<int> ids);
128+
129+
/// Returns the total number of cached records.
130+
@protected
131+
Future<int> doGetRecordCount();
132+
133+
/// Deletes all records (without updating [cachedSize] — the base class
134+
/// resets it to 0).
135+
@protected
136+
Future<void> doClearRecords();
137+
138+
/// Returns the current total cache size in bytes from the underlying
139+
/// storage. Called by the base class after deletions.
140+
@protected
141+
Future<int> doQueryCacheSize();
142+
143+
/// Closes the storage and releases resources.
144+
@protected
145+
Future<void> doClose();
146+
147+
Future<T> _wrap<T>(String message, Future<T> Function() operation) async {
148+
try {
149+
return await operation();
150+
} on RecordCacheException {
151+
rethrow;
152+
} on Object catch (e) {
153+
throw RecordCacheDatabaseException(message, defaultRecoverySuggestion, e);
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)