@@ -53,6 +53,9 @@ import kotlinx.coroutines.sync.Semaphore
5353import kotlinx.coroutines.sync.withPermit
5454import kotlinx.coroutines.withContext
5555import java.io.File
56+ import java.util.concurrent.ConcurrentHashMap
57+ import java.util.concurrent.atomic.AtomicBoolean
58+ import java.util.concurrent.atomic.AtomicInteger
5659import kotlin.random.Random
5760
5861@Suppress(" LongParameterList" , " TooGenericExceptionCaught" )
@@ -81,7 +84,7 @@ class FileUploadWorker(
8184 const val TOTAL_UPLOAD_SIZE = " total_upload_size"
8285 const val SHOW_SAME_FILE_ALREADY_EXISTS_NOTIFICATION = " show_same_file_already_exists_notification"
8386
84- var currentUploadFileOperation : UploadFileOperation ? = null
87+ private val activeUploadFileOperations = ConcurrentHashMap < String , UploadFileOperation >()
8588
8689 private const val UPLOADS_ADDED_MESSAGE = " UPLOADS_ADDED"
8790 private const val UPLOAD_START_MESSAGE = " UPLOAD_START"
@@ -109,20 +112,18 @@ class FileUploadWorker(
109112 fun getUploadFinishMessage (): String = FileUploadWorker ::class .java.name + UPLOAD_FINISH_MESSAGE
110113
111114 fun cancelCurrentUpload (remotePath : String , accountName : String , onCompleted : () -> Unit ) {
112- currentUploadFileOperation?. let {
115+ activeUploadFileOperations.values.forEach {
113116 if (it.remotePath == remotePath && it.user.accountName == accountName) {
114117 it.cancel(ResultCode .USER_CANCELLED )
115- onCompleted()
116118 }
117119 }
120+ onCompleted()
118121 }
119122
120123 fun isUploading (remotePath : String? , accountName : String? ): Boolean {
121- currentUploadFileOperation?.let {
122- return it.remotePath == remotePath && it.user.accountName == accountName
123- }
124-
125- return false
124+ return activeUploadFileOperations.values.any {
125+ it.remotePath == remotePath && it.user.accountName == accountName
126+ }
126127 }
127128
128129 fun getUploadAction (action : String ): Int = when (action) {
@@ -133,7 +134,9 @@ class FileUploadWorker(
133134 }
134135 }
135136
136- private var lastPercent = 0
137+ private val lastPercents = ConcurrentHashMap <String , Int >()
138+ private val lastUpdateTimes = ConcurrentHashMap <String , Long >()
139+
137140 private val notificationId = Random .nextInt()
138141 private val notificationManager = UploadNotificationManager (context, viewThemeUtils, notificationId)
139142 private val intents = FileUploaderIntents (context)
@@ -208,7 +211,7 @@ class FileUploadWorker(
208211 Log_OC .e(TAG , " FileUploadWorker stopped" )
209212
210213 setIdleWorkerState()
211- currentUploadFileOperation?. cancel(null )
214+ activeUploadFileOperations.values.forEach { it. cancel(null ) }
212215 notificationManager.dismissNotification()
213216 }
214217
@@ -217,7 +220,8 @@ class FileUploadWorker(
217220 }
218221
219222 private fun setIdleWorkerState () {
220- WorkerStateObserver .send(WorkerState .FileUploadCompleted (currentUploadFileOperation?.file))
223+ val lastOp = activeUploadFileOperations.values.lastOrNull()
224+ WorkerStateObserver .send(WorkerState .FileUploadCompleted (lastOp?.file))
221225 }
222226
223227 @Suppress(" ReturnCount" , " LongMethod" , " DEPRECATION" )
@@ -260,8 +264,6 @@ class FileUploadWorker(
260264 val client = OwnCloudClientManagerFactory .getDefaultSingleton().getClientFor(ocAccount, context)
261265
262266 return @withContext parallelUpload(uploads, user, previouslyUploadedFileSize, totalUploadSize, client, accountName)
263-
264- // return@withContext sequentialUpload(uploads, user, previouslyUploadedFileSize, totalUploadSize, client, accountName)
265267 }
266268
267269
@@ -273,23 +275,28 @@ class FileUploadWorker(
273275 client : OwnCloudClient ,
274276 accountName : String
275277 ): Result {
278+ if (uploads.isNullOrEmpty()) {
279+ return Result .success()
280+ }
281+
276282 val semaphore = Semaphore (5 ) // Limit to 5 parallel uploads
277- var quotaExceeded = false
283+ val quotaExceeded = AtomicBoolean (false )
284+ val completedCount = AtomicInteger (0 )
278285
279286 coroutineScope {
280- for ((index, upload) in uploads?.withIndex() !! ) {
281- if (quotaExceeded) break
287+ for (upload in uploads) {
288+ if (quotaExceeded.get() ) break
282289 ensureActive()
283290
284291 launch {
285- semaphore.withPermit {
286- if (quotaExceeded || isStopped) return @launch
292+ if (preferences.isGlobalUploadPaused) {
293+ Log_OC .d(TAG , " Upload is paused, skip uploading files!" )
294+ notificationManager.notifyPaused(intents.openUploadListIntent(null ))
295+ return @launch
296+ }
287297
288- if (preferences.isGlobalUploadPaused) {
289- Log_OC .d(TAG , " Upload is paused, skip uploading files!" )
290- notificationManager.notifyPaused(intents.openUploadListIntent(null ))
291- return @launch
292- }
298+ semaphore.withPermit {
299+ if (quotaExceeded.get() || isStopped) return @launch
293300
294301 if (canExitEarly()) {
295302 notificationManager.showConnectionErrorNotification()
@@ -298,102 +305,49 @@ class FileUploadWorker(
298305
299306 setWorkerState(user)
300307 val operation = createUploadFileOperation(upload, user)
301-
302- // NOTE: currentUploadFileOperation is a companion property.
303- // Parallelizing will cause race conditions here.
304- // You should ideally move this to a thread-safe collection if needed.
305- currentUploadFileOperation = operation
306-
307- val currentIndex = (index + 1 )
308- val currentUploadIndex = (currentIndex + previouslyUploadedFileSize)
309-
310- // Synchronize notification updates if they aren't thread-safe
311- synchronized(notificationManager) {
312- notificationManager.prepareForStart(
313- operation,
314- startIntent = intents.openUploadListIntent(operation),
315- currentUploadIndex = currentUploadIndex,
316- totalUploadSize = totalUploadSize
317- )
318- }
308+ activeUploadFileOperations[operation.originalStoragePath] = operation
309+
310+ try {
311+ val currentUploadIndex = previouslyUploadedFileSize + completedCount.incrementAndGet()
312+
313+ // Synchronize notification updates
314+ synchronized(notificationManager) {
315+ notificationManager.prepareForStart(
316+ operation,
317+ startIntent = intents.openUploadListIntent(operation),
318+ currentUploadIndex = currentUploadIndex,
319+ totalUploadSize = totalUploadSize
320+ )
321+ }
319322
320323 val result = upload(operation, user, client)
321324
322325 val entity = uploadsStorageManager.uploadDao.getUploadById(upload.uploadId, accountName)
323326 uploadsStorageManager.updateStatus(entity, result.isSuccess)
324327
325- if (result.code == ResultCode .QUOTA_EXCEEDED ) {
326- Log_OC .w(TAG , " Quota exceeded, stopping uploads" )
327- notificationManager.showQuotaExceedNotification(operation)
328- quotaExceeded = true
329- this @coroutineScope.cancel(" Quota exceeded" )
330- return @launch
331- }
328+ if (result.code == ResultCode .QUOTA_EXCEEDED ) {
329+ Log_OC .w(TAG , " Quota exceeded, stopping uploads" )
330+ notificationManager.showQuotaExceedNotification(operation)
331+ quotaExceeded.set( true )
332+ this @coroutineScope.cancel(" Quota exceeded" )
333+ return @launch
334+ }
332335
333336 sendUploadFinishEvent(totalUploadSize, currentUploadIndex, operation, result)
334- }
337+
338+ } finally {
339+ activeUploadFileOperations.remove(operation.originalStoragePath)
340+ lastPercents.remove(operation.originalStoragePath)
341+ lastUpdateTimes.remove(operation.originalStoragePath)
342+ }
343+ }
335344 }
336345 }
337346 }
338347
339- return if (quotaExceeded) Result .failure() else Result .success()
348+ return if (quotaExceeded.get() ) Result .failure() else Result .success()
340349 }
341350
342- private suspend fun CoroutineScope.sequentialUpload (
343- uploads : List <OCUpload >? ,
344- user : User ,
345- previouslyUploadedFileSize : Int ,
346- totalUploadSize : Int ,
347- client : OwnCloudClient ,
348- accountName : String
349- ): Result {
350- for ((index, upload) in uploads?.withIndex()!! ) {
351- ensureActive()
352-
353- if (preferences.isGlobalUploadPaused) {
354- Log_OC .d(TAG , " Upload is paused, skip uploading files!" )
355- notificationManager.notifyPaused(
356- intents.openUploadListIntent(null )
357- )
358- return Result .success()
359- }
360-
361- if (canExitEarly()) {
362- notificationManager.showConnectionErrorNotification()
363- return Result .failure()
364- }
365-
366- setWorkerState(user)
367- val operation = createUploadFileOperation(upload, user)
368- currentUploadFileOperation = operation
369-
370- val currentIndex = (index + 1 )
371- val currentUploadIndex = (currentIndex + previouslyUploadedFileSize)
372- notificationManager.prepareForStart(
373- operation,
374- startIntent = intents.openUploadListIntent(operation),
375- currentUploadIndex = currentUploadIndex,
376- totalUploadSize = totalUploadSize
377- )
378-
379- val result = withContext(Dispatchers .IO ) {
380- upload(operation, user, client)
381- }
382- val entity = uploadsStorageManager.uploadDao.getUploadById(upload.uploadId, accountName)
383- uploadsStorageManager.updateStatus(entity, result.isSuccess)
384- currentUploadFileOperation = null
385-
386- if (result.code == ResultCode .QUOTA_EXCEEDED ) {
387- Log_OC .w(TAG , " Quota exceeded, stopping uploads" )
388- notificationManager.showQuotaExceedNotification(operation)
389- break
390- }
391-
392- sendUploadFinishEvent(totalUploadSize, currentUploadIndex, operation, result)
393- }
394-
395- return Result .success()
396- }
397351 private fun sendUploadFinishEvent (
398352 totalUploadSize : Int ,
399353 currentUploadIndex : Int ,
@@ -503,20 +457,24 @@ class FileUploadWorker(
503457 totalToTransfer : Long ,
504458 fileAbsoluteName : String
505459 ) {
460+ val operation = activeUploadFileOperations[fileAbsoluteName] ? : return
506461 val percent = getPercent(totalTransferredSoFar, totalToTransfer)
507462 val currentTime = System .currentTimeMillis()
508463
464+ val lastPercent = lastPercents[fileAbsoluteName] ? : 0
465+ val lastUpdateTime = lastUpdateTimes[fileAbsoluteName] ? : 0L
466+
509467 if (percent != lastPercent && (currentTime - lastUpdateTime) >= minProgressUpdateInterval) {
510- notificationManager. run {
511- val accountName = currentUploadFileOperation? .user? .accountName
512- val remotePath = currentUploadFileOperation? .remotePath
468+ synchronized( notificationManager) {
469+ val accountName = operation .user.accountName
470+ val remotePath = operation .remotePath
513471
514- updateUploadProgress(percent, currentUploadFileOperation )
472+ notificationManager. updateUploadProgress(percent, operation )
515473
516474 if (accountName != null && remotePath != null ) {
517475 val key: String = FileUploadHelper .buildRemoteName(accountName, remotePath)
518476 val boundListener = FileUploadHelper .mBoundListeners[key]
519- val filename = currentUploadFileOperation? .fileName ? : " "
477+ val filename = operation .fileName ? : " "
520478
521479 boundListener?.onTransferProgress(
522480 progressRate,
@@ -526,11 +484,10 @@ class FileUploadWorker(
526484 )
527485 }
528486
529- dismissOldErrorNotification(currentUploadFileOperation )
487+ notificationManager. dismissOldErrorNotification(operation )
530488 }
531- lastUpdateTime = currentTime
489+ lastUpdateTimes[fileAbsoluteName] = currentTime
490+ lastPercents[fileAbsoluteName] = percent
532491 }
533-
534- lastPercent = percent
535492 }
536493}
0 commit comments