Skip to content

Commit 63e1c23

Browse files
committed
fix: cleanup and edge cases
1 parent 878dc6a commit 63e1c23

22 files changed

Lines changed: 399 additions & 118 deletions

package/expo-package/android/src/newarch/com/streamchatexpo/StreamMultipartUploaderModule.kt

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ import com.facebook.react.bridge.Arguments
44
import com.facebook.react.bridge.Promise
55
import com.facebook.react.bridge.ReactApplicationContext
66
import com.facebook.react.bridge.ReadableArray
7+
import com.facebook.react.bridge.ReadableMap
8+
import com.facebook.react.bridge.UiThreadUtil
79
import com.facebook.react.modules.core.DeviceEventManagerModule
810
import com.streamchatreactnative.shared.upload.StreamMultipartUploadRequestParser
911
import com.streamchatreactnative.shared.upload.StreamMultipartUploader
10-
import java.util.concurrent.Executors
12+
import java.util.concurrent.LinkedBlockingQueue
13+
import java.util.concurrent.ThreadPoolExecutor
14+
import java.util.concurrent.TimeUnit
1115

1216
class StreamMultipartUploaderModule(
1317
reactContext: ReactApplicationContext,
@@ -29,65 +33,90 @@ class StreamMultipartUploaderModule(
2933
method: String,
3034
headers: ReadableArray,
3135
parts: ReadableArray,
32-
progress: com.facebook.react.bridge.ReadableMap?,
36+
progress: ReadableMap?,
37+
timeoutMs: Double?,
3338
promise: Promise,
3439
) {
35-
executor.execute {
40+
val request =
3641
try {
37-
val request = StreamMultipartUploadRequestParser.parse(
42+
StreamMultipartUploadRequestParser.parse(
3843
uploadId = uploadId,
3944
url = url,
4045
method = method,
4146
headers = headers,
4247
parts = parts,
4348
progress = progress,
49+
timeoutMs = timeoutMs,
4450
)
45-
val response =
46-
StreamMultipartUploader.upload(reactApplicationContext, request) { loaded, total ->
47-
emitProgress(uploadId, loaded, total)
48-
}
51+
} catch (error: Throwable) {
52+
promise.reject("stream_multipart_upload_error", error.message, error)
53+
return
54+
}
4955

50-
val payload = Arguments.createMap().apply {
51-
putString("body", response.body)
52-
putArray("headers", Arguments.createArray().apply {
53-
response.headers.forEach { (name, value) ->
54-
pushMap(
55-
Arguments.createMap().apply {
56-
putString("name", name)
57-
putString("value", value)
58-
},
59-
)
56+
try {
57+
executor.execute {
58+
try {
59+
val response =
60+
StreamMultipartUploader.upload(reactApplicationContext, request) { loaded, total ->
61+
emitProgress(uploadId, loaded, total)
6062
}
61-
})
62-
putDouble("status", response.status.toDouble())
63-
putString("statusText", response.statusText)
63+
64+
val payload = Arguments.createMap().apply {
65+
putString("body", response.body)
66+
putArray("headers", Arguments.createArray().apply {
67+
response.headers.forEach { (name, value) ->
68+
pushMap(
69+
Arguments.createMap().apply {
70+
putString("name", name)
71+
putString("value", value)
72+
},
73+
)
74+
}
75+
})
76+
putDouble("status", response.status.toDouble())
77+
putString("statusText", response.statusText)
78+
}
79+
promise.resolve(payload)
80+
} catch (error: Throwable) {
81+
promise.reject("stream_multipart_upload_error", error.message, error)
6482
}
65-
promise.resolve(payload)
66-
} catch (error: Throwable) {
67-
promise.reject("stream_multipart_upload_error", error.message, error)
6883
}
84+
} catch (error: Throwable) {
85+
promise.reject("stream_multipart_upload_error", error.message, error)
6986
}
7087
}
7188

7289
private fun emitProgress(uploadId: String, loaded: Long, total: Long?) {
73-
val payload = Arguments.createMap().apply {
74-
putDouble("loaded", loaded.toDouble())
75-
if (total != null) {
76-
putDouble("total", total.toDouble())
77-
} else {
78-
putNull("total")
90+
UiThreadUtil.runOnUiThread {
91+
val payload = Arguments.createMap().apply {
92+
putDouble("loaded", loaded.toDouble())
93+
if (total != null) {
94+
putDouble("total", total.toDouble())
95+
} else {
96+
putNull("total")
97+
}
98+
putString("uploadId", uploadId)
7999
}
80-
putString("uploadId", uploadId)
81-
}
82100

83-
reactApplicationContext
84-
.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter::class.java)
85-
.emit(PROGRESS_EVENT_NAME, payload)
101+
reactApplicationContext
102+
.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter::class.java)
103+
.emit(PROGRESS_EVENT_NAME, payload)
104+
}
86105
}
87106

88107
companion object {
89108
const val NAME = "StreamMultipartUploader"
90109
private const val PROGRESS_EVENT_NAME = "streamMultipartUploadProgress"
91-
private val executor = Executors.newCachedThreadPool()
110+
private val maxConcurrentUploads = Runtime.getRuntime().availableProcessors().coerceIn(2, 4)
111+
private val executor =
112+
ThreadPoolExecutor(
113+
maxConcurrentUploads,
114+
maxConcurrentUploads,
115+
30L,
116+
TimeUnit.SECONDS,
117+
LinkedBlockingQueue(64),
118+
).apply {
119+
allowCoreThreadTimeOut(true)
120+
}
92121
}
93122
}

package/expo-package/src/native/NativeStreamMultipartUploader.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export interface Spec extends TurboModule {
4545
headers: ReadonlyArray<UploadHeader>,
4646
parts: ReadonlyArray<UploadPart>,
4747
progress?: UploadProgressConfig | null,
48+
timeoutMs?: number | null,
4849
): Promise<UploadResponse>;
4950
}
5051

package/expo-package/src/native/multipartUploader.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type MultipartUploadRequest = {
4343
parts: UploadPart[];
4444
progress?: UploadProgressConfig;
4545
signal?: NativeMultipartAbortSignal;
46+
timeoutMs?: number;
4647
uploadId: string;
4748
url: string;
4849
};
@@ -58,6 +59,7 @@ export const uploadMultipart = async ({
5859
parts,
5960
progress,
6061
signal,
62+
timeoutMs,
6163
uploadId,
6264
url,
6365
}: MultipartUploadRequest): Promise<MultipartUploadResponse> => {
@@ -118,6 +120,7 @@ export const uploadMultipart = async ({
118120
toUploadHeaders(headers),
119121
parts,
120122
progress ?? {},
123+
timeoutMs,
121124
);
122125

123126
if (signal?.aborted) {

package/expo-package/src/optionalDependencies/multipartUpload.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export const multipartUpload = async ({
4141
parts,
4242
progress,
4343
signal,
44+
timeoutMs,
4445
url,
4546
}: NativeMultipartUploadRequest) => {
4647
const resolvedParts = await Promise.all(parts.map(resolvePartUri));
@@ -52,6 +53,7 @@ export const multipartUpload = async ({
5253
parts: resolvedParts,
5354
progress,
5455
signal,
56+
timeoutMs,
5557
uploadId: `stream-upload-${Date.now()}-${Math.random().toString(16).slice(2)}`,
5658
url,
5759
});

package/native-package/android/src/newarch/com/streamchatreactnative/StreamMultipartUploaderModule.kt

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ import com.facebook.react.bridge.Arguments
44
import com.facebook.react.bridge.Promise
55
import com.facebook.react.bridge.ReactApplicationContext
66
import com.facebook.react.bridge.ReadableArray
7+
import com.facebook.react.bridge.ReadableMap
8+
import com.facebook.react.bridge.UiThreadUtil
79
import com.facebook.react.modules.core.DeviceEventManagerModule
810
import com.streamchatreactnative.shared.upload.StreamMultipartUploadRequestParser
911
import com.streamchatreactnative.shared.upload.StreamMultipartUploader
10-
import java.util.concurrent.Executors
12+
import java.util.concurrent.LinkedBlockingQueue
13+
import java.util.concurrent.ThreadPoolExecutor
14+
import java.util.concurrent.TimeUnit
1115

1216
class StreamMultipartUploaderModule(
1317
reactContext: ReactApplicationContext,
@@ -29,65 +33,90 @@ class StreamMultipartUploaderModule(
2933
method: String,
3034
headers: ReadableArray,
3135
parts: ReadableArray,
32-
progress: com.facebook.react.bridge.ReadableMap?,
36+
progress: ReadableMap?,
37+
timeoutMs: Double?,
3338
promise: Promise,
3439
) {
35-
executor.execute {
40+
val request =
3641
try {
37-
val request = StreamMultipartUploadRequestParser.parse(
42+
StreamMultipartUploadRequestParser.parse(
3843
uploadId = uploadId,
3944
url = url,
4045
method = method,
4146
headers = headers,
4247
parts = parts,
4348
progress = progress,
49+
timeoutMs = timeoutMs,
4450
)
45-
val response =
46-
StreamMultipartUploader.upload(reactApplicationContext, request) { loaded, total ->
47-
emitProgress(uploadId, loaded, total)
48-
}
51+
} catch (error: Throwable) {
52+
promise.reject("stream_multipart_upload_error", error.message, error)
53+
return
54+
}
4955

50-
val payload = Arguments.createMap().apply {
51-
putString("body", response.body)
52-
putArray("headers", Arguments.createArray().apply {
53-
response.headers.forEach { (name, value) ->
54-
pushMap(
55-
Arguments.createMap().apply {
56-
putString("name", name)
57-
putString("value", value)
58-
},
59-
)
56+
try {
57+
executor.execute {
58+
try {
59+
val response =
60+
StreamMultipartUploader.upload(reactApplicationContext, request) { loaded, total ->
61+
emitProgress(uploadId, loaded, total)
6062
}
61-
})
62-
putDouble("status", response.status.toDouble())
63-
putString("statusText", response.statusText)
63+
64+
val payload = Arguments.createMap().apply {
65+
putString("body", response.body)
66+
putArray("headers", Arguments.createArray().apply {
67+
response.headers.forEach { (name, value) ->
68+
pushMap(
69+
Arguments.createMap().apply {
70+
putString("name", name)
71+
putString("value", value)
72+
},
73+
)
74+
}
75+
})
76+
putDouble("status", response.status.toDouble())
77+
putString("statusText", response.statusText)
78+
}
79+
promise.resolve(payload)
80+
} catch (error: Throwable) {
81+
promise.reject("stream_multipart_upload_error", error.message, error)
6482
}
65-
promise.resolve(payload)
66-
} catch (error: Throwable) {
67-
promise.reject("stream_multipart_upload_error", error.message, error)
6883
}
84+
} catch (error: Throwable) {
85+
promise.reject("stream_multipart_upload_error", error.message, error)
6986
}
7087
}
7188

7289
private fun emitProgress(uploadId: String, loaded: Long, total: Long?) {
73-
val payload = Arguments.createMap().apply {
74-
putDouble("loaded", loaded.toDouble())
75-
if (total != null) {
76-
putDouble("total", total.toDouble())
77-
} else {
78-
putNull("total")
90+
UiThreadUtil.runOnUiThread {
91+
val payload = Arguments.createMap().apply {
92+
putDouble("loaded", loaded.toDouble())
93+
if (total != null) {
94+
putDouble("total", total.toDouble())
95+
} else {
96+
putNull("total")
97+
}
98+
putString("uploadId", uploadId)
7999
}
80-
putString("uploadId", uploadId)
81-
}
82100

83-
reactApplicationContext
84-
.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter::class.java)
85-
.emit(PROGRESS_EVENT_NAME, payload)
101+
reactApplicationContext
102+
.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter::class.java)
103+
.emit(PROGRESS_EVENT_NAME, payload)
104+
}
86105
}
87106

88107
companion object {
89108
const val NAME = "StreamMultipartUploader"
90109
private const val PROGRESS_EVENT_NAME = "streamMultipartUploadProgress"
91-
private val executor = Executors.newCachedThreadPool()
110+
private val maxConcurrentUploads = Runtime.getRuntime().availableProcessors().coerceIn(2, 4)
111+
private val executor =
112+
ThreadPoolExecutor(
113+
maxConcurrentUploads,
114+
maxConcurrentUploads,
115+
30L,
116+
TimeUnit.SECONDS,
117+
LinkedBlockingQueue(64),
118+
).apply {
119+
allowCoreThreadTimeOut(true)
120+
}
92121
}
93122
}

package/native-package/src/native/NativeStreamMultipartUploader.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export interface Spec extends TurboModule {
4545
headers: ReadonlyArray<UploadHeader>,
4646
parts: ReadonlyArray<UploadPart>,
4747
progress?: UploadProgressConfig | null,
48+
timeoutMs?: number | null,
4849
): Promise<UploadResponse>;
4950
}
5051

package/native-package/src/native/multipartUploader.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type MultipartUploadRequest = {
4343
parts: UploadPart[];
4444
progress?: UploadProgressConfig;
4545
signal?: NativeMultipartAbortSignal;
46+
timeoutMs?: number;
4647
uploadId: string;
4748
url: string;
4849
};
@@ -58,6 +59,7 @@ export const uploadMultipart = async ({
5859
parts,
5960
progress,
6061
signal,
62+
timeoutMs,
6163
uploadId,
6264
url,
6365
}: MultipartUploadRequest): Promise<MultipartUploadResponse> => {
@@ -118,6 +120,7 @@ export const uploadMultipart = async ({
118120
toUploadHeaders(headers),
119121
parts,
120122
progress ?? {},
123+
timeoutMs,
121124
);
122125

123126
if (signal?.aborted) {

package/native-package/src/optionalDependencies/multipartUpload.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export const multipartUpload = async ({
4141
parts,
4242
progress,
4343
signal,
44+
timeoutMs,
4445
url,
4546
}: NativeMultipartUploadRequest) => {
4647
const resolvedParts = await Promise.all(parts.map(resolvePartUri));
@@ -52,6 +53,7 @@ export const multipartUpload = async ({
5253
parts: resolvedParts,
5354
progress,
5455
signal,
56+
timeoutMs,
5557
uploadId: `stream-upload-${Date.now()}-${Math.random().toString(16).slice(2)}`,
5658
url,
5759
});

package/shared-native/android/upload/StreamMultipartUploadModels.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ data class StreamMultipartUploadRequest(
55
val method: String,
66
val parts: List<StreamMultipartUploadPart>,
77
val progress: StreamMultipartUploadProgressOptions?,
8+
val timeoutMs: Long?,
89
val uploadId: String,
910
val url: String,
1011
)

0 commit comments

Comments
 (0)