Skip to content

Commit e1eec5f

Browse files
committed
test(firehose): add client and sender tests, remove unused defaultRecoverySuggestion
- AmplifyFirehoseClient tests: initialization, record(), flush(), clearCache(), enable/disable, close, closed-state errors - FirehoseSender tests: request building, response categorization (success/retryable/failed), empty records handling - Remove unused defaultRecoverySuggestion from exception file - Clean up pubspec_overrides (remove stale amplify_kinesis_dart override)
1 parent 12651f1 commit e1eec5f

3 files changed

Lines changed: 355 additions & 4 deletions

File tree

packages/kinesis/amplify_firehose_dart/lib/src/exception/amplify_firehose_exception.dart

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@
44
import 'package:amplify_core/amplify_core.dart';
55
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
66

7-
/// Default recovery suggestion for errors.
8-
const String defaultRecoverySuggestion =
9-
'Inspect the underlying error for more details.';
10-
117
/// {@template amplify_firehose.amplify_firehose_exception}
128
/// Base exception for Amplify Firehose errors.
139
/// {@endtemplate}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import 'dart:typed_data';
5+
6+
import 'package:amplify_firehose_dart/amplify_firehose_dart.dart';
7+
import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'
8+
show Error, Ok;
9+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
10+
import 'package:mocktail/mocktail.dart';
11+
import 'package:test/test.dart';
12+
13+
class _MockRecordClient extends Mock implements RecordClient {}
14+
15+
void main() {
16+
group('AmplifyFirehoseClient', () {
17+
late _MockRecordClient mockRecordClient;
18+
19+
setUpAll(() {
20+
registerFallbackValue(
21+
RecordInput.now(data: Uint8List(0), streamName: '', dataSize: 0),
22+
);
23+
});
24+
25+
setUp(() {
26+
mockRecordClient = _MockRecordClient();
27+
when(
28+
() => mockRecordClient.record(any()),
29+
).thenAnswer((_) async => const RecordData());
30+
when(
31+
() => mockRecordClient.flush(),
32+
).thenAnswer((_) async => const FlushData());
33+
when(
34+
() => mockRecordClient.clearCache(),
35+
).thenAnswer((_) async => const ClearCacheData());
36+
when(() => mockRecordClient.close()).thenAnswer((_) async {});
37+
});
38+
39+
tearDown(() {
40+
resetMocktailState();
41+
reset(mockRecordClient);
42+
});
43+
44+
group('initialization', () {
45+
test('initializes with default options', () {
46+
final client = AmplifyFirehoseClient.withRecordClient(
47+
recordClient: mockRecordClient,
48+
);
49+
expect(client.region, equals('us-east-1'));
50+
expect(client.options.cacheMaxBytes, equals(5 * 1024 * 1024));
51+
expect(client.options.maxRetries, equals(5));
52+
expect(client.options.flushStrategy, isA<FlushInterval>());
53+
});
54+
55+
test('initializes with custom options', () {
56+
const opts = AmplifyFirehoseClientOptions(
57+
cacheMaxBytes: 10 * 1024 * 1024,
58+
maxRetries: 10,
59+
flushStrategy: FlushInterval(interval: Duration(minutes: 1)),
60+
);
61+
final client = AmplifyFirehoseClient.withRecordClient(
62+
recordClient: mockRecordClient,
63+
region: 'eu-west-1',
64+
options: opts,
65+
);
66+
expect(client.region, equals('eu-west-1'));
67+
expect(client.options.cacheMaxBytes, equals(10 * 1024 * 1024));
68+
expect(client.options.maxRetries, equals(10));
69+
expect(
70+
(client.options.flushStrategy as FlushInterval).interval,
71+
equals(const Duration(minutes: 1)),
72+
);
73+
});
74+
});
75+
76+
group('record()', () {
77+
test('delegates to RecordClient with correct dataSize', () async {
78+
final client = AmplifyFirehoseClient.withRecordClient(
79+
recordClient: mockRecordClient,
80+
);
81+
final data = Uint8List.fromList([1, 2, 3, 4, 5]);
82+
final result = await client.record(data: data, streamName: 'my-stream');
83+
84+
final captured =
85+
verify(() => mockRecordClient.record(captureAny())).captured.single
86+
as RecordInput;
87+
expect(captured.data, equals(data));
88+
expect(captured.streamName, equals('my-stream'));
89+
expect(captured.dataSize, equals(5));
90+
expect(captured.partitionKey, isNull);
91+
expect(result, isA<Ok<RecordData>>());
92+
});
93+
94+
test('returns RecordData when disabled', () async {
95+
final client = AmplifyFirehoseClient.withRecordClient(
96+
recordClient: mockRecordClient,
97+
)..disable();
98+
final result = await client.record(
99+
data: Uint8List.fromList([1]),
100+
streamName: 'stream',
101+
);
102+
verifyNever(() => mockRecordClient.record(any()));
103+
expect(result, isA<Ok<RecordData>>());
104+
});
105+
106+
test('returns error when closed', () async {
107+
final client = AmplifyFirehoseClient.withRecordClient(
108+
recordClient: mockRecordClient,
109+
);
110+
await client.close();
111+
final result = await client.record(
112+
data: Uint8List.fromList([1]),
113+
streamName: 'stream',
114+
);
115+
expect(result, isA<Error<RecordData>>());
116+
expect(
117+
(result as Error<RecordData>).error,
118+
isA<FirehoseClientClosedException>(),
119+
);
120+
});
121+
});
122+
123+
group('flush()', () {
124+
test('delegates to RecordClient', () async {
125+
when(
126+
() => mockRecordClient.flush(),
127+
).thenAnswer((_) async => const FlushData(recordsFlushed: 5));
128+
final client = AmplifyFirehoseClient.withRecordClient(
129+
recordClient: mockRecordClient,
130+
);
131+
final result = await client.flush();
132+
verify(() => mockRecordClient.flush()).called(1);
133+
expect(result, isA<Ok<FlushData>>());
134+
expect((result as Ok<FlushData>).value.recordsFlushed, equals(5));
135+
});
136+
137+
test('returns error when closed', () async {
138+
final client = AmplifyFirehoseClient.withRecordClient(
139+
recordClient: mockRecordClient,
140+
);
141+
await client.close();
142+
final result = await client.flush();
143+
expect(result, isA<Error<FlushData>>());
144+
expect(
145+
(result as Error<FlushData>).error,
146+
isA<FirehoseClientClosedException>(),
147+
);
148+
});
149+
});
150+
151+
group('clearCache()', () {
152+
test('delegates to RecordClient', () async {
153+
when(
154+
() => mockRecordClient.clearCache(),
155+
).thenAnswer((_) async => const ClearCacheData(recordsCleared: 3));
156+
final client = AmplifyFirehoseClient.withRecordClient(
157+
recordClient: mockRecordClient,
158+
);
159+
final result = await client.clearCache();
160+
verify(() => mockRecordClient.clearCache()).called(1);
161+
expect(result, isA<Ok<ClearCacheData>>());
162+
expect((result as Ok<ClearCacheData>).value.recordsCleared, equals(3));
163+
});
164+
});
165+
166+
group('enable() / disable()', () {
167+
test('enable sets isEnabled to true', () {
168+
final client = AmplifyFirehoseClient.withRecordClient(
169+
recordClient: mockRecordClient,
170+
)..disable();
171+
expect(client.isEnabled, isFalse);
172+
client.enable();
173+
expect(client.isEnabled, isTrue);
174+
});
175+
176+
test('disable sets isEnabled to false', () {
177+
final client = AmplifyFirehoseClient.withRecordClient(
178+
recordClient: mockRecordClient,
179+
);
180+
expect(client.isEnabled, isTrue);
181+
client.disable();
182+
expect(client.isEnabled, isFalse);
183+
});
184+
});
185+
186+
group('close()', () {
187+
test('delegates to RecordClient', () async {
188+
final client = AmplifyFirehoseClient.withRecordClient(
189+
recordClient: mockRecordClient,
190+
);
191+
await client.close();
192+
verify(() => mockRecordClient.close()).called(1);
193+
expect(client.isClosed, isTrue);
194+
});
195+
});
196+
});
197+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
/// Tests for FirehoseSender request building and response categorization.
5+
library;
6+
7+
import 'dart:async';
8+
import 'dart:typed_data';
9+
10+
import 'package:amplify_firehose_dart/src/impl/firehose_sender.dart';
11+
import 'package:amplify_firehose_dart/src/sdk/firehose.dart' as sdk;
12+
import 'package:amplify_firehose_dart/src/sdk/src/firehose/firehose_client.dart';
13+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
14+
import 'package:aws_common/aws_common.dart';
15+
import 'package:mocktail/mocktail.dart';
16+
import 'package:smithy/smithy.dart';
17+
import 'package:test/test.dart';
18+
19+
class _MockFirehoseClient extends Mock implements FirehoseClient {}
20+
21+
SmithyOperation<T> _mockOp<T>(FutureOr<T> Function() fn) => SmithyOperation(
22+
CancelableOperation.fromFuture(Future.value(fn())),
23+
operationName: '',
24+
requestProgress: const Stream.empty(),
25+
responseProgress: const Stream.empty(),
26+
);
27+
28+
Record _testRecord({
29+
required int id,
30+
required Uint8List data,
31+
int retryCount = 0,
32+
}) {
33+
return Record(
34+
id: id,
35+
streamName: 'test-stream',
36+
data: data,
37+
dataSize: data.length,
38+
retryCount: retryCount,
39+
createdAt: 0,
40+
);
41+
}
42+
43+
void main() {
44+
late _MockFirehoseClient mockClient;
45+
const maxRetries = 3;
46+
47+
setUpAll(() {
48+
registerFallbackValue(
49+
sdk.PutRecordBatchInput(
50+
deliveryStreamName: 'test',
51+
records: [sdk.Record(data: Uint8List(0))],
52+
),
53+
);
54+
});
55+
56+
setUp(() {
57+
mockClient = _MockFirehoseClient();
58+
});
59+
60+
tearDown(() {
61+
resetMocktailState();
62+
reset(mockClient);
63+
});
64+
65+
group('FirehoseSender', () {
66+
test('builds correct PutRecordBatchInput', () async {
67+
when(() => mockClient.putRecordBatch(any())).thenReturn(
68+
_mockOp(
69+
() => sdk.PutRecordBatchOutput(
70+
failedPutCount: 0,
71+
requestResponses: [
72+
sdk.PutRecordBatchResponseEntry(recordId: 'r1'),
73+
sdk.PutRecordBatchResponseEntry(recordId: 'r2'),
74+
],
75+
),
76+
),
77+
);
78+
79+
final sender = FirehoseSender(
80+
firehoseClient: mockClient,
81+
maxRetries: maxRetries,
82+
);
83+
84+
await sender.sendBatch(
85+
streamName: 'my-stream',
86+
records: [
87+
_testRecord(id: 1, data: Uint8List.fromList([1, 2, 3])),
88+
_testRecord(id: 2, data: Uint8List.fromList([4, 5, 6])),
89+
],
90+
);
91+
92+
final captured =
93+
verify(() => mockClient.putRecordBatch(captureAny())).captured.single
94+
as sdk.PutRecordBatchInput;
95+
96+
expect(captured.deliveryStreamName, equals('my-stream'));
97+
expect(captured.records, hasLength(2));
98+
expect(captured.records[0].data, equals(Uint8List.fromList([1, 2, 3])));
99+
expect(captured.records[1].data, equals(Uint8List.fromList([4, 5, 6])));
100+
});
101+
102+
test('categorizes response into success, retryable, failed', () async {
103+
when(() => mockClient.putRecordBatch(any())).thenReturn(
104+
_mockOp(
105+
() => sdk.PutRecordBatchOutput(
106+
failedPutCount: 2,
107+
requestResponses: [
108+
sdk.PutRecordBatchResponseEntry(recordId: 'r1'),
109+
sdk.PutRecordBatchResponseEntry(
110+
errorCode: 'ServiceUnavailableException',
111+
),
112+
sdk.PutRecordBatchResponseEntry(errorCode: 'InternalFailure'),
113+
],
114+
),
115+
),
116+
);
117+
118+
final sender = FirehoseSender(
119+
firehoseClient: mockClient,
120+
maxRetries: maxRetries,
121+
);
122+
123+
final result = await sender.sendBatch(
124+
streamName: 'test-stream',
125+
records: [
126+
_testRecord(id: 1, data: Uint8List.fromList([1])),
127+
_testRecord(id: 2, data: Uint8List.fromList([2]), retryCount: 1),
128+
_testRecord(
129+
id: 3,
130+
data: Uint8List.fromList([3]),
131+
retryCount: maxRetries,
132+
),
133+
],
134+
);
135+
136+
expect(result.successfulIds, equals([1]));
137+
expect(result.retryableIds, equals([2]));
138+
expect(result.failedIds, equals([3]));
139+
});
140+
141+
test('returns empty result for empty records', () async {
142+
final sender = FirehoseSender(
143+
firehoseClient: mockClient,
144+
maxRetries: maxRetries,
145+
);
146+
147+
final result = await sender.sendBatch(
148+
streamName: 'test-stream',
149+
records: [],
150+
);
151+
152+
expect(result.successfulIds, isEmpty);
153+
expect(result.retryableIds, isEmpty);
154+
expect(result.failedIds, isEmpty);
155+
verifyNever(() => mockClient.putRecordBatch(any()));
156+
});
157+
});
158+
}

0 commit comments

Comments
 (0)