Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 113 additions & 32 deletions library/src/main/java/io/appwrite/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import io.appwrite.models.UploadProgress
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
import okhttp3.*
import okhttp3.Headers.Companion.toHeaders
Expand All @@ -30,6 +33,9 @@ import java.net.CookieManager
import java.net.CookiePolicy
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLSocketFactory
import javax.net.ssl.TrustManager
Expand All @@ -49,6 +55,7 @@ class Client @JvmOverloads constructor(
* The size for chunked uploads in bytes.
*/
internal const val CHUNK_SIZE = 5*1024*1024; // 5MB
internal const val MAX_CONCURRENT_UPLOADS = 8
internal const val GLOBAL_PREFS = "io.appwrite"
internal const val COOKIE_PREFS = "myCookie"
}
Expand Down Expand Up @@ -87,7 +94,7 @@ class Client @JvmOverloads constructor(
"x-sdk-name" to "Android",
"x-sdk-platform" to "client",
"x-sdk-language" to "android",
"x-sdk-version" to "24.1.1",
"x-sdk-version" to "24.2.0",
"x-appwrite-response-format" to "1.9.5"
)
config = mutableMapOf()
Expand Down Expand Up @@ -484,12 +491,10 @@ class Client @JvmOverloads constructor(
idParamName: String? = null,
onProgress: ((UploadProgress) -> Unit)? = null,
): T {
var file: RandomAccessFile? = null
val input = params[paramName] as InputFile
val size: Long = when(input.sourceType) {
"path", "file" -> {
file = RandomAccessFile(input.path, "r")
file.length()
File(input.path).length()
}
"bytes" -> {
(input.data as ByteArray).size.toLong()
Expand Down Expand Up @@ -518,9 +523,9 @@ class Client @JvmOverloads constructor(
)
}

val buffer = ByteArray(CHUNK_SIZE)
var offset = 0L
var result: Map<*, *>? = null
var uploadId: String? = null

if (idParamName?.isNotEmpty() == true) {
// Make a request to check if a file already exists
Expand All @@ -533,59 +538,135 @@ class Client @JvmOverloads constructor(
)
val chunksUploaded = current["chunksUploaded"] as Long
offset = chunksUploaded * CHUNK_SIZE
uploadId = params[idParamName]?.toString()
result = current
}

while (offset < size) {
when(input.sourceType) {
fun readChunk(start: Long, end: Long): ByteArray {
val length = (end - start).toInt()
return when(input.sourceType) {
"file", "path" -> {
file!!.seek(offset)
file!!.read(buffer)
RandomAccessFile(input.path, "r").use { chunkFile ->
val chunk = ByteArray(length)
chunkFile.seek(start)
chunkFile.readFully(chunk)
chunk
}
}
Comment thread
TorstenDittmann marked this conversation as resolved.
"bytes" -> {
val end = if (offset + CHUNK_SIZE < size) {
offset + CHUNK_SIZE - 1
} else {
size - 1
}
(input.data as ByteArray).copyInto(
buffer,
startIndex = offset.toInt(),
endIndex = end.toInt()
)
(input.data as ByteArray).copyOfRange(start.toInt(), end.toInt())
}
else -> throw UnsupportedOperationException()
}
}

params[paramName] = MultipartBody.Part.createFormData(
val totalChunks = (size + CHUNK_SIZE - 1) / CHUNK_SIZE

fun isUploadComplete(chunkResult: Map<*, *>): Boolean {
val chunksUploaded = chunkResult["chunksUploaded"]?.toString()?.toLongOrNull() ?: return false
val chunksTotal = chunkResult["chunksTotal"]?.toString()?.toLongOrNull() ?: totalChunks
return chunksUploaded >= chunksTotal
}

suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> {
val chunkParams = params.toMutableMap()
val chunkHeaders = headers.toMutableMap()

if (includeUploadId && uploadId != null) {
chunkHeaders["x-appwrite-id"] = uploadId!!
}

chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size"
chunkParams[paramName] = MultipartBody.Part.createFormData(
paramName,
input.filename,
buffer.toRequestBody()
readChunk(start, end).toRequestBody()
)

headers["Content-Range"] =
"bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size"

result = call(
val chunkResult = call(
method = "POST",
path,
headers,
params,
chunkHeaders,
chunkParams,
responseType = Map::class.java
)

offset += CHUNK_SIZE
headers["x-appwrite-id"] = result["\$id"].toString()
if (index == 0) {
uploadId = chunkResult["\$id"].toString()
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.

return chunkResult
}

if (offset == 0L) {
val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size)
result = uploadChunk(0, 0, firstChunkEnd, false)
offset = firstChunkEnd
onProgress?.invoke(
UploadProgress(
id = result["\$id"].toString(),
id = uploadId ?: result!!["\$id"].toString(),
progress = offset.coerceAtMost(size).toDouble() / size * 100,
sizeUploaded = offset.coerceAtMost(size),
chunksTotal = result["chunksTotal"].toString().toInt(),
chunksUploaded = result["chunksUploaded"].toString().toInt(),
chunksTotal = result!!["chunksTotal"].toString().toInt(),
chunksUploaded = result!!["chunksUploaded"].toString().toInt(),
)
)
}

val chunks = mutableListOf<Triple<Int, Long, Long>>()
var chunkOffset = offset
while (chunkOffset < size) {
val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size)
chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end))
chunkOffset = end
}

if (chunks.isNotEmpty()) {
val nextChunk = AtomicInteger(0)
val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt())
val uploadedBytes = AtomicLong(offset.coerceAtMost(size))
val completedResultRef = AtomicReference<Map<*, *>?>(null)
val lastResultRef = AtomicReference(result)
val progressLock = Any()

coroutineScope {
List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) {
async {
while (true) {
val chunkIndex = nextChunk.getAndIncrement()
if (chunkIndex >= chunks.size) {
break
}

val (index, start, end) = chunks[chunkIndex]
val chunkResult = uploadChunk(index, start, end, true)

val chunksUploaded = completedChunks.incrementAndGet()
val sizeUploaded = uploadedBytes.addAndGet(end - start)

lastResultRef.set(chunkResult)
if (isUploadComplete(chunkResult)) {
completedResultRef.set(chunkResult)
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.

synchronized(progressLock) {
onProgress?.invoke(
UploadProgress(
id = uploadId ?: chunkResult["\$id"].toString(),
progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100,
sizeUploaded = sizeUploaded.coerceAtMost(size),
chunksTotal = chunkResult["chunksTotal"].toString().toInt(),
chunksUploaded = chunksUploaded,
)
)
}
}
}
}.awaitAll()
}
result = completedResultRef.get() ?: lastResultRef.get()
}

return converter(result as Map<String, Any>)
}

Expand Down