Skip to content

Commit 3cfc3a8

Browse files
authored
Merge pull request #154 from unsignedapps/fix/asynccurrentvalue-deadlock
Fix lock-order inversion deadlock in `AsyncCurrentValue`
2 parents c50af49 + 8a7dd58 commit 3cfc3a8

2 files changed

Lines changed: 102 additions & 19 deletions

File tree

Sources/Vexil/Utilities/AsyncCurrentValue.swift

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,7 @@ struct AsyncCurrentValue<Wrapped: Sendable> {
1919
// iterators start with generation = 0, so our initial value
2020
// has generation 1, so even that will be delivered.
2121
var generation = 1
22-
var wrappedValue: Wrapped {
23-
didSet {
24-
generation += 1
25-
for (_, continuation) in pendingContinuations {
26-
continuation.resume(returning: (generation, wrappedValue))
27-
}
28-
pendingContinuations = []
29-
}
30-
}
31-
22+
var wrappedValue: Wrapped
3223
var pendingContinuations = [(UUID, CheckedContinuation<(Int, Wrapped)?, Never>)]()
3324
}
3425

@@ -68,18 +59,39 @@ struct AsyncCurrentValue<Wrapped: Sendable> {
6859
/// - body: A closure that passes the current value as an in-out parameter that you can mutate.
6960
/// When the closure returns the mutated value is saved as the current value and is sent to all subscribers.
7061
///
71-
func update<R: Sendable>(_ body: (inout sending Wrapped) throws -> R) rethrows -> R {
72-
try allocation.mutex.withLock { state in
62+
func update<R: Sendable, E: Error>(_ body: (inout sending Wrapped) throws(E) -> R) throws(E) -> R {
63+
let result: Result<R, E>
64+
let generation: Int
65+
let pendingContinuations: [CheckedContinuation<(Int, Wrapped)?, Never>]
66+
let updatedValue: Wrapped
67+
68+
// If we resume continuations within the context of this lock we risk a deadlock
69+
// as they attempt to access the next value. So we do the update and return
70+
// pending continuations to be resumed outside the lock. It should be impossible
71+
// for new continuations to miss this generation as they're accessed and added
72+
// within the same lock closure.
73+
74+
(result, updatedValue, generation, pendingContinuations) = allocation.mutex.withLock { state in
75+
76+
// The closure mutates a copy, then we save that back to our state
7377
var wrappedValue = state.wrappedValue
74-
do {
75-
let result = try body(&wrappedValue)
76-
state.wrappedValue = wrappedValue
77-
return result
78-
} catch {
79-
state.wrappedValue = wrappedValue
80-
throw error
78+
let result = Result { () throws(E) -> R in
79+
try body(&wrappedValue)
8180
}
81+
state.wrappedValue = wrappedValue
82+
83+
// Bump generation and grab pending continuations
84+
state.generation += 1
85+
let toResume = state.pendingContinuations.map(\.1)
86+
state.pendingContinuations = []
87+
return (result, wrappedValue, state.generation, toResume)
88+
}
89+
90+
// Resume our pending continuations
91+
for continuation in pendingContinuations {
92+
continuation.resume(returning: (generation, updatedValue))
8293
}
94+
return try result.get()
8395
}
8496

8597
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Vexil open source project
4+
//
5+
// Copyright (c) 2026 Unsigned Apps and the open source contributors.
6+
// Licensed under the MIT license
7+
//
8+
// See LICENSE for license information
9+
//
10+
// SPDX-License-Identifier: MIT
11+
//
12+
//===----------------------------------------------------------------------===//
13+
14+
import Foundation
15+
import Testing
16+
@testable import Vexil
17+
18+
#if os(macOS)
19+
20+
struct AsyncCurrentValueTests {
21+
/// Regression test for a lock-order inversion deadlock between two threads:
22+
///
23+
/// Thread A (update): holds allocation.mutex → calls continuation.resume() inside didSet
24+
/// → resume() internally acquires the Swift task status lock
25+
///
26+
/// Thread B (cancel): acquires the Swift task status lock → fires onCancel: handler
27+
/// → onCancel: calls allocation.mutex.withLock → waits for mutex
28+
///
29+
/// Thread A holds mutex, wants task-lock.
30+
/// Thread B holds task-lock, wants mutex.
31+
/// → deadlock.
32+
///
33+
/// If the bug is present, the test will time out after 1 minute.
34+
@Test(.timeLimit(.minutes(1)))
35+
func `AsyncCurrentValue does not deadlock when cancellation races a concurrent update`() async {
36+
for _ in 0 ..< 100_000 {
37+
let currentValue = AsyncCurrentValue<FlagChange>(.all)
38+
39+
// This task will:
40+
// 1. Call next() once — returns the initial value immediately because
41+
// iterator.generation (0) < state.generation (1).
42+
// 2. Call next() again — suspends, storing its continuation in
43+
// pendingContinuations. This is the continuation that gets raced.
44+
let consumingTask = Task {
45+
var iterator = currentValue.stream.makeAsyncIterator()
46+
_ = await iterator.next(isolation: nil)
47+
_ = await iterator.next(isolation: nil)
48+
}
49+
50+
// Yield to give the consuming task time to advance past the first next()
51+
// and park its continuation inside pendingContinuations on the second call.
52+
await Task.yield()
53+
await Task.yield()
54+
await Task.yield()
55+
56+
// Fire the two racing operations:
57+
// updateTask — acquires allocation.mutex, sets wrappedValue, didSet calls
58+
// continuation.resume() while still inside withLock.
59+
// cancel — acquires the task status lock, fires onCancel:, which calls
60+
// allocation.mutex.withLock.
61+
let updateTask = Task.detached { currentValue.update { _ in } }
62+
consumingTask.cancel()
63+
64+
await updateTask.value
65+
await consumingTask.value
66+
}
67+
}
68+
69+
}
70+
71+
#endif

0 commit comments

Comments
 (0)