Skip to content

Commit 533ec49

Browse files
committed
Apply crud throttle, better error handling
1 parent 81c7121 commit 533ec49

3 files changed

Lines changed: 74 additions & 15 deletions

File tree

Sources/PowerSync/Implementation/sync/HttpClient.swift

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,23 @@ import Foundation
99
/// a general-purpose HTTP client.
1010
protocol HttpClient: Sendable {
1111
/// Start streaming a `/sync/stream` response body, emitting individual lines.
12+
///
13+
/// Throws an ``UnexpectedResponseError`` if the response can't be interpreted as sync lines.
1214
func receiveSyncLines(request: URLRequest) async throws -> (HTTPURLResponse, any SyncLineResponse)
1315

1416
/// Read a full response body.
1517
func readFully(request: URLRequest) async throws -> (HTTPURLResponse, Data)
1618
}
1719

20+
struct UnexpectedResponseError: Error, CustomDebugStringConvertible {
21+
let response: HTTPURLResponse
22+
let message: String
23+
24+
var debugDescription: String {
25+
message
26+
}
27+
}
28+
1829
protocol SyncLineResponse: Sendable, AsyncSequence where AsyncIterator: SyncLineResponseIterator {}
1930

2031
protocol SyncLineResponseIterator: AsyncIteratorProtocol {
@@ -31,11 +42,15 @@ struct PlatformHttpClient: HttpClient {
3142
let session: URLSession
3243

3344
func receiveSyncLines(request: URLRequest) async throws -> (HTTPURLResponse, any SyncLineResponse) {
34-
let (bytes, response) = try await session.bytes(for: request)
45+
let (bytes, originalResponse) = try await session.bytes(for: request)
46+
let response = originalResponse as! HTTPURLResponse
3547
let jsonStreamMimeType = "application/x-ndjson"
36-
48+
3749
if response.mimeType != jsonStreamMimeType {
38-
throw PowerSyncError.operationFailed(message: "Invalid sync lines response, (expected \(jsonStreamMimeType), got \(response.mimeType, default: "")")
50+
throw UnexpectedResponseError(
51+
response: response,
52+
message: "Invalid sync lines response, (expected \(jsonStreamMimeType), got \(response.mimeType, default: "")"
53+
)
3954
}
4055

4156
struct PlatformSyncLineResponse<Base>: SyncLineResponse where Base : AsyncSequence, Base.Element == UInt8, Base: Sendable {
@@ -57,7 +72,7 @@ struct PlatformHttpClient: HttpClient {
5772
}
5873
}
5974

60-
return (response as! HTTPURLResponse, PlatformSyncLineResponse(lines: bytes.lines))
75+
return (response, PlatformSyncLineResponse(lines: bytes.lines))
6176
}
6277

6378
func readFully(request: URLRequest) async throws -> (HTTPURLResponse, Data) {

Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ final class StreamingSyncClient: Sendable {
3838

3939
private func uploadLoop(signals: SyncSignals) async throws {
4040
// TODO: Replace with better watch mechanism once we've dropped the Kotlin dependency and can use onChange.
41-
let watch = try db.watch(sql: "SELECT 1 FROM ps_crud LIMIT 1", parameters: [], mapper: { _ in () })
41+
let options = WatchOptions(sql: "SELECT 1 FROM ps_crud LIMIT 1", throttle: options.crudThrottle, mapper: { _ in ()})
42+
let watch = try db.watch(options: options)
4243
.dropFirst() // Skip initial result, we just want to watch changes
4344
.map { _ in () }
4445
let allTriggers = AsyncAlgorithms.merge(watch, signals.signalCrudUpload.subscribe())
@@ -131,18 +132,21 @@ The next upload iteration will be delayed.
131132
try tx.execute(sql: "UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name = '$local'", parameters: [opId])
132133
}
133134
}
134-
135+
136+
private func handleCommonResponseErrors(response: HTTPURLResponse) async {
137+
if response.statusCode == 401 {
138+
await self.invalidateCredentials()
139+
}
140+
}
141+
135142
private func getWriteCheckpoint() async throws -> String {
136143
let clientId = try await db.get("SELECT powersync_client_id()") { try $0.getString(index: 0) }
137144
let (_, request) = try await authenticatedRequest { endpoint in
138145
endpoint.path += "/write-checkpoint2.json"
139146
endpoint.queryItems = [.init(name: "client_id", value: clientId)]
140147
}
141148
let (response, data) = try await httpClient.readFully(request: request)
142-
143-
if response.statusCode == 401 {
144-
await self.invalidateCredentials()
145-
}
149+
await self.handleCommonResponseErrors(response: response)
146150
if response.statusCode != 200 {
147151
throw PowerSyncError.operationFailed(message: "Error getting write checkpoint: \(response.statusCode)")
148152
}
@@ -209,14 +213,23 @@ The next upload iteration will be delayed.
209213
httpRequest.setValue("application/x-ndjson", forHTTPHeaderField: "Accept")
210214
httpRequest.httpBody = try StreamingSyncClient.jsonEncoder.encode(request)
211215

212-
let (response, stream) = try await httpClient.receiveSyncLines(request: httpRequest)
213-
if response.statusCode == 401 {
214-
await invalidateCredentials()
216+
let response: HTTPURLResponse
217+
let stream: any SyncLineResponse
218+
do {
219+
(response, stream) = try await httpClient.receiveSyncLines(request: httpRequest)
220+
} catch {
221+
if let responseError = error as? UnexpectedResponseError {
222+
await handleCommonResponseErrors(response: responseError.response)
223+
}
224+
225+
throw error
215226
}
227+
228+
await handleCommonResponseErrors(response: response)
216229
if response.statusCode != 200 {
217230
throw PowerSyncError.operationFailed(message: "POST \(url) failed with status code \(response.statusCode)")
218231
}
219-
232+
220233
return ControlInvocationsFromStream(sequence: stream)
221234
}
222235

Tests/PowerSyncTests/SyncTests.swift

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import AsyncAlgorithms
2+
import Foundation
23
@testable import PowerSync
34
import Testing
45

@@ -226,7 +227,37 @@ class InMemorySyncIntegrationTests {
226227
await waitForStatus(db.currentStatus) { $0.connected }
227228
try #require(await connector.fetchCredentialsCalls == 2)
228229
}
229-
230+
231+
@Test func handlesThrowing401Response() async throws {
232+
final actor BackendConnector: PowerSyncBackendConnectorProtocol {
233+
var fetchCredentialsCalls = 0
234+
235+
func fetchCredentials() async throws -> PowerSyncCredentials? {
236+
fetchCredentialsCalls += 1
237+
return testCredentials
238+
}
239+
240+
func uploadData(database: any PowerSyncDatabaseProtocol) async throws {}
241+
}
242+
243+
let connector = BackendConnector()
244+
let channel = AsyncThrowingChannel<PowerSync.SyncLine, any Error>()
245+
let db = openDatabase(MockHttpClient { request in
246+
if await connector.fetchCredentialsCalls == 1 {
247+
// On a real 401 response, the platform client would throw because the body can't be interpreted as sync lines.
248+
// This verifies the sync client can recognize that and reset credentials.
249+
let response = HTTPURLResponse(url: request.url!, statusCode: 401, httpVersion: nil, headerFields: nil)!
250+
throw UnexpectedResponseError(response: response, message: "Expected error to retry fetching credentials")
251+
} else {
252+
return channel
253+
}
254+
})
255+
256+
try await db.connect(connector: connector, options: ConnectOptions(retryDelay: 0))
257+
await waitForStatus(db.currentStatus) { $0.connected }
258+
try #require(await connector.fetchCredentialsCalls == 2)
259+
}
260+
230261
@Test func tokenThrows() async throws {
231262
actor BackendConnector: PowerSyncBackendConnectorProtocol {
232263
var isFirstFetchCall = true

0 commit comments

Comments
 (0)