From 0456ab35ed3023050f1316d4b2a1273cff03016b Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 21 May 2026 17:19:32 +0400 Subject: [PATCH 1/2] feat: support concurrent chunk uploads --- library/src/main/java/io/appwrite/Client.kt | 137 +++++++++++++++----- 1 file changed, 105 insertions(+), 32 deletions(-) diff --git a/library/src/main/java/io/appwrite/Client.kt b/library/src/main/java/io/appwrite/Client.kt index ef9d8f5..bd47222 100644 --- a/library/src/main/java/io/appwrite/Client.kt +++ b/library/src/main/java/io/appwrite/Client.kt @@ -13,6 +13,9 @@ import io.appwrite.models.UploadProgress import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine import okhttp3.* import okhttp3.Headers.Companion.toHeaders @@ -30,6 +33,8 @@ import java.net.CookieManager import java.net.CookiePolicy import java.security.SecureRandom import java.security.cert.X509Certificate +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong import javax.net.ssl.SSLContext import javax.net.ssl.SSLSocketFactory import javax.net.ssl.TrustManager @@ -49,6 +54,7 @@ class Client @JvmOverloads constructor( * The size for chunked uploads in bytes. */ internal const val CHUNK_SIZE = 5*1024*1024; // 5MB + internal const val MAX_CONCURRENT_UPLOADS = 8 internal const val GLOBAL_PREFS = "io.appwrite" internal const val COOKIE_PREFS = "myCookie" } @@ -87,7 +93,7 @@ class Client @JvmOverloads constructor( "x-sdk-name" to "Android", "x-sdk-platform" to "client", "x-sdk-language" to "android", - "x-sdk-version" to "24.1.1", + "x-sdk-version" to "24.2.0", "x-appwrite-response-format" to "1.9.5" ) config = mutableMapOf() @@ -484,12 +490,10 @@ class Client @JvmOverloads constructor( idParamName: String? = null, onProgress: ((UploadProgress) -> Unit)? = null, ): T { - var file: RandomAccessFile? = null val input = params[paramName] as InputFile val size: Long = when(input.sourceType) { "path", "file" -> { - file = RandomAccessFile(input.path, "r") - file.length() + File(input.path).length() } "bytes" -> { (input.data as ByteArray).size.toLong() @@ -518,9 +522,9 @@ class Client @JvmOverloads constructor( ) } - val buffer = ByteArray(CHUNK_SIZE) var offset = 0L var result: Map<*, *>? = null + var uploadId: String? = null if (idParamName?.isNotEmpty() == true) { // Make a request to check if a file already exists @@ -533,59 +537,128 @@ class Client @JvmOverloads constructor( ) val chunksUploaded = current["chunksUploaded"] as Long offset = chunksUploaded * CHUNK_SIZE + uploadId = params[idParamName]?.toString() + result = current } - while (offset < size) { - when(input.sourceType) { + fun readChunk(start: Long, end: Long): ByteArray { + val length = (end - start).toInt() + return when(input.sourceType) { "file", "path" -> { - file!!.seek(offset) - file!!.read(buffer) + RandomAccessFile(input.path, "r").use { chunkFile -> + val chunk = ByteArray(length) + chunkFile.seek(start) + chunkFile.readFully(chunk) + chunk + } } "bytes" -> { - val end = if (offset + CHUNK_SIZE < size) { - offset + CHUNK_SIZE - 1 - } else { - size - 1 - } - (input.data as ByteArray).copyInto( - buffer, - startIndex = offset.toInt(), - endIndex = end.toInt() - ) + (input.data as ByteArray).copyOfRange(start.toInt(), end.toInt()) } else -> throw UnsupportedOperationException() } + } - params[paramName] = MultipartBody.Part.createFormData( + val totalChunks = (size + CHUNK_SIZE - 1) / CHUNK_SIZE + + fun isUploadComplete(chunkResult: Map<*, *>): Boolean { + val chunksUploaded = chunkResult["chunksUploaded"]?.toString()?.toLongOrNull() ?: return false + val chunksTotal = chunkResult["chunksTotal"]?.toString()?.toLongOrNull() ?: totalChunks + return chunksUploaded >= chunksTotal + } + + suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> { + val chunkParams = params.toMutableMap() + val chunkHeaders = headers.toMutableMap() + + if (includeUploadId && uploadId != null) { + chunkHeaders["x-appwrite-id"] = uploadId!! + } + + chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size" + chunkParams[paramName] = MultipartBody.Part.createFormData( paramName, input.filename, - buffer.toRequestBody() + readChunk(start, end).toRequestBody() ) - headers["Content-Range"] = - "bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size" - - result = call( + val chunkResult = call( method = "POST", path, - headers, - params, + chunkHeaders, + chunkParams, responseType = Map::class.java ) - offset += CHUNK_SIZE - headers["x-appwrite-id"] = result["\$id"].toString() + if (index == 0 || uploadId == null) { + uploadId = chunkResult["\$id"].toString() + } + + return chunkResult + } + + if (offset == 0L) { + val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size) + result = uploadChunk(0, 0, firstChunkEnd, false) + offset = firstChunkEnd onProgress?.invoke( UploadProgress( - id = result["\$id"].toString(), + id = uploadId ?: result!!["\$id"].toString(), progress = offset.coerceAtMost(size).toDouble() / size * 100, sizeUploaded = offset.coerceAtMost(size), - chunksTotal = result["chunksTotal"].toString().toInt(), - chunksUploaded = result["chunksUploaded"].toString().toInt(), + chunksTotal = result!!["chunksTotal"].toString().toInt(), + chunksUploaded = result!!["chunksUploaded"].toString().toInt(), ) ) } + val chunks = mutableListOf>() + var chunkOffset = offset + while (chunkOffset < size) { + val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size) + chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end)) + chunkOffset = end + } + + if (chunks.isNotEmpty()) { + val nextChunk = AtomicInteger(0) + val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt()) + val uploadedBytes = AtomicLong(offset.coerceAtMost(size)) + + coroutineScope { + List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) { + async { + while (true) { + val chunkIndex = nextChunk.getAndIncrement() + if (chunkIndex >= chunks.size) { + break + } + + val (index, start, end) = chunks[chunkIndex] + val chunkResult = uploadChunk(index, start, end, true) + + val chunksUploaded = completedChunks.incrementAndGet() + val sizeUploaded = uploadedBytes.addAndGet(end - start) + + if (isUploadComplete(chunkResult)) { + result = chunkResult + } + + onProgress?.invoke( + UploadProgress( + id = uploadId ?: chunkResult["\$id"].toString(), + progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100, + sizeUploaded = sizeUploaded.coerceAtMost(size), + chunksTotal = chunkResult["chunksTotal"].toString().toInt(), + chunksUploaded = chunksUploaded, + ) + ) + } + } + }.awaitAll() + } + } + return converter(result as Map) } From a6f9e16bc02ed4639905e663339896332ef97bef Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 21 May 2026 20:45:46 +0400 Subject: [PATCH 2/2] feat: support concurrent chunk uploads --- library/src/main/java/io/appwrite/Client.kt | 28 +++++++++++++-------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/library/src/main/java/io/appwrite/Client.kt b/library/src/main/java/io/appwrite/Client.kt index bd47222..98d86a3 100644 --- a/library/src/main/java/io/appwrite/Client.kt +++ b/library/src/main/java/io/appwrite/Client.kt @@ -35,6 +35,7 @@ import java.security.SecureRandom import java.security.cert.X509Certificate import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference import javax.net.ssl.SSLContext import javax.net.ssl.SSLSocketFactory import javax.net.ssl.TrustManager @@ -590,7 +591,7 @@ class Client @JvmOverloads constructor( responseType = Map::class.java ) - if (index == 0 || uploadId == null) { + if (index == 0) { uploadId = chunkResult["\$id"].toString() } @@ -624,6 +625,9 @@ class Client @JvmOverloads constructor( val nextChunk = AtomicInteger(0) val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt()) val uploadedBytes = AtomicLong(offset.coerceAtMost(size)) + val completedResultRef = AtomicReference?>(null) + val lastResultRef = AtomicReference(result) + val progressLock = Any() coroutineScope { List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) { @@ -640,23 +644,27 @@ class Client @JvmOverloads constructor( val chunksUploaded = completedChunks.incrementAndGet() val sizeUploaded = uploadedBytes.addAndGet(end - start) + lastResultRef.set(chunkResult) if (isUploadComplete(chunkResult)) { - result = chunkResult + completedResultRef.set(chunkResult) } - onProgress?.invoke( - UploadProgress( - id = uploadId ?: chunkResult["\$id"].toString(), - progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100, - sizeUploaded = sizeUploaded.coerceAtMost(size), - chunksTotal = chunkResult["chunksTotal"].toString().toInt(), - chunksUploaded = chunksUploaded, + synchronized(progressLock) { + onProgress?.invoke( + UploadProgress( + id = uploadId ?: chunkResult["\$id"].toString(), + progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100, + sizeUploaded = sizeUploaded.coerceAtMost(size), + chunksTotal = chunkResult["chunksTotal"].toString().toInt(), + chunksUploaded = chunksUploaded, + ) ) - ) + } } } }.awaitAll() } + result = completedResultRef.get() ?: lastResultRef.get() } return converter(result as Map)