-
Notifications
You must be signed in to change notification settings - Fork 130
Expand file tree
/
Copy pathAmplifyKinesisClient.kt
More file actions
232 lines (215 loc) · 8.62 KB
/
AmplifyKinesisClient.kt
File metadata and controls
232 lines (215 loc) · 8.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package com.amplifyframework.kinesis
import android.content.Context
import aws.sdk.kotlin.runtime.http.operation.customUserAgentMetadata
import aws.sdk.kotlin.services.kinesis.KinesisClient
import aws.smithy.kotlin.runtime.client.RequestInterceptorContext
import aws.smithy.kotlin.runtime.http.interceptors.HttpInterceptor
import com.amplifyframework.annotations.InternalAmplifyApi
import com.amplifyframework.foundation.credentials.AwsCredentials
import com.amplifyframework.foundation.credentials.AwsCredentialsProvider
import com.amplifyframework.foundation.credentials.toSmithyProvider
import com.amplifyframework.foundation.logging.AmplifyLogging
import com.amplifyframework.foundation.logging.Logger
import com.amplifyframework.foundation.result.Result
import com.amplifyframework.foundation.result.mapFailure
import com.amplifyframework.recordcache.AutoFlushScheduler
import com.amplifyframework.recordcache.ClearCacheResult
import com.amplifyframework.recordcache.FlushResult
import com.amplifyframework.recordcache.FlushStrategy
import com.amplifyframework.recordcache.FlushStrategy.Interval
import com.amplifyframework.recordcache.RecordClient
import com.amplifyframework.recordcache.RecordData
import com.amplifyframework.recordcache.RecordInput
import com.amplifyframework.recordcache.RecordResult
import com.amplifyframework.recordcache.SQLiteRecordStorage
import kotlin.system.measureTimeMillis
/**
* Kinesis supports up to 500 records per stream.
* See [the docs](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html)
*/
private const val MAX_RECORDS_PER_STREAM = 500
/**
* A client for sending data to Amazon Kinesis Data Streams.
*
* Provides automatic batching, retry logic, and local caching for high-throughput
* data streaming to Kinesis with configurable flush strategies.
*
* Example usage:
* ```kotlin
* // Bridge V2 Auth to foundation credentials via Smithy types
* val credentialsProvider = CognitoCredentialsProvider().toAwsCredentialsProvider()
*
* val kinesis = AmplifyKinesisClient(
* context = applicationContext,
* region = "us-east-1",
* credentialsProvider = credentialsProvider
* )
*
* // Record data
* kinesis.record(
* data = "Hello Kinesis".toByteArray(),
* streamName = "my-stream",
* partitionKey = "partition-1"
* )
*
* // Flush cached records
* val result = kinesis.flush()
* ```
*
* @param context Android application context for database access
* @param region AWS region where the Kinesis stream is located
* @param credentialsProvider AWS credentials for authentication. Use
* `CognitoCredentialsProvider().toAwsCredentialsProvider()` to bridge from V2 Auth.
* @param options Configuration options with sensible defaults
*/
@OptIn(InternalAmplifyApi::class)
class AmplifyKinesisClient(
val context: Context,
val region: String,
val credentialsProvider: AwsCredentialsProvider<out AwsCredentials>,
options: AmplifyKinesisClientOptions = AmplifyKinesisClientOptions.defaults()
) {
private val logger: Logger = AmplifyLogging.logger<AmplifyKinesisClient>()
/** The underlying SDK [KinesisClient] for direct access. */
val kinesisClient: KinesisClient = KinesisClient {
this.region = this@AmplifyKinesisClient.region
this.credentialsProvider = this@AmplifyKinesisClient.credentialsProvider.toSmithyProvider()
// Add user agent metadata for tracking Kinesis feature usage
this.interceptors += object : HttpInterceptor {
override suspend fun modifyBeforeSerialization(context: RequestInterceptorContext<Any>): Any {
context.executionContext.customUserAgentMetadata.add(
"kinesis",
BuildConfig.VERSION_NAME
)
return super.modifyBeforeSerialization(context)
}
}
options.configureClient?.applyConfiguration(this)
}
private val recordClient: RecordClient = RecordClient(
sender = KinesisRecordSender(
kinesisClient = kinesisClient,
maxRetries = options.maxRetries
),
storage = SQLiteRecordStorage(
context = context,
identifier = region,
maxRecordsByStream = MAX_RECORDS_PER_STREAM,
maxBytes = options.cacheMaxBytes
)
)
private val scheduler: AutoFlushScheduler?
@Volatile private var isEnabled = true
init {
scheduler = when (options.flushStrategy) {
is FlushStrategy.Interval -> AutoFlushScheduler(
options.flushStrategy,
client = recordClient
)
is FlushStrategy.None -> null
}
// Auto-start the scheduler if present
scheduler?.start()
}
/**
* Records data to the specified Kinesis stream.
*
* @param data The data to record as byte array
* @param partitionKey The partition key for the record
* @param streamName The name of the Kinesis stream
* @return Result.success(RecordData) on success, or Result.failure with:
* - [AmplifyKinesisLimitExceededException] (cache full)
* - [AmplifyKinesisStorageException] (database errors)
*/
suspend fun record(data: ByteArray, partitionKey: String, streamName: String): RecordResult {
if (!isEnabled) {
logger.debug { "Record collection is disabled, dropping record" }
return Result.Success(RecordData())
}
logger.verbose { "Recording to stream: $streamName" }
return logOp(
operation = { recordClient.record(RecordInput(streamName, partitionKey, data)).wrapError() },
logSuccess = { _, timeMs ->
logger.debug { "Record completed successfully in ${timeMs}ms" }
},
logFailure = { error, timeMs ->
logger.warn { "Record failed in ${timeMs}ms: ${error?.message}" }
}
)
}
/**
* Flushes all cached records to their respective Kinesis streams.
*
* @return Result.success(FlushData) on success, or Result.failure with:
* - [AmplifyKinesisServiceException] (API failures)
* - [AmplifyKinesisStorageException] (database errors)
* - [AmplifyKinesisUnknownException] (unexpected failures)
*/
suspend fun flush(): FlushResult {
logger.info { "Starting flush" }
return logOp(
operation = { recordClient.flush().wrapError() },
logSuccess = { data, timeMs ->
logger.info {
"Flush completed successfully in ${timeMs}ms - ${data.recordsFlushed} records flushed"
}
},
logFailure = { error, timeMs ->
logger.warn { "Flush failed in ${timeMs}ms: ${error?.message}" }
}
)
}
/**
* Clears all cached records from local storage.
*
* @return Result.success(ClearCacheData) on success, or Result.failure with:
* - [AmplifyKinesisStorageException] (database errors)
*/
suspend fun clearCache(): ClearCacheResult {
logger.info { "Clearing cache" }
return logOp(
operation = { recordClient.clearCache().wrapError() },
logSuccess = { data, timeMs ->
logger.info {
"Clear cache completed successfully in ${timeMs}ms - ${data.recordsCleared} records cleared"
}
},
logFailure = { error, timeMs ->
logger.warn { "Clear cache failed in ${timeMs}ms: ${error?.message}" }
}
)
}
/**
* Enables record collection and automatic flushing of cached records.
*/
fun enable() {
isEnabled = true
scheduler?.start()
}
/**
* Disables record collection and automatic flushing. Records submitted while
* disabled are silently dropped. Already-cached records remain in storage.
*/
fun disable() {
isEnabled = false
scheduler?.disable()
}
private fun <T> Result<T, Throwable>.wrapError(): Result<T, AmplifyKinesisException> = mapFailure {
AmplifyKinesisException.from(it)
}
private suspend inline fun <T> logOp(
operation: suspend () -> Result<T, AmplifyKinesisException>,
logSuccess: (T, Long) -> Unit,
logFailure: (Throwable?, Long) -> Unit
): Result<T, AmplifyKinesisException> {
val result: Result<T, AmplifyKinesisException>
val timeMs = measureTimeMillis {
result = operation()
}
when (result) {
is Result.Failure -> logFailure(result.error, timeMs)
is Result.Success -> logSuccess(result.data, timeMs)
}
return result
}
}