diff --git a/library/src/main/java/io/appwrite/Client.kt b/library/src/main/java/io/appwrite/Client.kt index ef9d8f5..98d86a3 100644 --- a/library/src/main/java/io/appwrite/Client.kt +++ b/library/src/main/java/io/appwrite/Client.kt @@ -13,6 +13,9 @@ import io.appwrite.models.UploadProgress import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine import okhttp3.* import okhttp3.Headers.Companion.toHeaders @@ -30,6 +33,9 @@ import java.net.CookieManager import java.net.CookiePolicy import java.security.SecureRandom import java.security.cert.X509Certificate +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference import javax.net.ssl.SSLContext import javax.net.ssl.SSLSocketFactory import javax.net.ssl.TrustManager @@ -49,6 +55,7 @@ class Client @JvmOverloads constructor( * The size for chunked uploads in bytes. */ internal const val CHUNK_SIZE = 5*1024*1024; // 5MB + internal const val MAX_CONCURRENT_UPLOADS = 8 internal const val GLOBAL_PREFS = "io.appwrite" internal const val COOKIE_PREFS = "myCookie" } @@ -87,7 +94,7 @@ class Client @JvmOverloads constructor( "x-sdk-name" to "Android", "x-sdk-platform" to "client", "x-sdk-language" to "android", - "x-sdk-version" to "24.1.1", + "x-sdk-version" to "24.2.0", "x-appwrite-response-format" to "1.9.5" ) config = mutableMapOf() @@ -484,12 +491,10 @@ class Client @JvmOverloads constructor( idParamName: String? = null, onProgress: ((UploadProgress) -> Unit)? = null, ): T { - var file: RandomAccessFile? = null val input = params[paramName] as InputFile val size: Long = when(input.sourceType) { "path", "file" -> { - file = RandomAccessFile(input.path, "r") - file.length() + File(input.path).length() } "bytes" -> { (input.data as ByteArray).size.toLong() @@ -518,9 +523,9 @@ class Client @JvmOverloads constructor( ) } - val buffer = ByteArray(CHUNK_SIZE) var offset = 0L var result: Map<*, *>? = null + var uploadId: String? = null if (idParamName?.isNotEmpty() == true) { // Make a request to check if a file already exists @@ -533,59 +538,135 @@ class Client @JvmOverloads constructor( ) val chunksUploaded = current["chunksUploaded"] as Long offset = chunksUploaded * CHUNK_SIZE + uploadId = params[idParamName]?.toString() + result = current } - while (offset < size) { - when(input.sourceType) { + fun readChunk(start: Long, end: Long): ByteArray { + val length = (end - start).toInt() + return when(input.sourceType) { "file", "path" -> { - file!!.seek(offset) - file!!.read(buffer) + RandomAccessFile(input.path, "r").use { chunkFile -> + val chunk = ByteArray(length) + chunkFile.seek(start) + chunkFile.readFully(chunk) + chunk + } } "bytes" -> { - val end = if (offset + CHUNK_SIZE < size) { - offset + CHUNK_SIZE - 1 - } else { - size - 1 - } - (input.data as ByteArray).copyInto( - buffer, - startIndex = offset.toInt(), - endIndex = end.toInt() - ) + (input.data as ByteArray).copyOfRange(start.toInt(), end.toInt()) } else -> throw UnsupportedOperationException() } + } - params[paramName] = MultipartBody.Part.createFormData( + val totalChunks = (size + CHUNK_SIZE - 1) / CHUNK_SIZE + + fun isUploadComplete(chunkResult: Map<*, *>): Boolean { + val chunksUploaded = chunkResult["chunksUploaded"]?.toString()?.toLongOrNull() ?: return false + val chunksTotal = chunkResult["chunksTotal"]?.toString()?.toLongOrNull() ?: totalChunks + return chunksUploaded >= chunksTotal + } + + suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> { + val chunkParams = params.toMutableMap() + val chunkHeaders = headers.toMutableMap() + + if (includeUploadId && uploadId != null) { + chunkHeaders["x-appwrite-id"] = uploadId!! + } + + chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size" + chunkParams[paramName] = MultipartBody.Part.createFormData( paramName, input.filename, - buffer.toRequestBody() + readChunk(start, end).toRequestBody() ) - headers["Content-Range"] = - "bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size" - - result = call( + val chunkResult = call( method = "POST", path, - headers, - params, + chunkHeaders, + chunkParams, responseType = Map::class.java ) - offset += CHUNK_SIZE - headers["x-appwrite-id"] = result["\$id"].toString() + if (index == 0) { + uploadId = chunkResult["\$id"].toString() + } + + return chunkResult + } + + if (offset == 0L) { + val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size) + result = uploadChunk(0, 0, firstChunkEnd, false) + offset = firstChunkEnd onProgress?.invoke( UploadProgress( - id = result["\$id"].toString(), + id = uploadId ?: result!!["\$id"].toString(), progress = offset.coerceAtMost(size).toDouble() / size * 100, sizeUploaded = offset.coerceAtMost(size), - chunksTotal = result["chunksTotal"].toString().toInt(), - chunksUploaded = result["chunksUploaded"].toString().toInt(), + chunksTotal = result!!["chunksTotal"].toString().toInt(), + chunksUploaded = result!!["chunksUploaded"].toString().toInt(), ) ) } + val chunks = mutableListOf>() + var chunkOffset = offset + while (chunkOffset < size) { + val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size) + chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end)) + chunkOffset = end + } + + if (chunks.isNotEmpty()) { + val nextChunk = AtomicInteger(0) + val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt()) + val uploadedBytes = AtomicLong(offset.coerceAtMost(size)) + val completedResultRef = AtomicReference?>(null) + val lastResultRef = AtomicReference(result) + val progressLock = Any() + + coroutineScope { + List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) { + async { + while (true) { + val chunkIndex = nextChunk.getAndIncrement() + if (chunkIndex >= chunks.size) { + break + } + + val (index, start, end) = chunks[chunkIndex] + val chunkResult = uploadChunk(index, start, end, true) + + val chunksUploaded = completedChunks.incrementAndGet() + val sizeUploaded = uploadedBytes.addAndGet(end - start) + + lastResultRef.set(chunkResult) + if (isUploadComplete(chunkResult)) { + completedResultRef.set(chunkResult) + } + + synchronized(progressLock) { + onProgress?.invoke( + UploadProgress( + id = uploadId ?: chunkResult["\$id"].toString(), + progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100, + sizeUploaded = sizeUploaded.coerceAtMost(size), + chunksTotal = chunkResult["chunksTotal"].toString().toInt(), + chunksUploaded = chunksUploaded, + ) + ) + } + } + } + }.awaitAll() + } + result = completedResultRef.get() ?: lastResultRef.get() + } + return converter(result as Map) }