@@ -43,10 +43,14 @@ import kotlinx.coroutines.flow.flow
4343import kotlinx.coroutines.flow.flowOf
4444import kotlinx.coroutines.flow.flowOn
4545import kotlinx.coroutines.flow.map
46+ import kotlinx.coroutines.flow.mapNotNull
47+ import kotlinx.coroutines.flow.take
48+ import kotlinx.coroutines.flow.timeout
4649import retrofit2.HttpException
4750import java.io.IOException
4851import javax.inject.Inject
4952import kotlin.collections.map
53+ import kotlin.time.Duration.Companion.microseconds
5054
5155@Suppress(" LargeClass" , " TooManyFunctions" )
5256class OfflineFirstChatRepository @Inject constructor(
@@ -433,35 +437,35 @@ class OfflineFirstChatRepository @Inject constructor(
433437 // _messageFlow.emit(triple)
434438 // }
435439
436- private suspend fun hasToLoadPreviousMessagesFromServer (beforeMessageId : Long , amountToCheck : Int ): Boolean {
437- val loadFromServer: Boolean
438-
439- val blockForMessage = getBlockOfMessage(beforeMessageId.toInt())
440-
441- if (blockForMessage == null ) {
442- Log .d(TAG , " No blocks for this message were found so we have to ask server" )
443- loadFromServer = true
444- } else if (! blockForMessage.hasHistory) {
445- Log .d(TAG , " The last chatBlock is reached so we won't request server for older messages" )
446- loadFromServer = false
447- } else {
448- val amountBetween = chatDao.getCountBetweenMessageIds(
449- internalConversationId,
450- beforeMessageId,
451- blockForMessage.oldestMessageId,
452- threadId
453- )
454- loadFromServer = amountBetween < amountToCheck
455-
456- Log .d(
457- TAG ,
458- " Amount between messageId " + beforeMessageId + " and " + blockForMessage.oldestMessageId +
459- " is: " + amountBetween + " and $amountToCheck were needed, so 'loadFromServer' is " +
460- loadFromServer
461- )
462- }
463- return loadFromServer
464- }
440+ // private suspend fun hasToLoadPreviousMessagesFromServer(beforeMessageId: Long, amountToCheck: Int): Boolean {
441+ // val loadFromServer: Boolean
442+ //
443+ // val blockForMessage = getBlockOfMessage(beforeMessageId.toInt())
444+ //
445+ // if (blockForMessage == null) {
446+ // Log.d(TAG, "No blocks for this message were found so we have to ask server")
447+ // loadFromServer = true
448+ // } else if (!blockForMessage.hasHistory) {
449+ // Log.d(TAG, "The last chatBlock is reached so we won't request server for older messages")
450+ // loadFromServer = false
451+ // } else {
452+ // val amountBetween = chatDao.getCountBetweenMessageIds(
453+ // internalConversationId,
454+ // beforeMessageId,
455+ // blockForMessage.oldestMessageId,
456+ // threadId
457+ // )
458+ // loadFromServer = amountBetween < amountToCheck
459+ //
460+ // Log.d(
461+ // TAG,
462+ // "Amount between messageId " + beforeMessageId + " and " + blockForMessage.oldestMessageId +
463+ // " is: " + amountBetween + " and $amountToCheck were needed, so 'loadFromServer' is " +
464+ // loadFromServer
465+ // )
466+ // }
467+ // return loadFromServer
468+ // }
465469
466470 @Suppress(" LongParameterList" )
467471 private fun getFieldMap (
@@ -497,30 +501,38 @@ class OfflineFirstChatRepository @Inject constructor(
497501 override suspend fun getNumberOfThreadReplies (threadId : Long ): Int =
498502 chatDao.getNumberOfThreadReplies(internalConversationId, threadId)
499503
500- override suspend fun getMessage (messageId : Long , bundle : Bundle ) = flowOf<ChatMessage >()
501- // override suspend fun getMessage(messageId: Long, bundle: Bundle): Flow<ChatMessage> {
502- // Log.d(TAG, "Get message with id $messageId")
503- // // val loadFromServer = hasToLoadPreviousMessagesFromServer(messageId, 1)
504- // //
505- // // if (loadFromServer) {
506- // val fieldMap = getFieldMap(
507- // lookIntoFuture = false,
508- // timeout = 0,
509- // includeLastKnown = true,
510- // lastKnown = messageId.toInt(),
511- // limit = 1
512- // )
513- // bundle.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap)
514- //
515- // Log.d(TAG, "Starting online request for single message (e.g. a reply)")
516- // getAndPersistMessages(bundle)
517- // // }
518- // // we cant just expect here that sync succeeded??
519- // return chatDao.getChatMessageForConversation(
520- // internalConversationId,
521- // messageId
522- // ).map(ChatMessageEntity::asModel)
523- // }
504+ override suspend fun getMessage (
505+ messageId : Long ,
506+ bundle : Bundle
507+ ): Flow <ChatMessage > {
508+ Log .d(TAG , " Get message with id $messageId " )
509+
510+ val localMessage = chatDao.getChatMessageOnce(
511+ internalConversationId,
512+ messageId
513+ )
514+
515+ if (localMessage == null ) {
516+ val fieldMap = getFieldMap(
517+ lookIntoFuture = false ,
518+ timeout = 0 ,
519+ includeLastKnown = true ,
520+ lastKnown = messageId.toInt(),
521+ limit = 1
522+ )
523+ bundle.putSerializable(BundleKeys .KEY_FIELD_MAP , fieldMap)
524+
525+ Log .d(TAG , " Starting online request for single message" )
526+ getAndPersistMessages(bundle)
527+ }
528+
529+ return chatDao
530+ .getChatMessageForConversationNullable(internalConversationId, messageId)
531+ .mapNotNull { it?.asModel() }
532+ .take(1 )
533+ .timeout(5_000 .microseconds)
534+ .catch { /* timeout -> emit nothing */ }
535+ }
524536
525537 fun pullMessagesFlow (bundle : Bundle ): Flow <ChatPullResult > = flow {
526538 val fieldMap = bundle.getSerializable(BundleKeys .KEY_FIELD_MAP ) as HashMap <String , Int >
0 commit comments