diff --git a/packages/kinesis/amplify_firehose_dart/lib/amplify_firehose_dart.dart b/packages/kinesis/amplify_firehose_dart/lib/amplify_firehose_dart.dart index 36c4c485d4..51188a435b 100644 --- a/packages/kinesis/amplify_firehose_dart/lib/amplify_firehose_dart.dart +++ b/packages/kinesis/amplify_firehose_dart/lib/amplify_firehose_dart.dart @@ -4,5 +4,6 @@ /// Amplify Amazon Data Firehose client for Dart. library; -// SDK client (for escape hatch) -// Exports will be added as implementation PRs land. +// Exceptions +export 'src/exception/amplify_firehose_exception.dart' + hide defaultRecoverySuggestion; diff --git a/packages/kinesis/amplify_firehose_dart/lib/src/exception/amplify_firehose_exception.dart b/packages/kinesis/amplify_firehose_dart/lib/src/exception/amplify_firehose_exception.dart new file mode 100644 index 0000000000..681ef19f5f --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/lib/src/exception/amplify_firehose_exception.dart @@ -0,0 +1,117 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'package:amplify_core/amplify_core.dart'; +import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart'; + +/// Default recovery suggestion for errors. +const String defaultRecoverySuggestion = + 'Inspect the underlying error for more details.'; + +/// {@template amplify_firehose.amplify_firehose_exception} +/// Base exception for Amplify Amazon Data Firehose errors. +/// {@endtemplate} +sealed class AmplifyFirehoseException extends AmplifyException { + /// {@macro amplify_firehose.amplify_firehose_exception} + const AmplifyFirehoseException( + super.message, { + super.recoverySuggestion, + super.underlyingException, + }); + + /// Maps an arbitrary error into the appropriate [AmplifyFirehoseException] + /// subtype. If [error] is already an [AmplifyFirehoseException], it is + /// returned as-is. [RecordCacheException] subtypes are mapped to their + /// corresponding public exception types. + static AmplifyFirehoseException from(Object error) => switch (error) { + final AmplifyFirehoseException e => e, + final RecordCacheValidationException e => FirehoseValidationException( + e.message, + recoverySuggestion: e.recoverySuggestion, + ), + final RecordCacheLimitExceededException e => FirehoseLimitExceededException( + message: e.message, + recoverySuggestion: e.recoverySuggestion, + ), + final RecordCacheDatabaseException e => FirehoseStorageException( + e.message, + recoverySuggestion: e.recoverySuggestion, + underlyingException: e.cause, + ), + final Exception e => FirehoseUnknownException( + e.toString(), + underlyingException: e, + ), + _ => FirehoseUnknownException(error.toString()), + }; +} + +/// {@template amplify_firehose.firehose_storage_exception} +/// Thrown when a local cache/database error occurs. +/// {@endtemplate} +final class FirehoseStorageException extends AmplifyFirehoseException { + /// {@macro amplify_firehose.firehose_storage_exception} + const FirehoseStorageException( + super.message, { + super.recoverySuggestion, + super.underlyingException, + }); + + @override + String get runtimeTypeName => 'FirehoseStorageException'; +} + +/// {@template amplify_firehose.firehose_limit_exceeded_exception} +/// Thrown when the local cache is full. +/// {@endtemplate} +final class FirehoseLimitExceededException extends AmplifyFirehoseException { + /// {@macro amplify_firehose.firehose_limit_exceeded_exception} + const FirehoseLimitExceededException({ + String? message, + String? recoverySuggestion, + }) : super( + message ?? 'Cache is full', + recoverySuggestion: + recoverySuggestion ?? 'Call flush() or clearCache().', + ); + + @override + String get runtimeTypeName => 'FirehoseLimitExceededException'; +} + +/// {@template amplify_firehose.firehose_validation_exception} +/// Thrown when record input validation fails (e.g. oversized record). +/// {@endtemplate} +final class FirehoseValidationException extends AmplifyFirehoseException { + /// {@macro amplify_firehose.firehose_validation_exception} + const FirehoseValidationException(super.message, {super.recoverySuggestion}); + + @override + String get runtimeTypeName => 'FirehoseValidationException'; +} + +/// {@template amplify_firehose.firehose_unknown_exception} +/// Catch-all for unexpected errors. +/// {@endtemplate} +final class FirehoseUnknownException extends AmplifyFirehoseException { + /// {@macro amplify_firehose.firehose_unknown_exception} + const FirehoseUnknownException(super.message, {super.underlyingException}); + + @override + String get runtimeTypeName => 'FirehoseUnknownException'; +} + +/// {@template amplify_firehose.client_closed_exception} +/// Thrown when an operation is attempted on a closed client. +/// {@endtemplate} +final class ClientClosedException extends AmplifyFirehoseException { + /// {@macro amplify_firehose.client_closed_exception} + const ClientClosedException() + : super( + 'Client has been closed', + recoverySuggestion: 'Create a new AmplifyFirehoseClient instance.', + ); + + @override + String get runtimeTypeName => 'ClientClosedException'; +} diff --git a/packages/kinesis/amplify_firehose_dart/lib/src/exception/record_cache_exception.dart b/packages/kinesis/amplify_firehose_dart/lib/src/exception/record_cache_exception.dart new file mode 100644 index 0000000000..2f69529ffe --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/lib/src/exception/record_cache_exception.dart @@ -0,0 +1,57 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/// Internal error type used by RecordClient / RecordStorage. +/// +/// Mapped to the public AmplifyFirehoseException hierarchy at the +/// AmplifyFirehoseClient boundary via `AmplifyFirehoseException.from`. +sealed class RecordCacheException implements Exception { + /// Creates a [RecordCacheException]. + const RecordCacheException(this.message, this.recoverySuggestion, [this.cause]); + + /// A message describing the error. + final String message; + + /// A suggestion for how to recover from the error. + final String recoverySuggestion; + + /// The underlying cause of the error, if any. + final Object? cause; + + @override + String toString() { + final buf = StringBuffer('RecordCacheException: $message'); + if (cause != null) buf.write('\nCaused by: $cause'); + return buf.toString(); + } +} + +/// Database operation failed. +final class RecordCacheDatabaseException extends RecordCacheException { + /// Creates a [RecordCacheDatabaseException]. + const RecordCacheDatabaseException( + super.message, + super.recoverySuggestion, [ + super.cause, + ]); +} + +/// Cache limit exceeded — no space for new records. +final class RecordCacheLimitExceededException extends RecordCacheException { + /// Creates a [RecordCacheLimitExceededException]. + const RecordCacheLimitExceededException( + super.message, + super.recoverySuggestion, [ + super.cause, + ]); +} + +/// Record input validation failed (e.g. oversized record). +final class RecordCacheValidationException extends RecordCacheException { + /// Creates a [RecordCacheValidationException]. + const RecordCacheValidationException( + super.message, + super.recoverySuggestion, [ + super.cause, + ]); +} diff --git a/packages/kinesis/amplify_firehose_dart/lib/src/impl/firehose_record.dart b/packages/kinesis/amplify_firehose_dart/lib/src/impl/firehose_record.dart new file mode 100644 index 0000000000..82983ef843 --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/lib/src/impl/firehose_record.dart @@ -0,0 +1,45 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'dart:typed_data'; + +/// Internal representation of a record to be sent to Firehose. +/// +/// Unlike Kinesis Data Streams, Firehose records do not have a +/// partition key. The record size is simply the data blob length. +final class RecordInput { + /// Creates a new Firehose record. + RecordInput({ + required this.data, + required this.streamName, + required this.createdAt, + }) : dataSize = data.length; + + /// Creates a Firehose record with the current timestamp. + factory RecordInput.now({ + required Uint8List data, + required String streamName, + }) { + return RecordInput( + data: data, + streamName: streamName, + createdAt: DateTime.now(), + ); + } + + /// The data blob to send to Firehose. + final Uint8List data; + + /// The name of the Firehose delivery stream. + final String streamName; + + /// The size of the record in bytes (data blob only). + /// + /// Per AWS docs, the per-record size limit for Firehose is 1,000 KiB. + /// Unlike Kinesis Data Streams, there is no partition key to include + /// in the size calculation. + final int dataSize; + + /// Timestamp of when the record was created. + final DateTime createdAt; +} diff --git a/packages/kinesis/amplify_firehose_dart/lib/src/impl/storage/record_storage.dart b/packages/kinesis/amplify_firehose_dart/lib/src/impl/storage/record_storage.dart new file mode 100644 index 0000000000..226f3bbc39 --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/lib/src/impl/storage/record_storage.dart @@ -0,0 +1,156 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'package:amplify_firehose_dart/src/exception/amplify_firehose_exception.dart' + show defaultRecoverySuggestion; +import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart'; +import 'package:amplify_firehose_dart/src/firehose_limits.dart' as limits; +import 'package:amplify_firehose_dart/src/impl/firehose_record.dart'; +import 'package:amplify_firehose_dart/src/model/record.dart'; +import 'package:meta/meta.dart'; + +export 'package:amplify_firehose_dart/src/model/record.dart'; + +/// {@template amplify_firehose.record_storage} +/// Abstract base class for record persistence. +/// +/// Implementations provide platform-specific storage (SQLite on VM, +/// IndexedDB on web, in-memory fallback). Validation of record size +/// and cache limits is handled here in [addRecord]; subclasses +/// implement [writeRecord] for the actual write. +/// +/// All public methods wrap unexpected errors as +/// [RecordCacheDatabaseException]. Subclasses throw +/// [RecordCacheException] subtypes for known errors; anything else is +/// caught and wrapped automatically. +/// {@endtemplate} +abstract class RecordStorage { + /// {@macro amplify_firehose.record_storage} + RecordStorage({ + required int maxCacheBytes, + int maxRecordsPerBatch = limits.maxRecordsPerBatch, + int maxBytesPerBatch = limits.maxBatchSizeBytes, + int maxRecordSizeBytes = limits.maxRecordSizeBytes, + int initialCachedSize = 0, + }) : _maxCacheBytes = maxCacheBytes, + _maxRecordsPerBatch = maxRecordsPerBatch, + _maxBytesPerBatch = maxBytesPerBatch, + _maxRecordSizeBytes = maxRecordSizeBytes, + cachedSize = initialCachedSize; + + final int _maxCacheBytes; + final int _maxRecordsPerBatch; + final int _maxBytesPerBatch; + final int _maxRecordSizeBytes; + + /// The current total cached size in bytes. + @protected + int cachedSize; + + /// The maximum cache size in bytes. + int get maxCacheBytes => _maxCacheBytes; + + /// Maximum number of records per batch. + int get maxRecordsPerBatch => _maxRecordsPerBatch; + + /// Maximum total bytes per batch. + int get maxBytesPerBatch => _maxBytesPerBatch; + + /// Validates and saves a record to storage. + /// Throws [RecordCacheValidationException] on invalid input. + /// Throws [RecordCacheLimitExceededException] if the cache is full. + /// Throws [RecordCacheDatabaseException] on storage errors. + Future addRecord(RecordInput record) => + _wrap('Failed to add record to cache', () async { + if (record.dataSize > _maxRecordSizeBytes) { + throw RecordCacheValidationException( + 'Record size (${record.dataSize} bytes) exceeds the maximum ' + 'of $_maxRecordSizeBytes bytes.', + 'Reduce the record payload size.', + ); + } + if (cachedSize + record.dataSize > _maxCacheBytes) { + throw RecordCacheLimitExceededException( + 'Cache size limit exceeded: ' + '${cachedSize + record.dataSize} bytes > $_maxCacheBytes bytes', + 'Call flush() to send cached records or increase cache size limit.', + ); + } + await writeRecord(record); + cachedSize += record.dataSize; + }); + + /// Retrieves records grouped by stream. + Future>> getRecordsByStream() => + _wrap('Could not retrieve records from storage', doGetRecordsByStream); + + /// Deletes records by their IDs and refreshes [cachedSize]. + Future deleteRecords(Iterable ids) => + _wrap('Failed to delete records from cache', () async { + await doDeleteRecords(ids); + cachedSize = await doQueryCacheSize(); + }); + + /// Increments the retry count for the specified records. + Future incrementRetryCount(Iterable ids) => _wrap( + 'Failed to increment retry count', + () => doIncrementRetryCount(ids), + ); + + /// Returns the total number of cached records. + Future getRecordCount() => + _wrap('Failed to get record count', doGetRecordCount); + + /// Deletes all records and resets [cachedSize] to 0. + Future clearRecords() => _wrap('Failed to clear cache', () async { + await doClearRecords(); + cachedSize = 0; + }); + + /// Closes the storage and releases resources. + Future close() => _wrap('Failed to close storage', doClose); + + /// Writes a validated record to the underlying storage. + @protected + Future writeRecord(RecordInput record); + + /// Retrieves records grouped by stream name. + @protected + Future>> doGetRecordsByStream(); + + /// Deletes records by their IDs. + @protected + Future doDeleteRecords(Iterable ids); + + /// Increments the retry count for the specified records. + @protected + Future doIncrementRetryCount(Iterable ids); + + /// Returns the total number of cached records. + @protected + Future doGetRecordCount(); + + /// Deletes all records (without updating [cachedSize] — the base class + /// resets it to 0). + @protected + Future doClearRecords(); + + /// Returns the current total cache size in bytes from the underlying + /// storage. Called by the base class after deletions. + @protected + Future doQueryCacheSize(); + + /// Closes the storage and releases resources. + @protected + Future doClose(); + + Future _wrap(String message, Future Function() operation) async { + try { + return await operation(); + } on RecordCacheException { + rethrow; + } on Object catch (e) { + throw RecordCacheDatabaseException(message, defaultRecoverySuggestion, e); + } + } +} diff --git a/packages/kinesis/amplify_firehose_dart/lib/src/impl/storage/record_storage_memory.dart b/packages/kinesis/amplify_firehose_dart/lib/src/impl/storage/record_storage_memory.dart new file mode 100644 index 0000000000..868853aa2e --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/lib/src/impl/storage/record_storage_memory.dart @@ -0,0 +1,109 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'dart:collection'; + +import 'package:amplify_firehose_dart/src/impl/firehose_record.dart'; +import 'package:amplify_firehose_dart/src/impl/storage/record_storage.dart'; + +/// {@template amplify_firehose.in_memory_record_storage} +/// In-memory [RecordStorage] fallback for web when IndexedDB is unavailable. +/// Records are not persisted. +/// {@endtemplate} +final class InMemoryRecordStorage extends RecordStorage { + /// {@macro amplify_firehose.in_memory_record_storage} + InMemoryRecordStorage({ + required super.maxCacheBytes, + super.maxRecordsPerBatch, + super.maxBytesPerBatch, + super.maxRecordSizeBytes, + }); + + int _nextId = 1; + final LinkedHashMap _records = LinkedHashMap(); + + @override + Future writeRecord(RecordInput record) async { + final id = _nextId++; + _records[id] = Record( + id: id, + streamName: record.streamName, + data: record.data, + dataSize: record.dataSize, + retryCount: 0, + createdAt: record.createdAt.millisecondsSinceEpoch, + ); + } + + @override + Future>> doGetRecordsByStream() async { + final sorted = _records.values.toList() + ..sort((a, b) { + final cmp = a.streamName.compareTo(b.streamName); + if (cmp != 0) return cmp; + return a.id.compareTo(b.id); + }); + + final result = >{}; + final streamSizes = {}; + final streamCounts = {}; + + for (final record in sorted) { + final stream = record.streamName; + final count = streamCounts[stream] ?? 0; + final size = streamSizes[stream] ?? 0; + if (count >= maxRecordsPerBatch) continue; + if (size + record.dataSize > maxBytesPerBatch) continue; + + result.putIfAbsent(stream, () => []).add(record); + streamCounts[stream] = count + 1; + streamSizes[stream] = size + record.dataSize; + } + return result; + } + + @override + Future doDeleteRecords(Iterable ids) async { + for (final id in ids) { + _records.remove(id); + } + } + + @override + Future doQueryCacheSize() async { + var total = 0; + for (final record in _records.values) { + total += record.dataSize; + } + return total; + } + + @override + Future doIncrementRetryCount(Iterable ids) async { + for (final id in ids) { + final record = _records[id]; + if (record == null) continue; + _records[id] = Record( + id: record.id, + streamName: record.streamName, + data: record.data, + dataSize: record.dataSize, + retryCount: record.retryCount + 1, + createdAt: record.createdAt, + ); + } + } + + @override + Future doGetRecordCount() async => _records.length; + + @override + Future doClearRecords() async { + _records.clear(); + } + + @override + Future doClose() async { + await clearRecords(); + } +} diff --git a/packages/kinesis/amplify_firehose_dart/lib/src/model/record.dart b/packages/kinesis/amplify_firehose_dart/lib/src/model/record.dart new file mode 100644 index 0000000000..5234eef7d5 --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/lib/src/model/record.dart @@ -0,0 +1,45 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'dart:typed_data'; + +/// {@template amplify_firehose.record} +/// A record persisted in local storage, ready to be flushed to Firehose. +/// +/// This is a plain Dart class with no ORM coupling, shared across all +/// storage backends (SQLite, IndexedDB, In-Memory). +/// {@endtemplate} +final class Record { + /// {@macro amplify_firehose.record} + const Record({ + required this.id, + required this.streamName, + required this.data, + required this.dataSize, + required this.retryCount, + required this.createdAt, + }); + + /// Auto-incrementing primary key. + final int id; + + /// The name of the Firehose delivery stream. + final String streamName; + + /// The data blob to send to Firehose. + final Uint8List data; + + /// The size of the data blob in bytes. + final int dataSize; + + /// The number of times this record has been retried. + final int retryCount; + + /// Unix timestamp (milliseconds) of when the record was created. + final int createdAt; + + @override + String toString() => + 'Record(id: $id, streamName: $streamName, ' + 'dataSize: $dataSize, retryCount: $retryCount)'; +} diff --git a/packages/kinesis/amplify_firehose_dart/test/amplify_firehose_exception_test.dart b/packages/kinesis/amplify_firehose_dart/test/amplify_firehose_exception_test.dart new file mode 100644 index 0000000000..8ddd538105 --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/test/amplify_firehose_exception_test.dart @@ -0,0 +1,71 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'package:amplify_firehose_dart/src/exception/amplify_firehose_exception.dart'; +import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart'; +import 'package:test/test.dart'; + +void main() { + group('AmplifyFirehoseException.from', () { + test('passes through AmplifyFirehoseException unchanged', () { + const original = FirehoseStorageException('msg'); + final result = AmplifyFirehoseException.from(original); + expect(identical(result, original), isTrue); + }); + + test( + 'converts RecordCacheValidationException to FirehoseValidationException', + () { + const cause = RecordCacheValidationException('bad input', 'fix it'); + final result = AmplifyFirehoseException.from(cause); + expect(result, isA()); + expect(result.message, 'bad input'); + expect(result.recoverySuggestion, 'fix it'); + }, + ); + + test( + 'converts RecordCacheDatabaseException to FirehoseStorageException', + () { + final underlying = Exception('sqlite error'); + final cause = RecordCacheDatabaseException( + 'db error', + 'retry', + underlying, + ); + final result = AmplifyFirehoseException.from(cause); + expect(result, isA()); + expect(result.message, 'db error'); + expect(result.recoverySuggestion, 'retry'); + expect(result.underlyingException, underlying); + }, + ); + + test('converts RecordCacheLimitExceededException ' + 'to FirehoseLimitExceededException', () { + const cause = RecordCacheLimitExceededException( + 'cache full', + 'flush first', + ); + final result = AmplifyFirehoseException.from(cause); + expect(result, isA()); + expect(result.message, 'cache full'); + expect(result.recoverySuggestion, 'flush first'); + }); + + test('converts unknown Exception to FirehoseUnknownException', () { + final cause = Exception('something unexpected'); + final result = AmplifyFirehoseException.from(cause); + expect(result, isA()); + expect(result.underlyingException, cause); + }); + + test('converts non-Exception error to FirehoseUnknownException', () { + const error = 'a string error'; + final result = AmplifyFirehoseException.from(error); + expect(result, isA()); + expect(result.message, 'a string error'); + expect(result.underlyingException, isNull); + }); + }); +} diff --git a/packages/kinesis/amplify_firehose_dart/test/common/mocktail_mocks.dart b/packages/kinesis/amplify_firehose_dart/test/common/mocktail_mocks.dart new file mode 100644 index 0000000000..a6093b0ab5 --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/test/common/mocktail_mocks.dart @@ -0,0 +1,44 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/// Shared mock classes for Firehose tests. +library; + +import 'dart:async'; + +import 'package:amplify_firehose_dart/src/sdk/src/firehose/firehose_client.dart'; +import 'package:amplify_foundation_dart/amplify_foundation_dart.dart' + as foundation; +import 'package:aws_common/aws_common.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:smithy/smithy.dart'; + +// ============================================================================= +// Smithy/SDK Mocks +// ============================================================================= + +/// Creates a mock [SmithyOperation] that returns the result of [fn]. +SmithyOperation mockSmithyOperation(FutureOr Function() fn) => + SmithyOperation( + CancelableOperation.fromFuture(Future.value(fn())), + operationName: '', + requestProgress: const Stream.empty(), + responseProgress: const Stream.empty(), + ); + +/// Mock implementation of [FirehoseClient]. +class MockFirehoseClient extends Mock implements FirehoseClient {} + +/// Mock implementation of [SmithyOperation]. +class MockSmithyOperation extends Mock implements SmithyOperation {} + +/// Mock implementation of [AWSHttpException]. +class MockAWSHttpException extends Mock implements AWSHttpException {} + +// ============================================================================= +// Fake Implementations +// ============================================================================= + +/// Fake implementation of [foundation.AWSCredentialsProvider] for testing. +class FakeAWSCredentialsProvider extends Fake + implements foundation.AWSCredentialsProvider {} diff --git a/packages/kinesis/amplify_firehose_dart/test/in_memory_record_storage_test.dart b/packages/kinesis/amplify_firehose_dart/test/in_memory_record_storage_test.dart new file mode 100644 index 0000000000..9b15852631 --- /dev/null +++ b/packages/kinesis/amplify_firehose_dart/test/in_memory_record_storage_test.dart @@ -0,0 +1,200 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'dart:typed_data'; + +import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart'; +import 'package:amplify_firehose_dart/src/impl/firehose_record.dart'; +import 'package:amplify_firehose_dart/src/impl/storage/record_storage.dart'; +import 'package:amplify_firehose_dart/src/impl/storage/record_storage_memory.dart'; +import 'package:test/test.dart'; + +void main() { + group('InMemoryRecordStorage', () { + late InMemoryRecordStorage storage; + + setUp(() { + storage = InMemoryRecordStorage(maxCacheBytes: 10 * 1024 * 1024); + }); + + tearDown(() async { + await storage.close(); + }); + + /// Helper to get all records as a flat list. + Future> getAllRecords() async { + final byStream = await storage.getRecordsByStream(); + return byStream.values.expand((r) => r).toList(); + } + + group('addRecord', () { + test('saves and retrieves a record', () async { + await storage.addRecord( + RecordInput.now( + data: Uint8List.fromList([1, 2, 3, 4, 5]), + streamName: 'test-stream', + ), + ); + + final retrieved = await getAllRecords(); + expect(retrieved, hasLength(1)); + expect(retrieved.first.streamName, equals('test-stream')); + expect( + retrieved.first.data, + equals(Uint8List.fromList([1, 2, 3, 4, 5])), + ); + expect(retrieved.first.retryCount, equals(0)); + }); + + test('rejects oversized records', () async { + final oversized = Uint8List(1000 * 1024 + 1); + expect( + () => storage.addRecord( + RecordInput.now(data: oversized, streamName: 'stream'), + ), + throwsA(isA()), + ); + }); + + test('rejects records when cache is full', () async { + final smallStorage = InMemoryRecordStorage(maxCacheBytes: 10); + addTearDown(smallStorage.close); + + await smallStorage.addRecord( + RecordInput.now( + data: Uint8List.fromList([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), + streamName: 'stream', + ), + ); + + expect( + () => smallStorage.addRecord( + RecordInput.now( + data: Uint8List.fromList([1]), + streamName: 'stream', + ), + ), + throwsA(isA()), + ); + }); + }); + + group('deleteRecords', () { + test('removes correct records by ID', () async { + for (var i = 0; i < 5; i++) { + await storage.addRecord( + RecordInput.now( + data: Uint8List.fromList([i]), + streamName: 'stream', + ), + ); + } + + final allRecords = await getAllRecords(); + expect(allRecords, hasLength(5)); + + final idsToDelete = [allRecords[1].id, allRecords[3].id]; + await storage.deleteRecords(idsToDelete); + + final remaining = await getAllRecords(); + expect(remaining, hasLength(3)); + expect(remaining.map((r) => r.id), isNot(contains(idsToDelete[0]))); + expect(remaining.map((r) => r.id), isNot(contains(idsToDelete[1]))); + }); + + test('handles empty ID list gracefully', () async { + await storage.addRecord( + RecordInput.now(data: Uint8List.fromList([1]), streamName: 'stream'), + ); + + await storage.deleteRecords([]); + expect(await getAllRecords(), hasLength(1)); + }); + }); + + group('incrementRetryCount', () { + test('increments retry count correctly', () async { + await storage.addRecord( + RecordInput.now(data: Uint8List.fromList([1]), streamName: 'stream'), + ); + + var records = await getAllRecords(); + expect(records.first.retryCount, equals(0)); + + await storage.incrementRetryCount([records.first.id]); + records = await getAllRecords(); + expect(records.first.retryCount, equals(1)); + + await storage.incrementRetryCount([records.first.id]); + records = await getAllRecords(); + expect(records.first.retryCount, equals(2)); + }); + }); + + group('getRecordsByStream', () { + test('returns records grouped by stream name', () async { + await storage.addRecord( + RecordInput.now( + data: Uint8List.fromList([1]), + streamName: 'stream-a', + ), + ); + await storage.addRecord( + RecordInput.now( + data: Uint8List.fromList([2]), + streamName: 'stream-b', + ), + ); + await storage.addRecord( + RecordInput.now( + data: Uint8List.fromList([3]), + streamName: 'stream-a', + ), + ); + + final result = await storage.getRecordsByStream(); + expect(result.keys, containsAll(['stream-a', 'stream-b'])); + expect(result['stream-a'], hasLength(2)); + expect(result['stream-b'], hasLength(1)); + }); + + test('returns empty map when no records', () async { + final result = await storage.getRecordsByStream(); + expect(result, isEmpty); + }); + }); + + group('clearRecords', () { + test('removes all records', () async { + for (var i = 0; i < 5; i++) { + await storage.addRecord( + RecordInput.now( + data: Uint8List.fromList([i]), + streamName: 'stream', + ), + ); + } + expect(await getAllRecords(), hasLength(5)); + + await storage.clearRecords(); + + expect(await getAllRecords(), isEmpty); + }); + }); + + group('getRecordCount', () { + test('returns correct count', () async { + expect(await storage.getRecordCount(), equals(0)); + for (var i = 0; i < 3; i++) { + await storage.addRecord( + RecordInput.now( + data: Uint8List.fromList([i]), + streamName: 'stream', + ), + ); + } + expect(await storage.getRecordCount(), equals(3)); + }); + }); + }); +}