Skip to content

Commit 81c7121

Browse files
committed
Properly use structured concurrency
1 parent f67be2a commit 81c7121

1 file changed

Lines changed: 13 additions & 11 deletions

File tree

Sources/PowerSync/Implementation/sync/StreamingSyncClient.swift

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,11 @@ The next upload iteration will be delayed.
156156

157157
while (!Task.isCancelled) {
158158
do {
159-
// This async let ensures each iteration is a task scoped to this block. This allows us to spawn
160-
// additional tasks in run() that would get cancelled when the main iteration is complete.
161-
async let iteration = ActiveSyncIteration(syncClient: self, signals: signals).run()
162-
163-
result = try await iteration
159+
try await withThrowingTaskGroup(of: Void.self) { group in
160+
let iteration = ActiveSyncIteration(syncClient: self, signals: signals)
161+
var group: ThrowingTaskGroup<Void, any Error>? = group
162+
result = try await iteration.run(group: &group)
163+
}
164164
} catch {
165165
result = SyncIterationResult()
166166

@@ -234,7 +234,7 @@ private struct ActiveSyncIteration: Sendable {
234234
self.signals = signals
235235
}
236236

237-
func run() async throws -> SyncIterationResult {
237+
func run(group: inout ThrowingTaskGroup<Void, any Error>?) async throws -> SyncIterationResult {
238238
// Notify the core extension for changed Sync Stream subscriptions, as we might have to reconnect.
239239
async let _ = watchSyncStreams()
240240
// Notify the core extension for completed crud uploads, as we might want to retry applying a
@@ -256,7 +256,7 @@ private struct ActiveSyncIteration: Sendable {
256256
let serviceEvents = try await syncClient.fetchSyncLines(request: request)
257257
controlArgs = AsyncAlgorithms.merge(serviceEvents, localEvents.subscribe())
258258
} else {
259-
try await self.execute(instr: instruction)
259+
try await self.execute(instr: instruction, group: &group)
260260
}
261261
}
262262

@@ -273,7 +273,7 @@ private struct ActiveSyncIteration: Sendable {
273273
return SyncIterationResult(hideDisconnect: hideDisconnect)
274274
}
275275

276-
try await execute(instr: instr)
276+
try await execute(instr: instr, group: &group)
277277
}
278278

279279
if !hadSyncLine && arg.isSyncLine() {
@@ -296,7 +296,9 @@ private struct ActiveSyncIteration: Sendable {
296296
return SyncIterationResult(hideDisconnect: hideDisconnect)
297297
}
298298

299-
try await execute(instr: instr)
299+
// Don't pass the task group here, stop instructions shouldn't spawn further async work.
300+
var group: ThrowingTaskGroup<Void, any Error>? = nil
301+
try await execute(instr: instr, group: &group)
300302
}
301303

302304
return SyncIterationResult()
@@ -311,7 +313,7 @@ private struct ActiveSyncIteration: Sendable {
311313
return try StreamingSyncClient.jsonDecoder.decode([Instruction].self, from: data)
312314
}
313315

314-
private func execute(instr: consuming Instruction) async throws {
316+
private func execute(instr: consuming Instruction, group: inout ThrowingTaskGroup<Void, any Error>?) async throws {
315317
switch (instr) {
316318
case .logLine(severity: let severity, line: let line):
317319
let logger = syncClient.db.logger
@@ -336,7 +338,7 @@ private struct ActiveSyncIteration: Sendable {
336338
if didExpire {
337339
await syncClient.invalidateCredentials()
338340
} else {
339-
Task {
341+
group?.addTask {
340342
do {
341343
let _ = try await syncClient.connector.fetchCredentials(allowCached: false)
342344
syncClient.db.logger.debug("Stopping because new credentials are available", tag: tag)

0 commit comments

Comments
 (0)