Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
/// Amplify Amazon Data Firehose client for Dart.
library;

// SDK client (for escape hatch)
// Exports will be added as implementation PRs land.
// Exceptions
export 'src/exception/amplify_firehose_exception.dart'
hide defaultRecoverySuggestion;
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import 'package:amplify_core/amplify_core.dart';
import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart';

/// Default recovery suggestion for errors.
const String defaultRecoverySuggestion =
'Inspect the underlying error for more details.';

/// {@template amplify_firehose.amplify_firehose_exception}
/// Base exception for Amplify Amazon Data Firehose errors.
/// {@endtemplate}
sealed class AmplifyFirehoseException extends AmplifyException {
/// {@macro amplify_firehose.amplify_firehose_exception}
const AmplifyFirehoseException(
super.message, {
super.recoverySuggestion,
super.underlyingException,
});

/// Maps an arbitrary error into the appropriate [AmplifyFirehoseException]
/// subtype. If [error] is already an [AmplifyFirehoseException], it is
/// returned as-is. [RecordCacheException] subtypes are mapped to their
/// corresponding public exception types.
static AmplifyFirehoseException from(Object error) => switch (error) {
final AmplifyFirehoseException e => e,
final RecordCacheValidationException e => FirehoseValidationException(
e.message,
recoverySuggestion: e.recoverySuggestion,
),
final RecordCacheLimitExceededException e => FirehoseLimitExceededException(
message: e.message,
recoverySuggestion: e.recoverySuggestion,
),
final RecordCacheDatabaseException e => FirehoseStorageException(
e.message,
recoverySuggestion: e.recoverySuggestion,
underlyingException: e.cause,
),
final Exception e => FirehoseUnknownException(
e.toString(),
underlyingException: e,
),
_ => FirehoseUnknownException(error.toString()),
};
}

/// {@template amplify_firehose.firehose_storage_exception}
/// Thrown when a local cache/database error occurs.
/// {@endtemplate}
final class FirehoseStorageException extends AmplifyFirehoseException {
/// {@macro amplify_firehose.firehose_storage_exception}
const FirehoseStorageException(
super.message, {
super.recoverySuggestion,
super.underlyingException,
});

@override
String get runtimeTypeName => 'FirehoseStorageException';
}

/// {@template amplify_firehose.firehose_limit_exceeded_exception}
/// Thrown when the local cache is full.
/// {@endtemplate}
final class FirehoseLimitExceededException extends AmplifyFirehoseException {
/// {@macro amplify_firehose.firehose_limit_exceeded_exception}
const FirehoseLimitExceededException({
String? message,
String? recoverySuggestion,
}) : super(
message ?? 'Cache is full',
recoverySuggestion:
recoverySuggestion ?? 'Call flush() or clearCache().',
);

@override
String get runtimeTypeName => 'FirehoseLimitExceededException';
}

/// {@template amplify_firehose.firehose_validation_exception}
/// Thrown when record input validation fails (e.g. oversized record).
/// {@endtemplate}
final class FirehoseValidationException extends AmplifyFirehoseException {
/// {@macro amplify_firehose.firehose_validation_exception}
const FirehoseValidationException(super.message, {super.recoverySuggestion});

@override
String get runtimeTypeName => 'FirehoseValidationException';
}

/// {@template amplify_firehose.firehose_unknown_exception}
/// Catch-all for unexpected errors.
/// {@endtemplate}
final class FirehoseUnknownException extends AmplifyFirehoseException {
/// {@macro amplify_firehose.firehose_unknown_exception}
const FirehoseUnknownException(super.message, {super.underlyingException});

@override
String get runtimeTypeName => 'FirehoseUnknownException';
}

/// {@template amplify_firehose.client_closed_exception}
/// Thrown when an operation is attempted on a closed client.
/// {@endtemplate}
final class ClientClosedException extends AmplifyFirehoseException {
/// {@macro amplify_firehose.client_closed_exception}
const ClientClosedException()
: super(
'Client has been closed',
recoverySuggestion: 'Create a new AmplifyFirehoseClient instance.',
);

@override
String get runtimeTypeName => 'ClientClosedException';
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

/// Internal error type used by RecordClient / RecordStorage.
///
/// Mapped to the public AmplifyFirehoseException hierarchy at the
/// AmplifyFirehoseClient boundary via `AmplifyFirehoseException.from`.
sealed class RecordCacheException implements Exception {
/// Creates a [RecordCacheException].
const RecordCacheException(this.message, this.recoverySuggestion, [this.cause]);

/// A message describing the error.
final String message;

/// A suggestion for how to recover from the error.
final String recoverySuggestion;

/// The underlying cause of the error, if any.
final Object? cause;

@override
String toString() {
final buf = StringBuffer('RecordCacheException: $message');
if (cause != null) buf.write('\nCaused by: $cause');
return buf.toString();
}
}

/// Database operation failed.
final class RecordCacheDatabaseException extends RecordCacheException {
/// Creates a [RecordCacheDatabaseException].
const RecordCacheDatabaseException(
super.message,
super.recoverySuggestion, [
super.cause,
]);
}

/// Cache limit exceeded — no space for new records.
final class RecordCacheLimitExceededException extends RecordCacheException {
/// Creates a [RecordCacheLimitExceededException].
const RecordCacheLimitExceededException(
super.message,
super.recoverySuggestion, [
super.cause,
]);
}

/// Record input validation failed (e.g. oversized record).
final class RecordCacheValidationException extends RecordCacheException {
/// Creates a [RecordCacheValidationException].
const RecordCacheValidationException(
super.message,
super.recoverySuggestion, [
super.cause,
]);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import 'dart:typed_data';

/// Internal representation of a record to be sent to Firehose.
///
/// Unlike Kinesis Data Streams, Firehose records do not have a
/// partition key. The record size is simply the data blob length.
final class RecordInput {
/// Creates a new Firehose record.
RecordInput({
required this.data,
required this.streamName,
required this.createdAt,
}) : dataSize = data.length;

/// Creates a Firehose record with the current timestamp.
factory RecordInput.now({
required Uint8List data,
required String streamName,
}) {
return RecordInput(
data: data,
streamName: streamName,
createdAt: DateTime.now(),
);
}

/// The data blob to send to Firehose.
final Uint8List data;

/// The name of the Firehose delivery stream.
final String streamName;

/// The size of the record in bytes (data blob only).
///
/// Per AWS docs, the per-record size limit for Firehose is 1,000 KiB.
/// Unlike Kinesis Data Streams, there is no partition key to include
/// in the size calculation.
final int dataSize;

/// Timestamp of when the record was created.
final DateTime createdAt;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid re-implementing those for the firehose client?

// SPDX-License-Identifier: Apache-2.0

import 'package:amplify_firehose_dart/src/exception/amplify_firehose_exception.dart'
show defaultRecoverySuggestion;
import 'package:amplify_firehose_dart/src/exception/record_cache_exception.dart';
import 'package:amplify_firehose_dart/src/firehose_limits.dart' as limits;
import 'package:amplify_firehose_dart/src/impl/firehose_record.dart';
import 'package:amplify_firehose_dart/src/model/record.dart';
import 'package:meta/meta.dart';

export 'package:amplify_firehose_dart/src/model/record.dart';

/// {@template amplify_firehose.record_storage}
/// Abstract base class for record persistence.
///
/// Implementations provide platform-specific storage (SQLite on VM,
/// IndexedDB on web, in-memory fallback). Validation of record size
/// and cache limits is handled here in [addRecord]; subclasses
/// implement [writeRecord] for the actual write.
///
/// All public methods wrap unexpected errors as
/// [RecordCacheDatabaseException]. Subclasses throw
/// [RecordCacheException] subtypes for known errors; anything else is
/// caught and wrapped automatically.
/// {@endtemplate}
abstract class RecordStorage {
/// {@macro amplify_firehose.record_storage}
RecordStorage({
required int maxCacheBytes,
int maxRecordsPerBatch = limits.maxRecordsPerBatch,
int maxBytesPerBatch = limits.maxBatchSizeBytes,
int maxRecordSizeBytes = limits.maxRecordSizeBytes,
int initialCachedSize = 0,
}) : _maxCacheBytes = maxCacheBytes,
_maxRecordsPerBatch = maxRecordsPerBatch,
_maxBytesPerBatch = maxBytesPerBatch,
_maxRecordSizeBytes = maxRecordSizeBytes,
cachedSize = initialCachedSize;

final int _maxCacheBytes;
final int _maxRecordsPerBatch;
final int _maxBytesPerBatch;
final int _maxRecordSizeBytes;

/// The current total cached size in bytes.
@protected
int cachedSize;

/// The maximum cache size in bytes.
int get maxCacheBytes => _maxCacheBytes;

/// Maximum number of records per batch.
int get maxRecordsPerBatch => _maxRecordsPerBatch;

/// Maximum total bytes per batch.
int get maxBytesPerBatch => _maxBytesPerBatch;

/// Validates and saves a record to storage.
/// Throws [RecordCacheValidationException] on invalid input.
/// Throws [RecordCacheLimitExceededException] if the cache is full.
/// Throws [RecordCacheDatabaseException] on storage errors.
Future<void> addRecord(RecordInput record) =>
_wrap('Failed to add record to cache', () async {
if (record.dataSize > _maxRecordSizeBytes) {
throw RecordCacheValidationException(
'Record size (${record.dataSize} bytes) exceeds the maximum '
'of $_maxRecordSizeBytes bytes.',
'Reduce the record payload size.',
);
}
if (cachedSize + record.dataSize > _maxCacheBytes) {
throw RecordCacheLimitExceededException(
'Cache size limit exceeded: '
'${cachedSize + record.dataSize} bytes > $_maxCacheBytes bytes',
'Call flush() to send cached records or increase cache size limit.',
);
}
await writeRecord(record);
cachedSize += record.dataSize;
});

/// Retrieves records grouped by stream.
Future<Map<String, List<Record>>> getRecordsByStream() =>
_wrap('Could not retrieve records from storage', doGetRecordsByStream);

/// Deletes records by their IDs and refreshes [cachedSize].
Future<void> deleteRecords(Iterable<int> ids) =>
_wrap('Failed to delete records from cache', () async {
await doDeleteRecords(ids);
cachedSize = await doQueryCacheSize();
});

/// Increments the retry count for the specified records.
Future<void> incrementRetryCount(Iterable<int> ids) => _wrap(
'Failed to increment retry count',
() => doIncrementRetryCount(ids),
);

/// Returns the total number of cached records.
Future<int> getRecordCount() =>
_wrap('Failed to get record count', doGetRecordCount);

/// Deletes all records and resets [cachedSize] to 0.
Future<void> clearRecords() => _wrap('Failed to clear cache', () async {
await doClearRecords();
cachedSize = 0;
});

/// Closes the storage and releases resources.
Future<void> close() => _wrap('Failed to close storage', doClose);

/// Writes a validated record to the underlying storage.
@protected
Future<void> writeRecord(RecordInput record);

/// Retrieves records grouped by stream name.
@protected
Future<Map<String, List<Record>>> doGetRecordsByStream();

/// Deletes records by their IDs.
@protected
Future<void> doDeleteRecords(Iterable<int> ids);

/// Increments the retry count for the specified records.
@protected
Future<void> doIncrementRetryCount(Iterable<int> ids);

/// Returns the total number of cached records.
@protected
Future<int> doGetRecordCount();

/// Deletes all records (without updating [cachedSize] — the base class
/// resets it to 0).
@protected
Future<void> doClearRecords();

/// Returns the current total cache size in bytes from the underlying
/// storage. Called by the base class after deletions.
@protected
Future<int> doQueryCacheSize();

/// Closes the storage and releases resources.
@protected
Future<void> doClose();

Future<T> _wrap<T>(String message, Future<T> Function() operation) async {
try {
return await operation();
} on RecordCacheException {
rethrow;
} on Object catch (e) {
throw RecordCacheDatabaseException(message, defaultRecoverySuggestion, e);
}
}
}
Loading
Loading