Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ repositories {
Next, add the dependency to your project's `build.gradle(.kts)` file:

```groovy
implementation("io.appwrite:sdk-for-android:24.1.1")
implementation("io.appwrite:sdk-for-android:24.2.0")
```

### Maven
Expand All @@ -49,7 +49,7 @@ Add this to your project's `pom.xml` file:
<dependency>
<groupId>io.appwrite</groupId>
<artifactId>sdk-for-android</artifactId>
<version>24.1.1</version>
<version>24.2.0</version>
</dependency>
</dependencies>
```
Expand Down
137 changes: 105 additions & 32 deletions library/src/main/java/io/appwrite/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import io.appwrite.models.UploadProgress
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
import okhttp3.*
import okhttp3.Headers.Companion.toHeaders
Expand All @@ -30,6 +33,8 @@ import java.net.CookieManager
import java.net.CookiePolicy
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLSocketFactory
import javax.net.ssl.TrustManager
Expand All @@ -49,6 +54,7 @@ class Client @JvmOverloads constructor(
* The size for chunked uploads in bytes.
*/
internal const val CHUNK_SIZE = 5*1024*1024; // 5MB
internal const val MAX_CONCURRENT_UPLOADS = 8
internal const val GLOBAL_PREFS = "io.appwrite"
internal const val COOKIE_PREFS = "myCookie"
}
Expand Down Expand Up @@ -87,7 +93,7 @@ class Client @JvmOverloads constructor(
"x-sdk-name" to "Android",
"x-sdk-platform" to "client",
"x-sdk-language" to "android",
"x-sdk-version" to "24.1.1",
"x-sdk-version" to "24.2.0",
"x-appwrite-response-format" to "1.9.5"
)
config = mutableMapOf()
Expand Down Expand Up @@ -484,12 +490,10 @@ class Client @JvmOverloads constructor(
idParamName: String? = null,
onProgress: ((UploadProgress) -> Unit)? = null,
): T {
var file: RandomAccessFile? = null
val input = params[paramName] as InputFile
val size: Long = when(input.sourceType) {
"path", "file" -> {
file = RandomAccessFile(input.path, "r")
file.length()
File(input.path).length()
}
"bytes" -> {
(input.data as ByteArray).size.toLong()
Expand Down Expand Up @@ -518,9 +522,9 @@ class Client @JvmOverloads constructor(
)
}

val buffer = ByteArray(CHUNK_SIZE)
var offset = 0L
var result: Map<*, *>? = null
var uploadId: String? = null

if (idParamName?.isNotEmpty() == true) {
// Make a request to check if a file already exists
Expand All @@ -533,59 +537,128 @@ class Client @JvmOverloads constructor(
)
val chunksUploaded = current["chunksUploaded"] as Long
offset = chunksUploaded * CHUNK_SIZE
uploadId = params[idParamName]?.toString()
result = current
}

while (offset < size) {
when(input.sourceType) {
fun readChunk(start: Long, end: Long): ByteArray {
val length = (end - start).toInt()
return when(input.sourceType) {
"file", "path" -> {
file!!.seek(offset)
file!!.read(buffer)
RandomAccessFile(input.path, "r").use { chunkFile ->
val chunk = ByteArray(length)
chunkFile.seek(start)
chunkFile.readFully(chunk)
chunk
}
}
"bytes" -> {
val end = if (offset + CHUNK_SIZE < size) {
offset + CHUNK_SIZE - 1
} else {
size - 1
}
(input.data as ByteArray).copyInto(
buffer,
startIndex = offset.toInt(),
endIndex = end.toInt()
)
(input.data as ByteArray).copyOfRange(start.toInt(), end.toInt())
}
else -> throw UnsupportedOperationException()
}
}

params[paramName] = MultipartBody.Part.createFormData(
val totalChunks = (size + CHUNK_SIZE - 1) / CHUNK_SIZE

fun isUploadComplete(chunkResult: Map<*, *>): Boolean {
val chunksUploaded = chunkResult["chunksUploaded"]?.toString()?.toLongOrNull() ?: return false
val chunksTotal = chunkResult["chunksTotal"]?.toString()?.toLongOrNull() ?: totalChunks
return chunksUploaded >= chunksTotal
}

suspend fun uploadChunk(index: Int, start: Long, end: Long, includeUploadId: Boolean): Map<*, *> {
val chunkParams = params.toMutableMap()
val chunkHeaders = headers.toMutableMap()

if (includeUploadId && uploadId != null) {
chunkHeaders["x-appwrite-id"] = uploadId!!
}

chunkHeaders["Content-Range"] = "bytes $start-${end - 1}/$size"
chunkParams[paramName] = MultipartBody.Part.createFormData(
paramName,
input.filename,
buffer.toRequestBody()
readChunk(start, end).toRequestBody()
)

headers["Content-Range"] =
"bytes $offset-${((offset + CHUNK_SIZE) - 1).coerceAtMost(size - 1)}/$size"

result = call(
val chunkResult = call(
method = "POST",
path,
headers,
params,
chunkHeaders,
chunkParams,
responseType = Map::class.java
)

offset += CHUNK_SIZE
headers["x-appwrite-id"] = result["\$id"].toString()
if (index == 0 || uploadId == null) {
uploadId = chunkResult["\$id"].toString()
}
Comment on lines +573 to +595

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 uploadId is not @Volatile but is read across coroutine threads

uploadId is written in the sequential section (chunk 0 upload or from params[idParamName]) and then read by every concurrent uploadChunk invocation at the if (includeUploadId && uploadId != null) check. Without @Volatile, the JVM does not guarantee that a write performed on one thread is visible to coroutines dispatched to other threads, even if the write happened-before the coroutineScope { } call in program order. If a concurrent coroutine reads a stale null, it silently omits the x-appwrite-id header; the server would then treat the chunk as a new file creation rather than an additional chunk, silently corrupting or duplicating the upload. Declaring uploadId as @Volatile costs nothing and eliminates the risk.


return chunkResult
}

if (offset == 0L) {
val firstChunkEnd = CHUNK_SIZE.toLong().coerceAtMost(size)
result = uploadChunk(0, 0, firstChunkEnd, false)
offset = firstChunkEnd
onProgress?.invoke(
UploadProgress(
id = result["\$id"].toString(),
id = uploadId ?: result!!["\$id"].toString(),
progress = offset.coerceAtMost(size).toDouble() / size * 100,
sizeUploaded = offset.coerceAtMost(size),
chunksTotal = result["chunksTotal"].toString().toInt(),
chunksUploaded = result["chunksUploaded"].toString().toInt(),
chunksTotal = result!!["chunksTotal"].toString().toInt(),
chunksUploaded = result!!["chunksUploaded"].toString().toInt(),
)
)
}

val chunks = mutableListOf<Triple<Int, Long, Long>>()
var chunkOffset = offset
while (chunkOffset < size) {
val end = (chunkOffset + CHUNK_SIZE).coerceAtMost(size)
chunks.add(Triple((chunkOffset / CHUNK_SIZE).toInt(), chunkOffset, end))
chunkOffset = end
}

if (chunks.isNotEmpty()) {
val nextChunk = AtomicInteger(0)
val completedChunks = AtomicInteger((offset / CHUNK_SIZE).toInt())
val uploadedBytes = AtomicLong(offset.coerceAtMost(size))

coroutineScope {
List(MAX_CONCURRENT_UPLOADS.coerceAtMost(chunks.size)) {
async {
while (true) {
val chunkIndex = nextChunk.getAndIncrement()
if (chunkIndex >= chunks.size) {
break
}

val (index, start, end) = chunks[chunkIndex]
val chunkResult = uploadChunk(index, start, end, true)

val chunksUploaded = completedChunks.incrementAndGet()
val sizeUploaded = uploadedBytes.addAndGet(end - start)

if (isUploadComplete(chunkResult)) {
result = chunkResult
}
Comment on lines +643 to +645

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Non-volatile result written from concurrent async blocks

result is a plain var captured by up to 8 concurrent async coroutines, each of which may independently evaluate isUploadComplete and write to it without synchronization. On a multi-threaded dispatcher (e.g. Dispatchers.IO), the JVM does not guarantee that the write from one coroutine is ever visible to another without a @Volatile annotation or an AtomicReference. In the worst case, result may still hold the first-chunk response (or the pre-upload current state in a resume path) after awaitAll() if none of the concurrent writes are flushed. The caller on line 662 would then convert stale metadata as if the upload were complete. Mark result as @Volatile or use AtomicReference<Map<*,*>?> to ensure the final write is visible.


onProgress?.invoke(
UploadProgress(
id = uploadId ?: chunkResult["\$id"].toString(),
progress = sizeUploaded.coerceAtMost(size).toDouble() / size * 100,
sizeUploaded = sizeUploaded.coerceAtMost(size),
chunksTotal = chunkResult["chunksTotal"].toString().toInt(),
chunksUploaded = chunksUploaded,
)
)
Comment on lines +647 to +655

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 onProgress invoked concurrently from multiple coroutines

Up to 8 async blocks can call onProgress?.invoke(...) simultaneously on different threads. If the callback updates shared state (e.g. a UI progress bar, a counter, or a LiveData) without its own synchronization, those updates will race. Additionally, because chunk completion ordering is non-deterministic, a callback for a later-indexed chunk may fire before one for an earlier-indexed chunk, so callers that assume monotonically ordered delivery will observe unexpected behaviour. Documenting that this callback may be invoked concurrently from background threads, or funnelling all invocations through a single serialised dispatcher, would make the contract explicit.

}
}
}.awaitAll()
}
}

return converter(result as Map<String, Any>)
}

Expand Down
Loading