@@ -7,110 +7,159 @@ import io.ktor.client.statement.*
77import io.ktor.http.isSuccess
88import io.ktor.utils.io.*
99import kotlinx.coroutines.Dispatchers
10+ import kotlinx.coroutines.channels.Channel
11+ import kotlinx.coroutines.channels.awaitClose
12+ import kotlinx.coroutines.coroutineScope
13+ import kotlinx.coroutines.delay
1014import kotlinx.coroutines.flow.Flow
11- import kotlinx.coroutines.flow.channelFlow
15+ import kotlinx.coroutines.flow.buffer
16+ import kotlinx.coroutines.flow.callbackFlow
17+ import kotlinx.coroutines.flow.flowOn
1218import kotlinx.coroutines.isActive
19+ import kotlinx.coroutines.launch
1320import kotlinx.coroutines.withContext
1421import zed.rainxch.core.domain.model.DownloadProgress
1522import zed.rainxch.core.domain.network.Downloader
1623import java.io.File
1724import java.io.FileOutputStream
25+ import java.nio.ByteBuffer
1826import java.util.UUID
27+ import java.util.concurrent.atomic.AtomicLong
28+ import kotlin.coroutines.cancellation.CancellationException
1929
2030class DesktopDownloader (
2131 private val http : HttpClient ,
2232 private val files : FileLocationsProvider ,
2333) : Downloader {
2434
25- override fun download (url : String , suggestedFileName : String? ): Flow <DownloadProgress > = channelFlow {
26- withContext(Dispatchers .IO ) {
27- val dir = File (files.userDownloadsDir())
28- if (! dir.exists()) dir.mkdirs()
29-
30- val safeName = (suggestedFileName?.takeIf { it.isNotBlank() }
31- ? : url.substringAfterLast(' /' )
32- .ifBlank { " asset-${UUID .randomUUID()} " })
33- val outFile = File (dir, safeName)
34-
35- if (outFile.exists()) {
36- Logger .d { " Deleting existing file before download: ${outFile.absolutePath} " }
37- outFile.delete()
38- }
35+ override fun download (url : String , suggestedFileName : String? ): Flow <DownloadProgress > =
36+ callbackFlow {
37+ coroutineScope {
38+ val dir = File (files.userDownloadsDir())
39+ if (! dir.exists()) dir.mkdirs()
3940
40- Logger .d { " Downloading: $url to ${outFile.absolutePath} " }
41+ val safeName = (suggestedFileName?.takeIf { it.isNotBlank() }
42+ ? : url.substringAfterLast(' /' )
43+ .ifBlank { " asset-${UUID .randomUUID()} " })
44+ val outFile = File (dir, safeName)
4145
42- val response : HttpResponse = http.get(url)
43- if ( ! response.status.isSuccess()) {
44- throw IllegalStateException ( " Download failed: HTTP ${response.status.value} " )
45- }
46+ if (outFile.exists()) {
47+ Logger .d { " Deleting existing file before download: ${outFile.absolutePath} " }
48+ outFile.delete( )
49+ }
4650
47- val total = response.headers[" Content-Length" ]?.toLongOrNull()
48- val channel = response.bodyAsChannel()
51+ Logger .d { " Downloading: $url to ${outFile.absolutePath} " }
4952
50- try {
51- FileOutputStream (outFile).use { fos ->
52- val buffer = ByteArray (DEFAULT_BUFFER_SIZE )
53- var downloaded = 0L
53+ val response: HttpResponse = http.get(url)
54+ if (! response.status.isSuccess()) {
55+ close(IllegalStateException (" Download failed: HTTP ${response.status.value} " ))
56+ return @coroutineScope
57+ }
5458
55- while (isActive) {
56- val read = channel.readAvailable(buffer, 0 , buffer.size)
57- if (read == - 1 ) break
58- fos.write(buffer, 0 , read)
59- downloaded + = read
59+ val total = response.headers[" Content-Length" ]?.toLongOrNull()
60+ val channel = response.bodyAsChannel()
61+
62+ val downloaded = AtomicLong (0L )
63+
64+ trySend(DownloadProgress (0L , total, if (total != null && total > 0 ) 0 else null ))
65+
66+ val downloadJob = launch(Dispatchers .IO ) {
67+ try {
68+ FileOutputStream (outFile).use { fos ->
69+ val fc = fos.channel
70+
71+ while (isActive) {
72+ val buffer = ByteArray (DEFAULT_BUFFER_SIZE )
73+ val bytesRead = channel.readAvailable(buffer, 0 , buffer.size)
74+ if (bytesRead == - 1 ) break
75+
76+ if (bytesRead > 0 ) {
77+ val byteBuffer = ByteBuffer .wrap(buffer, 0 , bytesRead)
78+ fc.write(byteBuffer)
79+ downloaded.addAndGet(bytesRead.toLong())
80+ }
81+ }
82+ }
83+ Logger .d { " File write complete: ${outFile.absolutePath} " }
84+ } catch (e: CancellationException ) {
85+ if (outFile.exists()) {
86+ outFile.delete()
87+ Logger .d { " Deleted partial file after cancellation: ${outFile.absolutePath} " }
88+ }
89+ throw e
90+ } catch (e: Exception ) {
91+ if (outFile.exists()) {
92+ outFile.delete()
93+ }
94+ throw e
95+ }
96+ }
6097
98+ val progressJob = launch {
99+ while (isActive && downloadJob.isActive) {
100+ val current = downloaded.get()
61101 val percent = if (total != null && total > 0 ) {
62- ((downloaded * 100L ) / total).toInt()
102+ ((current * 100L ) / total).toInt()
63103 } else null
64-
65- trySend( DownloadProgress (downloaded, total, percent) )
104+ trySend( DownloadProgress (current, total, percent))
105+ delay( 50L )
66106 }
67- fos.flush()
68107 }
69108
70- Logger .d { " Download complete: ${outFile.absolutePath} " }
71-
72- trySend(DownloadProgress (total ? : outFile.length(), total, 100 ))
73- } catch (e: CancellationException ) {
74- if (outFile.exists()) {
75- outFile.delete()
76- Logger .d { " Deleted partial file after cancellation: ${outFile.absolutePath} " }
109+ try {
110+ downloadJob.join()
111+ progressJob.cancel()
112+
113+ val finalDownloaded = total ? : outFile.length()
114+ trySend(DownloadProgress (finalDownloaded, total, 100 ))
115+ Logger .d { " Download complete: ${outFile.absolutePath} " }
116+
117+ close()
118+ } catch (e: CancellationException ) {
119+ downloadJob.cancel()
120+ progressJob.cancel()
121+ close(e)
122+ } catch (e: Exception ) {
123+ downloadJob.cancel()
124+ progressJob.cancel()
125+ close(e)
77126 }
78- throw e
79- } finally {
80- close()
81127 }
82- }
83- }
84128
85- override suspend fun saveToFile (url : String , suggestedFileName : String? ): String = withContext(Dispatchers .IO ) {
86- val dir = File (files.userDownloadsDir())
87- val safeName = (suggestedFileName?.takeIf { it.isNotBlank() }
88- ? : url.substringAfterLast(' /' )
89- .ifBlank { " asset-${UUID .randomUUID()} " })
129+ awaitClose { }
130+ }.flowOn(Dispatchers .Default ).buffer(Channel .CONFLATED )
131+
132+ override suspend fun saveToFile (url : String , suggestedFileName : String? ): String =
133+ withContext(Dispatchers .IO ) {
134+ val dir = File (files.userDownloadsDir())
135+ val safeName = (suggestedFileName?.takeIf { it.isNotBlank() }
136+ ? : url.substringAfterLast(' /' )
137+ .ifBlank { " asset-${UUID .randomUUID()} " })
90138
91- val outFile = File (dir, safeName)
139+ val outFile = File (dir, safeName)
92140
93- if (outFile.exists()) {
94- Logger .d { " Deleting existing file before download: ${outFile.absolutePath} " }
95- outFile.delete()
96- }
141+ if (outFile.exists()) {
142+ Logger .d { " Deleting existing file before download: ${outFile.absolutePath} " }
143+ outFile.delete()
144+ }
97145
98- Logger .d { " saveToFile downloading file..." }
99- download(url, suggestedFileName).collect { }
146+ Logger .d { " saveToFile downloading file..." }
147+ download(url, suggestedFileName).collect { }
100148
101- outFile.absolutePath
102- }
149+ outFile.absolutePath
150+ }
103151
104- override suspend fun getDownloadedFilePath (fileName : String ): String? = withContext(Dispatchers .IO ) {
105- val dir = File (files.userDownloadsDir())
106- val file = File (dir, fileName)
152+ override suspend fun getDownloadedFilePath (fileName : String ): String? =
153+ withContext(Dispatchers .IO ) {
154+ val dir = File (files.userDownloadsDir())
155+ val file = File (dir, fileName)
107156
108- if (file.exists() && file.length() > 0 ) {
109- file.absolutePath
110- } else {
111- null
157+ if (file.exists() && file.length() > 0 ) {
158+ file.absolutePath
159+ } else {
160+ null
161+ }
112162 }
113- }
114163
115164 override suspend fun cancelDownload (fileName : String ): Boolean = withContext(Dispatchers .IO ) {
116165 val dir = File (files.userDownloadsDir())
0 commit comments