Skip to content

Commit 8a7dd58

Browse files
committed
Moved resumption of pending continuations to outside the mutex.
1 parent 0e608b5 commit 8a7dd58

1 file changed

Lines changed: 31 additions & 19 deletions

File tree

Sources/Vexil/Utilities/AsyncCurrentValue.swift

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,7 @@ struct AsyncCurrentValue<Wrapped: Sendable> {
1919
// iterators start with generation = 0, so our initial value
2020
// has generation 1, so even that will be delivered.
2121
var generation = 1
22-
var wrappedValue: Wrapped {
23-
didSet {
24-
generation += 1
25-
for (_, continuation) in pendingContinuations {
26-
continuation.resume(returning: (generation, wrappedValue))
27-
}
28-
pendingContinuations = []
29-
}
30-
}
31-
22+
var wrappedValue: Wrapped
3223
var pendingContinuations = [(UUID, CheckedContinuation<(Int, Wrapped)?, Never>)]()
3324
}
3425

@@ -68,18 +59,39 @@ struct AsyncCurrentValue<Wrapped: Sendable> {
6859
/// - body: A closure that passes the current value as an in-out parameter that you can mutate.
6960
/// When the closure returns the mutated value is saved as the current value and is sent to all subscribers.
7061
///
71-
func update<R: Sendable>(_ body: (inout sending Wrapped) throws -> R) rethrows -> R {
72-
try allocation.mutex.withLock { state in
62+
func update<R: Sendable, E: Error>(_ body: (inout sending Wrapped) throws(E) -> R) throws(E) -> R {
63+
let result: Result<R, E>
64+
let generation: Int
65+
let pendingContinuations: [CheckedContinuation<(Int, Wrapped)?, Never>]
66+
let updatedValue: Wrapped
67+
68+
// If we resume continuations within the context of this lock we risk a deadlock
69+
// as they attempt to access the next value. So we do the update and return
70+
// pending continuations to be resumed outside the lock. It should be impossible
71+
// for new continuations to miss this generation as they're accessed and added
72+
// within the same lock closure.
73+
74+
(result, updatedValue, generation, pendingContinuations) = allocation.mutex.withLock { state in
75+
76+
// The closure mutates a copy, then we save that back to our state
7377
var wrappedValue = state.wrappedValue
74-
do {
75-
let result = try body(&wrappedValue)
76-
state.wrappedValue = wrappedValue
77-
return result
78-
} catch {
79-
state.wrappedValue = wrappedValue
80-
throw error
78+
let result = Result { () throws(E) -> R in
79+
try body(&wrappedValue)
8180
}
81+
state.wrappedValue = wrappedValue
82+
83+
// Bump generation and grab pending continuations
84+
state.generation += 1
85+
let toResume = state.pendingContinuations.map(\.1)
86+
state.pendingContinuations = []
87+
return (result, wrappedValue, state.generation, toResume)
88+
}
89+
90+
// Resume our pending continuations
91+
for continuation in pendingContinuations {
92+
continuation.resume(returning: (generation, updatedValue))
8293
}
94+
return try result.get()
8395
}
8496

8597
}

0 commit comments

Comments
 (0)