Skip to content

Commit 12651f1

Browse files
committed
feat(firehose): add FirehoseSender, AmplifyFirehoseClient, and client options
- FirehoseSender implements shared Sender interface (calls PutRecordBatch) - AmplifyFirehoseClient with create(), record(), flush(), clearCache(), enable(), disable(), close() — mirrors KDS client structure - AmplifyFirehoseClientOptions (cacheMaxBytes, maxRetries, flushStrategy) - record() computes dataSize as data.length (no partition key) - Uses shared RecordClient, AutoFlushScheduler, platform storage - SDK escape hatch via firehoseClient getter - withRecordClient constructor for testing
1 parent e623f6d commit 12651f1

4 files changed

Lines changed: 342 additions & 4 deletions

File tree

packages/kinesis/amplify_firehose_dart/lib/amplify_firehose_dart.dart

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ library;
77
// Re-export shared types used in the public API
88
export 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'
99
show
10-
FlushStrategy,
10+
ClearCacheData,
11+
FlushData,
1112
FlushInterval,
1213
FlushNone,
13-
FlushData,
14-
RecordData,
15-
ClearCacheData;
14+
FlushStrategy,
15+
RecordData;
1616

17+
// Main client
18+
export 'src/amplify_firehose_client.dart';
19+
// Options
20+
export 'src/amplify_firehose_client_options.dart';
1721
// Exceptions
1822
export 'src/exception/amplify_firehose_exception.dart';
1923
// SDK client (for escape hatch)
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import 'dart:async';
5+
import 'dart:typed_data';
6+
7+
import 'package:amplify_firehose_dart/src/amplify_firehose_client_options.dart';
8+
import 'package:amplify_firehose_dart/src/exception/amplify_firehose_exception.dart';
9+
import 'package:amplify_firehose_dart/src/firehose_limits.dart' as limits;
10+
import 'package:amplify_firehose_dart/src/impl/firehose_sender.dart';
11+
import 'package:amplify_firehose_dart/src/sdk/firehose.dart' as sdk;
12+
import 'package:amplify_firehose_dart/src/version.dart';
13+
import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'
14+
as foundation
15+
show packageVersion;
16+
import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'
17+
hide packageVersion;
18+
import 'package:amplify_foundation_dart_bridge/amplify_foundation_dart_bridge.dart';
19+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
20+
import 'package:smithy/smithy.dart' show WithUserAgent;
21+
22+
/// User agent component identifying this library.
23+
const _userAgentComponent =
24+
'md/amplify-firehose#$packageVersion '
25+
'lib/amplify-flutter#${foundation.packageVersion}';
26+
27+
/// {@template amplify_firehose.amplify_firehose_client}
28+
/// Client for recording and streaming data to Amazon Data Firehose.
29+
///
30+
/// Provides offline-capable data streaming with:
31+
/// - Local persistence for offline support (SQLite on VM, IndexedDB on web)
32+
/// - Automatic retry for failed records
33+
/// - Configurable batching (up to 500 records or 4 MB per batch)
34+
/// - Interval-based automatic flushing
35+
///
36+
/// This is the Dart-only implementation. For Flutter apps, use the
37+
/// `amplify_firehose` package which resolves the storage path
38+
/// automatically via `path_provider`.
39+
/// {@endtemplate}
40+
class AmplifyFirehoseClient {
41+
AmplifyFirehoseClient._({
42+
required String region,
43+
required AmplifyFirehoseClientOptions options,
44+
required RecordClient recordClient,
45+
required sdk.FirehoseClient firehoseClient,
46+
AutoFlushScheduler? scheduler,
47+
}) : _region = region,
48+
_options = options,
49+
_recordClient = recordClient,
50+
_firehoseClient = firehoseClient,
51+
_scheduler = scheduler,
52+
_logger = AmplifyLogging.logger('AmplifyFirehoseClient');
53+
54+
/// Creates a client with a pre-configured [RecordClient] (for testing).
55+
AmplifyFirehoseClient.withRecordClient({
56+
required RecordClient recordClient,
57+
String region = 'us-east-1',
58+
AmplifyFirehoseClientOptions? options,
59+
}) : _region = region,
60+
_options = options ?? const AmplifyFirehoseClientOptions(),
61+
_recordClient = recordClient,
62+
_firehoseClient = null,
63+
_scheduler = null,
64+
_logger = AmplifyLogging.logger('AmplifyFirehoseClient');
65+
66+
/// {@macro amplify_firehose.amplify_firehose_client}
67+
static Future<AmplifyFirehoseClient> create({
68+
required String region,
69+
required AWSCredentialsProvider credentialsProvider,
70+
required FutureOr<String>? storagePath,
71+
AmplifyFirehoseClientOptions? options,
72+
}) async {
73+
final opts = options ?? const AmplifyFirehoseClientOptions();
74+
75+
final storage = await createPlatformRecordStorage(
76+
identifier: region,
77+
storagePath: storagePath,
78+
maxCacheBytes: opts.cacheMaxBytes,
79+
maxRecordsPerBatch: limits.maxRecordsPerBatch,
80+
maxBytesPerBatch: limits.maxBatchSizeBytes,
81+
maxRecordSizeBytes: limits.maxRecordSizeBytes,
82+
dbPrefix: 'firehose_records',
83+
storeName: 'firehose_records',
84+
);
85+
86+
final firehoseClient = sdk.FirehoseClient(
87+
region: region,
88+
credentialsProvider: SmithyCredentialsProviderBridge(credentialsProvider),
89+
requestInterceptors: [const WithUserAgent(_userAgentComponent)],
90+
);
91+
92+
final recordClient = RecordClient(
93+
storage: storage,
94+
sender: FirehoseSender(
95+
firehoseClient: firehoseClient,
96+
maxRetries: opts.maxRetries,
97+
),
98+
maxRetries: opts.maxRetries,
99+
);
100+
101+
final scheduler = switch (opts.flushStrategy) {
102+
FlushInterval(:final interval) => AutoFlushScheduler(
103+
interval: interval,
104+
client: recordClient,
105+
)..start(),
106+
FlushNone() => null,
107+
};
108+
109+
return AmplifyFirehoseClient._(
110+
region: region,
111+
options: opts,
112+
recordClient: recordClient,
113+
firehoseClient: firehoseClient,
114+
scheduler: scheduler,
115+
);
116+
}
117+
118+
final String _region;
119+
final AmplifyFirehoseClientOptions _options;
120+
final RecordClient _recordClient;
121+
final sdk.FirehoseClient? _firehoseClient;
122+
final Logger _logger;
123+
final AutoFlushScheduler? _scheduler;
124+
bool _enabled = true;
125+
bool _closed = false;
126+
127+
/// The AWS region for this client.
128+
String get region => _region;
129+
130+
/// The configuration options for this client.
131+
AmplifyFirehoseClientOptions get options => _options;
132+
133+
/// Whether the client is currently enabled.
134+
bool get isEnabled => _enabled;
135+
136+
/// Whether the client has been closed.
137+
bool get isClosed => _closed;
138+
139+
/// Direct access to the underlying Firehose SDK client.
140+
sdk.FirehoseClient get firehoseClient {
141+
final client = _firehoseClient;
142+
if (client == null) {
143+
throw StateError(
144+
'firehoseClient is not available on clients created with '
145+
'withRecordClient.',
146+
);
147+
}
148+
return client;
149+
}
150+
151+
/// Records data to be sent to a Firehose delivery stream.
152+
///
153+
/// Returns [Result.ok] with [RecordData] on success, or [Result.error] with:
154+
/// - [FirehoseValidationException] for invalid input (e.g. oversized record)
155+
/// - [FirehoseLimitExceededException] if the cache is full
156+
/// - [FirehoseStorageException] for database errors
157+
Future<Result<RecordData>> record({
158+
required Uint8List data,
159+
required String streamName,
160+
}) async {
161+
if (_closed) return const Result.error(FirehoseClientClosedException());
162+
if (!isEnabled) {
163+
_logger.debug('Record collection is disabled, dropping record');
164+
return const Result.ok(RecordData());
165+
}
166+
_logger.verbose('Recording to stream: $streamName');
167+
final input = RecordInput.now(
168+
data: data,
169+
streamName: streamName,
170+
dataSize: data.length,
171+
);
172+
return _wrapError(() => _recordClient.record(input));
173+
}
174+
175+
/// Flushes cached records to their respective Firehose streams.
176+
Future<Result<FlushData>> flush() async {
177+
if (_closed) return const Result.error(FirehoseClientClosedException());
178+
_logger.verbose('Starting flush');
179+
return _wrapError(_recordClient.flush);
180+
}
181+
182+
/// Clears all cached records from local storage.
183+
Future<Result<ClearCacheData>> clearCache() async {
184+
if (_closed) return const Result.error(FirehoseClientClosedException());
185+
_logger.verbose('Clearing cache');
186+
return _wrapError(_recordClient.clearCache);
187+
}
188+
189+
/// Enables the client to accept and flush records.
190+
void enable() {
191+
_logger.info('Enabling record collection and automatic flushing');
192+
_enabled = true;
193+
_scheduler?.start();
194+
}
195+
196+
/// Disables record collection and automatic flushing.
197+
void disable() {
198+
_logger.info('Disabling record collection and automatic flushing');
199+
_enabled = false;
200+
_scheduler?.stop();
201+
}
202+
203+
/// Closes the client and releases all resources.
204+
Future<void> close() async {
205+
_closed = true;
206+
_scheduler?.stop();
207+
await _recordClient.close();
208+
}
209+
210+
Future<Result<T>> _wrapError<T>(Future<T> Function() operation) async {
211+
try {
212+
final value = await operation();
213+
return Result.ok(value);
214+
} on Object catch (e) {
215+
final wrapped = AmplifyFirehoseException.from(e);
216+
_logger.warn('Operation failed: ${wrapped.message}', e);
217+
return Result.error(wrapped);
218+
}
219+
}
220+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import 'package:amplify_firehose_dart/src/amplify_firehose_client.dart'
5+
show AmplifyFirehoseClient;
6+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
7+
8+
/// {@template amplify_firehose.amplify_firehose_client_options}
9+
/// Configuration options for [AmplifyFirehoseClient].
10+
/// {@endtemplate}
11+
final class AmplifyFirehoseClientOptions {
12+
/// {@macro amplify_firehose.amplify_firehose_client_options}
13+
const AmplifyFirehoseClientOptions({
14+
this.cacheMaxBytes = 5 * 1024 * 1024,
15+
this.maxRetries = 5,
16+
this.flushStrategy = const FlushInterval(),
17+
});
18+
19+
/// Maximum size of the local cache in bytes.
20+
///
21+
/// Defaults to 5 MB.
22+
final int cacheMaxBytes;
23+
24+
/// Maximum number of retry attempts for failed records.
25+
///
26+
/// Defaults to 5.
27+
final int maxRetries;
28+
29+
/// Strategy for automatic flushing of cached records.
30+
///
31+
/// Defaults to [FlushInterval] with a 30-second interval.
32+
final FlushStrategy flushStrategy;
33+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import 'package:amplify_firehose_dart/src/sdk/firehose.dart' as sdk;
5+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
6+
7+
/// {@template amplify_firehose.firehose_sender}
8+
/// Handles communication with Amazon Data Firehose.
9+
///
10+
/// Takes a pre-configured [sdk.FirehoseClient] and owns the retry-count
11+
/// categorization so that all error codes are treated as retryable
12+
/// until the record exceeds `maxRetries`.
13+
/// {@endtemplate}
14+
class FirehoseSender implements Sender {
15+
/// {@macro amplify_firehose.firehose_sender}
16+
FirehoseSender({
17+
required sdk.FirehoseClient firehoseClient,
18+
required int maxRetries,
19+
}) : _firehoseClient = firehoseClient,
20+
_maxRetries = maxRetries;
21+
22+
final sdk.FirehoseClient _firehoseClient;
23+
final int _maxRetries;
24+
25+
@override
26+
Future<SendResult> sendBatch({
27+
required String streamName,
28+
required List<Record> records,
29+
}) async {
30+
if (records.isEmpty) {
31+
return const SendResult(
32+
successfulIds: [],
33+
retryableIds: [],
34+
failedIds: [],
35+
);
36+
}
37+
38+
final requestRecords = records
39+
.map((record) => sdk.Record(data: record.data))
40+
.toList();
41+
42+
final request = sdk.PutRecordBatchInput(
43+
deliveryStreamName: streamName,
44+
records: requestRecords,
45+
);
46+
47+
final response = await _firehoseClient.putRecordBatch(request).result;
48+
return _splitResponse(response, records);
49+
}
50+
51+
SendResult _splitResponse(
52+
sdk.PutRecordBatchOutput response,
53+
List<Record> records,
54+
) {
55+
final successfulIds = <int>[];
56+
final retryableIds = <int>[];
57+
final failedIds = <int>[];
58+
59+
final resultEntries = response.requestResponses.toList();
60+
61+
for (var i = 0; i < resultEntries.length; i++) {
62+
final entry = resultEntries[i];
63+
final recordId = records[i].id;
64+
final retryCount = records[i].retryCount;
65+
66+
if (entry.errorCode == null) {
67+
successfulIds.add(recordId);
68+
} else if (retryCount >= _maxRetries) {
69+
failedIds.add(recordId);
70+
} else {
71+
retryableIds.add(recordId);
72+
}
73+
}
74+
75+
return SendResult(
76+
successfulIds: successfulIds,
77+
retryableIds: retryableIds,
78+
failedIds: failedIds,
79+
);
80+
}
81+
}

0 commit comments

Comments
 (0)