Skip to content

Commit 0cca086

Browse files
authored
refactor: rewrite export system with streaming and native macOS patterns (#782)
* refactor: rewrite export system with streaming and native macOS patterns * fix: export data correctness, atomic writes, and view layer cleanup * feat: native streaming for all built-in database drivers * fix: address review findings for export streaming rewrite * feat: native streaming for all registry database drivers * fix: progress reporting issues and registry driver streaming * fix: flush final row count before dismissing export progress dialog * fix: use unbounded buffer for AsyncThrowingStream to prevent row drops * fix: address all review findings, batch streaming for performance
1 parent c05d395 commit 0cca086

76 files changed

Lines changed: 3213 additions & 1043 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- Main editor window rewritten on AppKit (`NSWindowController` + `NSToolbar`) for faster tab opens and correct lifecycle
1313
- Toolbar layout follows Apple HIG (sidebar left, connection center, view actions right)
14+
- Export engine rewritten to use streaming row fetch instead of offset/limit pagination
15+
- Export progress integrates with macOS system progress
16+
- All export file writes use atomic operations for crash safety
1417

1518
### Fixed
1619

1720
- Cmd+W closing the connection window instead of clearing to empty state
1821
- ER Diagram and Server Dashboard replacing the current tab instead of opening a new one
1922
- Welcome window stealing focus on connect, disabling Cmd+T until manual click
2023
- Toolbar empty on second tab, menu shortcuts disabled after toolbar click
24+
- JSON export no longer coerces leading-zero strings to integers
25+
- XLSX export auto-splits tables exceeding 1,048,576 rows into multiple sheets
26+
- CSV formula injection guard corrected to OWASP-standard prefixes only
27+
- MQL export validates JSON values before passthrough
28+
- SQL export gzip compression is now async and cancellable
29+
- Export progress bar reliably reaches 100%
2130

2231
### Added
2332

Packages/TableProCore/Sources/TableProPluginKit/ExportFormatPlugin.swift

Lines changed: 45 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,35 @@ public struct PluginEnumTypeInfo: Sendable {
7777
}
7878
}
7979

80+
public typealias PluginRow = [String?]
81+
82+
public struct PluginStreamHeader: Sendable {
83+
public let columns: [String]
84+
public let columnTypeNames: [String]
85+
public let estimatedRowCount: Int?
86+
87+
public init(columns: [String], columnTypeNames: [String], estimatedRowCount: Int? = nil) {
88+
self.columns = columns
89+
self.columnTypeNames = columnTypeNames
90+
self.estimatedRowCount = estimatedRowCount
91+
}
92+
}
93+
94+
public enum PluginStreamElement: Sendable {
95+
case header(PluginStreamHeader)
96+
case rows([PluginRow])
97+
}
98+
99+
public struct ExportFormatResult: Sendable {
100+
public let warnings: [String]
101+
public init(warnings: [String] = []) {
102+
self.warnings = warnings
103+
}
104+
}
105+
80106
public protocol PluginExportDataSource: AnyObject, Sendable {
81107
var databaseTypeId: String { get }
82-
func fetchRows(table: String, databaseName: String, offset: Int, limit: Int) async throws -> PluginQueryResult
108+
func streamRows(table: String, databaseName: String) -> AsyncThrowingStream<PluginStreamElement, Error>
83109
func fetchTableDDL(table: String, databaseName: String) async throws -> String
84110
func execute(query: String) async throws -> PluginQueryResult
85111
func quoteIdentifier(_ identifier: String) -> String
@@ -95,100 +121,65 @@ public extension PluginExportDataSource {
95121
}
96122

97123
public final class PluginExportProgress: @unchecked Sendable {
98-
private let lock = NSLock()
99-
private var _currentTable: String = ""
100-
private var _currentTableIndex: Int = 0
101-
private var _processedRows: Int = 0
102-
private var _totalRows: Int = 0
103-
private var _statusMessage: String = ""
104-
private var _isCancelled: Bool = false
105-
124+
private let progress: Progress
106125
private let updateInterval: Int = 1_000
107126
private var internalRowCount: Int = 0
127+
private let lock = NSLock()
108128

109-
public var onUpdate: (@Sendable (String, Int, Int, Int, String) -> Void)?
110-
111-
public init() {}
129+
public init(progress: Progress) {
130+
self.progress = progress
131+
}
112132

113133
public func setCurrentTable(_ name: String, index: Int) {
114-
lock.lock()
115-
_currentTable = name
116-
_currentTableIndex = index
117-
lock.unlock()
118-
notifyUpdate()
134+
progress.localizedDescription = name
119135
}
120136

121137
public func incrementRow() {
122138
lock.lock()
123139
internalRowCount += 1
124-
_processedRows = internalRowCount
125-
let shouldNotify = internalRowCount % updateInterval == 0
140+
let count = internalRowCount
141+
let shouldNotify = count % updateInterval == 0
126142
lock.unlock()
127143
if shouldNotify {
128-
notifyUpdate()
144+
progress.completedUnitCount = Int64(count)
129145
}
130146
}
131147

132148
public func finalizeTable() {
133-
notifyUpdate()
134-
}
135-
136-
public func setTotalRows(_ count: Int) {
137149
lock.lock()
138-
_totalRows = count
150+
let count = internalRowCount
139151
lock.unlock()
152+
progress.completedUnitCount = Int64(count)
140153
}
141154

142155
public func setStatus(_ message: String) {
143-
lock.lock()
144-
_statusMessage = message
145-
lock.unlock()
146-
notifyUpdate()
156+
progress.localizedAdditionalDescription = message
147157
}
148158

149159
public func checkCancellation() throws {
150-
lock.lock()
151-
let cancelled = _isCancelled
152-
lock.unlock()
153-
if cancelled || Task.isCancelled {
160+
if progress.isCancelled || Task.isCancelled {
154161
throw PluginExportCancellationError()
155162
}
156163
}
157164

158165
public func cancel() {
159-
lock.lock()
160-
_isCancelled = true
161-
lock.unlock()
166+
progress.cancel()
162167
}
163168

164169
public var isCancelled: Bool {
165-
lock.lock()
166-
defer { lock.unlock() }
167-
return _isCancelled
170+
progress.isCancelled || Task.isCancelled
168171
}
169172

170173
public var processedRows: Int {
171174
lock.lock()
172175
defer { lock.unlock() }
173-
return _processedRows
176+
return internalRowCount
174177
}
175178

176179
public var totalRows: Int {
177-
lock.lock()
178-
defer { lock.unlock() }
179-
return _totalRows
180+
Int(progress.totalUnitCount)
180181
}
181182

182-
private func notifyUpdate() {
183-
lock.lock()
184-
let table = _currentTable
185-
let index = _currentTableIndex
186-
let rows = _processedRows
187-
let total = _totalRows
188-
let status = _statusMessage
189-
lock.unlock()
190-
onUpdate?(table, index, rows, total, status)
191-
}
192183
}
193184

194185
public protocol ExportFormatPlugin: TableProPlugin {
@@ -204,14 +195,13 @@ public protocol ExportFormatPlugin: TableProPlugin {
204195
func isTableExportable(optionValues: [Bool]) -> Bool
205196

206197
var currentFileExtension: String { get }
207-
var warnings: [String] { get }
208198

209199
func export(
210200
tables: [PluginExportTable],
211201
dataSource: any PluginExportDataSource,
212202
destination: URL,
213203
progress: PluginExportProgress
214-
) async throws
204+
) async throws -> ExportFormatResult
215205
}
216206

217207
public extension ExportFormatPlugin {
@@ -222,5 +212,4 @@ public extension ExportFormatPlugin {
222212
func defaultTableOptionValues() -> [Bool] { [] }
223213
func isTableExportable(optionValues: [Bool]) -> Bool { true }
224214
var currentFileExtension: String { Self.defaultFileExtension }
225-
var warnings: [String] { [] }
226215
}

Packages/TableProCore/Sources/TableProPluginKit/ImportFormatPlugin.swift

Lines changed: 17 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -66,87 +66,63 @@ public extension PluginImportDataSink {
6666
}
6767

6868
public final class PluginImportProgress: @unchecked Sendable {
69-
private let lock = NSLock()
70-
private var _processedStatements: Int = 0
71-
private var _estimatedTotalStatements: Int = 0
72-
private var _statusMessage: String = ""
73-
private var _isCancelled: Bool = false
74-
69+
private let progress: Progress
7570
private let updateInterval: Int = 500
7671
private var internalCount: Int = 0
72+
private let lock = NSLock()
7773

78-
public var onUpdate: (@Sendable (Int, Int, String) -> Void)?
79-
80-
public init() {}
74+
public init(progress: Progress) {
75+
self.progress = progress
76+
}
8177

8278
public func setEstimatedTotal(_ count: Int) {
83-
lock.lock()
84-
_estimatedTotalStatements = count
85-
lock.unlock()
79+
progress.totalUnitCount = Int64(count)
8680
}
8781

8882
public func incrementStatement() {
8983
lock.lock()
9084
internalCount += 1
91-
_processedStatements = internalCount
92-
let shouldNotify = internalCount % updateInterval == 0
85+
let count = internalCount
86+
let shouldNotify = count % updateInterval == 0
9387
lock.unlock()
9488
if shouldNotify {
95-
notifyUpdate()
89+
progress.completedUnitCount = Int64(count)
9690
}
9791
}
9892

9993
public func setStatus(_ message: String) {
100-
lock.lock()
101-
_statusMessage = message
102-
lock.unlock()
103-
notifyUpdate()
94+
progress.localizedAdditionalDescription = message
10495
}
10596

10697
public func checkCancellation() throws {
107-
lock.lock()
108-
let cancelled = _isCancelled
109-
lock.unlock()
110-
if cancelled || Task.isCancelled {
98+
if progress.isCancelled || Task.isCancelled {
11199
throw PluginImportCancellationError()
112100
}
113101
}
114102

115103
public func cancel() {
116-
lock.lock()
117-
_isCancelled = true
118-
lock.unlock()
104+
progress.cancel()
119105
}
120106

121107
public var isCancelled: Bool {
122-
lock.lock()
123-
defer { lock.unlock() }
124-
return _isCancelled
108+
progress.isCancelled || Task.isCancelled
125109
}
126110

127111
public var processedStatements: Int {
128112
lock.lock()
129113
defer { lock.unlock() }
130-
return _processedStatements
114+
return internalCount
131115
}
132116

133117
public var estimatedTotalStatements: Int {
134-
lock.lock()
135-
defer { lock.unlock() }
136-
return _estimatedTotalStatements
118+
Int(progress.totalUnitCount)
137119
}
138120

139121
public func finalize() {
140-
notifyUpdate()
141-
}
142-
143-
private func notifyUpdate() {
144122
lock.lock()
145-
let processed = _processedStatements
146-
let total = _estimatedTotalStatements
147-
let status = _statusMessage
123+
let count = internalCount
148124
lock.unlock()
149-
onUpdate?(processed, total, status)
125+
progress.completedUnitCount = Int64(count)
150126
}
151127
}
152128

Packages/TableProCore/Sources/TableProPluginKit/PluginDatabaseDriver.swift

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ public protocol PluginDatabaseDriver: AnyObject, Sendable {
170170

171171
// Default export query (optional)
172172
func defaultExportQuery(table: String) -> String?
173+
174+
// Streaming row fetch for export
175+
func streamRows(query: String) -> AsyncThrowingStream<PluginStreamElement, Error>
173176
}
174177

175178
// MARK: - Default Implementations
@@ -530,4 +533,41 @@ public extension PluginDatabaseDriver {
530533
func fetchRows(query: String, offset: Int, limit: Int) async throws -> PluginQueryResult {
531534
try await execute(query: "\(query) LIMIT \(limit) OFFSET \(offset)")
532535
}
536+
537+
func streamRows(query: String) -> AsyncThrowingStream<PluginStreamElement, Error> {
538+
AsyncThrowingStream(bufferingPolicy: .unbounded) { continuation in
539+
Task {
540+
do {
541+
let batchSize = 1_000
542+
let firstPage = try await fetchRows(query: query, offset: 0, limit: batchSize)
543+
continuation.yield(.header(PluginStreamHeader(
544+
columns: firstPage.columns,
545+
columnTypeNames: firstPage.columnTypeNames,
546+
estimatedRowCount: nil
547+
)))
548+
if !firstPage.rows.isEmpty {
549+
continuation.yield(.rows(firstPage.rows))
550+
}
551+
if firstPage.rows.count < batchSize {
552+
continuation.finish()
553+
return
554+
}
555+
await Task.yield()
556+
var offset = firstPage.rows.count
557+
while true {
558+
try Task.checkCancellation()
559+
let page = try await fetchRows(query: query, offset: offset, limit: batchSize)
560+
if page.rows.isEmpty { break }
561+
continuation.yield(.rows(page.rows))
562+
offset += page.rows.count
563+
if page.rows.count < batchSize { break }
564+
await Task.yield()
565+
}
566+
continuation.finish()
567+
} catch {
568+
continuation.finish(throwing: error)
569+
}
570+
}
571+
}
572+
}
533573
}

0 commit comments

Comments
 (0)