Skip to content

Commit 8c2d390

Browse files
committed
Epoll: Rework epoll type
I spent a little trying to fix all of the various issues with the epoll wrapper we had, before realizing most of them would be completely gone if we just reworked the type itself :). The callback nature, all of the handler/state tracking internally all (to me) has no purpose being in the type itself. All of this logic can live outside and the wrapper should just be a typesafe abstraction around it that you can build on. This is what this change aims to do. There should be zero behavioral difference here.
1 parent 997d5a4 commit 8c2d390

File tree

4 files changed

+149
-121
lines changed

4 files changed

+149
-121
lines changed

Sources/ContainerizationOS/Linux/Epoll.swift

Lines changed: 102 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@
1515
//===----------------------------------------------------------------------===//
1616

1717
#if os(Linux)
18+
import Foundation
1819

1920
#if canImport(Musl)
2021
import Musl
22+
private let _write = Musl.write
2123
#elseif canImport(Glibc)
2224
import Glibc
25+
private let _write = Glibc.write
2326
#endif
2427

2528
import CShim
26-
import Foundation
27-
import Synchronization
2829

2930
// On glibc, epoll constants are EPOLL_EVENTS enum values. On musl they're
3031
// plain UInt32. These helpers normalize them to UInt32/Int32.
@@ -35,166 +36,157 @@ private func epollMask(_ value: EPOLL_EVENTS) -> UInt32 { value.rawValue }
3536
private func epollFlag(_ value: EPOLL_EVENTS) -> Int32 { Int32(bitPattern: value.rawValue) }
3637
#endif
3738

38-
/// Register file descriptors to receive events via Linux's
39-
/// epoll syscall surface.
39+
/// A thin wrapper around the Linux epoll syscall surface.
4040
public final class Epoll: Sendable {
41-
public typealias Mask = Int32
42-
public typealias Handler = (@Sendable (Mask) -> Void)
41+
/// A set of epoll event flags.
42+
public struct Mask: OptionSet, Sendable {
43+
public let rawValue: UInt32
44+
45+
public init(rawValue: UInt32) {
46+
self.rawValue = rawValue
47+
}
48+
49+
public static let input = Mask(rawValue: epollMask(EPOLLIN))
50+
public static let output = Mask(rawValue: epollMask(EPOLLOUT))
4351

44-
public static let maskIn: Int32 = Int32(bitPattern: epollMask(EPOLLIN))
45-
public static let maskOut: Int32 = Int32(bitPattern: epollMask(EPOLLOUT))
46-
public static let defaultMask: Int32 = maskIn | maskOut
52+
public var isHangup: Bool {
53+
!self.isDisjoint(with: Mask(rawValue: epollMask(EPOLLHUP) | epollMask(EPOLLERR)))
54+
}
55+
56+
public var isRemoteHangup: Bool {
57+
!self.isDisjoint(with: Mask(rawValue: epollMask(EPOLLRDHUP)))
58+
}
59+
60+
public var readyToRead: Bool {
61+
self.contains(.input)
62+
}
63+
64+
public var readyToWrite: Bool {
65+
self.contains(.output)
66+
}
67+
}
68+
69+
/// An event returned by `wait()`.
70+
public struct Event: Sendable {
71+
public let fd: Int32
72+
public let mask: Mask
73+
}
4774

4875
private let epollFD: Int32
49-
private let handlers = SafeMap<Int32, Handler>()
50-
private let pipe = Pipe() // to wake up a waiting epoll_wait
76+
private let eventFD: Int32
5177

5278
public init() throws {
5379
let efd = epoll_create1(Int32(EPOLL_CLOEXEC))
54-
guard efd > 0 else {
80+
guard efd >= 0 else {
5581
throw POSIXError.fromErrno()
5682
}
83+
84+
let evfd = eventfd(0, Int32(EFD_CLOEXEC | EFD_NONBLOCK))
85+
guard evfd >= 0 else {
86+
let evfdErrno = POSIXError.fromErrno()
87+
close(efd)
88+
throw evfdErrno
89+
}
90+
5791
self.epollFD = efd
58-
try self.add(pipe.fileHandleForReading.fileDescriptor) { _ in }
92+
self.eventFD = evfd
93+
94+
// Register the eventfd with epoll for shutdown signaling.
95+
var event = epoll_event()
96+
event.events = epollMask(EPOLLIN)
97+
event.data.fd = self.eventFD
98+
let ctlResult = withUnsafeMutablePointer(to: &event) { ptr in
99+
epoll_ctl(efd, EPOLL_CTL_ADD, self.eventFD, ptr)
100+
}
101+
guard ctlResult == 0 else {
102+
let ctlErrno = POSIXError.fromErrno()
103+
close(evfd)
104+
close(efd)
105+
throw ctlErrno
106+
}
107+
}
108+
109+
deinit {
110+
close(epollFD)
111+
close(eventFD)
59112
}
60113

61-
public func add(
62-
_ fd: Int32,
63-
mask: Int32 = Epoll.defaultMask,
64-
handler: @escaping Handler
65-
) throws {
114+
/// Register a file descriptor for edge-triggered monitoring.
115+
public func add(_ fd: Int32, mask: Mask) throws {
66116
guard fcntl(fd, F_SETFL, O_NONBLOCK) == 0 else {
67117
throw POSIXError.fromErrno()
68118
}
69119

70-
let events = epollMask(EPOLLET) | UInt32(bitPattern: mask)
120+
let events = epollMask(EPOLLET) | mask.rawValue
71121

72122
var event = epoll_event()
73123
event.events = events
74124
event.data.fd = fd
75125

76126
try withUnsafeMutablePointer(to: &event) { ptr in
77-
while true {
78-
if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 {
79-
if errno == EAGAIN || errno == EINTR {
80-
continue
81-
}
82-
throw POSIXError.fromErrno()
83-
}
84-
break
127+
if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 {
128+
throw POSIXError.fromErrno()
85129
}
86130
}
131+
}
87132

88-
self.handlers.set(fd, handler)
133+
/// Remove a file descriptor from the monitored collection.
134+
public func delete(_ fd: Int32) throws {
135+
var event = epoll_event()
136+
let result = withUnsafeMutablePointer(to: &event) { ptr in
137+
epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr) as Int32
138+
}
139+
if result != 0 {
140+
if !acceptableDeletionErrno() {
141+
throw POSIXError.fromErrno()
142+
}
143+
}
89144
}
90145

91-
/// Run the main epoll loop.
146+
/// Wait for events.
92147
///
93-
/// max events to return in a single wait
94-
/// timeout in ms.
95-
/// -1 means block forever.
96-
/// 0 means return immediately if no events.
97-
public func run(maxEvents: Int = 128, timeout: Int32 = -1) throws {
98-
var events: [epoll_event] = .init(
99-
repeating: epoll_event(),
100-
count: maxEvents
101-
)
148+
/// Returns ready events, an empty array on timeout, or `nil` on shutdown.
149+
public func wait(maxEvents: Int = 128, timeout: Int32 = -1) -> [Event]? {
150+
var events: [epoll_event] = .init(repeating: epoll_event(), count: maxEvents)
102151

103152
while true {
104153
let n = epoll_wait(self.epollFD, &events, Int32(events.count), timeout)
105-
guard n >= 0 else {
154+
if n < 0 {
106155
if errno == EINTR || errno == EAGAIN {
107-
continue // go back to epoll_wait
156+
continue
108157
}
109-
throw POSIXError.fromErrno()
158+
preconditionFailure("epoll_wait failed unexpectedly: \(POSIXError.fromErrno())")
110159
}
111160

112161
if n == 0 {
113-
return // if epoll wait times out, then n will be 0
162+
return []
114163
}
115164

165+
var result: [Event] = []
166+
result.reserveCapacity(Int(n))
116167
for i in 0..<Int(n) {
117168
let fd = events[i].data.fd
118-
let mask = events[i].events
119-
120-
if fd == self.pipe.fileHandleForReading.fileDescriptor {
121-
close(self.epollFD)
122-
return // this is a shutdown message
123-
}
124-
125-
guard let handler = handlers.get(fd) else {
126-
continue
169+
if fd == self.eventFD {
170+
return nil
127171
}
128-
handler(Int32(bitPattern: mask))
172+
result.append(Event(fd: fd, mask: Mask(rawValue: events[i].events)))
129173
}
174+
return result
130175
}
131176
}
132177

133-
/// Remove the provided fd from the monitored collection.
134-
public func delete(_ fd: Int32) throws {
135-
var event = epoll_event()
136-
let result = withUnsafeMutablePointer(to: &event) { ptr in
137-
epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr) as Int32
138-
}
139-
if result != 0 {
140-
if !acceptableDeletionErrno() {
141-
throw POSIXError.fromErrno()
142-
}
143-
}
144-
self.handlers.del(fd)
178+
/// Signal the epoll loop to stop waiting.
179+
public func shutdown() {
180+
var val: UInt64 = 1
181+
let n = _write(eventFD, &val, MemoryLayout<UInt64>.size)
182+
precondition(n == MemoryLayout<UInt64>.size, "eventfd write failed: \(POSIXError.fromErrno())")
145183
}
146184

147185
// The errno's here are acceptable and can happen if the caller
148186
// closed the underlying fd before calling delete().
149187
private func acceptableDeletionErrno() -> Bool {
150188
errno == ENOENT || errno == EBADF || errno == EPERM
151189
}
152-
153-
/// Shutdown the epoll handler.
154-
public func shutdown() throws {
155-
// wakes up epoll_wait and triggers a shutdown
156-
try self.pipe.fileHandleForWriting.close()
157-
}
158-
159-
private final class SafeMap<Key: Hashable & Sendable, Value: Sendable>: Sendable {
160-
let dict = Mutex<[Key: Value]>([:])
161-
162-
func set(_ key: Key, _ value: Value) {
163-
dict.withLock { @Sendable in
164-
$0[key] = value
165-
}
166-
}
167-
168-
func get(_ key: Key) -> Value? {
169-
dict.withLock { @Sendable in
170-
$0[key]
171-
}
172-
}
173-
174-
func del(_ key: Key) {
175-
dict.withLock { @Sendable in
176-
_ = $0.removeValue(forKey: key)
177-
}
178-
}
179-
}
180-
}
181-
182-
extension Epoll.Mask {
183-
public var isHangup: Bool {
184-
(self & Int32(bitPattern: epollMask(EPOLLHUP) | epollMask(EPOLLERR))) != 0
185-
}
186-
187-
public var isRhangup: Bool {
188-
(self & Int32(bitPattern: epollMask(EPOLLRDHUP))) != 0
189-
}
190-
191-
public var readyToRead: Bool {
192-
(self & Int32(bitPattern: epollMask(EPOLLIN))) != 0
193-
}
194-
195-
public var readyToWrite: Bool {
196-
(self & Int32(bitPattern: epollMask(EPOLLOUT))) != 0
197-
}
198190
}
199191

200192
#endif // os(Linux)

vminitd/Sources/vminitd/IOPair.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ final class IOPair: Sendable {
6969
// Remove the fd from our global epoll instance first.
7070
let readFromFd = self.from.fileDescriptor
7171
do {
72-
try ProcessSupervisor.default.poller.delete(readFromFd)
72+
try ProcessSupervisor.default.unregisterFd(readFromFd)
7373
} catch {
7474
logger?.error("failed to delete fd from epoll \(readFromFd): \(error)")
7575
}
@@ -118,7 +118,7 @@ final class IOPair: Sendable {
118118
let readFrom = OSFile(fd: readFromFd)
119119
let writeTo = OSFile(fd: writeToFd)
120120

121-
try ProcessSupervisor.default.poller.add(readFromFd, mask: Epoll.maskIn) { mask in
121+
try ProcessSupervisor.default.registerFd(readFromFd, mask: .input) { mask in
122122
self.io.withLock { io in
123123
if io.closed {
124124
return

vminitd/Sources/vminitd/ProcessSupervisor.swift

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ import Logging
2020
import Synchronization
2121

2222
final class ProcessSupervisor: Sendable {
23-
let poller: Epoll
23+
private let poller: Epoll
24+
private let handlers = Mutex<[Int32: @Sendable (Epoll.Mask) -> Void]>([:])
2425

2526
private let queue: DispatchQueue
2627
// `DispatchSourceSignal` is thread-safe.
@@ -47,11 +48,46 @@ final class ProcessSupervisor: Sendable {
4748
self.poller = try! Epoll()
4849
self.state = Mutex(State())
4950
let t = Thread {
50-
try! self.poller.run()
51+
while true {
52+
guard let events = self.poller.wait() else {
53+
return
54+
}
55+
if events.isEmpty {
56+
return
57+
}
58+
for event in events {
59+
let handler = self.handlers.withLock { $0[event.fd] }
60+
handler?(event.mask)
61+
}
62+
}
5163
}
5264
t.start()
5365
}
5466

67+
/// Register a file descriptor for epoll monitoring with a handler.
68+
///
69+
/// The handler is stored before the fd is added to epoll, ensuring no
70+
/// events are missed.
71+
func registerFd(
72+
_ fd: Int32,
73+
mask: Epoll.Mask = [.input, .output],
74+
handler: @escaping @Sendable (Epoll.Mask) -> Void
75+
) throws {
76+
self.handlers.withLock { $0[fd] = handler }
77+
do {
78+
try self.poller.add(fd, mask: mask)
79+
} catch {
80+
self.handlers.withLock { _ = $0.removeValue(forKey: fd) }
81+
throw error
82+
}
83+
}
84+
85+
/// Remove a file descriptor from epoll monitoring and discard its handler.
86+
func unregisterFd(_ fd: Int32) throws {
87+
self.handlers.withLock { _ = $0.removeValue(forKey: fd) }
88+
try self.poller.delete(fd)
89+
}
90+
5591
func ready() {
5692
self.source.setEventHandler {
5793
self.handleSignal()
@@ -123,6 +159,6 @@ final class ProcessSupervisor: Sendable {
123159

124160
deinit {
125161
source.cancel()
126-
try? poller.shutdown()
162+
poller.shutdown()
127163
}
128164
}

0 commit comments

Comments
 (0)