Skip to content

Commit c4fa3e6

Browse files
committed
Fix remaining async vsock lifetime bugs
1 parent 633b8b2 commit c4fa3e6

11 files changed

Lines changed: 456 additions & 73 deletions

Sources/Containerization/LinuxContainer.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,7 @@ extension LinuxContainer {
969969
}
970970

971971
/// Dial a vsock port in the container.
972-
public func dialVsock(port: UInt32) async throws -> FileHandle {
972+
public func dialVsock(port: UInt32) async throws -> VsockConnection {
973973
try await self.state.withLock {
974974
let state = try $0.startedState("dialVsock")
975975
return try await state.vm.dial(port)
@@ -1098,7 +1098,7 @@ extension LinuxContainer {
10981098
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
10991099
self.copyQueue.async {
11001100
do {
1101-
defer { conn.closeFile() }
1101+
defer { try? conn.close() }
11021102

11031103
if isArchive {
11041104
let writer = try ArchiveWriter(configuration: .init(format: .pax, filter: .gzip))
@@ -1209,7 +1209,7 @@ extension LinuxContainer {
12091209
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
12101210
self.copyQueue.async {
12111211
do {
1212-
defer { conn.closeFile() }
1212+
defer { try? conn.close() }
12131213

12141214
if metadata.isArchive {
12151215
try FileManager.default.createDirectory(at: destination, withIntermediateDirectories: true)

Sources/Containerization/LinuxPod.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,7 @@ extension LinuxPod {
850850
}
851851

852852
/// Dial a vsock port in the pod's VM.
853-
public func dialVsock(port: UInt32) async throws -> FileHandle {
853+
public func dialVsock(port: UInt32) async throws -> VsockConnection {
854854
try await self.state.withLock { state in
855855
let createdState = try state.phase.createdState("dialVsock")
856856
return try await createdState.vm.dial(port)

Sources/Containerization/LinuxProcess.swift

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,26 +48,44 @@ public final class LinuxProcess: Sendable {
4848
}
4949

5050
private struct StdioHandles: Sendable {
51-
var stdin: FileHandle?
52-
var stdout: FileHandle?
53-
var stderr: FileHandle?
51+
var stdin: VsockConnection?
52+
var stdout: VsockConnection?
53+
var stderr: VsockConnection?
5454

5555
mutating func close() throws {
56+
var firstError: Error?
57+
5658
if let stdin {
57-
try stdin.close()
5859
stdin.readabilityHandler = nil
60+
do {
61+
try stdin.close()
62+
} catch {
63+
firstError = firstError ?? error
64+
}
5965
self.stdin = nil
6066
}
6167
if let stdout {
62-
try stdout.close()
6368
stdout.readabilityHandler = nil
69+
do {
70+
try stdout.close()
71+
} catch {
72+
firstError = firstError ?? error
73+
}
6474
self.stdout = nil
6575
}
6676
if let stderr {
67-
try stderr.close()
6877
stderr.readabilityHandler = nil
78+
do {
79+
try stderr.close()
80+
} catch {
81+
firstError = firstError ?? error
82+
}
6983
self.stderr = nil
7084
}
85+
86+
if let firstError {
87+
throw firstError
88+
}
7189
}
7290
}
7391

@@ -124,10 +142,10 @@ public final class LinuxProcess: Sendable {
124142
}
125143

126144
extension LinuxProcess {
127-
func setupIO(listeners: [VsockListener?]) async throws -> [FileHandle?] {
145+
func setupIO(listeners: [VsockListener?]) async throws -> [VsockConnection?] {
128146
let handles = try await Timeout.run(seconds: 3) {
129-
try await withThrowingTaskGroup(of: (Int, FileHandle?).self) { group in
130-
var results = [FileHandle?](repeating: nil, count: 3)
147+
try await withThrowingTaskGroup(of: (Int, VsockConnection?).self) { group in
148+
var results = [VsockConnection?](repeating: nil, count: 3)
131149

132150
for (index, listener) in listeners.enumerated() {
133151
guard let listener else { continue }
@@ -196,7 +214,7 @@ extension LinuxProcess {
196214
return handles
197215
}
198216

199-
func startStdinRelay(handle: FileHandle) {
217+
func startStdinRelay(handle: VsockConnection) {
200218
guard let stdin = self.ioSetup.stdin else { return }
201219

202220
self.state.withLock {

Sources/Containerization/UnixSocketRelay.swift

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,13 @@ package final class UnixSocketRelay: Sendable {
2929
private let log: Logger?
3030
private let state: Mutex<State>
3131

32+
private struct ActiveRelay: Sendable {
33+
let relay: BidirectionalRelay
34+
let guestConnection: VsockConnection
35+
}
36+
3237
private struct State {
33-
var activeRelays: [String: BidirectionalRelay] = [:]
38+
var activeRelays: [String: ActiveRelay] = [:]
3439
var t: Task<(), Never>? = nil
3540
var listener: VsockListener? = nil
3641
}
@@ -75,10 +80,9 @@ extension UnixSocketRelay {
7580
}
7681
t.cancel()
7782
$0.t = nil
78-
for (_, relay) in $0.activeRelays {
79-
relay.stop()
83+
for (_, activeRelay) in $0.activeRelays {
84+
activeRelay.relay.stop()
8085
}
81-
$0.activeRelays.removeAll()
8286

8387
switch configuration.direction {
8488
case .outOf:
@@ -170,12 +174,12 @@ extension UnixSocketRelay {
170174
"initiating connection from host to guest",
171175
metadata: [
172176
"vport": "\(port)",
173-
"hostFd": "\(guestConn.fileDescriptor)",
174-
"guestFd": "\(hostConn.fileDescriptor)",
177+
"hostFd": "\(hostConn.fileDescriptor)",
178+
"guestFd": "\(guestConn.fileDescriptor)",
175179
])
176180
try await self.relay(
177181
hostConn: hostConn,
178-
guestFd: guestConn.fileDescriptor
182+
guestConn: guestConn
179183
)
180184
} catch {
181185
log?.error("failed to relay between vsock \(port) and \(hostConn)")
@@ -184,7 +188,7 @@ extension UnixSocketRelay {
184188
}
185189

186190
private func handleGuestVsockConn(
187-
vsockConn: FileHandle,
191+
vsockConn: VsockConnection,
188192
hostConnectionPath: URL,
189193
port: UInt32,
190194
log: Logger?
@@ -207,7 +211,7 @@ extension UnixSocketRelay {
207211
do {
208212
try await self.relay(
209213
hostConn: hostSocket,
210-
guestFd: vsockConn.fileDescriptor
214+
guestConn: vsockConn
211215
)
212216
} catch {
213217
log?.error("failed to relay between vsock \(port) and \(hostPath)")
@@ -216,9 +220,13 @@ extension UnixSocketRelay {
216220

217221
private func relay(
218222
hostConn: Socket,
219-
guestFd: Int32
223+
guestConn: VsockConnection
220224
) async throws {
221225
let hostFd = hostConn.fileDescriptor
226+
let guestFd = dup(guestConn.fileDescriptor)
227+
if guestFd == -1 {
228+
throw POSIXError.fromErrno()
229+
}
222230

223231
let relayID = UUID().uuidString
224232
let relay = BidirectionalRelay(
@@ -229,9 +237,21 @@ extension UnixSocketRelay {
229237
)
230238

231239
state.withLock {
232-
$0.activeRelays[relayID] = relay
240+
// Retain the original connection until the relay has fully completed.
241+
// The relay owns its duplicated fd and will close it itself.
242+
$0.activeRelays[relayID] = ActiveRelay(
243+
relay: relay,
244+
guestConnection: guestConn
245+
)
233246
}
234247

235248
relay.start()
249+
250+
Task {
251+
await relay.waitForCompletion()
252+
let _ = self.state.withLock {
253+
$0.activeRelays.removeValue(forKey: relayID)
254+
}
255+
}
236256
}
237257
}

Sources/Containerization/VZVirtualMachine+Helpers.swift

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -141,27 +141,22 @@ extension VZVirtualMachine {
141141
}
142142

143143
extension VZVirtioSocketConnection {
144-
/// Duplicates the file descriptor and immediately closes the connection.
144+
/// Duplicates the file descriptor and retains the originating vsock connection
145+
/// until the returned connection is closed or deallocated.
145146
///
146-
/// Only safe when the returned fd is used synchronously before any
147-
/// suspension point. For deferred use (e.g., gRPC/NIO), use
148-
/// ``dupFileDescriptor()`` and keep the connection alive via
149-
/// ``VsockTransport``.
150-
func dupHandle() throws -> FileHandle {
151-
let fd = dup(self.fileDescriptor)
152-
if fd == -1 {
153-
throw POSIXError.fromErrno()
154-
}
155-
self.close()
156-
return FileHandle(fileDescriptor: fd, closeOnDealloc: false)
147+
/// Use this for file descriptors which cross an async boundary or may not be
148+
/// consumed immediately by the caller.
149+
func retainedConnection() throws -> VsockConnection {
150+
try VsockConnection(connection: self)
157151
}
158152

159153
/// Duplicates the connection's file descriptor without closing the connection.
160154
///
161155
/// The caller must keep the `VZVirtioSocketConnection` alive until the dup'd
162156
/// descriptor is no longer needed. The Virtualization framework tears down the
163157
/// vsock endpoint when the connection is closed, which invalidates dup'd
164-
/// descriptors.
158+
/// descriptors. This is intended for callers which manage lifetime separately,
159+
/// such as gRPC transports stored on `Vminitd`.
165160
func dupFileDescriptor() throws -> FileHandle {
166161
let fd = dup(self.fileDescriptor)
167162
if fd == -1 {

Sources/Containerization/VZVirtualMachineInstance.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,14 +202,14 @@ extension VZVirtualMachineInstance: VirtualMachineInstance {
202202
}
203203
}
204204

205-
func dial(_ port: UInt32) async throws -> FileHandle {
205+
func dial(_ port: UInt32) async throws -> VsockConnection {
206206
try await lock.withLock { _ in
207207
do {
208208
let conn = try await vm.connect(
209209
queue: queue,
210210
port: port
211211
)
212-
return try conn.dupHandle()
212+
return try conn.retainedConnection()
213213
} catch {
214214
if let err = error as? ContainerizationError {
215215
throw err

Sources/Containerization/VirtualMachineInstance.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public protocol VirtualMachineInstance: Sendable {
3838
/// what port the agent is listening on.
3939
func dialAgent() async throws -> Agent
4040
/// Dial a vsock port in the guest.
41-
func dial(_ port: UInt32) async throws -> FileHandle
41+
func dial(_ port: UInt32) async throws -> VsockConnection
4242
/// Listen on a host vsock port.
4343
func listen(_ port: UInt32) throws -> VsockListener
4444
/// Start the virtual machine.
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
//===----------------------------------------------------------------------===//
2+
// Copyright © 2025-2026 Apple Inc. and the Containerization project authors.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//===----------------------------------------------------------------------===//
16+
17+
import Foundation
18+
19+
#if os(macOS)
20+
import Virtualization
21+
#endif
22+
23+
/// A vsock connection whose duplicated file descriptor keeps the originating
24+
/// transport alive until the connection is closed.
25+
///
26+
/// Uses `@unchecked Sendable` because the mutable close state is protected by
27+
/// `NSLock`, while the underlying `FileHandle` and `VsockTransport` are shared
28+
/// across tasks.
29+
public final class VsockConnection: @unchecked Sendable {
30+
private let fileHandle: FileHandle
31+
private let transport: VsockTransport
32+
private let lock = NSLock()
33+
private var isClosed = false
34+
35+
#if os(macOS)
36+
init(connection: VZVirtioSocketConnection) throws {
37+
let fd = dup(connection.fileDescriptor)
38+
if fd == -1 {
39+
throw POSIXError.fromErrno()
40+
}
41+
self.fileHandle = FileHandle(fileDescriptor: fd, closeOnDealloc: false)
42+
self.transport = VsockTransport(connection)
43+
}
44+
#endif
45+
46+
init(fileDescriptor: Int32, transport: VsockTransport) {
47+
self.fileHandle = FileHandle(fileDescriptor: fileDescriptor, closeOnDealloc: false)
48+
self.transport = transport
49+
}
50+
51+
public var fileDescriptor: Int32 {
52+
fileHandle.fileDescriptor
53+
}
54+
55+
public var readabilityHandler: (@Sendable (FileHandle) -> Void)? {
56+
get { fileHandle.readabilityHandler }
57+
set { fileHandle.readabilityHandler = newValue }
58+
}
59+
60+
public var availableData: Data {
61+
fileHandle.availableData
62+
}
63+
64+
public func write(contentsOf data: some DataProtocol) throws {
65+
try fileHandle.write(contentsOf: data)
66+
}
67+
68+
public func close() throws {
69+
try closeIfNeeded {
70+
try fileHandle.close()
71+
}
72+
}
73+
74+
private func closeIfNeeded(_ closeUnderlying: () throws -> Void) throws {
75+
lock.lock()
76+
guard !isClosed else {
77+
lock.unlock()
78+
return
79+
}
80+
isClosed = true
81+
lock.unlock()
82+
83+
defer { transport.close() }
84+
try closeUnderlying()
85+
}
86+
87+
deinit {
88+
try? closeIfNeeded {
89+
try fileHandle.close()
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)