Skip to content

Commit 6355200

Browse files
committed
Added function to parallelise uploads. Still need to address non-thread safe vars and make sure UI updates accordingly
Signed-off-by: Raphael Vieira <raphaelecv@hotmail.com>
1 parent d37732c commit 6355200

1 file changed

Lines changed: 98 additions & 5 deletions

File tree

app/src/main/java/com/nextcloud/client/jobs/upload/FileUploadWorker.kt

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,14 @@ import com.owncloud.android.lib.common.utils.Log_OC
4343
import com.owncloud.android.operations.UploadFileOperation
4444
import com.owncloud.android.ui.notifications.NotificationUtils
4545
import com.owncloud.android.utils.theme.ViewThemeUtils
46+
import kotlinx.coroutines.CoroutineScope
4647
import kotlinx.coroutines.Dispatchers
48+
import kotlinx.coroutines.cancel
49+
import kotlinx.coroutines.coroutineScope
4750
import kotlinx.coroutines.ensureActive
51+
import kotlinx.coroutines.launch
52+
import kotlinx.coroutines.sync.Semaphore
53+
import kotlinx.coroutines.sync.withPermit
4854
import kotlinx.coroutines.withContext
4955
import java.io.File
5056
import kotlin.random.Random
@@ -253,20 +259,108 @@ class FileUploadWorker(
253259
val ocAccount = OwnCloudAccount(user.toPlatformAccount(), context)
254260
val client = OwnCloudClientManagerFactory.getDefaultSingleton().getClientFor(ocAccount, context)
255261

256-
for ((index, upload) in uploads.withIndex()) {
262+
return@withContext parallelUpload(uploads, user, previouslyUploadedFileSize, totalUploadSize, client, accountName)
263+
264+
// return@withContext sequentialUpload(uploads, user, previouslyUploadedFileSize, totalUploadSize, client, accountName)
265+
}
266+
267+
268+
private suspend fun parallelUpload(
269+
uploads: List<OCUpload>?,
270+
user: User,
271+
previouslyUploadedFileSize: Int,
272+
totalUploadSize: Int,
273+
client: OwnCloudClient,
274+
accountName: String
275+
): Result {
276+
val semaphore = Semaphore(5) // Limit to 5 parallel uploads
277+
var quotaExceeded = false
278+
279+
coroutineScope {
280+
for ((index, upload) in uploads?.withIndex()!!) {
281+
if (quotaExceeded) break
282+
ensureActive()
283+
284+
launch {
285+
semaphore.withPermit {
286+
if (quotaExceeded || isStopped) return@launch
287+
288+
if (preferences.isGlobalUploadPaused) {
289+
Log_OC.d(TAG, "Upload is paused, skip uploading files!")
290+
notificationManager.notifyPaused(intents.openUploadListIntent(null))
291+
return@launch
292+
}
293+
294+
if (canExitEarly()) {
295+
notificationManager.showConnectionErrorNotification()
296+
return@launch
297+
}
298+
299+
setWorkerState(user)
300+
val operation = createUploadFileOperation(upload, user)
301+
302+
// NOTE: currentUploadFileOperation is a companion property.
303+
// Parallelizing will cause race conditions here.
304+
// You should ideally move this to a thread-safe collection if needed.
305+
currentUploadFileOperation = operation
306+
307+
val currentIndex = (index + 1)
308+
val currentUploadIndex = (currentIndex + previouslyUploadedFileSize)
309+
310+
// Synchronize notification updates if they aren't thread-safe
311+
synchronized(notificationManager) {
312+
notificationManager.prepareForStart(
313+
operation,
314+
startIntent = intents.openUploadListIntent(operation),
315+
currentUploadIndex = currentUploadIndex,
316+
totalUploadSize = totalUploadSize
317+
)
318+
}
319+
320+
val result = upload(operation, user, client)
321+
322+
val entity = uploadsStorageManager.uploadDao.getUploadById(upload.uploadId, accountName)
323+
uploadsStorageManager.updateStatus(entity, result.isSuccess)
324+
325+
if (result.code == ResultCode.QUOTA_EXCEEDED) {
326+
Log_OC.w(TAG, "Quota exceeded, stopping uploads")
327+
notificationManager.showQuotaExceedNotification(operation)
328+
quotaExceeded = true
329+
this@coroutineScope.cancel("Quota exceeded")
330+
return@launch
331+
}
332+
333+
sendUploadFinishEvent(totalUploadSize, currentUploadIndex, operation, result)
334+
}
335+
}
336+
}
337+
}
338+
339+
return if (quotaExceeded) Result.failure() else Result.success()
340+
}
341+
342+
private suspend fun CoroutineScope.sequentialUpload(
343+
uploads: List<OCUpload>?,
344+
user: User,
345+
previouslyUploadedFileSize: Int,
346+
totalUploadSize: Int,
347+
client: OwnCloudClient,
348+
accountName: String
349+
): Result {
350+
for ((index, upload) in uploads?.withIndex()!!) {
257351
ensureActive()
258352

259353
if (preferences.isGlobalUploadPaused) {
260354
Log_OC.d(TAG, "Upload is paused, skip uploading files!")
261355
notificationManager.notifyPaused(
262356
intents.openUploadListIntent(null)
263357
)
264-
return@withContext Result.success()
358+
return Result.success()
265359
}
266360

267361
if (canExitEarly()) {
268362
notificationManager.showConnectionErrorNotification()
269-
return@withContext Result.failure()
363+
return Result.failure()
270364
}
271365

272366
setWorkerState(user)
@@ -298,9 +392,8 @@ class FileUploadWorker(
298392
sendUploadFinishEvent(totalUploadSize, currentUploadIndex, operation, result)
299393
}
300394

301-
return@withContext Result.success()
395+
return Result.success()
302396
}
303-
304397
private fun sendUploadFinishEvent(
305398
totalUploadSize: Int,
306399
currentUploadIndex: Int,

0 commit comments

Comments
 (0)