@@ -17,29 +17,31 @@ import failchat.chat.findTyped
1717import failchat.chat.handlers.BraceEscaper
1818import failchat.util.CoroutineExceptionLogger
1919import failchat.util.value
20+ import failchat.youtube.LiveChatResponse.Action
2021import kotlinx.coroutines.CoroutineScope
2122import kotlinx.coroutines.Dispatchers
22- import kotlinx.coroutines.Job
2323import kotlinx.coroutines.cancel
24+ import kotlinx.coroutines.delay
25+ import kotlinx.coroutines.isActive
2426import kotlinx.coroutines.launch
2527import mu.KotlinLogging
2628import java.util.concurrent.atomic.AtomicReference
29+ import kotlin.coroutines.cancellation.CancellationException
2730
2831class YoutubeChatClient (
2932 override val callbacks : ChatClientCallbacks ,
3033 private val youtubeClient : YoutubeClient ,
3134 private val messageIdGenerator : MessageIdGenerator ,
3235 private val history : ChatMessageHistory ,
3336 private val videoId : String
34- ) : ChatClient, CoroutineScope by CoroutineScope(Dispatchers .Default ) {
37+ ) : ChatClient, CoroutineScope by CoroutineScope(Dispatchers .Default + CoroutineExceptionLogger ) {
3538
3639 private companion object {
3740 val logger = KotlinLogging .logger {}
3841 val roleToBadgeMap = mapOf (
3942 RoleBadges .verified.description to RoleBadges .verified,
4043 " Owner" to RoleBadges .streamer,
4144 RoleBadges .moderator.description to RoleBadges .moderator
42-
4345 )
4446 val roleBadgeToColorMap = mapOf (
4547 RoleBadges .streamer to YoutubeColors .streamer,
@@ -65,57 +67,79 @@ class YoutubeChatClient(
6567 if (! statusChanged) {
6668 error(" Chat client status: ${atomicStatus.value} " )
6769 }
68- logger.info { " Starting youtube client" }
70+ logger.info( " Starting youtube client" )
6971
70- val job = launch {
71- val initialParameters = youtubeClient.getNewLiveChatSessionData(videoId)
72- logger.info { " Initial youtube parameters: $initialParameters " }
72+ launchWatcher()
73+ }
7374
74- val statusUpdated = atomicStatus.compareAndSet(ChatClientStatus .CONNECTING , ChatClientStatus .CONNECTED )
75- if (statusUpdated) {
76- callbacks.onStatusUpdate(StatusUpdate (Origin .YOUTUBE , OriginStatus .CONNECTED ))
75+ private fun launchWatcher () = launch {
76+ while (isActive) {
77+ try {
78+ listenForMessages()
79+ } catch (e: CancellationException ) {
80+ // do nothing
81+ } catch (e: Throwable ) {
82+ logger.error(e) { " Error occurred in youtube chat listener" }
83+ atomicStatus.set(ChatClientStatus .ERROR )
84+ callbacks.onStatusUpdate(StatusUpdate (Origin .YOUTUBE , OriginStatus .DISCONNECTED ))
85+ delay(5000 )
7786 }
87+ }
88+ atomicStatus.set(ChatClientStatus .OFFLINE )
89+ logger.info(" Youtube watcher was stopped" )
90+ }
7891
79- highlightHandler.setChannelTitle(initialParameters.channelName)
92+ private suspend fun listenForMessages () {
93+ var parameters = youtubeClient.getNewLiveChatSessionData(videoId)
94+ logger.info { " Initial youtube parameters: $parameters " }
8095
81- val actionChannel = with (youtubeClient) {
82- pollLiveChatActions(initialParameters)
83- }
84- for (action in actionChannel) {
85- if (action.isModerationAction()) {
86- val channelIdToDeleteMessages = action.markChatItemsByAuthorAsDeletedAction!! .externalChannelId
96+ atomicStatus.set(ChatClientStatus .CONNECTED )
97+ callbacks.onStatusUpdate(StatusUpdate (Origin .YOUTUBE , OriginStatus .CONNECTED ))
8798
88- val messagesToDelete = history.findTyped<YoutubeMessage > {
89- channelIdToDeleteMessages == it.author.id
90- }
91- messagesToDelete.forEach {
92- callbacks.onChatMessageDeleted(it)
93- }
99+ highlightHandler.setChannelTitle(parameters.channelName)
94100
101+ while (isActive) {
102+ val liveChatContinuation = youtubeClient.getLiveChatResponse(parameters)
103+ .continuationContents
104+ .liveChatContinuation
105+
106+ for (action in liveChatContinuation.actions) {
107+ if (action.isModerationAction()) {
108+ handleModerationAction(action)
95109 } else {
96- val message = action.toChatMessage() ? : continue
97- messageHandlers.forEach {
98- it.handleMessage(message)
99- }
100- callbacks.onChatMessage(message)
110+ handleChatMessageAction(action)
101111 }
102112 }
113+
114+ val continuationData = liveChatContinuation.continuations.first().anyContinuation()
115+ parameters = parameters.copy(nextContinuation = continuationData.continuation)
116+
117+ delay(continuationData.timeoutMs.toLong())
103118 }
119+ }
104120
105- job.invokeOnCompletion { e ->
106- if (e != null ) {
107- atomicStatus.set(ChatClientStatus .ERROR )
108- } else {
109- atomicStatus.set(ChatClientStatus .OFFLINE )
110- }
111- callbacks.onStatusUpdate(StatusUpdate (Origin .YOUTUBE , OriginStatus .DISCONNECTED ))
121+ private fun handleChatMessageAction (action : Action ) {
122+ val message = action.toChatMessage() ? : return
123+ messageHandlers.forEach {
124+ it.handleMessage(message)
125+ }
126+ callbacks.onChatMessage(message)
127+ }
128+
129+ private suspend fun handleModerationAction (action : Action ) {
130+ val channelIdToDeleteMessages = action.markChatItemsByAuthorAsDeletedAction!! .externalChannelId
131+
132+ val messagesToDelete = history.findTyped<YoutubeMessage > {
133+ channelIdToDeleteMessages == it.author.id
134+ }
135+ messagesToDelete.forEach {
136+ callbacks.onChatMessageDeleted(it)
112137 }
113138 }
114139
115140 override fun stop () {
116141 logger.info { " Stopping youtube client" }
117142 cancel()
118- atomicStatus.value = ChatClientStatus .OFFLINE
119143 }
120144
121145 private fun LiveChatResponse.Action.toChatMessage (): YoutubeMessage ? {
@@ -199,5 +223,4 @@ class YoutubeChatClient(
199223 description = liveChatAuthorBadgeRenderer.tooltip
200224 )
201225 }
202-
203226}
0 commit comments