From 991bd57de7677f4d2126195bc3b4b643b67c349f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 15 Apr 2026 10:10:09 +0200 Subject: [PATCH 1/7] Implement crud batches in Swift instead of Kotlin --- CHANGELOG.md | 4 + .../Kotlin/KotlinPowerSyncDatabaseImpl.swift | 24 +++- .../PowerSync/Kotlin/db/KotlinCrudBatch.swift | 27 ---- .../PowerSync/Kotlin/db/KotlinCrudEntry.swift | 44 ------- .../Kotlin/db/KotlinCrudTransaction.swift | 22 ---- .../Kotlin/db/KotlinCrudTransactions.swift | 39 ------ .../Protocol/PowerSyncDatabaseProtocol.swift | 2 +- Sources/PowerSync/Protocol/db/CrudBatch.swift | 39 ++++-- Sources/PowerSync/Protocol/db/CrudEntry.swift | 118 +++++++++++++++--- .../Protocol/db/CrudTransaction.swift | 83 ++++++++++-- Sources/PowerSync/Protocol/db/JsonParam.swift | 37 ++++++ Tests/PowerSyncTests/JsonParamTests.swift | 28 +++++ 12 files changed, 297 insertions(+), 170 deletions(-) delete mode 100644 Sources/PowerSync/Kotlin/db/KotlinCrudBatch.swift delete mode 100644 Sources/PowerSync/Kotlin/db/KotlinCrudEntry.swift delete mode 100644 Sources/PowerSync/Kotlin/db/KotlinCrudTransaction.swift delete mode 100644 Sources/PowerSync/Kotlin/db/KotlinCrudTransactions.swift create mode 100644 Tests/PowerSyncTests/JsonParamTests.swift diff --git a/CHANGELOG.md b/CHANGELOG.md index fbe5a80..6eb4d7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.14.0 (unreleased) + +* + ## 1.13.1 * Don't attempt to create WebSocket connections on watchOS. diff --git a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift index faea484..5a729ce 100644 --- a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift +++ b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift @@ -73,16 +73,30 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol, } func getCrudBatch(limit: Int32 = 100) async throws -> CrudBatch? { - guard let base = try await kotlinDatabase.getCrudBatch(limit: limit) else { + var entries = try await getAll( + sql: "SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?", + parameters: [Int64(limit + 1)], + mapper: CrudEntry.fromCursor + ) + + if entries.isEmpty { return nil } - return try KotlinCrudBatch( - batch: base + + let hasMore = entries.count > limit + if hasMore { + entries.removeLast() + } + + return CrudBatch( + hasMore: hasMore, + crud: entries, + db: self ) } - func getCrudTransactions() -> any CrudTransactions { - return KotlinCrudTransactions(db: kotlinDatabase) + func getCrudTransactions() -> CrudTransactions { + return CrudTransactions(db: self) } func getPowerSyncVersion() async throws -> String { diff --git a/Sources/PowerSync/Kotlin/db/KotlinCrudBatch.swift b/Sources/PowerSync/Kotlin/db/KotlinCrudBatch.swift deleted file mode 100644 index f94c29c..0000000 --- a/Sources/PowerSync/Kotlin/db/KotlinCrudBatch.swift +++ /dev/null @@ -1,27 +0,0 @@ -import PowerSyncKotlin - -/// Implements `CrudBatch` using the Kotlin SDK -struct KotlinCrudBatch: CrudBatch { - let batch: PowerSyncKotlin.CrudBatch - let crud: [CrudEntry] - - init( - batch: PowerSyncKotlin.CrudBatch) - throws - { - self.batch = batch - self.crud = try batch.crud.map { try KotlinCrudEntry( - entry: $0 - ) } - } - - var hasMore: Bool { - batch.hasMore - } - - func complete( - writeCheckpoint: String? - ) async throws { - _ = try await batch.complete.invoke(p1: writeCheckpoint) - } -} diff --git a/Sources/PowerSync/Kotlin/db/KotlinCrudEntry.swift b/Sources/PowerSync/Kotlin/db/KotlinCrudEntry.swift deleted file mode 100644 index 6433854..0000000 --- a/Sources/PowerSync/Kotlin/db/KotlinCrudEntry.swift +++ /dev/null @@ -1,44 +0,0 @@ -import PowerSyncKotlin - -/// Implements `CrudEntry` using the KotlinSDK -struct KotlinCrudEntry : CrudEntry { - let entry: PowerSyncKotlin.CrudEntry - let op: UpdateType - - init ( - entry: PowerSyncKotlin.CrudEntry - ) throws { - self.entry = entry - self.op = try UpdateType.fromString(entry.op.name) - } - - var id: String { - entry.id - } - - var clientId: Int64 { - Int64(entry.clientId) - } - - var table: String { - entry.table - } - - var transactionId: Int64? { - entry.transactionId?.int64Value - } - - var metadata: String? { - entry.metadata - } - - var opData: [String : String?]? { - /// Kotlin represents this as Map, but this is - /// converted to [String: Any] by SKIEE - entry.opData?.mapValues { $0 as? String } - } - - var previousValues: [String : String?]? { - entry.previousValues?.mapValues { $0 as? String } - } -} diff --git a/Sources/PowerSync/Kotlin/db/KotlinCrudTransaction.swift b/Sources/PowerSync/Kotlin/db/KotlinCrudTransaction.swift deleted file mode 100644 index fe4e2f5..0000000 --- a/Sources/PowerSync/Kotlin/db/KotlinCrudTransaction.swift +++ /dev/null @@ -1,22 +0,0 @@ -import PowerSyncKotlin - -/// Implements `CrudTransaction` using the Kotlin SDK -struct KotlinCrudTransaction: CrudTransaction { - let transaction: PowerSyncKotlin.CrudTransaction - let crud: [CrudEntry] - - init(transaction: PowerSyncKotlin.CrudTransaction) throws { - self.transaction = transaction - self.crud = try transaction.crud.map { try KotlinCrudEntry( - entry: $0 - ) } - } - - var transactionId: Int64? { - transaction.transactionId?.int64Value - } - - func complete(writeCheckpoint: String?) async throws { - _ = try await transaction.complete.invoke(p1: writeCheckpoint) - } -} diff --git a/Sources/PowerSync/Kotlin/db/KotlinCrudTransactions.swift b/Sources/PowerSync/Kotlin/db/KotlinCrudTransactions.swift deleted file mode 100644 index 866e76e..0000000 --- a/Sources/PowerSync/Kotlin/db/KotlinCrudTransactions.swift +++ /dev/null @@ -1,39 +0,0 @@ -import PowerSyncKotlin - -struct KotlinCrudTransactions: CrudTransactions { - typealias Element = KotlinCrudTransaction - - private let db: KotlinPowerSyncDatabase - - init(db: KotlinPowerSyncDatabase) { - self.db = db - } - - public func makeAsyncIterator() -> CrudTransactionIterator { - let kotlinIterator = errorHandledCrudTransactions(db: self.db).makeAsyncIterator() - return CrudTransactionIterator(inner: kotlinIterator) - } - - struct CrudTransactionIterator: CrudTransactionsIterator { - private var inner: PowerSyncKotlin.SkieSwiftFlowIterator - - internal init(inner: PowerSyncKotlin.SkieSwiftFlowIterator) { - self.inner = inner - } - - public mutating func next() async throws -> KotlinCrudTransaction? { - if let innerTx = await self.inner.next() { - if let success = innerTx as? PowerSyncResult.Success { - let tx = success.value as! PowerSyncKotlin.CrudTransaction - return try KotlinCrudTransaction(transaction: tx) - } else if let failure = innerTx as? PowerSyncResult.Failure { - try throwPowerSyncException(exception: failure.exception) - } - - fatalError("unreachable") - } else { - return nil - } - } - } -} diff --git a/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift b/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift index 05e2477..a88020e 100644 --- a/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift +++ b/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift @@ -223,7 +223,7 @@ public protocol PowerSyncDatabaseProtocol: Queries, Sendable { /// ```Swift /// /// ``` - func getCrudTransactions() -> any CrudTransactions + func getCrudTransactions() -> CrudTransactions /// Convenience method to get the current version of PowerSync. func getPowerSyncVersion() async throws -> String diff --git a/Sources/PowerSync/Protocol/db/CrudBatch.swift b/Sources/PowerSync/Protocol/db/CrudBatch.swift index 6c770f4..1376b02 100644 --- a/Sources/PowerSync/Protocol/db/CrudBatch.swift +++ b/Sources/PowerSync/Protocol/db/CrudBatch.swift @@ -1,20 +1,29 @@ import Foundation -/// A transaction of client-side changes. -public protocol CrudBatch: Sendable { +/// A collection of client-side changes. +public struct CrudBatch: Sendable { /// Indicates if there are additional Crud items in the queue which are not included in this batch - var hasMore: Bool { get } + let hasMore: Bool /// List of client-side changes. - var crud: [any CrudEntry] { get } + let crud: [CrudEntry] + + private let db: PowerSyncDatabaseProtocol + + internal init(hasMore: Bool, crud: [CrudEntry], db: PowerSyncDatabaseProtocol) { + self.hasMore = hasMore + self.crud = crud + self.db = db + } /// Call to remove the changes from the local queue, once successfully uploaded. /// /// `writeCheckpoint` is optional. - func complete(writeCheckpoint: String?) async throws -} - -public extension CrudBatch { + func complete(writeCheckpoint: String?) async throws { + let lastId = crud.last!.clientId + try await completeCrudItems(self.db, lastId) + } + /// Call to remove the changes from the local queue, once successfully uploaded. func complete() async throws { try await self.complete( @@ -22,3 +31,17 @@ public extension CrudBatch { ) } } + +internal func completeCrudItems(_ db: any PowerSyncDatabaseProtocol, _ lastItemId: Int64, writeCheckpoint: String? = nil) async throws { + return try await db.writeTransaction { tx in + try tx.execute(sql: "DELETE FROM ps_crud WHERE id <= ?", parameters: [lastItemId]) + if writeCheckpoint != nil { + let hasCrud = (try tx.getOptional(sql: "SELECT 1 FROM ps_crud", parameters: nil) { cursor in () }) != nil + if !hasCrud { + try tx.execute(sql: "UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name = '$local'", parameters: [writeCheckpoint]) + return + } + } + try tx.execute(sql: "UPDATE ps_buckets SET target_op = 9223372036854775807 WHERE name = '$local'", parameters: nil) + } +} diff --git a/Sources/PowerSync/Protocol/db/CrudEntry.swift b/Sources/PowerSync/Protocol/db/CrudEntry.swift index 85ecfeb..32330d2 100644 --- a/Sources/PowerSync/Protocol/db/CrudEntry.swift +++ b/Sources/PowerSync/Protocol/db/CrudEntry.swift @@ -1,3 +1,5 @@ +import Foundation + /// Represents the type of CRUD update operation that can be performed on a row. public enum UpdateType: String, Codable, Sendable { /// A row has been inserted or replaced @@ -28,22 +30,22 @@ public enum UpdateType: String, Codable, Sendable { } /// Represents a CRUD (Create, Read, Update, Delete) entry in the system. -public protocol CrudEntry: Sendable { +public struct CrudEntry: Sendable { /// The unique identifier of the entry. - var id: String { get } - + let id: String + /// The client ID associated with the entry. - var clientId: Int64 { get } - + let clientId: Int64 + /// The type of update operation performed on the entry. - var op: UpdateType { get } + let op: UpdateType /// The name of the table where the entry resides. - var table: String { get } - + let table: String + /// The transaction ID associated with the entry, if any. - var transactionId: Int64? { get } - + let transactionId: Int64? + /// User-defined metadata that can be attached to writes. /// /// This is the value the `_metadata` column had when the write to the database was made, @@ -51,13 +53,101 @@ public protocol CrudEntry: Sendable { /// /// Note that the `_metadata` column and this field are only available when ``Table/trackMetadata`` /// is enabled. - var metadata: String? { get } - + let metadata: String? + /// The operation data associated with the entry, represented as a dictionary of column names to their values. - var opData: [String: String?]? { get } + let opDataTyped: JsonParam? + + /// The operation data associated with the entry, represented as a dictionary of column names to their values. + /// + /// Consider using ``CrudEntry/opDataTyped`` instead, which provides values as typed JSON. + var opData: [String: String?]? { + get { + opDataTyped?.mapValues { value in + do { + return try CrudEntry.jsonValueToString(value) + } catch { + return nil + } + } + } + } /// Previous values before this change. /// /// These values can be tracked for `UPDATE` statements when ``Table/trackPreviousValues`` is enabled. - var previousValues: [String: String?]? { get } + let previousValuesTyped: JsonParam? + + /// Previous values before this change. + /// + /// These values can be tracked for `UPDATE` statements when ``Table/trackPreviousValues`` is enabled. + /// + /// Consider using ``CrudEntry/previousValuesTyped`` instead, which provides values as typed JSON. + var previousValues: [String: String?]? { + get { + previousValuesTyped?.mapValues { value in + do { + return try CrudEntry.jsonValueToString(value) + } catch { + return nil + } + } + } + } + + private let nonExhaustive: Void // Prevent initialization outside of this package + + internal static func fromCursor(cursor: borrowing SqlCursor) throws -> CrudEntry { + let id = try cursor.getInt64(index: 0) + let txId = cursor.getInt64Optional(index: 1) + let data = try cursor.getString(index: 2) + + struct CrudJsonEntry: Decodable { + let id: String + let op: UpdateType + let data: JsonParam? + let type: String + let metadata: String? + let old: JsonParam? + } + + let decoder = JSONDecoder() + var entry: CrudJsonEntry + do { + entry = try decoder.decode(CrudJsonEntry.self, from: data.data(using: .utf8)!) + } catch { + throw error + } + + return CrudEntry( + id: entry.id, + clientId: id, + op: entry.op, + table: entry.type, + transactionId: txId, + metadata: entry.metadata, + opDataTyped: entry.data, + previousValuesTyped: entry.old, + nonExhaustive: () + ) + } + + private static func jsonValueToString(_ value: JsonValue?) throws -> String? { + try value.map { value in + switch (value) { + case .string(let value): + return value + case .int(let value): + return String(value) + case .double(let value): + return String(value) + case .bool(let value): + return String(value) + case .null: + return "null" + case .array(_), .object(_): + throw PowerSyncError.operationFailed(message: "Invalid array/object in CRUD data, should be string") + } + } + } } diff --git a/Sources/PowerSync/Protocol/db/CrudTransaction.swift b/Sources/PowerSync/Protocol/db/CrudTransaction.swift index 8a7dacf..4ce630e 100644 --- a/Sources/PowerSync/Protocol/db/CrudTransaction.swift +++ b/Sources/PowerSync/Protocol/db/CrudTransaction.swift @@ -1,22 +1,32 @@ import Foundation + /// A transaction of client-side changes. -public protocol CrudTransaction: Sendable { +public struct CrudTransaction: Sendable { /// Unique transaction id. /// /// If nil, this contains a list of changes recorded without an explicit transaction associated. - var transactionId: Int64? { get } + let transactionId: Int64 /// List of client-side changes. - var crud: [any CrudEntry] { get } + let crud: [CrudEntry] + + private let db: any PowerSyncDatabaseProtocol + + internal init(transactionId: Int64, crud: [CrudEntry], db: any PowerSyncDatabaseProtocol) { + self.transactionId = transactionId + self.crud = crud + self.db = db + } /// Call to remove the changes from the local queue, once successfully uploaded. /// /// `writeCheckpoint` is optional. - func complete(writeCheckpoint: String?) async throws -} - -public extension CrudTransaction { + func complete(writeCheckpoint: String?) async throws { + let id = self.crud.last!.clientId + try await completeCrudItems(db, id, writeCheckpoint: writeCheckpoint) + } + /// Call to remove the changes from the local queue, once successfully uploaded. func complete() async throws { try await self.complete( @@ -24,11 +34,64 @@ public extension CrudTransaction { ) } } - /// A sequence of crud transactions in a PowerSync database. /// /// For details, see ``PowerSyncDatabaseProtocol/getCrudTransactions()``. -public protocol CrudTransactions: AsyncSequence where Element: CrudTransaction, AsyncIterator: CrudTransactionsIterator {} +public struct CrudTransactions: AsyncSequence { + public typealias Element = CrudTransaction + public typealias AsyncIterator = CrudTransactionsIterator + + private let db: any PowerSyncDatabaseProtocol + + internal init(db: any PowerSyncDatabaseProtocol) { + self.db = db + } + + public func makeAsyncIterator() -> CrudTransactionsIterator { + CrudTransactionsIterator(db: db) + } +} /// The iterator returned by ``CrudTransactions``. -public protocol CrudTransactionsIterator: AsyncIteratorProtocol where Element: CrudTransaction {} +public struct CrudTransactionsIterator: AsyncIteratorProtocol { + public typealias Element = CrudTransaction + + private var lastItemId: Int64 = -1 + private let db: any PowerSyncDatabaseProtocol + + internal init(db: any PowerSyncDatabaseProtocol) { + self.db = db + } + + public mutating func next() async throws -> CrudTransaction? { + // Note: We try to avoid filtering on tx_id here because there's no index on that column. + // Starting at the first entry we want and then joining by rowid is more efficient. This is + // sound because there can't be concurrent write transactions, so transaction ids are + // increasing when we iterate over rowids. + let query = """ +WITH RECURSIVE crud_entries AS ( + SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?) + UNION ALL + SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud + INNER JOIN crud_entries ON crud_entries.id + 1 = rowid + WHERE crud_entries.tx_id = ps_crud.tx_id +) +SELECT * FROM crud_entries; +""" + + let items = try await db.getAll(sql: query, parameters: [lastItemId], mapper: CrudEntry.fromCursor) + if items.isEmpty { + return nil + } + + let txId = items.first!.transactionId + let lastId = items.last!.clientId + + lastItemId = lastId + return CrudTransaction( + transactionId: txId!, + crud: items, + db: db + ) + } +} diff --git a/Sources/PowerSync/Protocol/db/JsonParam.swift b/Sources/PowerSync/Protocol/db/JsonParam.swift index 85775ae..87ea5a6 100644 --- a/Sources/PowerSync/Protocol/db/JsonParam.swift +++ b/Sources/PowerSync/Protocol/db/JsonParam.swift @@ -24,6 +24,23 @@ public enum JsonValue: Codable, Sendable, Equatable { /// A JSON object containing key-value pairs where values are `JSONValue` instances. case object([String: JsonValue]) + public init(from decoder: any Decoder) throws { + let c = try decoder.singleValueContainer() + if c.decodeNil() { self = .null } + else if let b = try? c.decode(Bool.self) { self = .bool(b) } + else if let i = try? c.decode(Int.self) { self = .int(i) } + else if let d = try? c.decode(Double.self) { self = .double(d) } + else if let s = try? c.decode(String.self) { self = .string(s) } + else if let a = try? c.decode([JsonValue].self) { self = .array(a) } + else if let o = try? c.decode([String: JsonValue].self) { self = .object(o) } + else { + throw DecodingError.typeMismatch( + JsonValue.self, + .init(codingPath: decoder.codingPath, + debugDescription: "Expected any JSON value")) + } + } + /// Converts the `JSONValue` into a native Swift representation. /// /// - Returns: A corresponding Swift type (`String`, `Int`, `Double`, `Bool`, `nil`, `[Any]`, or `[String: Any]`), @@ -50,6 +67,26 @@ public enum JsonValue: Codable, Sendable, Equatable { return anyDict } } + + public func encode(to encoder: any Encoder) throws { + var c = encoder.singleValueContainer() + switch self { + case .string(let value): + try c.encode(value) + case .int(let value): + try c.encode(value) + case .double(let value): + try c.encode(value) + case .bool(let value): + try c.encode(value) + case .null: + try c.encodeNil() + case .array(let values): + try c.encode(values) + case .object(let object): + try c.encode(object) + } + } } /// A typealias representing a top-level JSON object with string keys and `JSONValue` values. diff --git a/Tests/PowerSyncTests/JsonParamTests.swift b/Tests/PowerSyncTests/JsonParamTests.swift new file mode 100644 index 0000000..ed9c975 --- /dev/null +++ b/Tests/PowerSyncTests/JsonParamTests.swift @@ -0,0 +1,28 @@ +import Foundation +import Testing +import PowerSync + +@Suite() +struct JsonValueTests { + @Test func canDecode() throws { + let decoder = JSONDecoder() + + try #require(try decoder.decode(JsonValue.self, from: "null".data(using: .utf8)!) == .null) + try #require(try decoder.decode(JsonValue.self, from: "123".data(using: .utf8)!) == .int(123)) + try #require(try decoder.decode(JsonValue.self, from: "123.45".data(using: .utf8)!) == .double(123.45)) + try #require(try decoder.decode(JsonValue.self, from: "\"123\"".data(using: .utf8)!) == .string("123")) + try #require(try decoder.decode(JsonValue.self, from: "[1,2,3]".data(using: .utf8)!) == .array([.int(1), .int(2), .int(3)])) + try #require(try decoder.decode(JsonValue.self, from: "{\"foo\": \"bar\"}".data(using: .utf8)!) == .object(["foo": .string("bar")])) + } + + @Test func canEncode() throws { + let encoder = JSONEncoder() + + try #require(String(data: try encoder.encode(JsonValue.null), encoding: .utf8) == "null") + try #require(String(data: try encoder.encode(JsonValue.int(123)), encoding: .utf8) == "123") + try #require(String(data: try encoder.encode(JsonValue.double(123.45)), encoding: .utf8) == "123.45") + try #require(String(data: try encoder.encode(JsonValue.string("123")), encoding: .utf8) == "\"123\"") + try #require(String(data: try encoder.encode(JsonValue.array([.int(1), .int(2), .int(3)])), encoding: .utf8) == "[1,2,3]") + try #require(String(data: try encoder.encode(JsonValue.object(["foo": .string("bar")])), encoding: .utf8) == "{\"foo\":\"bar\"}") + } +} From a2d0439e16108a51887e7f591c96d229c9c2cdf4 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 15 Apr 2026 10:18:33 +0200 Subject: [PATCH 2/7] Changelog, format --- CHANGELOG.md | 2 +- Sources/PowerSync/Protocol/db/CrudBatch.swift | 14 +++---- Sources/PowerSync/Protocol/db/CrudEntry.swift | 40 +++++++++---------- .../Protocol/db/CrudTransaction.swift | 23 ++++++----- 4 files changed, 40 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eb4d7e..60f0cbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## 1.14.0 (unreleased) -* +* Add `opDataTyped` and `previousValuesTyped` to `CrudEntry`, providing typed values instead of strings. ## 1.13.1 diff --git a/Sources/PowerSync/Protocol/db/CrudBatch.swift b/Sources/PowerSync/Protocol/db/CrudBatch.swift index 1376b02..ff03e01 100644 --- a/Sources/PowerSync/Protocol/db/CrudBatch.swift +++ b/Sources/PowerSync/Protocol/db/CrudBatch.swift @@ -3,13 +3,13 @@ import Foundation /// A collection of client-side changes. public struct CrudBatch: Sendable { /// Indicates if there are additional Crud items in the queue which are not included in this batch - let hasMore: Bool + public let hasMore: Bool /// List of client-side changes. - let crud: [CrudEntry] - + public let crud: [CrudEntry] + private let db: PowerSyncDatabaseProtocol - + internal init(hasMore: Bool, crud: [CrudEntry], db: PowerSyncDatabaseProtocol) { self.hasMore = hasMore self.crud = crud @@ -19,13 +19,13 @@ public struct CrudBatch: Sendable { /// Call to remove the changes from the local queue, once successfully uploaded. /// /// `writeCheckpoint` is optional. - func complete(writeCheckpoint: String?) async throws { + public func complete(writeCheckpoint: String?) async throws { let lastId = crud.last!.clientId try await completeCrudItems(self.db, lastId) } - + /// Call to remove the changes from the local queue, once successfully uploaded. - func complete() async throws { + public func complete() async throws { try await self.complete( writeCheckpoint: nil ) diff --git a/Sources/PowerSync/Protocol/db/CrudEntry.swift b/Sources/PowerSync/Protocol/db/CrudEntry.swift index 32330d2..2499fac 100644 --- a/Sources/PowerSync/Protocol/db/CrudEntry.swift +++ b/Sources/PowerSync/Protocol/db/CrudEntry.swift @@ -32,20 +32,20 @@ public enum UpdateType: String, Codable, Sendable { /// Represents a CRUD (Create, Read, Update, Delete) entry in the system. public struct CrudEntry: Sendable { /// The unique identifier of the entry. - let id: String - + public let id: String + /// The client ID associated with the entry. - let clientId: Int64 - + public let clientId: Int64 + /// The type of update operation performed on the entry. - let op: UpdateType + public let op: UpdateType /// The name of the table where the entry resides. - let table: String - + public let table: String + /// The transaction ID associated with the entry, if any. - let transactionId: Int64? - + public let transactionId: Int64? + /// User-defined metadata that can be attached to writes. /// /// This is the value the `_metadata` column had when the write to the database was made, @@ -53,15 +53,15 @@ public struct CrudEntry: Sendable { /// /// Note that the `_metadata` column and this field are only available when ``Table/trackMetadata`` /// is enabled. - let metadata: String? - + public let metadata: String? + /// The operation data associated with the entry, represented as a dictionary of column names to their values. - let opDataTyped: JsonParam? - + public let opDataTyped: JsonParam? + /// The operation data associated with the entry, represented as a dictionary of column names to their values. /// /// Consider using ``CrudEntry/opDataTyped`` instead, which provides values as typed JSON. - var opData: [String: String?]? { + public var opData: [String: String?]? { get { opDataTyped?.mapValues { value in do { @@ -76,14 +76,14 @@ public struct CrudEntry: Sendable { /// Previous values before this change. /// /// These values can be tracked for `UPDATE` statements when ``Table/trackPreviousValues`` is enabled. - let previousValuesTyped: JsonParam? - + public let previousValuesTyped: JsonParam? + /// Previous values before this change. /// /// These values can be tracked for `UPDATE` statements when ``Table/trackPreviousValues`` is enabled. /// /// Consider using ``CrudEntry/previousValuesTyped`` instead, which provides values as typed JSON. - var previousValues: [String: String?]? { + public var previousValues: [String: String?]? { get { previousValuesTyped?.mapValues { value in do { @@ -101,7 +101,7 @@ public struct CrudEntry: Sendable { let id = try cursor.getInt64(index: 0) let txId = cursor.getInt64Optional(index: 1) let data = try cursor.getString(index: 2) - + struct CrudJsonEntry: Decodable { let id: String let op: UpdateType @@ -110,7 +110,7 @@ public struct CrudEntry: Sendable { let metadata: String? let old: JsonParam? } - + let decoder = JSONDecoder() var entry: CrudJsonEntry do { @@ -131,7 +131,7 @@ public struct CrudEntry: Sendable { nonExhaustive: () ) } - + private static func jsonValueToString(_ value: JsonValue?) throws -> String? { try value.map { value in switch (value) { diff --git a/Sources/PowerSync/Protocol/db/CrudTransaction.swift b/Sources/PowerSync/Protocol/db/CrudTransaction.swift index 4ce630e..4059e02 100644 --- a/Sources/PowerSync/Protocol/db/CrudTransaction.swift +++ b/Sources/PowerSync/Protocol/db/CrudTransaction.swift @@ -6,13 +6,13 @@ public struct CrudTransaction: Sendable { /// Unique transaction id. /// /// If nil, this contains a list of changes recorded without an explicit transaction associated. - let transactionId: Int64 + public let transactionId: Int64 /// List of client-side changes. - let crud: [CrudEntry] - + public let crud: [CrudEntry] + private let db: any PowerSyncDatabaseProtocol - + internal init(transactionId: Int64, crud: [CrudEntry], db: any PowerSyncDatabaseProtocol) { self.transactionId = transactionId self.crud = crud @@ -22,25 +22,26 @@ public struct CrudTransaction: Sendable { /// Call to remove the changes from the local queue, once successfully uploaded. /// /// `writeCheckpoint` is optional. - func complete(writeCheckpoint: String?) async throws { + public func complete(writeCheckpoint: String?) async throws { let id = self.crud.last!.clientId try await completeCrudItems(db, id, writeCheckpoint: writeCheckpoint) } - + /// Call to remove the changes from the local queue, once successfully uploaded. - func complete() async throws { + public func complete() async throws { try await self.complete( writeCheckpoint: nil ) } } + /// A sequence of crud transactions in a PowerSync database. /// /// For details, see ``PowerSyncDatabaseProtocol/getCrudTransactions()``. public struct CrudTransactions: AsyncSequence { public typealias Element = CrudTransaction public typealias AsyncIterator = CrudTransactionsIterator - + private let db: any PowerSyncDatabaseProtocol internal init(db: any PowerSyncDatabaseProtocol) { @@ -78,15 +79,15 @@ WITH RECURSIVE crud_entries AS ( ) SELECT * FROM crud_entries; """ - + let items = try await db.getAll(sql: query, parameters: [lastItemId], mapper: CrudEntry.fromCursor) if items.isEmpty { return nil } - + let txId = items.first!.transactionId let lastId = items.last!.clientId - + lastItemId = lastId return CrudTransaction( transactionId: txId!, From 7a093246375bc79a5b861d490de9537862750dc8 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 15 Apr 2026 10:38:01 +0200 Subject: [PATCH 3/7] AI feedback --- Sources/PowerSync/Protocol/db/CrudEntry.swift | 7 +------ Sources/PowerSync/Protocol/db/CrudTransaction.swift | 4 ++-- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/Sources/PowerSync/Protocol/db/CrudEntry.swift b/Sources/PowerSync/Protocol/db/CrudEntry.swift index 2499fac..51dac38 100644 --- a/Sources/PowerSync/Protocol/db/CrudEntry.swift +++ b/Sources/PowerSync/Protocol/db/CrudEntry.swift @@ -112,12 +112,7 @@ public struct CrudEntry: Sendable { } let decoder = JSONDecoder() - var entry: CrudJsonEntry - do { - entry = try decoder.decode(CrudJsonEntry.self, from: data.data(using: .utf8)!) - } catch { - throw error - } + let entry = try decoder.decode(CrudJsonEntry.self, from: data.data(using: .utf8)!) return CrudEntry( id: entry.id, diff --git a/Sources/PowerSync/Protocol/db/CrudTransaction.swift b/Sources/PowerSync/Protocol/db/CrudTransaction.swift index 4059e02..243b690 100644 --- a/Sources/PowerSync/Protocol/db/CrudTransaction.swift +++ b/Sources/PowerSync/Protocol/db/CrudTransaction.swift @@ -6,7 +6,7 @@ public struct CrudTransaction: Sendable { /// Unique transaction id. /// /// If nil, this contains a list of changes recorded without an explicit transaction associated. - public let transactionId: Int64 + public let transactionId: Int64? /// List of client-side changes. public let crud: [CrudEntry] @@ -90,7 +90,7 @@ SELECT * FROM crud_entries; lastItemId = lastId return CrudTransaction( - transactionId: txId!, + transactionId: txId, crud: items, db: db ) From d0cecacb33c35f1d5bc56450d30f48c64e57f54a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 15 Apr 2026 10:40:16 +0200 Subject: [PATCH 4/7] typo --- Sources/PowerSync/Protocol/db/CrudTransaction.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/PowerSync/Protocol/db/CrudTransaction.swift b/Sources/PowerSync/Protocol/db/CrudTransaction.swift index 243b690..15285b4 100644 --- a/Sources/PowerSync/Protocol/db/CrudTransaction.swift +++ b/Sources/PowerSync/Protocol/db/CrudTransaction.swift @@ -13,7 +13,7 @@ public struct CrudTransaction: Sendable { private let db: any PowerSyncDatabaseProtocol - internal init(transactionId: Int64, crud: [CrudEntry], db: any PowerSyncDatabaseProtocol) { + internal init(transactionId: Int64?, crud: [CrudEntry], db: any PowerSyncDatabaseProtocol) { self.transactionId = transactionId self.crud = crud self.db = db From 72e99f0d3fc7ff66fdbd432b4e072b03da9d1bba Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 22 Apr 2026 17:24:11 +0200 Subject: [PATCH 5/7] Move to protocol extension --- .../Kotlin/KotlinPowerSyncDatabaseImpl.swift | 27 ------ .../Protocol/PowerSyncDatabaseProtocol.swift | 89 +++++++++++-------- 2 files changed, 53 insertions(+), 63 deletions(-) diff --git a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift index 5a729ce..1fc57ac 100644 --- a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift +++ b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift @@ -72,33 +72,6 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol, ) } - func getCrudBatch(limit: Int32 = 100) async throws -> CrudBatch? { - var entries = try await getAll( - sql: "SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?", - parameters: [Int64(limit + 1)], - mapper: CrudEntry.fromCursor - ) - - if entries.isEmpty { - return nil - } - - let hasMore = entries.count > limit - if hasMore { - entries.removeLast() - } - - return CrudBatch( - hasMore: hasMore, - crud: entries, - db: self - ) - } - - func getCrudTransactions() -> CrudTransactions { - return CrudTransactions(db: self) - } - func getPowerSyncVersion() async throws -> String { try await kotlinDatabase.getPowerSyncVersion() } diff --git a/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift b/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift index a88020e..392cf7a 100644 --- a/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift +++ b/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift @@ -191,40 +191,6 @@ public protocol PowerSyncDatabaseProtocol: Queries, Sendable { options: ConnectOptions? ) async throws - /// Get a batch of crud data to upload. - /// - /// Returns nil if there is no data to upload. - /// - /// Use this from the `PowerSyncBackendConnector.uploadData` callback. - /// - /// Once the data have been successfully uploaded, call `CrudBatch.complete` before - /// requesting the next batch. - /// - /// - Parameter limit: Maximum number of updates to return in a single batch. Default is 100. - /// - /// This method does include transaction ids in the result, but does not group - /// data by transaction. One batch may contain data from multiple transactions, - /// and a single transaction may be split over multiple batches. - func getCrudBatch(limit: Int32) async throws -> CrudBatch? - - /// Obtains an async iterator of completed transactions with local writes against the database. - /// - /// This is typically used from the ``PowerSyncBackendConnectorProtocol/uploadData(database:)`` callback. - /// Each entry emitted by teh returned flow is a full transaction containing all local writes made while that transaction was - /// active. - /// - /// Unlike ``getNextCrudTransaction()``, which always returns the oldest transaction that hasn't been - /// ``CrudTransaction/complete()``d yet, this iterator can be used to upload multiple transactions. - /// Calling ``CrudTransaction/complete()`` will mark that and all prior transactions returned by this iterator as - /// completed. - /// - /// This can be used to upload multiple transactions in a single batch, e.g. with - /// - /// ```Swift - /// - /// ``` - func getCrudTransactions() -> CrudTransactions - /// Convenience method to get the current version of PowerSync. func getPowerSyncVersion() async throws -> String @@ -343,9 +309,60 @@ public extension PowerSyncDatabaseProtocol { try await disconnectAndClear(clearLocal: true, soft: soft) } + /// Get a batch of crud data to upload. + /// + /// Returns nil if there is no data to upload. + /// + /// Use this from the `PowerSyncBackendConnector.uploadData` callback. + /// + /// Once the data have been successfully uploaded, call `CrudBatch.complete` before + /// requesting the next batch. + /// + /// - Parameter limit: Maximum number of updates to return in a single batch. Default is 100. + /// + /// This method does include transaction ids in the result, but does not group + /// data by transaction. One batch may contain data from multiple transactions, + /// and a single transaction may be split over multiple batches. func getCrudBatch(limit: Int32 = 100) async throws -> CrudBatch? { - try await getCrudBatch( - limit: limit + var entries = try await getAll( + sql: "SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?", + parameters: [Int64(limit + 1)], + mapper: CrudEntry.fromCursor ) + + if entries.isEmpty { + return nil + } + + let hasMore = entries.count > limit + if hasMore { + entries.removeLast() + } + + return CrudBatch( + hasMore: hasMore, + crud: entries, + db: self + ) + } + + /// Obtains an async iterator of completed transactions with local writes against the database. + /// + /// This is typically used from the ``PowerSyncBackendConnectorProtocol/uploadData(database:)`` callback. + /// Each entry emitted by teh returned flow is a full transaction containing all local writes made while that transaction was + /// active. + /// + /// Unlike ``getNextCrudTransaction()``, which always returns the oldest transaction that hasn't been + /// ``CrudTransaction/complete()``d yet, this iterator can be used to upload multiple transactions. + /// Calling ``CrudTransaction/complete()`` will mark that and all prior transactions returned by this iterator as + /// completed. + /// + /// This can be used to upload multiple transactions in a single batch, e.g. with + /// + /// ```Swift + /// + /// ``` + func getCrudTransactions() -> CrudTransactions { + CrudTransactions(db: self) } } From 5dd456ee45629309faaf80b8863b262d401b8cb3 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 23 Apr 2026 13:39:36 +0200 Subject: [PATCH 6/7] Review feedback --- CHANGELOG.md | 1 + Sources/PowerSync/Protocol/db/CrudBatch.swift | 11 ++--------- Sources/PowerSync/Protocol/db/CrudEntry.swift | 11 ++++++++--- Tests/PowerSyncTests/CrudTests.swift | 15 +++++++++++++++ 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60f0cbc..4244c41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.14.0 (unreleased) * Add `opDataTyped` and `previousValuesTyped` to `CrudEntry`, providing typed values instead of strings. +* Make `CrudBatch`, `CrudEntry` and `CrudTransaction` a concrete struct. Note that these can no longer be created in user code. ## 1.13.1 diff --git a/Sources/PowerSync/Protocol/db/CrudBatch.swift b/Sources/PowerSync/Protocol/db/CrudBatch.swift index ff03e01..1147d58 100644 --- a/Sources/PowerSync/Protocol/db/CrudBatch.swift +++ b/Sources/PowerSync/Protocol/db/CrudBatch.swift @@ -19,16 +19,9 @@ public struct CrudBatch: Sendable { /// Call to remove the changes from the local queue, once successfully uploaded. /// /// `writeCheckpoint` is optional. - public func complete(writeCheckpoint: String?) async throws { + public func complete(writeCheckpoint: String? = nil) async throws { let lastId = crud.last!.clientId - try await completeCrudItems(self.db, lastId) - } - - /// Call to remove the changes from the local queue, once successfully uploaded. - public func complete() async throws { - try await self.complete( - writeCheckpoint: nil - ) + try await completeCrudItems(self.db, lastId, writeCheckpoint: writeCheckpoint) } } diff --git a/Sources/PowerSync/Protocol/db/CrudEntry.swift b/Sources/PowerSync/Protocol/db/CrudEntry.swift index 51dac38..4827468 100644 --- a/Sources/PowerSync/Protocol/db/CrudEntry.swift +++ b/Sources/PowerSync/Protocol/db/CrudEntry.swift @@ -112,7 +112,10 @@ public struct CrudEntry: Sendable { } let decoder = JSONDecoder() - let entry = try decoder.decode(CrudJsonEntry.self, from: data.data(using: .utf8)!) + guard let jsonData = data.data(using: .utf8) else { + throw PowerSyncError.operationFailed(message: "Invalid UTF-8 in CRUD entry") + } + let entry = try decoder.decode(CrudJsonEntry.self, from: jsonData) return CrudEntry( id: entry.id, @@ -128,7 +131,9 @@ public struct CrudEntry: Sendable { } private static func jsonValueToString(_ value: JsonValue?) throws -> String? { - try value.map { value in + // Older versions of the SDK were only able to export these values as strings, so we convert for + // backwards compatibility. + try value.flatMap { value in switch (value) { case .string(let value): return value @@ -139,7 +144,7 @@ public struct CrudEntry: Sendable { case .bool(let value): return String(value) case .null: - return "null" + return nil case .array(_), .object(_): throw PowerSyncError.operationFailed(message: "Invalid array/object in CRUD data, should be string") } diff --git a/Tests/PowerSyncTests/CrudTests.swift b/Tests/PowerSyncTests/CrudTests.swift index 5eb2872..b84e694 100644 --- a/Tests/PowerSyncTests/CrudTests.swift +++ b/Tests/PowerSyncTests/CrudTests.swift @@ -318,4 +318,19 @@ final class CrudTests: XCTestCase { XCTAssertEqual(write.opData?["name"], "updated_name") XCTAssertEqual(write.previousValues?["name"], "user") } + + func testCustomWriteCheckpoints() async throws { + try await database.execute( + sql: "INSERT INTO users (id, name) VALUES (uuid(), 'a')", + parameters: [] + ) + + let tx = try await database.getNextCrudTransaction()! + try await tx.complete(writeCheckpoint: "123") + + let targetOp = try await database.get("SELECT target_op FROM ps_buckets WHERE name = '$local'") { + try $0.getInt(index: 0) + } + XCTAssertEqual(targetOp, 123) + } } From ddfc5175d36a38ab997d3d15439b139a93f78700 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 23 Apr 2026 13:40:57 +0200 Subject: [PATCH 7/7] Also test crud batch --- Tests/PowerSyncTests/CrudTests.swift | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Tests/PowerSyncTests/CrudTests.swift b/Tests/PowerSyncTests/CrudTests.swift index b84e694..47ac61c 100644 --- a/Tests/PowerSyncTests/CrudTests.swift +++ b/Tests/PowerSyncTests/CrudTests.swift @@ -332,5 +332,16 @@ final class CrudTests: XCTestCase { try $0.getInt(index: 0) } XCTAssertEqual(targetOp, 123) + + try await database.execute( + sql: "INSERT INTO users (id, name) VALUES (uuid(), 'a')", + parameters: [] + ) + let batch = try await database.getCrudBatch()! + try await batch.complete(writeCheckpoint: "124") + let newTargetOp = try await database.get("SELECT target_op FROM ps_buckets WHERE name = '$local'") { + try $0.getInt(index: 0) + } + XCTAssertEqual(newTargetOp, 124) } }