Skip to content

Commit c506a13

Browse files
authored
Merge pull request #6183 from nextcloud/bugfix/noid/improve-attachment-upload
Uploads hardening
2 parents 2e95018 + 7e8105d commit c506a13

10 files changed

Lines changed: 251 additions & 67 deletions

File tree

app/src/main/java/com/nextcloud/talk/chat/ChatActivity.kt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -483,13 +483,7 @@ class ChatActivity :
483483

484484
override fun onChatMessagesReceived(chatMessages: List<ChatMessageJson>) {
485485
chatViewModel.onSignalingChatMessageReceived(chatMessages)
486-
487-
Log.d(
488-
TAG,
489-
"received message in ChatActivity. This is the chat message received via HPB. It would be " +
490-
"nicer to receive it in the ViewModel or Repository directly. " +
491-
"Otherwise it needs to be passed into it from here..."
492-
)
486+
Log.d(TAG, "received signaling message in ChatActivity")
493487
}
494488
}
495489

app/src/main/java/com/nextcloud/talk/chat/data/ChatMessageRepository.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ interface ChatMessageRepository : LifecycleAwareManager {
6060

6161
suspend fun startMessagePolling(hasHighPerformanceBackend: Boolean)
6262

63+
/**
64+
* Performs a one-time non-blocking fetch of messages newer than the latest known message.
65+
* Use this to immediately refresh chat after local actions (e.g. file share) that create
66+
* server-side messages which would otherwise only appear after the next insurance-request cycle.
67+
*
68+
* @return `true` if at least one new message was received, `false` when the server returned
69+
* 304 Not Modified or an empty message list.
70+
*/
71+
suspend fun fetchNewMessages(): Boolean
72+
6373
/**
6474
* Loads messages from local storage. If the messages are not found, then it
6575
* synchronizes the database with the server, before retrying exactly once. Only

app/src/main/java/com/nextcloud/talk/chat/data/network/OfflineFirstChatRepository.kt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,12 @@ class OfflineFirstChatRepository @Inject constructor(
275275
}
276276

277277
/**
278-
* Fetches messages newer than latest known message
278+
* Fetches messages newer than latest known message.
279+
*
280+
* @return `true` if at least one new message was received and persisted.
279281
*/
280-
private suspend fun fetchNewMessages() {
281-
var fieldMap = getFieldMap(
282+
override suspend fun fetchNewMessages(): Boolean {
283+
val fieldMap = getFieldMap(
282284
lookIntoFuture = true,
283285
timeout = 0,
284286
includeLastKnown = false,
@@ -288,7 +290,7 @@ class OfflineFirstChatRepository @Inject constructor(
288290
val networkParams = Bundle()
289291
networkParams.putSerializable(BundleKeys.KEY_FIELD_MAP, fieldMap)
290292

291-
getAndPersistMessages(networkParams)
293+
return getAndPersistMessages(networkParams)
292294
}
293295

294296
override suspend fun loadMoreMessages(
@@ -497,7 +499,7 @@ class OfflineFirstChatRepository @Inject constructor(
497499
emit(ChatPullResult.Error(IllegalStateException("All attempts failed")))
498500
}.flowOn(Dispatchers.IO)
499501

500-
private suspend fun getAndPersistMessages(bundle: Bundle) {
502+
private suspend fun getAndPersistMessages(bundle: Bundle): Boolean {
501503
if (!networkMonitor.isOnline.value) {
502504
Log.d(TAG, "Device is offline, can't load chat messages from server")
503505
}
@@ -536,21 +538,26 @@ class OfflineFirstChatRepository @Inject constructor(
536538
lookIntoFuture,
537539
hasHistory
538540
)
541+
return true
539542
} else {
540543
Log.d(TAG, "No new messages to update")
544+
return false
541545
}
542546
}
543547

544548
is ChatPullResult.NotModified -> {
545549
Log.d(TAG, "Server returned NOT_MODIFIED, nothing to update")
550+
return false
546551
}
547552

548553
is ChatPullResult.PreconditionFailed -> {
549554
Log.d(TAG, "Server returned PRECONDITION_FAILED, nothing to update")
555+
return false
550556
}
551557

552558
is ChatPullResult.Error -> {
553559
Log.e(TAG, "Error pulling messages from server", result.throwable)
560+
return false
554561
}
555562
}
556563
}

app/src/main/java/com/nextcloud/talk/chat/viewmodels/ChatViewModel.kt

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import com.nextcloud.talk.data.database.mappers.toDomainModel
3737
import com.nextcloud.talk.data.database.model.ChatMessageEntity
3838
import com.nextcloud.talk.data.user.model.User
3939
import com.nextcloud.talk.extensions.toIntOrZero
40+
import com.nextcloud.talk.jobs.ShareOperationWorker
4041
import com.nextcloud.talk.jobs.UploadAndShareFilesWorker
4142
import com.nextcloud.talk.messagesearch.MessageSearchHelper
4243
import com.nextcloud.talk.models.MessageDraft
@@ -83,6 +84,7 @@ import kotlinx.coroutines.flow.combine
8384
import kotlinx.coroutines.flow.debounce
8485
import kotlinx.coroutines.flow.distinctUntilChanged
8586
import kotlinx.coroutines.flow.distinctUntilChangedBy
87+
import kotlinx.coroutines.flow.filter
8688
import kotlinx.coroutines.flow.filterNotNull
8789
import kotlinx.coroutines.flow.first
8890
import kotlinx.coroutines.flow.flatMapLatest
@@ -1200,6 +1202,50 @@ class ChatViewModel @AssistedInject constructor(
12001202
.launchIn(viewModelScope)
12011203
}
12021204

1205+
private fun observeShareCompleted() {
1206+
ShareOperationWorker.shareCompletedFlow
1207+
.filter { it == chatRoomToken }
1208+
.onEach {
1209+
Log.d(TAG, "Share completed for room $chatRoomToken — fetching new messages immediately")
1210+
fetchNewMessagesWithRetry()
1211+
}
1212+
.launchIn(viewModelScope)
1213+
1214+
UploadAndShareFilesWorker.uploadCompletedFlow
1215+
.filter { it == chatRoomToken }
1216+
.onEach {
1217+
Log.d(TAG, "Upload completed for room $chatRoomToken — fetching new messages immediately")
1218+
UploadAndShareFilesWorker.clearUploadCompletedReplay()
1219+
fetchNewMessagesWithRetry()
1220+
}
1221+
.launchIn(viewModelScope)
1222+
}
1223+
1224+
/**
1225+
* Retries [ChatMessageRepository.fetchNewMessages] up to [POST_UPLOAD_FETCH_MAX_ATTEMPTS] times,
1226+
* waiting [POST_UPLOAD_FETCH_RETRY_DELAY_MS] ms between attempts. Stops as soon as at least one
1227+
* new message is received so that the happy-path (server responds quickly) has no unnecessary
1228+
* delay, while a slow server still gets a few extra chances before we fall back to the regular
1229+
* insurance-request cycle.
1230+
*/
1231+
private suspend fun fetchNewMessagesWithRetry() {
1232+
repeat(POST_UPLOAD_FETCH_MAX_ATTEMPTS) { attempt ->
1233+
if (attempt > 0) {
1234+
Log.d(
1235+
TAG,
1236+
"fetchNewMessagesWithRetry: attempt ${attempt + 1}, " +
1237+
"waiting ${POST_UPLOAD_FETCH_RETRY_DELAY_MS}ms"
1238+
)
1239+
delay(POST_UPLOAD_FETCH_RETRY_DELAY_MS)
1240+
}
1241+
val gotMessages = chatRepository.fetchNewMessages()
1242+
if (gotMessages) {
1243+
Log.d(TAG, "fetchNewMessagesWithRetry: new messages received on attempt ${attempt + 1}")
1244+
return
1245+
}
1246+
}
1247+
Log.d(TAG, "fetchNewMessagesWithRetry: no new messages after $POST_UPLOAD_FETCH_MAX_ATTEMPTS attempts")
1248+
}
12031249
private fun handleSystemMessages(chatMessageList: List<ChatMessage>): List<ChatMessage> {
12041250
fun shouldRemoveMessage(currentMessage: MutableMap.MutableEntry<Int, ChatMessage>): Boolean =
12051251
isInfoMessageAboutDeletion(currentMessage) ||
@@ -1289,6 +1335,7 @@ class ChatViewModel @AssistedInject constructor(
12891335

12901336
observeConversationAndUserFirstTime()
12911337
observeConversationAndUserEveryTime()
1338+
observeShareCompleted()
12921339
}
12931340

12941341
fun ConversationModel?.isOneToOneConversation(): Boolean =
@@ -2181,6 +2228,8 @@ class ChatViewModel @AssistedInject constructor(
21812228
private const val MIN_CHARS_FOR_SEARCH = 2
21822229
private const val CONTEXT_MESSAGES_LIMIT = 50
21832230
private const val LOAD_MORE_MESSAGES_LIMIT = 100
2231+
private const val POST_UPLOAD_FETCH_MAX_ATTEMPTS = 4
2232+
private const val POST_UPLOAD_FETCH_RETRY_DELAY_MS = 1_500L
21842233
}
21852234

21862235
sealed class OutOfOfficeUIState {

app/src/main/java/com/nextcloud/talk/dagger/modules/RestModule.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,20 +240,33 @@ OkHttpClient provideHttpClient(Proxy proxy, AppPreferences appPreferences,
240240

241241
public static class HeadersInterceptor implements Interceptor {
242242

243+
private static final String OCS_V1_PATH = "/ocs/v1.php";
244+
private static final String OCS_V2_PATH = "/ocs/v2.php";
245+
243246
@NonNull
244247
@Override
245248
public Response intercept(@NonNull Chain chain) throws IOException {
246249
Request original = chain.request();
247-
Request request = original.newBuilder()
250+
Request.Builder requestBuilder = original.newBuilder()
248251
.header("User-Agent", ApiUtils.getUserAgent())
249-
.header("Accept", "application/json")
250-
.header("OCS-APIRequest", "true")
251252
.header("ngrok-skip-browser-warning", "true")
252-
.method(original.method(), original.body())
253-
.build();
253+
.method(original.method(), original.body());
254+
255+
if (isOcsEndpoint(original)) {
256+
requestBuilder
257+
.header("Accept", "application/json")
258+
.header("OCS-APIRequest", "true");
259+
}
260+
261+
Request request = requestBuilder.build();
254262

255263
return chain.proceed(request);
256264
}
265+
266+
private boolean isOcsEndpoint(@NonNull Request request) {
267+
String path = request.url().encodedPath();
268+
return path.contains(OCS_V1_PATH) || path.contains(OCS_V2_PATH);
269+
}
257270
}
258271

259272
public static class HttpAuthenticator implements Authenticator {

app/src/main/java/com/nextcloud/talk/jobs/ShareOperationWorker.kt

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import com.nextcloud.talk.utils.bundle.BundleKeys.KEY_INTERNAL_USER_ID
2626
import com.nextcloud.talk.utils.bundle.BundleKeys.KEY_META_DATA
2727
import com.nextcloud.talk.utils.bundle.BundleKeys.KEY_ROOM_TOKEN
2828
import io.reactivex.schedulers.Schedulers
29+
import kotlinx.coroutines.flow.MutableSharedFlow
30+
import kotlinx.coroutines.flow.SharedFlow
31+
import retrofit2.HttpException
2932
import javax.inject.Inject
3033

3134
@AutoInjector(NextcloudTalkApplication::class)
@@ -46,6 +49,17 @@ class ShareOperationWorker(context: Context, workerParams: WorkerParameters) : W
4649

4750
override fun doWork(): Result {
4851
for (filePath in filesArray) {
52+
tryCreateShare(filePath)
53+
}
54+
roomToken?.let { _shareCompletedFlow.tryEmit(it) }
55+
return Result.success()
56+
}
57+
58+
@Suppress("TooGenericExceptionCaught")
59+
private fun tryCreateShare(filePath: String?) {
60+
for (attempt in 1..SHARE_MAX_ATTEMPTS) {
61+
var succeeded = false
62+
var shouldRetry = false
4963
ncApi.createRemoteShare(
5064
credentials,
5165
ApiUtils.getSharingUrl(baseUrl!!),
@@ -56,11 +70,18 @@ class ShareOperationWorker(context: Context, workerParams: WorkerParameters) : W
5670
)
5771
.subscribeOn(Schedulers.io())
5872
.blockingSubscribe(
59-
{},
60-
{ e -> Log.w(TAG, "error while creating RemoteShare", e) }
73+
{ succeeded = true },
74+
{ e ->
75+
if (e is HttpException && e.code() == HTTP_NOT_FOUND && attempt < SHARE_MAX_ATTEMPTS) {
76+
shouldRetry = true
77+
} else {
78+
Log.w(TAG, "error while creating RemoteShare", e)
79+
}
80+
}
6181
)
82+
if (succeeded || !shouldRetry) return
83+
Thread.sleep(SHARE_RETRY_DELAY_MS)
6284
}
63-
return Result.success()
6485
}
6586

6687
init {
@@ -78,6 +99,12 @@ class ShareOperationWorker(context: Context, workerParams: WorkerParameters) : W
7899

79100
companion object {
80101
private val TAG = ShareOperationWorker::class.simpleName
102+
private const val HTTP_NOT_FOUND = 404
103+
private const val SHARE_MAX_ATTEMPTS = 4
104+
private const val SHARE_RETRY_DELAY_MS = 2000L
105+
106+
private val _shareCompletedFlow: MutableSharedFlow<String> = MutableSharedFlow(extraBufferCapacity = 1)
107+
val shareCompletedFlow: SharedFlow<String> = _shareCompletedFlow
81108

82109
fun shareFile(roomToken: String?, currentUser: User, remotePath: String, metaData: String?) {
83110
val paths: MutableList<String> = ArrayList()

app/src/main/java/com/nextcloud/talk/jobs/UploadAndShareFilesWorker.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ import com.nextcloud.talk.utils.bundle.BundleKeys.KEY_ROOM_TOKEN
5050
import com.nextcloud.talk.utils.database.user.CurrentUserProviderOld
5151
import com.nextcloud.talk.utils.permissions.PlatformPermissionUtil
5252
import com.nextcloud.talk.utils.preferences.AppPreferences
53+
import kotlinx.coroutines.ExperimentalCoroutinesApi
54+
import kotlinx.coroutines.flow.MutableSharedFlow
55+
import kotlinx.coroutines.flow.SharedFlow
5356
import kotlinx.coroutines.runBlocking
5457
import okhttp3.MediaType.Companion.toMediaTypeOrNull
5558
import okhttp3.OkHttpClient
@@ -126,6 +129,7 @@ class UploadAndShareFilesWorker(val context: Context, workerParameters: WorkerPa
126129

127130
if (uploadSuccess) {
128131
cancelNotification()
132+
_uploadCompletedFlow.tryEmit(roomToken)
129133
return Result.success()
130134
} else if (isStopped) {
131135
// since work is cancelled the result would be ignored anyways
@@ -409,13 +413,24 @@ class UploadAndShareFilesWorker(val context: Context, workerParameters: WorkerPa
409413
private const val ROOM_TOKEN = "ROOM_TOKEN"
410414
private const val CONVERSATION_NAME = "CONVERSATION_NAME"
411415
private const val META_DATA = "META_DATA"
412-
private const val CHUNK_UPLOAD_THRESHOLD_SIZE: Long = 1024000
416+
private const val CHUNK_UPLOAD_THRESHOLD_SIZE: Long = 1024 * 1024
413417
private const val NOTIFICATION_FILE_NAME_MAX_LENGTH = 20
414418
private const val THREE_DOTS = ""
415419
private const val HUNDRED_PERCENT = 100
416420
private const val ZERO_PERCENT = 0
417421
const val REQUEST_PERMISSION = 3123
418422

423+
private val _uploadCompletedFlow: MutableSharedFlow<String> = MutableSharedFlow(
424+
replay = 1,
425+
extraBufferCapacity = 1
426+
)
427+
val uploadCompletedFlow: SharedFlow<String> = _uploadCompletedFlow
428+
429+
@OptIn(ExperimentalCoroutinesApi::class)
430+
fun clearUploadCompletedReplay() {
431+
_uploadCompletedFlow.resetReplayCache()
432+
}
433+
419434
fun requestStoragePermission(activity: Activity) {
420435
when {
421436
Build.VERSION

0 commit comments

Comments
 (0)