-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathFlagsmithEventService.kt
More file actions
88 lines (74 loc) · 3.08 KB
/
FlagsmithEventService.kt
File metadata and controls
88 lines (74 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.flagsmith.internal
import android.util.Log
import com.flagsmith.entities.FlagEvent
import com.google.gson.Gson
import kotlinx.coroutines.flow.MutableStateFlow
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources
import java.util.concurrent.TimeUnit
internal class FlagsmithEventService constructor(
private val eventSourceBaseUrl: String?,
private val environmentKey: String,
private val updates: (Result<FlagEvent>) -> Unit
) {
private val sseClient = OkHttpClient.Builder()
.addInterceptor(FlagsmithRetrofitService.envKeyInterceptor(environmentKey))
.addInterceptor(FlagsmithRetrofitService.userAgentInterceptor())
.connectTimeout(6, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.MINUTES)
.writeTimeout(10, TimeUnit.MINUTES)
.build()
private val completeEventSourceUrl: String = eventSourceBaseUrl + "sse/environments/" + environmentKey + "/stream"
private val sseRequest = Request.Builder()
.url(completeEventSourceUrl)
.header("Accept", "application/json")
.addHeader("Accept", "text/event-stream")
.build()
private var currentEventSource: EventSource? = null
var sseEventsFlow = MutableStateFlow(FlagEvent(updatedAt = 0.0))
private set
private val sseEventSourceListener = object : EventSourceListener() {
override fun onClosed(eventSource: EventSource) {
super.onClosed(eventSource)
Log.d(TAG, "onClosed: $eventSource")
// This isn't uncommon and is the nature of HTTP requests, so just reconnect
initEventSource()
}
override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
super.onEvent(eventSource, id, type, data)
Log.d(TAG, "onEvent: $data")
if (type != null && type == "environment_updated" && data.isNotEmpty()) {
val flagEvent = Gson().fromJson(data, FlagEvent::class.java)
sseEventsFlow.tryEmit(flagEvent)
updates(Result.success(flagEvent))
}
}
override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) {
super.onFailure(eventSource, t, response)
Log.d(TAG, "onFailure: ${t?.message}")
if (t != null)
updates(Result.failure(t))
else
updates(Result.failure(Throwable("Unknown error")))
}
override fun onOpen(eventSource: EventSource, response: Response) {
super.onOpen(eventSource, response)
Log.d(TAG, "onOpen: $eventSource")
}
}
init {
initEventSource()
}
private fun initEventSource() {
currentEventSource?.cancel()
currentEventSource = EventSources.createFactory(sseClient)
.newEventSource(request = sseRequest, listener = sseEventSourceListener)
}
companion object {
private const val TAG = "FlagsmithEventService"
}
}