diff --git a/Sources/Storage/DownloadEngine.swift b/Sources/Storage/DownloadEngine.swift new file mode 100644 index 00000000..40057c07 --- /dev/null +++ b/Sources/Storage/DownloadEngine.swift @@ -0,0 +1,267 @@ +// +// DownloadEngine.swift +// Storage +// +// Created by Guilherme Souza on 04/05/26. +// + +import Foundation + +#if canImport(FoundationNetworking) + import FoundationNetworking +#endif + +actor DownloadEngine { + enum State { + case idle + /// A download task has been enqueued but the `URLSessionDownloadTask` has not been created yet + /// (e.g. waiting on the auth-token request to complete). Setting this state before scheduling + /// the async work prevents duplicate starts from concurrent ``start()`` or ``resume()`` calls. + case starting + case downloading(URLSessionDownloadTask) + /// The task was intentionally paused via ``pause()``. + /// `resumeData` is nil when the server does not support range requests. + case paused(resumeData: Data?) + case completed(URL) + case failed(StorageError) + case cancelled + + var isTerminal: Bool { + switch self { + case .completed, .failed, .cancelled: return true + default: return false + } + } + } + + private let session: URLSession + private let delegate: DownloadSessionDelegate + private let requestBuilder: @Sendable () async throws -> URLRequest + private let eventsContinuation: AsyncStream>.Continuation + private let resultContinuation: AsyncStream>.Continuation + + private var state: State = .idle + + init( + session: URLSession, + delegate: DownloadSessionDelegate, + requestBuilder: @Sendable @escaping () async throws -> URLRequest, + eventsContinuation: AsyncStream>.Continuation, + resultContinuation: AsyncStream>.Continuation + ) { + self.session = session + self.delegate = delegate + self.requestBuilder = requestBuilder + self.eventsContinuation = eventsContinuation + self.resultContinuation = resultContinuation + } + + func start() async { + guard case .idle = state else { return } + state = .starting + await startDownload(resumeData: nil) + } + + /// Pauses an active download. + /// + /// Calls `URLSessionDownloadTask.cancel(byProducingResumeData:)` to capture any available + /// resume data. If the server sent `Accept-Ranges: bytes` and an `ETag` or `Last-Modified` + /// header, the data will be non-nil and ``resume()`` will restart the download from where + /// it stopped. If resume data is unavailable the download restarts from byte 0 on ``resume()``. + func pause() async { + guard case .downloading(let task) = state else { return } + state = .paused(resumeData: nil) + // Await the resume-data callback so we capture whatever the server provides before + // returning. The actor is suspended during this await; other calls (e.g. cancel()) can + // run in the meantime — the `if case .paused` guard below handles that safely. + let resumeData = await withCheckedContinuation { continuation in + task.cancel(byProducingResumeData: { data in continuation.resume(returning: data) }) + } + if case .paused = state { + state = .paused(resumeData: resumeData) + } + } + + /// Resumes a paused download. + /// + /// If resume data was captured during ``pause()``, the session continues from the last + /// received byte. Otherwise the entire download is restarted from the beginning. + func resume() async { + guard case .paused(let resumeData) = state else { return } + state = .starting + await startDownload(resumeData: resumeData) + } + + /// Cancels the download immediately. + func cancel() { + guard !state.isTerminal else { return } + if case .downloading(let task) = state { + task.cancel() + } + state = .cancelled + let error = StorageError.cancelled + eventsContinuation.yield(.failed(error)) + eventsContinuation.finish() + resultContinuation.yield(.failure(error)) + resultContinuation.finish() + } + + // MARK: - Private + + private func startDownload(resumeData: Data?) async { + guard case .starting = state else { return } + + do { + let urlTask: URLSessionDownloadTask + if let resumeData { + urlTask = session.downloadTask(withResumeData: resumeData) + } else { + let request = try await requestBuilder() + // Re-check: the task may have been cancelled while we awaited the request builder. + guard case .starting = state else { return } + urlTask = session.downloadTask(with: request) + } + + // Capture the task identifier so that stale callbacks from a previously + // cancelled task (e.g. the one cancelled by pause()) are ignored if they + // arrive after a new task has been started by resume(). + let taskIdentifier = urlTask.taskIdentifier + delegate.register( + taskIdentifier: taskIdentifier, + callbacks: DownloadTaskCallbacks( + onProgress: { [weak self] totalBytesWritten, totalBytesExpectedToWrite in + Task { [weak self] in + await self?.didWriteData( + totalBytesWritten: totalBytesWritten, + totalBytesExpectedToWrite: totalBytesExpectedToWrite, + forTask: taskIdentifier + ) + } + }, + onFinished: { [weak self] result in + // The delegate has already moved the file synchronously. + // Notify the engine of the result from a Task so we don't block the delegate queue. + Task { [weak self] in await self?.didFinish(result, forTask: taskIdentifier) } + }, + onCompleteWithError: { [weak self] error in + Task { [weak self] in await self?.didCompleteWithError(error, forTask: taskIdentifier) } + } + ) + ) + + state = .downloading(urlTask) + urlTask.resume() + } catch { + guard !state.isTerminal else { return } + finish(with: .failure(StorageError.from(error))) + } + } + + private func didWriteData( + totalBytesWritten: Int64, + totalBytesExpectedToWrite: Int64, + forTask taskIdentifier: Int + ) { + guard case .downloading(let current) = state, + current.taskIdentifier == taskIdentifier + else { return } + eventsContinuation.yield( + .progress( + TransferProgress( + bytesTransferred: totalBytesWritten, + totalBytes: totalBytesExpectedToWrite + ))) + } + + /// Called after the delegate has moved the downloaded file to a stable temp location. + /// + /// The file move is performed synchronously by `DownloadSessionDelegate` inside + /// `urlSession(_:downloadTask:didFinishDownloadingTo:)` — before URLSession reclaims the + /// original temp file. This method just forwards the result to `finish(with:)`. + private func didFinish(_ result: Result, forTask taskIdentifier: Int) { + guard case .downloading(let current) = state, + current.taskIdentifier == taskIdentifier + else { return } + finish(with: result.mapError { $0 as any Error }) + } + + private func didCompleteWithError(_ error: (any Error)?, forTask taskIdentifier: Int) { + guard let error else { return } // nil means success; handled by didFinishDownloading + + // Only act when the error is for the current active download task. + // Stale URLError.cancelled from a task cancelled by pause() must not affect + // a new task that resume() started — even if that new task is already .downloading. + guard case .downloading(let current) = state, + current.taskIdentifier == taskIdentifier + else { return } + + if (error as? URLError)?.code == .cancelled { + // External cancellation (e.g. system pressure, explicit cancel()) while actively + // downloading. cancel() already sets state to .cancelled before this runs, so + // reaching here means a third-party or system cancellation — shut down cleanly. + cancel() + } else { + finish(with: .failure(StorageError.networkError(underlying: error))) + } + } + + private func finish(with result: Result) { + switch result { + case .success(let url): + state = .completed(url) + eventsContinuation.yield(.completed(url)) + case .failure(let error): + let storageError = error as? StorageError ?? StorageError.networkError(underlying: error) + state = .failed(storageError) + eventsContinuation.yield(.failed(storageError)) + } + eventsContinuation.finish() + resultContinuation.yield(result.mapError { $0 }) + resultContinuation.finish() + } +} + +// MARK: - Factory + +extension DownloadEngine { + static func makeTask( + session: URLSession, + delegate: DownloadSessionDelegate, + requestBuilder: @Sendable @escaping () async throws -> URLRequest + ) -> StorageDownloadTask { + let (eventStream, eventsContinuation) = + AsyncStream>.makeStream() + let (resultStream, resultContinuation) = + AsyncStream>.makeStream(bufferingPolicy: .bufferingNewest(1)) + + let engine = DownloadEngine( + session: session, + delegate: delegate, + requestBuilder: requestBuilder, + eventsContinuation: eventsContinuation, + resultContinuation: resultContinuation + ) + + eventsContinuation.onTermination = { reason in + guard case .cancelled = reason else { return } + Task { await engine.cancel() } + } + + let resultTask = Task { + for await r in resultStream { return try r.get() } + throw StorageError.cancelled + } + + let task = StorageDownloadTask( + events: eventStream, + resultTask: resultTask, + pause: { await engine.pause() }, + resume: { await engine.resume() }, + cancel: { await engine.cancel() } + ) + + Task { await engine.start() } + + return task + } +} diff --git a/Sources/Storage/DownloadSessionDelegate.swift b/Sources/Storage/DownloadSessionDelegate.swift index e1a4f07a..1b81a66c 100644 --- a/Sources/Storage/DownloadSessionDelegate.swift +++ b/Sources/Storage/DownloadSessionDelegate.swift @@ -12,89 +12,56 @@ import Foundation import FoundationNetworking #endif -final class DownloadSessionDelegate: NSObject, URLSessionDownloadDelegate, Sendable { +/// Callbacks registered for a single `URLSessionDownloadTask`. +/// +/// The delegate calls `onFinished` on successful completion and `onCompleteWithError` on +/// failure or explicit cancellation. Both are one-shot: the delegate unregisters the callbacks +/// as soon as either fires. +struct DownloadTaskCallbacks: Sendable { + /// Periodic bytes-received update. `totalBytesWritten` is the running total; + /// `totalBytesExpectedToWrite` is the `Content-Length` (-1 if unknown). + let onProgress: @Sendable (_ totalBytesWritten: Int64, _ totalBytesExpectedToWrite: Int64) -> Void + /// The download finished. The delegate moves the temporary file to a stable location + /// synchronously (before URLSession reclaims it) and passes the result here. + let onFinished: @Sendable (Result) -> Void + /// Called on failure or cancellation. `nil` is never passed (success is delivered via + /// `onFinished`); the parameter is nullable only to match the delegate signature. + let onCompleteWithError: @Sendable (_ error: (any Error)?) -> Void +} - struct DownloadTaskState { - let eventsContinuation: AsyncStream>.Continuation - let resultContinuation: AsyncStream>.Continuation - } +final class DownloadSessionDelegate: NSObject, URLSessionDownloadDelegate, Sendable { struct MutableState { - var tasks: [Int: DownloadTaskState] = [:] + var callbacks: [Int: DownloadTaskCallbacks] = [:] var backgroundCompletionHandler: (@Sendable () -> Void)? } private let state = LockIsolated(MutableState()) + // MARK: - Registration + + func register(taskIdentifier: Int, callbacks: DownloadTaskCallbacks) { + state.withValue { $0.callbacks[taskIdentifier] = callbacks } + } + // MARK: - Task creation - /// Creates a `StorageDownloadTask` backed by this delegate. + /// Creates a `StorageDownloadTask` backed by a `DownloadEngine`. /// /// `buildRequest` is called asynchronously before the underlying /// `URLSessionDownloadTask` is created, so callers can fetch an auth token - /// (via `_HTTPClient.createRequest`) without blocking. + /// without blocking. func makeStorageDownloadTask( in session: URLSession, buildRequest: @escaping @Sendable () async throws -> URLRequest ) -> StorageDownloadTask { - let (eventStream, eventsContinuation) = AsyncStream>.makeStream() - let (resultStream, resultContinuation) = AsyncStream>.makeStream( - bufferingPolicy: .bufferingNewest(1)) - - let urlTaskRef = LockIsolated(nil) - - let resultTask = Task { - for await r in resultStream { return try r.get() } - throw StorageError.cancelled - } - - // Bootstrap task: fetch the token, build the request, then start the download. - let bootstrapTask = Task { - do { - let request = try await buildRequest() - let urlTask = session.downloadTask(with: request) - - state.withValue { - $0.tasks[urlTask.taskIdentifier] = DownloadTaskState( - eventsContinuation: eventsContinuation, - resultContinuation: resultContinuation - ) - } - - urlTaskRef.setValue(urlTask) - urlTask.resume() - } catch { - let storageError = StorageError.from(error) - eventsContinuation.yield(.failed(storageError)) - eventsContinuation.finish() - resultContinuation.yield(.failure(storageError)) - resultContinuation.finish() - } - } - - eventsContinuation.onTermination = { [urlTaskRef] reason in - guard case .cancelled = reason else { return } - urlTaskRef.value?.cancel() - bootstrapTask.cancel() - } - - return StorageDownloadTask( - events: eventStream, - resultTask: resultTask, - pause: { urlTaskRef.value?.suspend() }, - resume: { urlTaskRef.value?.resume() }, - cancel: { - bootstrapTask.cancel() - urlTaskRef.value?.cancel() - // Finish continuations in case the bootstrap was cancelled before the - // URLSessionDownloadTask existed — otherwise observers hang forever. - eventsContinuation.finish() - resultContinuation.finish() - } - ) + DownloadEngine.makeTask(session: session, delegate: self, requestBuilder: buildRequest) } /// Package-level access for tests to drive delegate callbacks directly. + /// + /// Returns the stream and underlying `URLSessionDownloadTask` so tests can call + /// delegate methods directly without going through a real network connection. package func makeDownloadTask( in session: URLSession, request: URLRequest @@ -104,17 +71,40 @@ final class DownloadSessionDelegate: NSObject, URLSessionDownloadDelegate, Senda task: URLSessionDownloadTask ) { let (eventStream, eventsContinuation) = AsyncStream>.makeStream() - let (resultStream, resultContinuation) = AsyncStream>.makeStream( - bufferingPolicy: .bufferingNewest(1)) let urlTask = session.downloadTask(with: request) - state.withValue { - $0.tasks[urlTask.taskIdentifier] = DownloadTaskState( - eventsContinuation: eventsContinuation, - resultContinuation: resultContinuation + + register( + taskIdentifier: urlTask.taskIdentifier, + callbacks: DownloadTaskCallbacks( + onProgress: { totalBytesWritten, totalBytesExpectedToWrite in + eventsContinuation.yield( + .progress( + TransferProgress( + bytesTransferred: totalBytesWritten, + totalBytes: totalBytesExpectedToWrite + ))) + }, + onFinished: { result in + switch result { + case .success(let url): + eventsContinuation.yield(.completed(url)) + case .failure(let error): + eventsContinuation.yield(.failed(error)) + } + eventsContinuation.finish() + }, + onCompleteWithError: { error in + guard let error else { return } + let storageError: StorageError = + (error as? URLError)?.code == .cancelled + ? .cancelled + : .networkError(underlying: error) + eventsContinuation.yield(.failed(storageError)) + eventsContinuation.finish() + } ) - } - _ = resultStream // satisfy unused warning — test uses stream directly via delegate callbacks - _ = resultContinuation + ) + return (eventStream, eventsContinuation, urlTask) } @@ -132,14 +122,8 @@ final class DownloadSessionDelegate: NSObject, URLSessionDownloadDelegate, Senda totalBytesExpectedToWrite: Int64 ) { state.withValue { - guard let taskState = $0.tasks[downloadTask.taskIdentifier] else { return } - taskState.eventsContinuation.yield( - .progress( - TransferProgress( - bytesTransferred: totalBytesWritten, - totalBytes: totalBytesExpectedToWrite - ))) - } + $0.callbacks[downloadTask.taskIdentifier] + }?.onProgress(totalBytesWritten, totalBytesExpectedToWrite) } func urlSession( @@ -147,27 +131,43 @@ final class DownloadSessionDelegate: NSObject, URLSessionDownloadDelegate, Senda downloadTask: URLSessionDownloadTask, didFinishDownloadingTo location: URL ) { - state.withValue { - guard let taskState = $0.tasks[downloadTask.taskIdentifier] else { return } - - let destination = FileManager.default.temporaryDirectory - .appendingPathComponent(UUID().uuidString) - - do { - try FileManager.default.moveItem(at: location, to: destination) - taskState.eventsContinuation.yield(.completed(destination)) - taskState.eventsContinuation.finish() - taskState.resultContinuation.yield(.success(destination)) - taskState.resultContinuation.finish() - } catch { - let storageError = StorageError.fileSystemError(underlying: error) - taskState.eventsContinuation.yield(.failed(storageError)) - taskState.eventsContinuation.finish() - taskState.resultContinuation.yield(.failure(storageError)) - taskState.resultContinuation.finish() + // URLSession calls this method even for 4xx/5xx responses — the error body is + // what was "downloaded". Check the HTTP status code and deliver a typed error + // instead of a file URL when the server signalled failure. + if let httpResponse = downloadTask.response as? HTTPURLResponse, + httpResponse.statusCode >= 400 + { + let data = (try? Data(contentsOf: location)) ?? Data() + let error = StorageError.from(httpResponse: httpResponse, data: data) + let callbacks = state.withValue { + let cb = $0.callbacks[downloadTask.taskIdentifier] + $0.callbacks.removeValue(forKey: downloadTask.taskIdentifier) + return cb } - $0.tasks.removeValue(forKey: downloadTask.taskIdentifier) + callbacks?.onFinished(.failure(error)) + return } + + // Success path: URLSession deletes `location` as soon as this method returns, + // so the file must be moved synchronously — before invoking the callback. + let destination = FileManager.default.temporaryDirectory + .appendingPathComponent(UUID().uuidString) + let moveResult: Result + do { + try FileManager.default.moveItem(at: location, to: destination) + moveResult = .success(destination) + } catch { + moveResult = .failure(StorageError.fileSystemError(underlying: error)) + } + + // Unregister before calling the callback — the engine marks the task terminal on + // receipt, so subsequent delegate calls should be no-ops. + let callbacks = state.withValue { + let cb = $0.callbacks[downloadTask.taskIdentifier] + $0.callbacks.removeValue(forKey: downloadTask.taskIdentifier) + return cb + } + callbacks?.onFinished(moveResult) } func urlSession( @@ -176,23 +176,14 @@ final class DownloadSessionDelegate: NSObject, URLSessionDownloadDelegate, Senda didCompleteWithError error: (any Error)? ) { guard let error else { return } - - state.withValue { - guard let taskState = $0.tasks[task.taskIdentifier] else { return } - - let storageError: StorageError - if (error as? URLError)?.code == .cancelled { - storageError = .cancelled - } else { - storageError = .networkError(underlying: error) - } - - taskState.eventsContinuation.yield(.failed(storageError)) - taskState.eventsContinuation.finish() - taskState.resultContinuation.yield(.failure(storageError)) - taskState.resultContinuation.finish() - $0.tasks.removeValue(forKey: task.taskIdentifier) + // Only unregister on error — success is handled (and unregistered) by + // urlSession(_:downloadTask:didFinishDownloadingTo:) which fires first. + let callbacks = state.withValue { + let cb = $0.callbacks[task.taskIdentifier] + $0.callbacks.removeValue(forKey: task.taskIdentifier) + return cb } + callbacks?.onCompleteWithError(error) } func urlSessionDidFinishEvents(forBackgroundURLSession session: URLSession) { diff --git a/Sources/Storage/StorageError.swift b/Sources/Storage/StorageError.swift index d2695f4d..e372e1e8 100644 --- a/Sources/Storage/StorageError.swift +++ b/Sources/Storage/StorageError.swift @@ -205,6 +205,29 @@ extension StorageError { errorCode: .cancelled ) + /// Parses a Storage server error response from raw HTTP response data. + /// + /// Tries to decode the Supabase Storage JSON error envelope + /// `{ "message": "…", "error": "ErrorCode", "statusCode": "404" }`. + /// Falls back to the raw body string and `StorageErrorCode.unknown` when decoding fails. + static func from(httpResponse: HTTPURLResponse, data: Data) -> StorageError { + struct Body: Decodable { + let message: String? + let error: String? + /// The server sends the status code as a JSON string, e.g. `"404"`. + let statusCode: String? + } + let decoded = try? JSONDecoder().decode(Body.self, from: data) + return StorageError( + message: decoded?.message ?? decoded?.error ?? String(data: data, encoding: .utf8) + ?? "Unknown error", + errorCode: decoded?.error.map(StorageErrorCode.init(_:)) ?? .unknown, + statusCode: decoded?.statusCode.flatMap(Int.init) ?? httpResponse.statusCode, + underlyingResponse: httpResponse, + underlyingData: data + ) + } + /// Converts any `Error` to a `StorageError`. /// /// - Returns `self` when `error` is already a `StorageError`. diff --git a/Sources/Storage/StorageFileAPI.swift b/Sources/Storage/StorageFileAPI.swift index 600f232c..d094beb6 100644 --- a/Sources/Storage/StorageFileAPI.swift +++ b/Sources/Storage/StorageFileAPI.swift @@ -600,6 +600,11 @@ public struct StorageFileAPI: Sendable { /// permanent location before the app exits — the file is not guaranteed to persist across /// launches. /// + /// Downloads support pause and resume. Calling ``StorageDownloadTask/pause()`` suspends the + /// transfer and captures resume data when the server returns `Accept-Ranges: bytes`. Calling + /// ``StorageDownloadTask/resume()`` continues from the last received byte when resume data is + /// available, or restarts from the beginning otherwise. + /// /// When ``StorageClientConfiguration/backgroundDownloadSessionIdentifier`` is set, downloads /// continue while the app is suspended. Wire up /// ``StorageClient/handleBackgroundEvents(forSessionIdentifier:completionHandler:)`` in your @@ -615,11 +620,16 @@ public struct StorageFileAPI: Sendable { /// ## Example /// /// ```swift - /// let url = try await storage.from("avatars").download(path: "user-123/photo.png").value + /// let task = storage.from("videos").download(path: "clip.mp4") + /// + /// // Pause mid-download and resume later + /// await task.pause() + /// await task.resume() /// - /// // Move to a permanent location before the app exits + /// // Await the final file URL + /// let url = try await task.value /// let dest = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask)[0] - /// .appendingPathComponent("photo.png") + /// .appendingPathComponent("clip.mp4") /// try FileManager.default.moveItem(at: url, to: dest) /// ``` @discardableResult diff --git a/Sources/Storage/StorageTransferTask.swift b/Sources/Storage/StorageTransferTask.swift index cb1147d6..a59487bc 100644 --- a/Sources/Storage/StorageTransferTask.swift +++ b/Sources/Storage/StorageTransferTask.swift @@ -38,7 +38,7 @@ import Foundation /// } /// ``` /// -/// **Pause, resume, and cancel (TUS uploads only)** +/// **Pause, resume, and cancel** /// ```swift /// // TUS (resumable) upload — use method: .resumable to force TUS /// let upload = storage.from("videos").upload("clip.mp4", fileURL: fileURL, method: .resumable) @@ -93,15 +93,18 @@ public final class StorageTransferTask: Sendable { /// Suspends the transfer. /// - /// Only supported for TUS (resumable) uploads. For multipart uploads this is a no-op — - /// use ``cancel()`` and re-upload from scratch if you need to stop a multipart transfer. /// For TUS uploads the current in-flight chunk is drained before the task suspends. + /// For downloads, the session captures any available resume data so that ``resume()`` can + /// continue from the last byte received (requires `Accept-Ranges` support on the server). + /// For multipart uploads this is a no-op — use ``cancel()`` and re-upload from scratch. public func pause() async { await _pause() } /// Resumes a previously paused transfer. /// - /// Only supported for TUS (resumable) uploads. For multipart uploads this is a no-op. /// For TUS uploads the server is HEAD-queried to re-sync the byte offset before uploading resumes. + /// For downloads, if resume data was captured during ``pause()`` the download continues from + /// the last received byte; otherwise it restarts from the beginning. + /// For multipart uploads this is a no-op. public func resume() async { await _resume() } /// Cancels the transfer immediately. @@ -219,4 +222,8 @@ public typealias StorageUploadTask = StorageTransferTask /// /// The success value is a `URL` pointing to a temporary file on disk. /// Move or read the file before the app exits — it is not guaranteed to persist across launches. +/// +/// Downloads support ``StorageTransferTask/pause()``, ``StorageTransferTask/resume()``, and +/// ``StorageTransferTask/cancel()``. Pausing captures resume data when the server supports +/// `Accept-Ranges`; resuming then continues from the last received byte. public typealias StorageDownloadTask = StorageTransferTask diff --git a/Tests/IntegrationTests/StorageDownloadIntegrationTests.swift b/Tests/IntegrationTests/StorageDownloadIntegrationTests.swift new file mode 100644 index 00000000..d87428f0 --- /dev/null +++ b/Tests/IntegrationTests/StorageDownloadIntegrationTests.swift @@ -0,0 +1,450 @@ +// +// StorageDownloadIntegrationTests.swift +// +// +// Created by Guilherme Souza on 04/05/26. +// + +// Requires: supabase start && supabase db reset (from Tests/IntegrationTests/) +// Run with: make test-integration + +import Foundation +import Storage +import Testing + +#if canImport(FoundationNetworking) + import FoundationNetworking +#endif + +@Suite(.serialized) +final class StorageDownloadIntegrationTests { + + let storage = StorageClient( + url: URL(string: "\(DotEnv.SUPABASE_URL)/storage/v1")!, + configuration: StorageClientConfiguration( + headers: ["Authorization": "Bearer \(DotEnv.SUPABASE_SECRET_KEY)"], + logger: nil + ) + ) + + /// Creates a fresh private bucket, uploads a blob, then runs the test body. + /// Always cleans up the bucket regardless of success or failure. + private func withUploadedFile( + size: Int = 1 * 1024 * 1024, + byte: UInt8 = 0xAB, + _ body: (_ bucket: String, _ path: String, _ data: Data) async throws -> Void + ) async throws { + let bucketId = "dl-test-\(UUID().uuidString.lowercased())" + let path = "files/\(UUID().uuidString).bin" + let data = Data(repeating: byte, count: size) + + try await storage.createBucket(bucketId, options: BucketOptions(isPublic: false)) + + do { + _ = try await storage.from(bucketId).uploadMultipart(path, data: data).value + try await body(bucketId, path, data) + } catch { + try? await storage.emptyBucket(bucketId) + try? await storage.deleteBucket(bucketId) + throw error + } + + try? await storage.emptyBucket(bucketId) + try? await storage.deleteBucket(bucketId) + } + + // MARK: - Happy path + + /// Basic download completes and the file on disk matches what was uploaded. + @Test func downloadToDiskMatchesUploadedContent() async throws { + try await withUploadedFile { bucket, path, original in + let url = try await storage.from(bucket).download(path: path).value + defer { try? FileManager.default.removeItem(at: url) } + + #expect(FileManager.default.fileExists(atPath: url.path)) + let received = try Data(contentsOf: url) + #expect(received == original) + } + } + + /// `download()` emits at least one `.progress` event before `.completed`. + @Test func downloadEmitsProgressEvents() async throws { + try await withUploadedFile(size: 2 * 1024 * 1024) { bucket, path, _ in + let task = storage.from(bucket).download(path: path) + var progressFractions: [Double] = [] + var completedURL: URL? + + for await event in task.events { + switch event { + case .progress(let p): + progressFractions.append(p.fractionCompleted) + case .completed(let url): + completedURL = url + case .failed(let error): + Issue.record("Unexpected failure: \(error)") + } + } + + #expect(!progressFractions.isEmpty, "Expected at least one progress event") + #expect(progressFractions.allSatisfy { $0 >= 0 && $0 <= 1.0 }) + + let url = try #require(completedURL) + defer { try? FileManager.default.removeItem(at: url) } + #expect(FileManager.default.fileExists(atPath: url.path)) + } + } + + /// `downloadData()` returns the exact bytes that were uploaded. + @Test func downloadDataMatchesUploadedContent() async throws { + try await withUploadedFile(size: 256 * 1024) { bucket, path, original in + let received = try await storage.from(bucket).downloadData(path: path).value + #expect(received == original) + } + } + + /// `.value` and `.events` are independent — consuming both yields consistent results. + @Test func valueAndEventsAreIndependent() async throws { + try await withUploadedFile(size: 512 * 1024) { bucket, path, original in + let task = storage.from(bucket).download(path: path) + + // Consume events in a background task while simultaneously awaiting .value. + async let eventCount: Int = { + var count = 0 + for await _ in task.events { count += 1 } + return count + }() + + let url = try await task.value + defer { try? FileManager.default.removeItem(at: url) } + + let count = await eventCount + #expect(count > 0, "Events stream should have delivered events") + + let received = try Data(contentsOf: url) + #expect(received == original) + } + } + + // MARK: - Pause / resume + + /// Pausing immediately after starting and then resuming delivers the complete file. + @Test func pauseThenResumeCompletesDownload() async throws { + try await withUploadedFile(size: 2 * 1024 * 1024) { bucket, path, original in + let task = storage.from(bucket).download(path: path) + + // Collect the first progress event, then pause. + var paused = false + for await event in task.events { + if case .progress = event, !paused { + paused = true + await task.pause() + break + } + } + + // Resume and collect the rest. + await task.resume() + let url = try await task.value + defer { try? FileManager.default.removeItem(at: url) } + + let received = try Data(contentsOf: url) + #expect(received == original) + } + } + + /// Pausing before any data arrives (fire-pause-resume) still completes correctly. + @Test func pauseBeforeFirstProgressThenResumeCompletes() async throws { + try await withUploadedFile(size: 1 * 1024 * 1024) { bucket, path, original in + let task = storage.from(bucket).download(path: path) + + // Pause immediately — may land before the first progress event. + await task.pause() + await task.resume() + + let url = try await task.value + defer { try? FileManager.default.removeItem(at: url) } + + let received = try Data(contentsOf: url) + #expect(received == original) + } + } + + /// Multiple pause/resume cycles eventually complete the download. + @Test func multiplePauseResumeCyclesCompleteDownload() async throws { + try await withUploadedFile(size: 3 * 1024 * 1024) { bucket, path, original in + let task = storage.from(bucket).download(path: path) + + actor Counters { + var pauseCount = 0 + var progressCount = 0 + func incrementProgress() -> Int { + progressCount += 1 + return progressCount + } + func incrementPause() -> Int { + pauseCount += 1 + return pauseCount + } + func getPauseCount() -> Int { pauseCount } + } + let counters = Counters() + + // Drive the download manually: pause on every other progress event. + Task { + for await event in task.events { + if case .progress = event { + let progress = await counters.incrementProgress() + let pauses = await counters.getPauseCount() + if progress.isMultiple(of: 2) && pauses < 3 { + _ = await counters.incrementPause() + await task.pause() + // Small delay before resuming to let the pause propagate. + try? await Task.sleep(nanoseconds: 50_000_000) + await task.resume() + } + } + } + } + + let url = try await task.value + defer { try? FileManager.default.removeItem(at: url) } + + let received = try Data(contentsOf: url) + #expect(received == original) + #expect(await counters.getPauseCount() > 0, "Expected at least one pause/resume cycle") + } + } + + /// Pausing a download that is already paused is a no-op — a subsequent resume still works. + @Test func doublePauseIsIdempotent() async throws { + try await withUploadedFile(size: 1 * 1024 * 1024) { bucket, path, original in + let task = storage.from(bucket).download(path: path) + + await task.pause() + await task.pause() // second pause — should be a no-op + await task.resume() + + let url = try await task.value + defer { try? FileManager.default.removeItem(at: url) } + + let received = try Data(contentsOf: url) + #expect(received == original) + } + } + + /// Resuming a download that was never paused is a no-op — the download still completes. + @Test func resumeWithoutPauseIsNoop() async throws { + try await withUploadedFile(size: 512 * 1024) { bucket, path, original in + let task = storage.from(bucket).download(path: path) + await task.resume() // no-op: not paused + + let url = try await task.value + defer { try? FileManager.default.removeItem(at: url) } + + let received = try Data(contentsOf: url) + #expect(received == original) + } + } + + /// Resuming an already-completed download is a no-op — value is still available. + @Test func resumeAfterCompletionIsNoop() async throws { + try await withUploadedFile(size: 256 * 1024) { bucket, path, original in + let task = storage.from(bucket).download(path: path) + let url = try await task.value + defer { try? FileManager.default.removeItem(at: url) } + + await task.resume() // no-op: already completed + + let received = try Data(contentsOf: url) + #expect(received == original) + } + } + + // MARK: - Cancel + + /// Cancelling a download causes `.value` to throw with `.cancelled` error code. + @Test func cancelThrowsCancelledError() async throws { + try await withUploadedFile(size: 2 * 1024 * 1024) { bucket, path, _ in + let task = storage.from(bucket).download(path: path) + await task.cancel() + + do { + _ = try await task.value + Issue.record("Expected a StorageError to be thrown") + } catch let error as StorageError { + #expect(error.errorCode == .cancelled) + } + } + } + + /// Cancelling a download causes the `events` stream to end with a `.failed(.cancelled)` event. + @Test func cancelYieldsCancelledEventOnStream() async throws { + try await withUploadedFile(size: 2 * 1024 * 1024) { bucket, path, _ in + let task = storage.from(bucket).download(path: path) + await task.cancel() + + var lastEvent: TransferEvent? + for await event in task.events { lastEvent = event } + + guard case .failed(let error) = lastEvent else { + Issue.record("Expected .failed event, got \(String(describing: lastEvent))") + return + } + #expect(error.errorCode == .cancelled) + } + } + + /// Cancelling mid-download (after at least one progress event) still delivers `.cancelled`. + @Test func cancelMidDownloadDeliversCancelledError() async throws { + try await withUploadedFile(size: 3 * 1024 * 1024) { bucket, path, _ in + let task = storage.from(bucket).download(path: path) + + // Wait for first progress event, then cancel. + for await event in task.events { + if case .progress = event { + await task.cancel() + break + } + } + + do { + _ = try await task.value + Issue.record("Expected a StorageError to be thrown") + } catch let error as StorageError { + #expect(error.errorCode == .cancelled) + } + } + } + + /// Cancelling an already-completed download is a no-op — value is still available. + @Test func cancelAfterCompletionIsNoop() async throws { + try await withUploadedFile(size: 256 * 1024) { bucket, path, original in + let task = storage.from(bucket).download(path: path) + let url = try await task.value + defer { try? FileManager.default.removeItem(at: url) } + + await task.cancel() // no-op: already completed + + // Value should still be readable (it was already stored in the result task). + let url2 = try await task.value + defer { try? FileManager.default.removeItem(at: url2) } + + let received = try Data(contentsOf: url2) + #expect(received == original) + } + } + + /// Cancelling a paused download delivers `.cancelled`, not `.failed(.networkError)`. + @Test func cancelWhilePausedDeliversCancelledError() async throws { + try await withUploadedFile(size: 2 * 1024 * 1024) { bucket, path, _ in + let task = storage.from(bucket).download(path: path) + await task.pause() + await task.cancel() + + do { + _ = try await task.value + Issue.record("Expected a StorageError to be thrown") + } catch let error as StorageError { + #expect(error.errorCode == .cancelled) + } + } + } + + /// Calling `cancel()` multiple times is safe — the second call is a no-op. + @Test func doubleCancelIsIdempotent() async throws { + try await withUploadedFile(size: 1 * 1024 * 1024) { bucket, path, _ in + let task = storage.from(bucket).download(path: path) + await task.cancel() + await task.cancel() // second cancel — should be a no-op + + do { + _ = try await task.value + Issue.record("Expected a StorageError to be thrown") + } catch let error as StorageError { + #expect(error.errorCode == .cancelled) + } + } + } + + // MARK: - Error handling + + /// Downloading a path that does not exist results in a non-cancelled storage error. + @Test func downloadNonExistentPathFails() async throws { + try await withUploadedFile { bucket, _, _ in + let task = storage.from(bucket).download(path: "does/not/exist.bin") + + do { + _ = try await task.value + Issue.record("Expected a StorageError to be thrown") + } catch let error as StorageError { + #expect(error.errorCode != .cancelled) + } + } + } + + /// The `events` stream ends with `.failed` when the path does not exist. + @Test func downloadNonExistentPathEmitsFailedEvent() async throws { + try await withUploadedFile { bucket, _, _ in + let task = storage.from(bucket).download(path: "does/not/exist.bin") + var lastEvent: TransferEvent? + for await event in task.events { lastEvent = event } + + guard case .failed = lastEvent else { + Issue.record("Expected .failed event, got \(String(describing: lastEvent))") + return + } + } + } + + // MARK: - Concurrent downloads + + /// Multiple concurrent downloads from the same bucket all complete correctly. + @Test func concurrentDownloadsCompleteIndependently() async throws { + try await withUploadedFile(size: 512 * 1024, byte: 0x11) { bucket, path1, data1 in + // Upload a second file. + let path2 = "files/\(UUID().uuidString).bin" + let data2 = Data(repeating: 0x22, count: 512 * 1024) + _ = try await storage.from(bucket).uploadMultipart(path2, data: data2).value + + // Capture storage to avoid sending `self` across async-let child tasks. + let s = storage + async let url1 = s.from(bucket).download(path: path1).value + async let url2 = s.from(bucket).download(path: path2).value + + let (u1, u2) = try await (url1, url2) + defer { + try? FileManager.default.removeItem(at: u1) + try? FileManager.default.removeItem(at: u2) + } + + #expect(try Data(contentsOf: u1) == data1) + #expect(try Data(contentsOf: u2) == data2) + } + } + + /// Cancelling one download does not affect a concurrent download. + @Test func cancelOneDownloadDoesNotAffectAnother() async throws { + try await withUploadedFile(size: 2 * 1024 * 1024, byte: 0xAA) { bucket, path1, original in + let path2 = "files/\(UUID().uuidString).bin" + let data2 = Data(repeating: 0xBB, count: 2 * 1024 * 1024) + _ = try await storage.from(bucket).uploadMultipart(path2, data: data2).value + + let task1 = storage.from(bucket).download(path: path1) + let task2 = storage.from(bucket).download(path: path2) + + // Cancel task1 immediately; let task2 run to completion. + await task1.cancel() + + do { + _ = try await task1.value + Issue.record("Expected task1 to throw .cancelled") + } catch let error as StorageError { + #expect(error.errorCode == .cancelled) + } + + let url2 = try await task2.value + defer { try? FileManager.default.removeItem(at: url2) } + #expect(try Data(contentsOf: url2) == data2) + } + } +} diff --git a/Tests/StorageTests/DownloadEngineTests.swift b/Tests/StorageTests/DownloadEngineTests.swift new file mode 100644 index 00000000..cfc88935 --- /dev/null +++ b/Tests/StorageTests/DownloadEngineTests.swift @@ -0,0 +1,157 @@ +// +// DownloadEngineTests.swift +// Storage +// +// Created by Guilherme Souza on 04/05/26. +// + +// Darwin-only for the same reason as StorageDownloadTaskTests: swift-corelibs-foundation +// crashes when a custom URLProtocol intercepts a URLSessionDownloadTask on Linux. +// SequentialMockProtocol calls urlProtocolDidFinishLoading, which triggers the same forced +// cast that traps in _ProtocolClient.urlProtocolDidFinishLoading on Linux. +#if canImport(Darwin) + + import Foundation + import Testing + + @testable import Storage + + @Suite(.serialized) struct DownloadEngineTests { + + static let baseURL = URL(string: "http://localhost:54321/storage/v1")! + static let bucketId = "test-bucket" + + let client: StorageClient + let bucket: StorageFileAPI + + init() { + SequentialMockProtocol.reset() + let config = URLSessionConfiguration.ephemeral + config.protocolClasses = [SequentialMockProtocol.self] + let session = URLSession(configuration: config) + client = StorageClient( + url: Self.baseURL, + configuration: StorageClientConfiguration( + headers: ["Authorization": "Bearer test-token"], + session: session, + logger: nil + ) + ) + bucket = client.from(Self.bucketId) + } + + // MARK: - Basic download + + @Test func downloadCompletesSuccessfully() async throws { + let fileContents = Data("hello download".utf8) + SequentialMockProtocol.responses = [ + (200, ["Content-Length": "\(fileContents.count)"], fileContents) + ] + + let url = try await bucket.download(path: "images/photo.png").value + + #expect(FileManager.default.fileExists(atPath: url.path)) + let receivedData = try Data(contentsOf: url) + #expect(receivedData == fileContents) + try? FileManager.default.removeItem(at: url) + } + + @Test func networkErrorYieldsFailedEvent() async throws { + // No responses → SequentialMockProtocol returns URLError.badServerResponse + SequentialMockProtocol.responses = [] + SequentialMockProtocol.hangWhenExhausted = false + + let task = bucket.download(path: "images/missing.png") + + var lastEvent: TransferEvent? + for await event in task.events { lastEvent = event } + + guard case .failed = lastEvent else { + Issue.record("Expected .failed event, got \(String(describing: lastEvent))") + return + } + } + + // MARK: - Pause / resume + + /// Verifies that pausing a hanging download and then resuming it (from scratch, since no + /// real bytes were transferred and thus no resume data) eventually completes the download. + @Test func pauseAndResumeCompletesDownload() async throws { + // First request hangs — simulates a slow download we want to pause. + SequentialMockProtocol.hangWhenExhausted = true + SequentialMockProtocol.responses = [] + + let task = bucket.download(path: "images/photo.png") + + // Wait for the download task to enter the hanging request. + var hangIter = SequentialMockProtocol.nextHang.makeAsyncIterator() + _ = await hangIter.next() + + // Pause — cancels the hanging URLSessionDownloadTask. + await task.pause() + + // Wait for stopLoading() to be called so we know the cancel reached the protocol. + var cancelIter = SequentialMockProtocol.hangCancelled.makeAsyncIterator() + _ = await cancelIter.next() + + // Provide a response for the resumed download. + // Resume data will be nil (no bytes were actually written), so resume() restarts + // from byte 0. + let fileContents = Data("resumed content".utf8) + SequentialMockProtocol.hangWhenExhausted = false + SequentialMockProtocol.appendResponse( + (200, ["Content-Length": "\(fileContents.count)"], fileContents)) + + await task.resume() + + let url = try await task.value + #expect(FileManager.default.fileExists(atPath: url.path)) + let receivedData = try Data(contentsOf: url) + #expect(receivedData == fileContents) + try? FileManager.default.removeItem(at: url) + } + + // MARK: - Cancel + + @Test func cancelYieldsCancelledError() async throws { + SequentialMockProtocol.hangWhenExhausted = true + SequentialMockProtocol.responses = [] + + let task = bucket.download(path: "images/photo.png") + + var hangIter = SequentialMockProtocol.nextHang.makeAsyncIterator() + _ = await hangIter.next() + + await task.cancel() + + var lastEvent: TransferEvent? + for await event in task.events { lastEvent = event } + + guard case .failed(let error) = lastEvent else { + Issue.record("Expected .failed event, got \(String(describing: lastEvent))") + return + } + #expect(error.errorCode == .cancelled) + } + + @Test func cancelThrowsFromValue() async throws { + SequentialMockProtocol.hangWhenExhausted = true + SequentialMockProtocol.responses = [] + + let task = bucket.download(path: "images/photo.png") + + var hangIter = SequentialMockProtocol.nextHang.makeAsyncIterator() + _ = await hangIter.next() + + await task.cancel() + + do { + _ = try await task.value + Issue.record("Expected an error to be thrown") + } catch let error as StorageError { + #expect(error.errorCode == .cancelled) + } + } + } + +#endif // canImport(Darwin)