Skip to content

Commit 9fc700d

Browse files
Merge pull request #126 from appwrite/concurrent-chunk-uploads-1-9-x-minimal
feat: support concurrent chunk uploads
2 parents c99ae1b + a6f9e16 commit 9fc700d

1 file changed

Lines changed: 113 additions & 32 deletions

File tree

library/src/main/java/io/appwrite/Client.kt

Lines changed: 113 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import io.appwrite.models.UploadProgress
1313
import kotlinx.coroutines.CoroutineScope
1414
import kotlinx.coroutines.Dispatchers
1515
import kotlinx.coroutines.Job
16+
import kotlinx.coroutines.async
17+
import kotlinx.coroutines.awaitAll
18+
import kotlinx.coroutines.coroutineScope
1619
import kotlinx.coroutines.suspendCancellableCoroutine
1720
import okhttp3.*
1821
import okhttp3.Headers.Companion.toHeaders
@@ -30,6 +33,9 @@ import java.net.CookieManager
3033
import java.net.CookiePolicy
3134
import java.security.SecureRandom
3235
import java.security.cert.X509Certificate
36+
import java.util.concurrent.atomic.AtomicInteger
37+
import java.util.concurrent.atomic.AtomicLong
38+
import java.util.concurrent.atomic.AtomicReference
3339
import javax.net.ssl.SSLContext
3440
import javax.net.ssl.SSLSocketFactory
3541
import javax.net.ssl.TrustManager
@@ -49,6 +55,7 @@ class Client @JvmOverloads constructor(
4955
* The size for chunked uploads in bytes.
5056
*/
5157
internal const val CHUNK_SIZE = 5*1024*1024; // 5MB
58+
internal const val MAX_CONCURRENT_UPLOADS = 8
5259
internal const val GLOBAL_PREFS = "io.appwrite"
5360
internal const val COOKIE_PREFS = "myCookie"
5461
}
@@ -87,7 +94,7 @@ class Client @JvmOverloads constructor(
8794
"x-sdk-name" to "Android",
8895
"x-sdk-platform" to "client",
8996
"x-sdk-language" to "android",
90-
"x-sdk-version" to "24.1.1",
97+
"x-sdk-version" to "24.2.0",
9198
"x-appwrite-response-format" to "1.9.5"
9299
)
93100
config = mutableMapOf()
@@ -484,12 +491,10 @@ class Client @JvmOverloads constructor(
484491
idParamName: String? = null,
485492
onProgress: ((UploadProgress) -> Unit)? = null,
486493
): T {
487-
var file: RandomAccessFile? = null
488494
val input = params[paramName] as InputFile
489495
val size: Long = when(input.sourceType) {
490496
"path", "file" -> {
491-
file = RandomAccessFile(input.path, "r")
492-
file.length()
497+
File(input.path).length()
493498
}
494499
"bytes" -> {
495500
(input.data as ByteArray).size.toLong()
@@ -518,9 +523,9 @@ class Client @JvmOverloads constructor(
518523
)
519524
}
520525

521-
val buffer = ByteArray(CHUNK_SIZE)
522526
var offset = 0L
523527
var result: Map<*, *>? = null
528+
var uploadId: String? = null
524529

525530
if (idParamName?.isNotEmpty() == true) {
526531
// Make a request to check if a file already exists
@@ -533,59 +538,135 @@ class Client @JvmOverloads constructor(
533538
)
534539
val chunksUploaded = current["chunksUploaded"] as Long
535540
offset = chunksUploaded * CHUNK_SIZE
541+
uploadId = params[idParamName]?.toString()
542+
result = current
536543
}
537544

538-
while (offset < size) {
539-
when(input.sourceType) {
545+
fun readChunk(start: Long, end: Long): ByteArray {
546+
val length = (end - start).toInt()
547+
return when(input.sourceType) {
540548
"file", "path" -> {
541-
file!!.seek(offset)
542-
file!!.read(buffer)
549+
RandomAccessFile(input.path, "r").use { chunkFile ->
550+
val chunk = ByteArray(length)
551+
chunkFile.seek(start)
552+
chunkFile.readFully(chunk)
553+
chunk
554+
}
543555
}
544556
"bytes" -> {
545-
val end = if (offset + CHUNK_SIZE < size) {
546-
offset + CHUNK_SIZE - 1
547-
} else {
548-
size - 1
549-
}
550-
(input.data as ByteArray).copyInto(
551-
buffer,
552-
startIndex = offset.toInt(),
553-
endIndex = end.toInt()
554-
)
557+
(input.data as ByteArray).copyOfRange(start.toInt(), end.toInt())
555558
}
556559
else -> throw UnsupportedOperationException()
557560
}
561+
}
558562

559-
params[paramName] = MultipartBody.Part.createFormData(
563+
val totalChunks = (size + CHUNK_SIZE - 1) / CHUNK_SIZE
564+
565+
fun isUploadComplete(chunkResult: Map<*, *>): Boolean {
566+
val chunksUploaded = chunkResult["chunksUploaded"]?.toString()?.toLongOrNull() ?: return false
567+
val chunksTotal = chunkResult["chunksTotal"]?.toString()?.toLongOrNull() ?: totalChunks
568+
return chunksUploaded >= chunksTotal
569+
}
570+
571+
suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> {
572+
val chunkParams = params.toMutableMap()
573+
val chunkHeaders = headers.toMutableMap()
574+
575+
if (includeUploadId && uploadId != null) {
576+
chunkHeaders["x-appwrite-id"] = uploadId!!
577+
}
578+
579+
chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size"
580+
chunkParams[paramName] = MultipartBody.Part.createFormData(
560581
paramName,
561582
input.filename,
562-
buffer.toRequestBody()
583+
readChunk(start, end).toRequestBody()
563584
)
564585

565-
headers["Content-Range"] =
566-
"bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size"
567-
568-
result = call(
586+
val chunkResult = call(
569587
method = "POST",
570588
path,
571-
headers,
572-
params,
589+
chunkHeaders,
590+
chunkParams,
573591
responseType = Map::class.java
574592
)
575593

576-
offset += CHUNK_SIZE
577-
headers["x-appwrite-id"] = result["\$id"].toString()
594+
if (index == 0) {
595+
uploadId = chunkResult["\$id"].toString()
596+
}
597+
598+
return chunkResult
599+
}
600+
601+
if (offset == 0L) {
602+
val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size)
603+
result = uploadChunk(0, 0, firstChunkEnd, false)
604+
offset = firstChunkEnd
578605
onProgress?.invoke(
579606
UploadProgress(
580-
id = result["\$id"].toString(),
607+
id = uploadId ?: result!!["\$id"].toString(),
581608
progress = offset.coerceAtMost(size).toDouble() / size * 100,
582609
sizeUploaded = offset.coerceAtMost(size),
583-
chunksTotal = result["chunksTotal"].toString().toInt(),
584-
chunksUploaded = result["chunksUploaded"].toString().toInt(),
610+
chunksTotal = result!!["chunksTotal"].toString().toInt(),
611+
chunksUploaded = result!!["chunksUploaded"].toString().toInt(),
585612
)
586613
)
587614
}
588615

616+
val chunks = mutableListOf<Triple<Int, Long, Long>>()
617+
var chunkOffset = offset
618+
while (chunkOffset < size) {
619+
val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size)
620+
chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end))
621+
chunkOffset = end
622+
}
623+
624+
if (chunks.isNotEmpty()) {
625+
val nextChunk = AtomicInteger(0)
626+
val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt())
627+
val uploadedBytes = AtomicLong(offset.coerceAtMost(size))
628+
val completedResultRef = AtomicReference<Map<*, *>?>(null)
629+
val lastResultRef = AtomicReference(result)
630+
val progressLock = Any()
631+
632+
coroutineScope {
633+
List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) {
634+
async {
635+
while (true) {
636+
val chunkIndex = nextChunk.getAndIncrement()
637+
if (chunkIndex >= chunks.size) {
638+
break
639+
}
640+
641+
val (index, start, end) = chunks[chunkIndex]
642+
val chunkResult = uploadChunk(index, start, end, true)
643+
644+
val chunksUploaded = completedChunks.incrementAndGet()
645+
val sizeUploaded = uploadedBytes.addAndGet(end - start)
646+
647+
lastResultRef.set(chunkResult)
648+
if (isUploadComplete(chunkResult)) {
649+
completedResultRef.set(chunkResult)
650+
}
651+
652+
synchronized(progressLock) {
653+
onProgress?.invoke(
654+
UploadProgress(
655+
id = uploadId ?: chunkResult["\$id"].toString(),
656+
progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100,
657+
sizeUploaded = sizeUploaded.coerceAtMost(size),
658+
chunksTotal = chunkResult["chunksTotal"].toString().toInt(),
659+
chunksUploaded = chunksUploaded,
660+
)
661+
)
662+
}
663+
}
664+
}
665+
}.awaitAll()
666+
}
667+
result = completedResultRef.get() ?: lastResultRef.get()
668+
}
669+
589670
return converter(result as Map<String, Any>)
590671
}
591672

0 commit comments

Comments
 (0)