-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDaemonMode.swift
More file actions
1081 lines (926 loc) · 42.9 KB
/
Copy pathDaemonMode.swift
File metadata and controls
1081 lines (926 loc) · 42.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// DaemonMode.swift
// Headless daemon entry point with PID lock, sampling loop, socket server,
// signal handling, self-monitoring, autopilot policy, and webhook alerting.
import CacheoutShared
import Darwin
import Foundation
import os
/// Headless daemon mode for running CacheOut on servers without a GUI.
///
/// ## Entry points
/// Full daemon with autopilot + webhooks:
/// ```swift
/// await DaemonMode.runWithAutopilot(config: config)
/// ```
/// Minimal (no autopilot/webhooks):
/// ```swift
/// await DaemonMode.run(config: config)
/// ```
///
/// ## Lifecycle
/// `create(config:hooks:)` wires hooks first, then performs all externally-visible
/// setup (state dir, PID lock, socket, signal handlers). `startSamplingLoop()` begins
/// periodic sampling and self-monitoring. This ordering guarantees that `onShutdown`
/// is available even for signals received during early startup.
///
/// ## Features
/// - PID lock file at `<state-dir>/daemon.pid`
/// - Unix domain socket at `<state-dir>/status.sock`
/// - 1Hz memory sampling loop
/// - Alert evaluation from sample history
/// - Autopilot policy engine (T1 interventions from `autopilot.json`)
/// - Webhook alerting with per-code coalescing + cooldown
/// - Self-monitoring (RSS > 50MB for 30s -> restart)
/// - Signal handling (SIGTERM/SIGINT -> graceful shutdown, SIGHUP -> config reload)
/// - HELPER_UNAVAILABLE daemon-owned alert when autopilot enabled + helper not registered
///
/// ## Helper prerequisite
/// The privileged XPC helper must be pre-installed via the GUI app before running
/// `--daemon`. The helper is registered through `SMAppService` which requires the
/// app bundle context. Without it, autopilot actions return `xpc_not_available` errors
/// and a `HELPER_UNAVAILABLE` warning alert is set.
///
/// ## Hooks
/// - `onSnapshot`: called after each sample cycle with alerts
/// - `onShutdown`: called during graceful shutdown for flush/cleanup
/// - `onRestartNeeded`: called when self-monitor triggers restart
/// - `onSIGHUP`: called on SIGHUP for config reload
public actor DaemonMode: StatusSocket.DataSource {
// MARK: - Constants
/// Maximum RSS in bytes before triggering restart (50 MB).
private static let maxRSSBytes: Int = 50 * 1024 * 1024
/// Duration in seconds RSS must exceed threshold before restart.
private static let rssExceedDuration: TimeInterval = 30.0
/// Self-monitoring check interval in seconds.
private static let selfMonitorInterval: TimeInterval = 10.0
/// Exit code for self-initiated restart (EX_TEMPFAIL).
private static let restartExitCode: Int32 = 75
// MARK: - State
private let config: DaemonConfig
private let logger = Logger(subsystem: "com.cacheout", category: "DaemonMode")
private var _currentSnapshot: DaemonSnapshot?
private var _sampleHistory: [DaemonSnapshot] = []
private var _activeAlerts: [DaemonAlert] = []
private var _daemonOwnedAlerts: [DaemonAlert] = []
private var _configStatus = ConfigStatus()
private var _helperAvailable = false
/// When true, `sampleOnce()` skips the onSnapshot hook to prevent
/// the sample loop from observing a mid-reload config state.
/// Set by `loadConfig()` around the multi-step apply sequence.
private var _reloadInProgress = false
/// When true, `sampleOnce()` is a no-op. Set during shutdown to
/// quiesce alert production before the final webhook flush.
private var _shuttingDown = false
/// Whether the initial config load has completed. Used by `loadConfig`
/// to determine generation numbering (initial = gen 0 or 1, reload increments).
private var _hasCompletedInitialLoad = false
private var samplingTask: Task<Void, Never>?
private var selfMonitorTask: Task<Void, Never>?
private var statusSocket: StatusSocket?
/// Tracks the currently running onSnapshot hook task so that reload
/// and shutdown can await its completion before mutating config state.
private var snapshotHookTask: Task<Void, Never>?
private let alertEvaluator = AlertEvaluator()
/// PredictiveEngine for time-to-exhaustion and growth detection.
/// Owned by DaemonMode; fed availableMB on each sampling tick.
private let predictiveEngine = PredictiveEngine()
/// CompressorTracker for compression ratio trend detection.
/// Owned by DaemonMode; fed directly via `record(_:)` on each sampling tick.
private let compressorTracker = CompressorTracker()
/// Maximum sample history size (30 minutes at 1Hz).
private static let maxHistorySize = 1800
// MARK: - Hooks
/// Called after each sample cycle with the current merged alerts.
/// Set before calling `run()`. Thread-safe via Sendable closure + single-writer pattern.
nonisolated(unsafe) public var onSnapshot: (@Sendable ([DaemonAlert]) async -> Void)?
/// Called during graceful shutdown for cleanup (e.g., webhook flush).
/// Set before calling `run()`. Thread-safe via Sendable closure + single-writer pattern.
nonisolated(unsafe) public var onShutdown: (@Sendable () async -> Void)?
/// Called when self-monitor triggers a restart.
/// Set before calling `run()`. Thread-safe via Sendable closure + single-writer pattern.
nonisolated(unsafe) public var onRestartNeeded: (@Sendable () async -> Void)?
/// Called on SIGHUP for config reload.
/// Set before calling `run()`. Thread-safe via Sendable closure + single-writer pattern.
nonisolated(unsafe) public var onSIGHUP: (@Sendable () async -> Void)?
// MARK: - Init
public init(config: DaemonConfig) {
self.config = config
}
// MARK: - DataSource (StatusSocket.DataSource)
public func currentSnapshot() -> DaemonSnapshot? {
_currentSnapshot
}
public func sampleHistory() -> [DaemonSnapshot] {
_sampleHistory
}
public func activeAlerts() -> [DaemonAlert] {
_activeAlerts
}
public func configStatus() -> ConfigStatus {
_configStatus
}
public func helperAvailable() -> Bool {
_helperAvailable
}
public func recommendations() async -> RecommendationResult? {
let stats = _currentSnapshot?.stats
return await RecommendationEngine.generateRecommendations(
mode: .daemon,
predictiveEngine: predictiveEngine,
compressorTracker: compressorTracker,
systemStats: stats
)
}
/// Access the predictive engine for recommendations and socket handlers.
func getPredictiveEngine() -> PredictiveEngine {
predictiveEngine
}
// MARK: - Predictive Access (for StatusSocket handlers in task .2)
/// Time-to-exhaustion prediction in seconds, or nil if conditions not met.
func timeToExhaustion() async -> TimeInterval? {
await predictiveEngine.predictTimeToExhaustion()
}
/// Cached/fresh process scan result for recommendations and socket commands.
func processScanResult() async -> ProcessMemoryScanner.ScanResult {
await predictiveEngine.getOrRefreshScanResult()
}
// MARK: - Config Status Mutation (for task .2)
/// Update the config status (called by task .2's startup load and SIGHUP handler).
public func setConfigStatus(_ status: ConfigStatus) {
_configStatus = status
}
/// Update daemon-owned alerts (called by task .2).
public func setDaemonOwnedAlerts(_ alerts: [DaemonAlert]) {
_daemonOwnedAlerts = alerts
}
// MARK: - Run
/// Hooks for lifecycle events, accepted upfront so they are set before
/// any externally-visible startup work (PID lock, socket, signals).
public struct Hooks: Sendable {
/// Called after each sample cycle with the current merged alerts.
public var onSnapshot: (@Sendable ([DaemonAlert]) async -> Void)?
/// Called during graceful shutdown for cleanup (e.g., webhook flush).
public var onShutdown: (@Sendable () async -> Void)?
/// Called when self-monitor triggers a restart.
public var onRestartNeeded: (@Sendable () async -> Void)?
/// Called on SIGHUP for config reload.
public var onSIGHUP: (@Sendable () async -> Void)?
public init(
onSnapshot: (@Sendable ([DaemonAlert]) async -> Void)? = nil,
onShutdown: (@Sendable () async -> Void)? = nil,
onRestartNeeded: (@Sendable () async -> Void)? = nil,
onSIGHUP: (@Sendable () async -> Void)? = nil
) {
self.onSnapshot = onSnapshot
self.onShutdown = onShutdown
self.onRestartNeeded = onRestartNeeded
self.onSIGHUP = onSIGHUP
}
}
/// Main daemon entry point. Accepts hooks upfront, then performs all startup
/// (state dir, PID lock, socket, signal handlers) with hooks already wired.
/// Finally starts sampling and self-monitoring.
///
/// Usage (task .2):
/// ```swift
/// let hooks = DaemonMode.Hooks(
/// onSnapshot: { alerts in ... },
/// onShutdown: { ... },
/// onRestartNeeded: { ... }
/// )
/// let daemon = await DaemonMode.create(config: config, hooks: hooks)
/// await daemon.startSamplingLoop()
/// ```
///
/// - Parameters:
/// - config: Daemon configuration.
/// - hooks: Lifecycle hooks. Set before any externally visible work so
/// signal handlers can invoke onShutdown even during early startup.
/// - Returns: The initialized daemon instance (infrastructure running,
/// signal handlers installed, but not yet sampling).
public static func create(config: DaemonConfig, hooks: Hooks = Hooks()) async -> DaemonMode {
let daemon = DaemonMode(config: config)
// Wire hooks before any externally visible work
daemon.onSnapshot = hooks.onSnapshot
daemon.onShutdown = hooks.onShutdown
daemon.onRestartNeeded = hooks.onRestartNeeded
daemon.onSIGHUP = hooks.onSIGHUP
await daemon.setup()
return daemon
}
/// Convenience entry point that creates and immediately starts the daemon.
/// No hooks are attached when using this entry point.
public static func run(config: DaemonConfig) async {
let daemon = await create(config: config)
await daemon.startSamplingLoop()
}
/// Whether sampling has been started. Guards against accidental re-entry.
private var hasStartedSampling = false
/// Start the sampling loop and self-monitor.
///
/// Signal handlers and socket are already running (installed in `setup()`).
/// This method only starts the periodic sampling and self-monitoring tasks.
///
/// - Precondition: Must only be called once. Subsequent calls are no-ops with a warning.
public func startSamplingLoop() async {
guard !hasStartedSampling else {
logger.warning("DaemonMode.startSamplingLoop() called more than once — ignoring")
return
}
hasStartedSampling = true
startSampling()
startSelfMonitor()
logger.info("Daemon started successfully (PID: \(ProcessInfo.processInfo.processIdentifier))")
}
/// Set up daemon infrastructure: state dir, PID lock, socket, signal handlers.
/// Hooks must be set before calling this method.
private func setup() async {
logger.info("Daemon starting with state directory: \(self.config.stateDir.path, privacy: .public)")
// Ensure state directory exists with 0700 permissions.
// createDirectory only sets attributes on newly created dirs, so we
// explicitly chmod afterward to harden pre-existing directories.
// Use O_NOFOLLOW | O_DIRECTORY + fchmod on the resulting fd so a
// concurrent symlink swap at `stateDir` can't redirect the chmod target.
do {
try FileManager.default.createDirectory(
at: config.stateDir,
withIntermediateDirectories: true,
attributes: [.posixPermissions: 0o700]
)
let dirFd = config.stateDir.withUnsafeFileSystemRepresentation { pathPtr -> Int32 in
guard let pathPtr = pathPtr else { return -1 }
return open(pathPtr, O_RDONLY | O_NOFOLLOW | O_DIRECTORY | O_CLOEXEC)
}
guard dirFd >= 0 else {
throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno), userInfo: [NSLocalizedDescriptionKey: "open failed"])
}
defer { close(dirFd) }
guard fchmod(dirFd, 0o700) == 0 else {
throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno), userInfo: [NSLocalizedDescriptionKey: "fchmod failed"])
}
} catch {
logger.error("Failed to create/secure state directory: \(error.localizedDescription, privacy: .public)")
Foundation.exit(1)
}
// PID lock
guard acquirePIDLock() else {
logger.error("Another daemon instance is already running")
Foundation.exit(1)
}
// Check helper availability
_helperAvailable = HelperInstaller().status == .enabled
// Start socket server
let socketPath = config.stateDir.appendingPathComponent("status.sock").path
let socket = StatusSocket(socketPath: socketPath, dataSource: self)
do {
try socket.start()
statusSocket = socket
} catch {
logger.error("Failed to start status socket: \(error.localizedDescription, privacy: .public)")
cleanupAndExit(code: 1)
}
// Install signal handlers. Safe to do here because hooks are set
// before create() calls setup(), so onShutdown/onRestartNeeded are
// available even for signals received during early startup.
installSignalHandlers()
// The daemon stays alive via dispatchMain() in main.swift.
// Signal handlers, sampling loop, and self-monitor run as Tasks/DispatchSources.
// Shutdown occurs via gracefulShutdown() -> Foundation.exit(0) or
// triggerRestart() -> Foundation.exit(75).
}
// MARK: - PID Lock
private var pidFilePath: String {
config.stateDir.appendingPathComponent("daemon.pid").path
}
/// File descriptor for the PID lock file, held open for the process lifetime.
/// flock is released automatically when this fd is closed or the process exits.
private var pidLockFd: Int32 = -1
/// Acquire an exclusive PID lock using flock(LOCK_EX | LOCK_NB).
///
/// This is atomic: two concurrent launches cannot both acquire the lock.
/// The lock is held for the process lifetime via the open file descriptor.
/// If the process crashes, the kernel releases the flock automatically.
private func acquirePIDLock() -> Bool {
let pidPath = pidFilePath
// Open (or create) the PID file with 0600 permissions safely
let fd = URL(fileURLWithPath: pidPath).withUnsafeFileSystemRepresentation { pathPtr in
guard let ptr = pathPtr else { return Int32(-1) }
return open(ptr, O_WRONLY | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR)
}
guard fd >= 0 else {
logger.error("Failed to open PID file: errno \(errno)")
return false
}
// Try non-blocking exclusive lock
guard flock(fd, LOCK_EX | LOCK_NB) == 0 else {
close(fd)
// Read existing PID for error message
if let existingPid = readPIDFile(pidPath) {
logger.error("Daemon already running with PID \(existingPid) (lock held)")
} else {
logger.error("Another daemon instance holds the PID lock")
}
return false
}
// Truncate and write our PID
ftruncate(fd, 0)
let pid = ProcessInfo.processInfo.processIdentifier
let pidStr = "\(pid)\n"
pidStr.withCString { ptr in
_ = Darwin.write(fd, ptr, strlen(ptr))
}
// Keep fd open — flock is held for process lifetime
pidLockFd = fd
return true
}
private func readPIDFile(_ path: String) -> pid_t? {
guard let contents = try? String(contentsOfFile: path, encoding: .utf8) else {
return nil
}
return pid_t(contents.trimmingCharacters(in: .whitespacesAndNewlines))
}
/// Release the PID lock by closing the file descriptor.
///
/// The PID file is NOT unlinked on shutdown. This avoids a race where:
/// 1. Old daemon closes flock fd (releasing lock)
/// 2. New daemon acquires flock and writes its PID
/// 3. Old daemon unlinks the file (removing new daemon's identity)
///
/// Instead, flock is the sole source of truth for single-instance.
/// The next startup overwrites the stale PID after acquiring the lock.
private func releasePIDLock() {
if pidLockFd >= 0 {
close(pidLockFd)
pidLockFd = -1
}
}
// MARK: - Signal Handling
/// Dedicated serial queue for signal dispatch sources.
/// Using a dedicated queue instead of .main avoids dependency on
/// the main thread's run loop state and ensures signals are handled
/// even if the main thread is in dispatchMain().
private let signalQueue = DispatchQueue(label: "com.cacheout.daemon-signals")
private func installSignalHandlers() {
// Ignore default signal actions before creating DispatchSources.
// signal() applies process-wide (not per-thread like sigprocmask),
// ensuring signals are routed to the dispatch sources on all threads.
signal(SIGTERM, SIG_IGN)
signal(SIGINT, SIG_IGN)
signal(SIGHUP, SIG_IGN)
// SIGTERM
let termSource = DispatchSource.makeSignalSource(signal: SIGTERM, queue: signalQueue)
termSource.setEventHandler { [weak self] in
guard let self else { return }
Task { await self.handleShutdownSignal() }
}
termSource.resume()
// SIGINT
let intSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: signalQueue)
intSource.setEventHandler { [weak self] in
guard let self else { return }
Task { await self.handleShutdownSignal() }
}
intSource.resume()
// SIGHUP — hook for config reload (task .2 wires this)
let hupSource = DispatchSource.makeSignalSource(signal: SIGHUP, queue: signalQueue)
hupSource.setEventHandler { [weak self] in
guard let self else { return }
Task { await self.handleSIGHUP() }
}
hupSource.resume()
// Prevent sources from being deallocated
_signalSources = [termSource, intSource, hupSource]
}
// Stored to prevent deallocation
nonisolated(unsafe) private static var _signalSourcesStorage: [Any] = []
private var _signalSources: [Any] {
get { Self._signalSourcesStorage }
set { Self._signalSourcesStorage = newValue }
}
private func handleShutdownSignal() async {
logger.info("Shutdown signal received")
await gracefulShutdown()
}
/// Serialized reload task. Only one reload can be in progress at a time.
/// Both the initial config load and SIGHUP reloads go through this chain
/// to guarantee ordered generation increments and prevent overlapping applies.
private var reloadTask: Task<Void, Never>?
/// Schedule a config reload through the serial pipeline.
/// Returns a task that completes when this reload finishes.
///
/// Both initial startup load and SIGHUP use this method to ensure
/// they share one ordered pipeline. If a SIGHUP fires during the
/// initial load, it queues behind it.
@discardableResult
public func scheduleReload() -> Task<Void, Never> {
let previousTask = reloadTask
let task = Task { [weak self] in
// Wait for any in-progress reload to finish
await previousTask?.value
guard let self else { return }
if let hook = self.onSIGHUP {
await hook()
} else {
self.logger.info("Config reload: no reload hook wired")
}
}
reloadTask = task
return task
}
private func handleSIGHUP() {
logger.info("SIGHUP received — scheduling config reload")
scheduleReload()
}
// MARK: - Sampling Loop
private func startSampling() {
samplingTask = Task { [weak self] in
guard let self else { return }
while !Task.isCancelled {
await self.sampleOnce()
try? await Task.sleep(for: .seconds(self.config.pollIntervalSeconds))
}
}
}
private func sampleOnce() async {
// Skip if shutting down or mid-reload to avoid mixed config epochs
guard !_shuttingDown, !_reloadInProgress else { return }
// Use a lightweight local sample (same approach as MemoryMonitor.sample)
guard let stats = sampleSystemStats() else {
return
}
let snapshot = DaemonSnapshot(stats: stats)
_currentSnapshot = snapshot
// Feed availableMB into PredictiveEngine's sliding window
let availableMB = Double(stats.freePages + stats.inactivePages) * Double(stats.pageSize) / 1048576.0
await predictiveEngine.recordAvailableMB(availableMB, at: stats.timestamp)
// Feed CompressorTracker for compression ratio trend detection
await compressorTracker.record(stats)
// Append to history, capping size
_sampleHistory.append(snapshot)
if _sampleHistory.count > Self.maxHistorySize {
_sampleHistory.removeFirst(_sampleHistory.count - Self.maxHistorySize)
}
// Evaluate sample-derived alerts
let sampleAlerts = alertEvaluator.evaluate(
samples: _sampleHistory,
currentSnapshot: snapshot
)
// Merge with daemon-owned alerts
_activeAlerts = sampleAlerts + _daemonOwnedAlerts
// Re-check: reload may have started during the await-free section above
// (unlikely since we're inside the actor, but defensive)
guard !_shuttingDown, !_reloadInProgress else { return }
// Fire onSnapshot hook, tracking the task so reload/shutdown can await it
if let hook = onSnapshot {
let alerts = _activeAlerts
let task = Task { await hook(alerts) }
snapshotHookTask = task
await task.value
snapshotHookTask = nil
}
}
/// Sample system stats locally (mirrors MemoryMonitor.sample but without actor isolation).
nonisolated private func sampleSystemStats() -> SystemStatsDTO? {
let hostPort = mach_host_self()
defer { mach_port_deallocate(mach_task_self_, hostPort) }
var vmStats = vm_statistics64_data_t()
var count = mach_msg_type_number_t(
MemoryLayout<vm_statistics64_data_t>.stride / MemoryLayout<integer_t>.stride
)
let vmResult = withUnsafeMutablePointer(to: &vmStats) { ptr in
ptr.withMemoryRebound(to: integer_t.self, capacity: Int(count)) { intPtr in
host_statistics64(hostPort, HOST_VM_INFO64, intPtr, &count)
}
}
guard vmResult == KERN_SUCCESS else { return nil }
var compressedBytes: UInt64 = 0
var compressedSize = MemoryLayout<UInt64>.size
guard sysctlbyname("vm.compressor_compressed_bytes", &compressedBytes, &compressedSize, nil, 0) == 0 else {
return nil
}
var compressorBytesUsed: UInt64 = 0
var compressorUsedSize = MemoryLayout<UInt64>.size
guard sysctlbyname("vm.compressor_bytes_used", &compressorBytesUsed, &compressorUsedSize, nil, 0) == 0 else {
return nil
}
let compressionRatio: Double = compressorBytesUsed > 0
? Double(compressedBytes) / Double(compressorBytesUsed)
: 0.0
var swapUsage = xsw_usage()
var swapSize = MemoryLayout<xsw_usage>.size
guard sysctlbyname("vm.swapusage", &swapUsage, &swapSize, nil, 0) == 0 else {
return nil
}
var pressureLevel: Int32 = 0
var pressureLevelSize = MemoryLayout<Int32>.size
if sysctlbyname("kern.memorystatus_vm_pressure_level", &pressureLevel, &pressureLevelSize, nil, 0) != 0 {
pressureLevel = 0
}
let pageSize = UInt64(vm_kernel_page_size)
var memsize: UInt64 = 0
var memsizeLen = MemoryLayout<UInt64>.size
sysctlbyname("hw.memsize", &memsize, &memsizeLen, nil, 0)
let memoryTier = MemoryTier.detect()
return SystemStatsDTO(
timestamp: Date(),
freePages: UInt64(vmStats.free_count),
activePages: UInt64(vmStats.active_count),
inactivePages: UInt64(vmStats.inactive_count),
wiredPages: UInt64(vmStats.wire_count),
compressorPageCount: UInt64(vmStats.compressor_page_count),
compressedBytes: compressedBytes,
compressorBytesUsed: compressorBytesUsed,
compressionRatio: compressionRatio,
pageSize: pageSize,
purgeableCount: UInt64(vmStats.purgeable_count),
externalPages: UInt64(vmStats.external_page_count),
internalPages: UInt64(vmStats.internal_page_count),
compressions: UInt64(vmStats.compressions),
decompressions: UInt64(vmStats.decompressions),
pageins: UInt64(vmStats.pageins),
pageouts: UInt64(vmStats.pageouts),
swapUsedBytes: UInt64(swapUsage.xsu_used),
swapTotalBytes: UInt64(swapUsage.xsu_total),
pressureLevel: pressureLevel,
memoryTier: memoryTier.rawValue,
totalPhysicalMemory: memsize
)
}
// MARK: - Self-Monitoring
private func startSelfMonitor() {
selfMonitorTask = Task { [weak self] in
guard let self else { return }
var exceedStart: TimeInterval?
while !Task.isCancelled {
try? await Task.sleep(for: .seconds(Self.selfMonitorInterval))
let rss = self.currentRSSBytes()
if rss > Self.maxRSSBytes {
if exceedStart == nil {
exceedStart = ProcessInfo.processInfo.systemUptime
}
let elapsed = ProcessInfo.processInfo.systemUptime - (exceedStart ?? 0)
if elapsed >= Self.rssExceedDuration {
await self.triggerRestart()
return
}
} else {
exceedStart = nil
}
}
}
}
/// Get current process RSS in bytes.
nonisolated private func currentRSSBytes() -> Int {
var info = mach_task_basic_info()
var count = mach_msg_type_number_t(MemoryLayout<mach_task_basic_info>.size / MemoryLayout<natural_t>.size)
let result = withUnsafeMutablePointer(to: &info) { ptr in
ptr.withMemoryRebound(to: integer_t.self, capacity: Int(count)) { intPtr in
task_info(mach_task_self_, task_flavor_t(MACH_TASK_BASIC_INFO), intPtr, &count)
}
}
guard result == KERN_SUCCESS else { return 0 }
return Int(info.resident_size)
}
private func triggerRestart() async {
logger.warning("Self-monitor: RSS exceeded \(Self.maxRSSBytes / 1024 / 1024)MB for \(Int(Self.rssExceedDuration))s — requesting restart")
// Fire hook FIRST so urgent webhook delivery completes before the
// watchdog sees the restart marker and kills the process.
if let hook = onRestartNeeded {
await hook()
}
// Write restart marker AFTER hook so the watchdog doesn't act prematurely.
let markerPath = config.stateDir.appendingPathComponent("restart.marker").path
try? Date().description.write(toFile: markerPath, atomically: true, encoding: .utf8)
// Cleanup and exit with restart code
cleanupAndExit(code: Self.restartExitCode)
}
// MARK: - Shutdown
private func gracefulShutdown() async {
logger.info("Graceful shutdown starting")
// 1. Quiesce alert production: set shutdown flag and cancel sampling
// BEFORE flushing, so no new alerts can be enqueued during flush.
_shuttingDown = true
samplingTask?.cancel()
samplingTask = nil
selfMonitorTask?.cancel()
selfMonitorTask = nil
// 2. Await any in-flight snapshot hook so webhook deliveries from the
// last sample cycle complete before we flush.
await snapshotHookTask?.value
snapshotHookTask = nil
// 3. Stop accepting new connections
statusSocket?.stop()
// 4. Fire onShutdown hook (e.g., webhook flush with 3s budget)
// Safe now: sampling is stopped and in-flight hook is complete.
if let hook = onShutdown {
await hook()
}
// 4. Release PID lock (file remains for next-start housekeeping)
releasePIDLock()
// 5. Exit cleanly
logger.info("Graceful shutdown complete")
Foundation.exit(0)
}
/// Emergency cleanup and exit (non-graceful).
/// PID file is NOT removed — flock release (via process exit) is sufficient.
/// Socket file is unlinked so the next start can bind cleanly.
private func cleanupAndExit(code: Int32) -> Never {
let sockPath = config.stateDir.appendingPathComponent("status.sock").path
unlink(sockPath)
// pidLockFd is closed automatically by process exit, releasing flock
Foundation.exit(code)
}
// MARK: - Reload Barrier
/// Set the reload-in-progress flag. While set, `sampleOnce()` skips the
/// onSnapshot hook to prevent observing a mid-reload config state.
public func setReloadInProgress(_ inProgress: Bool) {
_reloadInProgress = inProgress
}
/// Await completion of any in-flight onSnapshot hook task.
/// Called by reload and shutdown to ensure no hook is running before
/// mutating config state or flushing.
public func awaitSnapshotHookCompletion() async {
await snapshotHookTask?.value
}
/// Whether the initial config load has completed.
public var hasCompletedInitialLoad: Bool {
_hasCompletedInitialLoad
}
/// Mark the initial config load as complete.
public func markInitialLoadComplete() {
_hasCompletedInitialLoad = true
}
// MARK: - Helper Availability Update
/// Re-check helper availability and update daemon-owned alerts accordingly.
/// Called during config load/reload to set or clear HELPER_UNAVAILABLE alert.
///
/// Also recomputes `_activeAlerts` immediately so that the `health` socket
/// command reflects the new daemon-owned alert state without waiting for
/// the next sample tick.
public func updateHelperAvailability(autopilotEnabled: Bool) {
_helperAvailable = HelperInstaller().status == .enabled
if autopilotEnabled && !_helperAvailable {
// Set HELPER_UNAVAILABLE daemon-owned alert
let alert = DaemonAlert(
code: .helperUnavailable,
severity: .warning,
message: "Autopilot is enabled but the privileged helper is not registered. "
+ "Install the helper via the GUI app before running --daemon."
)
_daemonOwnedAlerts = [alert]
logger.warning("Helper not registered — HELPER_UNAVAILABLE alert set")
} else {
// Clear daemon-owned alerts (trigger condition false)
_daemonOwnedAlerts = []
}
// Recompute merged alerts immediately so health reflects the change
// without waiting for the next sample cycle.
recomputeActiveAlerts()
}
/// Recompute `_activeAlerts` from current sample-derived + daemon-owned alerts.
private func recomputeActiveAlerts() {
let sampleAlerts: [DaemonAlert]
if let currentSnapshot = _currentSnapshot {
sampleAlerts = alertEvaluator.evaluate(
samples: _sampleHistory,
currentSnapshot: currentSnapshot
)
} else {
sampleAlerts = []
}
_activeAlerts = sampleAlerts + _daemonOwnedAlerts
}
// MARK: - Full Daemon with Autopilot + Webhooks
/// Main entry point that wires autopilot policy, webhook alerting,
/// startup config loading, and SIGHUP reload.
///
/// This is the production entry point used by `main.swift`.
public static func runWithAutopilot(config: DaemonConfig) async {
let autopilot = AutopilotPolicy()
let webhookAlerter = WebhookAlerter()
let configPath = config.stateDir.appendingPathComponent("autopilot.json").path
let logger = Logger(subsystem: "com.cacheout", category: "DaemonMode")
// Create daemon with hooks wired
// We need to capture daemon as a variable for SIGHUP to reference it,
// but hooks need to be defined first. Use the create/startSamplingLoop pattern.
// The daemon reference is captured weakly in closures via nonisolated(unsafe).
nonisolated(unsafe) var daemonRef: DaemonMode?
let hooks = DaemonMode.Hooks(
onSnapshot: { @Sendable alerts in
// Evaluate autopilot rules
if let daemon = daemonRef {
let samples = await daemon.sampleHistory()
if let current = await daemon.currentSnapshot() {
await autopilot.evaluate(samples: samples, currentSnapshot: current)
}
}
// Process alerts through webhook alerter
await webhookAlerter.processAlerts(alerts)
},
onShutdown: { @Sendable in
// Flush webhook deliveries (3s budget)
await webhookAlerter.flush()
// Invalidate current XPC connection
await autopilot.invalidateXPC()
},
onRestartNeeded: { @Sendable in
// Deliver urgent DAEMON_RESTART alert
let restartAlert = DaemonAlert(
code: .daemonRestart,
severity: .emergency,
message: "Daemon self-monitor triggered restart (RSS exceeded threshold)"
)
await webhookAlerter.deliverUrgent(alert: restartAlert)
},
onSIGHUP: { @Sendable in
guard let daemon = daemonRef else { return }
let isInitial = !(await daemon.hasCompletedInitialLoad)
await loadConfig(
path: configPath,
daemon: daemon,
autopilot: autopilot,
webhookAlerter: webhookAlerter,
isInitialLoad: isInitial,
logger: logger
)
if isInitial {
await daemon.markInitialLoadComplete()
}
}
)
let daemon = await DaemonMode.create(config: config, hooks: hooks)
daemonRef = daemon
// Startup config load — goes through the same serial reload pipeline
// as SIGHUP, so a SIGHUP arriving during startup queues behind it.
let initialLoadTask = await daemon.scheduleReload()
await initialLoadTask.value
// Start sampling
await daemon.startSamplingLoop()
}
// MARK: - XPC Connection Management
/// Open a new XPC connection to the helper daemon if registered.
/// Returns nil if the helper is not installed.
private static func openHelperConnection() -> NSXPCConnection? {
guard HelperInstaller().status == .enabled else { return nil }
let conn = NSXPCConnection(machServiceName: "com.cacheout.memhelper", options: .privileged)
conn.remoteObjectInterface = NSXPCInterface(with: MemoryHelperProtocol.self)
conn.resume()
return conn
}
// MARK: - Config Loading
/// Load and apply autopilot configuration from the given path.
///
/// Used by both startup and SIGHUP reload. On reload (non-initial), every
/// call increments the config generation counter. Helper availability and
/// XPC connection are refreshed on every attempt, even when the config is
/// rejected, so the daemon never holds stale helper state.
///
/// - Parameters:
/// - path: Path to autopilot.json
/// - daemon: The daemon instance to update
/// - autopilot: The autopilot policy to configure
/// - webhookAlerter: The webhook alerter to configure
/// - isInitialLoad: True for startup, false for SIGHUP reload
/// - logger: Logger for status messages
private static func loadConfig(
path: String,
daemon: DaemonMode,
autopilot: AutopilotPolicy,
webhookAlerter: WebhookAlerter,
isInitialLoad: Bool,
logger: Logger
) async {
let currentStatus = await daemon.configStatus()
let nextGeneration = isInitialLoad ? 1 : currentStatus.generation + 1
// Refresh XPC connection on every load attempt (helper may have been
// installed or removed since last check).
let xpcConnection = openHelperConnection()
await autopilot.setXPCConnection(xpcConnection)
// Helper function: refresh helper availability using the currently
// applied autopilot-enabled state (not the candidate config).
// Called on every exit path so helper state is never stale.
func refreshHelperState(autopilotEnabled: Bool?) async {
let enabled: Bool
if let explicit = autopilotEnabled {
enabled = explicit
} else {
enabled = await autopilot.isEnabled
}
await daemon.updateHelperAvailability(autopilotEnabled: enabled)
}
// Check if file exists
guard FileManager.default.fileExists(atPath: path) else {
if isInitialLoad {
// First load, file missing → gen 0, no_config
let status = ConfigStatus(generation: 0, status: .noConfig)
await daemon.setConfigStatus(status)
await refreshHelperState(autopilotEnabled: false)
logger.info("No autopilot config file found at \(path, privacy: .public)")
} else {
// SIGHUP but file removed — increment gen, set no_config
let status = ConfigStatus(
generation: nextGeneration,
lastReload: Date(),
status: .noConfig
)
await daemon.setConfigStatus(status)
// Disable autopilot + webhooks
await autopilot.applyConfig(.empty)
await webhookAlerter.applyConfig(webhook: nil)
await refreshHelperState(autopilotEnabled: false)
logger.info("Autopilot config file removed — disabled (gen \(nextGeneration))")
}
return
}
// Enforce 0600 permissions and read file securely avoiding TOCTOU
let data: Data? = URL(fileURLWithPath: path).withUnsafeFileSystemRepresentation { pathPtr in
guard let pathPtr = pathPtr else { return nil }
let fd = open(pathPtr, O_RDONLY | O_NOFOLLOW | O_CLOEXEC)
guard fd >= 0 else { return nil }
fchmod(fd, 0o600)
let handle = FileHandle(fileDescriptor: fd, closeOnDealloc: true)
return try? handle.readToEnd()
}
guard let data = data else {
let status = ConfigStatus(
generation: nextGeneration,
lastReload: Date(),
status: .error,
error: "Failed to read config file"
)
await daemon.setConfigStatus(status)
await refreshHelperState(autopilotEnabled: nil)
logger.error("Failed to read autopilot config at \(path, privacy: .public)")
return
}