-
Notifications
You must be signed in to change notification settings - Fork 30
feat: support concurrent chunk uploads #125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,9 @@ import io.appwrite.models.UploadProgress | |
| import kotlinx.coroutines.CoroutineScope | ||
| import kotlinx.coroutines.Dispatchers | ||
| import kotlinx.coroutines.Job | ||
| import kotlinx.coroutines.async | ||
| import kotlinx.coroutines.awaitAll | ||
| import kotlinx.coroutines.coroutineScope | ||
| import kotlinx.coroutines.suspendCancellableCoroutine | ||
| import okhttp3.* | ||
| import okhttp3.Headers.Companion.toHeaders | ||
|
|
@@ -30,6 +33,8 @@ import java.net.CookieManager | |
| import java.net.CookiePolicy | ||
| import java.security.SecureRandom | ||
| import java.security.cert.X509Certificate | ||
| import java.util.concurrent.atomic.AtomicInteger | ||
| import java.util.concurrent.atomic.AtomicLong | ||
| import javax.net.ssl.SSLContext | ||
| import javax.net.ssl.SSLSocketFactory | ||
| import javax.net.ssl.TrustManager | ||
|
|
@@ -49,6 +54,7 @@ class Client @JvmOverloads constructor( | |
| * The size for chunked uploads in bytes. | ||
| */ | ||
| internal const val CHUNK_SIZE = 5*1024*1024; // 5MB | ||
| internal const val MAX_CONCURRENT_UPLOADS = 8 | ||
| internal const val GLOBAL_PREFS = "io.appwrite" | ||
| internal const val COOKIE_PREFS = "myCookie" | ||
| } | ||
|
|
@@ -87,7 +93,7 @@ class Client @JvmOverloads constructor( | |
| "x-sdk-name" to "Android", | ||
| "x-sdk-platform" to "client", | ||
| "x-sdk-language" to "android", | ||
| "x-sdk-version" to "24.1.1", | ||
| "x-sdk-version" to "24.2.0", | ||
| "x-appwrite-response-format" to "1.9.5" | ||
| ) | ||
| config = mutableMapOf() | ||
|
|
@@ -484,12 +490,10 @@ class Client @JvmOverloads constructor( | |
| idParamName: String? = null, | ||
| onProgress: ((UploadProgress) -> Unit)? = null, | ||
| ): T { | ||
| var file: RandomAccessFile? = null | ||
| val input = params[paramName] as InputFile | ||
| val size: Long = when(input.sourceType) { | ||
| "path", "file" -> { | ||
| file = RandomAccessFile(input.path, "r") | ||
| file.length() | ||
| File(input.path).length() | ||
| } | ||
| "bytes" -> { | ||
| (input.data as ByteArray).size.toLong() | ||
|
|
@@ -518,9 +522,9 @@ class Client @JvmOverloads constructor( | |
| ) | ||
| } | ||
|
|
||
| val buffer = ByteArray(CHUNK_SIZE) | ||
| var offset = 0L | ||
| var result: Map<*, *>? = null | ||
| var uploadId: String? = null | ||
|
|
||
| if (idParamName?.isNotEmpty() == true) { | ||
| // Make a request to check if a file already exists | ||
|
|
@@ -533,59 +537,128 @@ class Client @JvmOverloads constructor( | |
| ) | ||
| val chunksUploaded = current["chunksUploaded"] as Long | ||
| offset = chunksUploaded * CHUNK_SIZE | ||
| uploadId = params[idParamName]?.toString() | ||
| result = current | ||
| } | ||
|
|
||
| while (offset < size) { | ||
| when(input.sourceType) { | ||
| fun readChunk(start: Long, end: Long): ByteArray { | ||
| val length = (end - start).toInt() | ||
| return when(input.sourceType) { | ||
| "file", "path" -> { | ||
| file!!.seek(offset) | ||
| file!!.read(buffer) | ||
| RandomAccessFile(input.path, "r").use { chunkFile -> | ||
| val chunk = ByteArray(length) | ||
| chunkFile.seek(start) | ||
| chunkFile.readFully(chunk) | ||
| chunk | ||
| } | ||
| } | ||
| "bytes" -> { | ||
| val end = if (offset + CHUNK_SIZE < size) { | ||
| offset + CHUNK_SIZE - 1 | ||
| } else { | ||
| size - 1 | ||
| } | ||
| (input.data as ByteArray).copyInto( | ||
| buffer, | ||
| startIndex = offset.toInt(), | ||
| endIndex = end.toInt() | ||
| ) | ||
| (input.data as ByteArray).copyOfRange(start.toInt(), end.toInt()) | ||
| } | ||
| else -> throw UnsupportedOperationException() | ||
| } | ||
| } | ||
|
|
||
| params[paramName] = MultipartBody.Part.createFormData( | ||
| val totalChunks = (size + CHUNK_SIZE - 1) / CHUNK_SIZE | ||
|
|
||
| fun isUploadComplete(chunkResult: Map<*, *>): Boolean { | ||
| val chunksUploaded = chunkResult["chunksUploaded"]?.toString()?.toLongOrNull() ?: return false | ||
| val chunksTotal = chunkResult["chunksTotal"]?.toString()?.toLongOrNull() ?: totalChunks | ||
| return chunksUploaded >= chunksTotal | ||
| } | ||
|
|
||
| suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> { | ||
| val chunkParams = params.toMutableMap() | ||
| val chunkHeaders = headers.toMutableMap() | ||
|
|
||
| if (includeUploadId && uploadId != null) { | ||
| chunkHeaders["x-appwrite-id"] = uploadId!! | ||
| } | ||
|
|
||
| chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size" | ||
| chunkParams[paramName] = MultipartBody.Part.createFormData( | ||
| paramName, | ||
| input.filename, | ||
| buffer.toRequestBody() | ||
| readChunk(start, end).toRequestBody() | ||
| ) | ||
|
|
||
| headers["Content-Range"] = | ||
| "bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size" | ||
|
|
||
| result = call( | ||
| val chunkResult = call( | ||
| method = "POST", | ||
| path, | ||
| headers, | ||
| params, | ||
| chunkHeaders, | ||
| chunkParams, | ||
| responseType = Map::class.java | ||
| ) | ||
|
|
||
| offset += CHUNK_SIZE | ||
| headers["x-appwrite-id"] = result["\$id"].toString() | ||
| if (index == 0 || uploadId == null) { | ||
| uploadId = chunkResult["\$id"].toString() | ||
| } | ||
|
|
||
| return chunkResult | ||
| } | ||
|
|
||
| if (offset == 0L) { | ||
| val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size) | ||
| result = uploadChunk(0, 0, firstChunkEnd, false) | ||
| offset = firstChunkEnd | ||
| onProgress?.invoke( | ||
| UploadProgress( | ||
| id = result["\$id"].toString(), | ||
| id = uploadId ?: result!!["\$id"].toString(), | ||
| progress = offset.coerceAtMost(size).toDouble() / size * 100, | ||
| sizeUploaded = offset.coerceAtMost(size), | ||
| chunksTotal = result["chunksTotal"].toString().toInt(), | ||
| chunksUploaded = result["chunksUploaded"].toString().toInt(), | ||
| chunksTotal = result!!["chunksTotal"].toString().toInt(), | ||
| chunksUploaded = result!!["chunksUploaded"].toString().toInt(), | ||
| ) | ||
| ) | ||
| } | ||
|
|
||
| val chunks = mutableListOf<Triple<Int, Long, Long>>() | ||
| var chunkOffset = offset | ||
| while (chunkOffset < size) { | ||
| val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size) | ||
| chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end)) | ||
| chunkOffset = end | ||
| } | ||
|
|
||
| if (chunks.isNotEmpty()) { | ||
| val nextChunk = AtomicInteger(0) | ||
| val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt()) | ||
| val uploadedBytes = AtomicLong(offset.coerceAtMost(size)) | ||
|
|
||
| coroutineScope { | ||
| List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) { | ||
| async { | ||
| while (true) { | ||
| val chunkIndex = nextChunk.getAndIncrement() | ||
| if (chunkIndex >= chunks.size) { | ||
| break | ||
| } | ||
|
|
||
| val (index, start, end) = chunks[chunkIndex] | ||
| val chunkResult = uploadChunk(index, start, end, true) | ||
|
|
||
| val chunksUploaded = completedChunks.incrementAndGet() | ||
| val sizeUploaded = uploadedBytes.addAndGet(end - start) | ||
|
|
||
| if (isUploadComplete(chunkResult)) { | ||
| result = chunkResult | ||
| } | ||
|
Comment on lines
+643
to
+645
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| onProgress?.invoke( | ||
| UploadProgress( | ||
| id = uploadId ?: chunkResult["\$id"].toString(), | ||
| progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100, | ||
| sizeUploaded = sizeUploaded.coerceAtMost(size), | ||
| chunksTotal = chunkResult["chunksTotal"].toString().toInt(), | ||
| chunksUploaded = chunksUploaded, | ||
| ) | ||
| ) | ||
|
Comment on lines
+647
to
+655
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Up to 8 async blocks can call |
||
| } | ||
| } | ||
| }.awaitAll() | ||
| } | ||
| } | ||
|
|
||
| return converter(result as Map<String, Any>) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uploadIdis not@Volatilebut is read across coroutine threadsuploadIdis written in the sequential section (chunk 0 upload or fromparams[idParamName]) and then read by every concurrentuploadChunkinvocation at theif (includeUploadId && uploadId != null)check. Without@Volatile, the JVM does not guarantee that a write performed on one thread is visible to coroutines dispatched to other threads, even if the write happened-before thecoroutineScope { }call in program order. If a concurrent coroutine reads a stale null, it silently omits thex-appwrite-idheader; the server would then treat the chunk as a new file creation rather than an additional chunk, silently corrupting or duplicating the upload. DeclaringuploadIdas@Volatilecosts nothing and eliminates the risk.