Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -483,13 +483,7 @@ class ChatActivity :

override fun onChatMessagesReceived(chatMessages: List<ChatMessageJson>) {
chatViewModel.onSignalingChatMessageReceived(chatMessages)

Log.d(
TAG,
"received message in ChatActivity. This is the chat message received via HPB. It would be " +
"nicer to receive it in the ViewModel or Repository directly. " +
"Otherwise it needs to be passed into it from here..."
)
Log.d(TAG, "received signaling message in ChatActivity")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ interface ChatMessageRepository : LifecycleAwareManager {

suspend fun startMessagePolling(hasHighPerformanceBackend: Boolean)

/**
* Performs a one-time non-blocking fetch of messages newer than the latest known message.
* Use this to immediately refresh chat after local actions (e.g. file share) that create
* server-side messages which would otherwise only appear after the next insurance-request cycle.
*
* @return `true` if at least one new message was received, `false` when the server returned
* 304 Not Modified or an empty message list.
*/
suspend fun fetchNewMessages(): Boolean

/**
* Loads messages from local storage. If the messages are not found, then it
* synchronizes the database with the server, before retrying exactly once. Only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,12 @@ class OfflineFirstChatRepository @Inject constructor(
}

/**
* Fetches messages newer than latest known message
* Fetches messages newer than latest known message.
*
* @return `true` if at least one new message was received and persisted.
*/
private suspend fun fetchNewMessages() {
var fieldMap = getFieldMap(
override suspend fun fetchNewMessages(): Boolean {
val fieldMap = getFieldMap(
lookIntoFuture = true,
timeout = 0,
includeLastKnown = false,
Expand All @@ -288,7 +290,7 @@ class OfflineFirstChatRepository @Inject constructor(
val networkParams = Bundle()
networkParams.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap)

getAndPersistMessages(networkParams)
return getAndPersistMessages(networkParams)
}

override suspend fun loadMoreMessages(
Expand Down Expand Up @@ -497,7 +499,7 @@ class OfflineFirstChatRepository @Inject constructor(
emit(ChatPullResult.Error(IllegalStateException("All attempts failed")))
}.flowOn(Dispatchers.IO)

private suspend fun getAndPersistMessages(bundle: Bundle) {
private suspend fun getAndPersistMessages(bundle: Bundle): Boolean {
if (!networkMonitor.isOnline.value) {
Log.d(TAG, "Device is offline, can't load chat messages from server")
}
Expand Down Expand Up @@ -536,21 +538,26 @@ class OfflineFirstChatRepository @Inject constructor(
lookIntoFuture,
hasHistory
)
return true
} else {
Log.d(TAG, "No new messages to update")
return false
}
}

is ChatPullResult.NotModified -> {
Log.d(TAG, "Server returned NOT_MODIFIED, nothing to update")
return false
}

is ChatPullResult.PreconditionFailed -> {
Log.d(TAG, "Server returned PRECONDITION_FAILED, nothing to update")
return false
}

is ChatPullResult.Error -> {
Log.e(TAG, "Error pulling messages from server", result.throwable)
return false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.nextcloud.talk.data.database.mappers.toDomainModel
import com.nextcloud.talk.data.database.model.ChatMessageEntity
import com.nextcloud.talk.data.user.model.User
import com.nextcloud.talk.extensions.toIntOrZero
import com.nextcloud.talk.jobs.ShareOperationWorker
import com.nextcloud.talk.jobs.UploadAndShareFilesWorker
import com.nextcloud.talk.messagesearch.MessageSearchHelper
import com.nextcloud.talk.models.MessageDraft
Expand Down Expand Up @@ -83,6 +84,7 @@ import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.distinctUntilChangedBy
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
Expand Down Expand Up @@ -1200,6 +1202,50 @@ class ChatViewModel @AssistedInject constructor(
.launchIn(viewModelScope)
}

private fun observeShareCompleted() {
ShareOperationWorker.shareCompletedFlow
.filter { it == chatRoomToken }
.onEach {
Log.d(TAG, "Share completed for room $chatRoomToken — fetching new messages immediately")
fetchNewMessagesWithRetry()
}
.launchIn(viewModelScope)

UploadAndShareFilesWorker.uploadCompletedFlow
.filter { it == chatRoomToken }
.onEach {
Log.d(TAG, "Upload completed for room $chatRoomToken — fetching new messages immediately")
UploadAndShareFilesWorker.clearUploadCompletedReplay()
fetchNewMessagesWithRetry()
}
.launchIn(viewModelScope)
}

/**
* Retries [ChatMessageRepository.fetchNewMessages] up to [POST_UPLOAD_FETCH_MAX_ATTEMPTS] times,
* waiting [POST_UPLOAD_FETCH_RETRY_DELAY_MS] ms between attempts. Stops as soon as at least one
* new message is received so that the happy-path (server responds quickly) has no unnecessary
* delay, while a slow server still gets a few extra chances before we fall back to the regular
* insurance-request cycle.
*/
private suspend fun fetchNewMessagesWithRetry() {
repeat(POST_UPLOAD_FETCH_MAX_ATTEMPTS) { attempt ->
if (attempt > 0) {
Log.d(
TAG,
"fetchNewMessagesWithRetry: attempt ${attempt + 1}, " +
"waiting ${POST_UPLOAD_FETCH_RETRY_DELAY_MS}ms"
)
delay(POST_UPLOAD_FETCH_RETRY_DELAY_MS)
}
val gotMessages = chatRepository.fetchNewMessages()
if (gotMessages) {
Log.d(TAG, "fetchNewMessagesWithRetry: new messages received on attempt ${attempt + 1}")
return
}
}
Log.d(TAG, "fetchNewMessagesWithRetry: no new messages after $POST_UPLOAD_FETCH_MAX_ATTEMPTS attempts")
}
private fun handleSystemMessages(chatMessageList: List<ChatMessage>): List<ChatMessage> {
fun shouldRemoveMessage(currentMessage: MutableMap.MutableEntry<Int, ChatMessage>): Boolean =
isInfoMessageAboutDeletion(currentMessage) ||
Expand Down Expand Up @@ -1289,6 +1335,7 @@ class ChatViewModel @AssistedInject constructor(

observeConversationAndUserFirstTime()
observeConversationAndUserEveryTime()
observeShareCompleted()
}

fun ConversationModel?.isOneToOneConversation(): Boolean =
Expand Down Expand Up @@ -2181,6 +2228,8 @@ class ChatViewModel @AssistedInject constructor(
private const val MIN_CHARS_FOR_SEARCH = 2
private const val CONTEXT_MESSAGES_LIMIT = 50
private const val LOAD_MORE_MESSAGES_LIMIT = 100
private const val POST_UPLOAD_FETCH_MAX_ATTEMPTS = 4
private const val POST_UPLOAD_FETCH_RETRY_DELAY_MS = 1_500L
}

sealed class OutOfOfficeUIState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,33 @@ OkHttpClient provideHttpClient(Proxy proxy, AppPreferences appPreferences,

public static class HeadersInterceptor implements Interceptor {

private static final String OCS_V1_PATH = "/ocs/v1.php";
private static final String OCS_V2_PATH = "/ocs/v2.php";

@NonNull
@Override
public Response intercept(@NonNull Chain chain) throws IOException {
Request original = chain.request();
Request request = original.newBuilder()
Request.Builder requestBuilder = original.newBuilder()
.header("User-Agent", ApiUtils.getUserAgent())
.header("Accept", "application/json")
.header("OCS-APIRequest", "true")
.header("ngrok-skip-browser-warning", "true")
.method(original.method(), original.body())
.build();
.method(original.method(), original.body());

if (isOcsEndpoint(original)) {
requestBuilder
.header("Accept", "application/json")
.header("OCS-APIRequest", "true");
}

Request request = requestBuilder.build();

return chain.proceed(request);
}

private boolean isOcsEndpoint(@NonNull Request request) {
String path = request.url().encodedPath();
return path.contains(OCS_V1_PATH) || path.contains(OCS_V2_PATH);
}
}

public static class HttpAuthenticator implements Authenticator {
Expand Down
33 changes: 30 additions & 3 deletions app/src/main/java/com/nextcloud/talk/jobs/ShareOperationWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import com.nextcloud.talk.utils.bundle.BundleKeys.KEY_INTERNAL_USER_ID
import com.nextcloud.talk.utils.bundle.BundleKeys.KEY_META_DATA
import com.nextcloud.talk.utils.bundle.BundleKeys.KEY_ROOM_TOKEN
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import retrofit2.HttpException
import javax.inject.Inject

@AutoInjector(NextcloudTalkApplication::class)
Expand All @@ -46,6 +49,17 @@ class ShareOperationWorker(context: Context, workerParams: WorkerParameters) : W

override fun doWork(): Result {
for (filePath in filesArray) {
tryCreateShare(filePath)
}
roomToken?.let { _shareCompletedFlow.tryEmit(it) }
return Result.success()
}

@Suppress("TooGenericExceptionCaught")
private fun tryCreateShare(filePath: String?) {
for (attempt in 1..SHARE_MAX_ATTEMPTS) {
var succeeded = false
var shouldRetry = false
ncApi.createRemoteShare(
credentials,
ApiUtils.getSharingUrl(baseUrl!!),
Expand All @@ -56,11 +70,18 @@ class ShareOperationWorker(context: Context, workerParams: WorkerParameters) : W
)
.subscribeOn(Schedulers.io())
.blockingSubscribe(
{},
{ e -> Log.w(TAG, "error while creating RemoteShare", e) }
{ succeeded = true },
{ e ->
if (e is HttpException && e.code() == HTTP_NOT_FOUND && attempt < SHARE_MAX_ATTEMPTS) {
shouldRetry = true
} else {
Log.w(TAG, "error while creating RemoteShare", e)
}
}
)
if (succeeded || !shouldRetry) return
Thread.sleep(SHARE_RETRY_DELAY_MS)
}
return Result.success()
}

init {
Expand All @@ -78,6 +99,12 @@ class ShareOperationWorker(context: Context, workerParams: WorkerParameters) : W

companion object {
private val TAG = ShareOperationWorker::class.simpleName
private const val HTTP_NOT_FOUND = 404
private const val SHARE_MAX_ATTEMPTS = 4
private const val SHARE_RETRY_DELAY_MS = 2000L

private val _shareCompletedFlow: MutableSharedFlow<String> = MutableSharedFlow(extraBufferCapacity = 1)
val shareCompletedFlow: SharedFlow<String> = _shareCompletedFlow

fun shareFile(roomToken: String?, currentUser: User, remotePath: String, metaData: String?) {
val paths: MutableList<String> = ArrayList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ import com.nextcloud.talk.utils.bundle.BundleKeys.KEY_ROOM_TOKEN
import com.nextcloud.talk.utils.database.user.CurrentUserProviderOld
import com.nextcloud.talk.utils.permissions.PlatformPermissionUtil
import com.nextcloud.talk.utils.preferences.AppPreferences
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.runBlocking
import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.OkHttpClient
Expand Down Expand Up @@ -126,6 +129,7 @@ class UploadAndShareFilesWorker(val context: Context, workerParameters: WorkerPa

if (uploadSuccess) {
cancelNotification()
_uploadCompletedFlow.tryEmit(roomToken)
return Result.success()
} else if (isStopped) {
// since work is cancelled the result would be ignored anyways
Expand Down Expand Up @@ -409,13 +413,24 @@ class UploadAndShareFilesWorker(val context: Context, workerParameters: WorkerPa
private const val ROOM_TOKEN = "ROOM_TOKEN"
private const val CONVERSATION_NAME = "CONVERSATION_NAME"
private const val META_DATA = "META_DATA"
private const val CHUNK_UPLOAD_THRESHOLD_SIZE: Long = 1024000
private const val CHUNK_UPLOAD_THRESHOLD_SIZE: Long = 1024 * 1024
private const val NOTIFICATION_FILE_NAME_MAX_LENGTH = 20
private const val THREE_DOTS = "…"
private const val HUNDRED_PERCENT = 100
private const val ZERO_PERCENT = 0
const val REQUEST_PERMISSION = 3123

private val _uploadCompletedFlow: MutableSharedFlow<String> = MutableSharedFlow(
replay = 1,
extraBufferCapacity = 1
)
val uploadCompletedFlow: SharedFlow<String> = _uploadCompletedFlow

@OptIn(ExperimentalCoroutinesApi::class)
fun clearUploadCompletedReplay() {
_uploadCompletedFlow.resetReplayCache()
}

fun requestStoragePermission(activity: Activity) {
when {
Build.VERSION
Expand Down
Loading
Loading