From 4152be26c8961b403ee437cc432ea2aba168feda Mon Sep 17 00:00:00 2001 From: chall37 Date: Mon, 9 Mar 2026 10:22:50 -0700 Subject: [PATCH] Hoist backpressure semaphore ownership out of TokenExecutor Move creation and wait-site of the backpressure semaphore from TokenExecutor into its two callers: TaskNotifier (PTY read path) and PTYSession (channel/mux path). TokenExecutor no longer owns any semaphore; it receives one via the semaphore: parameter and passes it through to TokenArray, which signals it on consumption exactly as before. TaskNotifier creates a per-task semaphore on registration and looks it up (under tasksLock) before each read. PTYSession creates one per channel session. No semantic change intended. --- sources/PTYSession.m | 6 ++--- sources/PTYSession.swift | 7 ++++-- sources/PTYTask.h | 4 +-- sources/PTYTask.m | 9 +++++-- sources/TaskNotifier.h | 3 +++ sources/TaskNotifier.m | 24 ++++++++++++++++++ sources/TokenExecutor.swift | 41 ++++--------------------------- sources/VT100Screen.h | 2 +- sources/VT100Screen.m | 4 +-- sources/VT100ScreenMutableState.h | 5 ++-- sources/VT100ScreenMutableState.m | 18 +++++++++----- 11 files changed, 67 insertions(+), 56 deletions(-) diff --git a/sources/PTYSession.m b/sources/PTYSession.m index bd898b1f5b..bae5256c1a 100644 --- a/sources/PTYSession.m +++ b/sources/PTYSession.m @@ -3857,9 +3857,9 @@ - (void)writeTask:(NSString *)string } // This is run in PTYTask's thread. It parses the input here and then queues an async task to run -// in the main thread to execute the parsed tokens. This blocks when the queue of tokens gets too large. -- (void)threadedReadTask:(char *)buffer length:(int)length { - [_screen threadedReadTask:buffer length:length]; +// in the main thread to execute the parsed tokens. +- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(dispatch_semaphore_t)semaphore { + [_screen threadedReadTask:buffer length:length semaphore:semaphore]; } - (BOOL)haveResizedRecently { diff --git a/sources/PTYSession.swift b/sources/PTYSession.swift index 8a5a3cd80f..1b6222a54a 100644 --- a/sources/PTYSession.swift +++ b/sources/PTYSession.swift @@ -1013,12 +1013,15 @@ extension PTYSession { channelClients.add(channelClient) let session = newSession(forChannelID: channelClient.uid, command: command)! + let channelSemaphore = DispatchSemaphore(value: Int(iTermAdvancedSettingsModel.bufferDepth())) let iobuffer = IOBuffer(fileDescriptor: channelClient.fd, - operationQueue: FileDescriptorMonitor.queue) { [weak session] in + operationQueue: FileDescriptorMonitor.queue) { [weak session, channelSemaphore] in + guard let session else { return } // receive returns nil if only part of a segmented message is received. if var data = channelClient.mux.receive() { + channelSemaphore.wait() data.withUnsafeMutableBytes { (ptr: UnsafeMutableRawBufferPointer) in - session?.threadedReadTask(ptr.baseAddress!, length: Int32(ptr.count)) + session.threadedReadTask(ptr.baseAddress!, length: Int32(ptr.count), semaphore: channelSemaphore) } } } writeClosure: { data in diff --git a/sources/PTYTask.h b/sources/PTYTask.h index 1137f075a2..b8508d397e 100644 --- a/sources/PTYTask.h +++ b/sources/PTYTask.h @@ -21,9 +21,9 @@ @protocol PTYTaskDelegate // Runs in a background thread. Should do as much work as possible in this // thread before kicking off a possibly async task in the main thread. -- (void)threadedReadTask:(char *)buffer length:(int)length; +- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(nullable dispatch_semaphore_t)semaphore; -// Runs in the same background task as -threadedReadTask:length:. +// Runs in the same background task as -threadedReadTask:length:semaphore:. - (void)threadedTaskBrokenPipe; - (void)brokenPipe; // Called in main thread - (void)tmuxClientWrite:(NSData *)data; diff --git a/sources/PTYTask.m b/sources/PTYTask.m index 0a1719c39d..1187050a56 100644 --- a/sources/PTYTask.m +++ b/sources/PTYTask.m @@ -74,6 +74,8 @@ @implementation PTYTask { BOOL _isTmuxTask; } +@synthesize currentReadSemaphore = _currentReadSemaphore; + - (instancetype)init { self = [super init]; if (self) { @@ -890,8 +892,11 @@ - (void)readTask:(char *)buffer length:(int)length { } // The delegate is responsible for parsing VT100 tokens here and sending them off to the - // main thread for execution. If its queues get too large, it can block. - [self.delegate threadedReadTask:buffer length:length]; + // main thread for execution. The semaphore is passed downstream to TokenArray, which + // signals it when the batch is consumed. + dispatch_semaphore_t semaphore = self.currentReadSemaphore; + self.currentReadSemaphore = nil; + [self.delegate threadedReadTask:buffer length:length semaphore:semaphore]; @synchronized (self) { if (coprocess_ && !self.sshIntegrationActive) { diff --git a/sources/TaskNotifier.h b/sources/TaskNotifier.h index 5ff9f1e6b2..e3eec995f0 100644 --- a/sources/TaskNotifier.h +++ b/sources/TaskNotifier.h @@ -23,6 +23,9 @@ extern NSString *const kCoprocessStatusChangeNotification; @property (nonatomic, readonly) BOOL hasBrokenPipe; @property (atomic, readonly) BOOL sshIntegrationActive; +// Set by TaskNotifier before processRead; consumed by the read path and passed to TokenArray. +@property (atomic, strong, nullable) dispatch_semaphore_t currentReadSemaphore; + - (void)processRead; - (void)processWrite; // Called on any thread diff --git a/sources/TaskNotifier.m b/sources/TaskNotifier.m index 11bc3765b1..7a4b22fe51 100644 --- a/sources/TaskNotifier.m +++ b/sources/TaskNotifier.m @@ -9,6 +9,7 @@ #import "TaskNotifier.h" #import "Coprocess.h" #import "DebugLogging.h" +#import "iTermAdvancedSettingsModel.h" #include #include @@ -30,6 +31,10 @@ @implementation TaskNotifier // A set of NSNumber*s holding pids of tasks that need to be wait()ed on NSMutableSet* deadpool; + + // Per-task backpressure semaphores, keyed by task identity. + // Created on register, removed on deregister. + NSMapTable, dispatch_semaphore_t> *_backpressureSemaphores; } @@ -52,6 +57,7 @@ - (instancetype)init { _tasks = [[NSMutableArray alloc] init]; tasksLock = [[NSRecursiveLock alloc] init]; tasksChanged = NO; + _backpressureSemaphores = [[NSMapTable strongToStrongObjectsMapTable] retain]; int unblockPipe[2]; if (pipe(unblockPipe) != 0) { @@ -73,6 +79,7 @@ - (instancetype)init { - (void)dealloc { [_tasks release]; + [_backpressureSemaphores release]; [tasksLock release]; [deadpool release]; close(unblockPipeR); @@ -85,6 +92,8 @@ - (void)registerTask:(id)task { [tasksLock lock]; PtyTaskDebugLog(@"Add task at %p\n", (void*)task); [_tasks addObject:task]; + [_backpressureSemaphores setObject:dispatch_semaphore_create([iTermAdvancedSettingsModel bufferDepth]) + forKey:task]; PtyTaskDebugLog(@"There are now %lu tasks\n", (unsigned long)_tasks.count); tasksChanged = YES; PtyTaskDebugLog(@"registerTask: unlock\n"); @@ -110,6 +119,7 @@ - (void)deregisterTask:(id)task { [deadpool addObject:@([[task coprocess] pid])]; } [_tasks removeObject:task]; + [_backpressureSemaphores removeObjectForKey:task]; tasksChanged = YES; PtyTaskDebugLog(@"End remove task %p. There are now %lu tasks.\n", (void *)task, @@ -141,10 +151,24 @@ void UnblockTaskNotifier(void) { - (BOOL)handleReadOnFileDescriptor:(int)fd task:(id)task fdSet:(fd_set *)fdSet { if (FD_ISSET(fd, fdSet)) { PtyTaskDebugLog(@"run/processRead: unlock"); + // Look up the per-task semaphore while holding the lock. Retain both the task and + // semaphore across the unlocked region since this is MRR code and deregisterTask: + // could remove them from the map while we're blocked. + dispatch_semaphore_t sem = [[_backpressureSemaphores objectForKey:task] retain]; + [task retain]; [tasksLock unlock]; + dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); + task.currentReadSemaphore = sem; [task processRead]; + // Signal if the semaphore was not consumed by the read path (e.g., brokenPipe or empty read). + if (task.currentReadSemaphore) { + dispatch_semaphore_signal(task.currentReadSemaphore); + task.currentReadSemaphore = nil; + } PtyTaskDebugLog(@"run/processRead: lock"); [tasksLock lock]; + [sem release]; + [task release]; if (tasksChanged) { PtyTaskDebugLog(@"Restart iteration\n"); tasksChanged = NO; diff --git a/sources/TokenExecutor.swift b/sources/TokenExecutor.swift index ea2423f9fc..8c91b2a071 100644 --- a/sources/TokenExecutor.swift +++ b/sources/TokenExecutor.swift @@ -101,7 +101,6 @@ class TokenExecutor: NSObject { impl.delegate = delegate } } - private let semaphore = DispatchSemaphore(value: Int(iTermAdvancedSettingsModel.bufferDepth())) private let impl: TokenExecutorImpl private let queue: DispatchQueue private static let isTokenExecutorSpecificKey = DispatchSpecificKey() @@ -134,38 +133,18 @@ class TokenExecutor: NSObject { queue.setSpecific(key: Self.isTokenExecutorSpecificKey, value: true) impl = TokenExecutorImpl(terminal, slownessDetector: slownessDetector, - semaphore: semaphore, queue: queue) } - // This takes ownership of vector. - // You can call this on any queue. - @objc - func addTokens(_ vector: CVector, - lengthTotal: Int, - lengthExcludingInBandSignaling: Int) { - addTokens(vector, - lengthTotal: lengthTotal, - lengthExcludingInBandSignaling: lengthExcludingInBandSignaling, - highPriority: false) - } - - private static let addTokensTimingStats: TimingStats = { - TimingStats(name: "TokenExecutor") - }() - // Flip this to true to measure how much time the TaskNotifier thread spends busy (reading, - // parsing, and in select()) vs idle (blocked on TokenExecutor's semaphore). - private let enableTimingStats = false - // This takes ownership of vector. // You can call this on any queue when not high priority. // If high priority, then you must be on the main queue or have joined the main & mutation queue. - // This blocks when the queue of tokens gets too large. @objc func addTokens(_ vector: CVector, lengthTotal: Int, lengthExcludingInBandSignaling: Int, - highPriority: Bool) { + highPriority: Bool, + semaphore: DispatchSemaphore?) { if gDebugLogging.boolValue { DLog("Add tokens with length \(lengthTotal) (excluding OOB: \(lengthExcludingInBandSignaling)), highpri=\(highPriority)") } if lengthTotal == 0 { return @@ -180,18 +159,11 @@ class TokenExecutor: NSObject { lengthTotal: lengthTotal, lengthExcludingInBandSignaling: lengthExcludingInBandSignaling, highPriority: highPriority, - semaphore: nil as DispatchSemaphore?) + semaphore: nil) return } - // Normal code path for tokens from PTY. Use the semaphore to give backpressure to reading. - let semaphore = self.semaphore - if enableTimingStats { - TokenExecutor.addTokensTimingStats.recordEnd() - } - _ = semaphore.wait(timeout: .distantFuture) - if enableTimingStats { - TokenExecutor.addTokensTimingStats.recordStart() - } + // Normal code path for tokens from PTY. The semaphore is provided by the caller: + // TaskNotifier for the PTY read path, PTYSession for the channel/mux path. reallyAddTokens(vector, lengthTotal: lengthTotal, lengthExcludingInBandSignaling: lengthExcludingInBandSignaling, @@ -314,7 +286,6 @@ private class TokenExecutorImpl { private let terminal: VT100Terminal private let queue: DispatchQueue private let slownessDetector: SlownessDetector - private let semaphore: DispatchSemaphore private var taskQueue = iTermTaskQueue() private var sideEffects = iTermTaskQueue() private let tokenQueue = TwoTierTokenQueue() @@ -350,12 +321,10 @@ private class TokenExecutorImpl { init(_ terminal: VT100Terminal, slownessDetector: SlownessDetector, - semaphore: DispatchSemaphore, queue: DispatchQueue) { self.terminal = terminal self.queue = queue self.slownessDetector = slownessDetector - self.semaphore = semaphore sideEffectScheduler = PeriodicScheduler(DispatchQueue.main, period: 1 / 30.0, action: { [weak self] in guard let self = self else { return diff --git a/sources/VT100Screen.h b/sources/VT100Screen.h index 876219c75d..17c1fe66a8 100644 --- a/sources/VT100Screen.h +++ b/sources/VT100Screen.h @@ -262,7 +262,7 @@ typedef NS_ENUM(NSUInteger, VT100ScreenTriggerCheckType) { - (void)sendPasswordInEchoProbe; - (void)setEchoProbeDelegate:(id)echoProbeDelegate; - (void)resetEchoProbe; -- (void)threadedReadTask:(char *)buffer length:(int)length; +- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(nullable dispatch_semaphore_t)semaphore; - (void)destructivelySetScreenWidth:(int)width height:(int)height diff --git a/sources/VT100Screen.m b/sources/VT100Screen.m index ca40e61fd9..09f745aeea 100644 --- a/sources/VT100Screen.m +++ b/sources/VT100Screen.m @@ -1714,8 +1714,8 @@ - (void)injectData:(NSData *)data { } // Warning: this is called on PTYTask's thread. -- (void)threadedReadTask:(char *)buffer length:(int)length { - [_mutableState threadedReadTask:buffer length:length]; +- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(dispatch_semaphore_t)semaphore { + [_mutableState threadedReadTask:buffer length:length semaphore:semaphore]; } - (long long)lastPromptLine { diff --git a/sources/VT100ScreenMutableState.h b/sources/VT100ScreenMutableState.h index 7becbca652..43777c90fc 100644 --- a/sources/VT100ScreenMutableState.h +++ b/sources/VT100ScreenMutableState.h @@ -363,11 +363,12 @@ basedAtAbsoluteLineNumber:(long long)absoluteLineNumber @property (nonatomic) BOOL hasMuteCoprocess; @property (nonatomic) BOOL suppressAllOutput; -- (void)threadedReadTask:(char *)buffer length:(int)length; +- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(nullable dispatch_semaphore_t)semaphore; - (void)addTokens:(CVector)vector lengthTotal:(int)lengthTotal lengthExcludingInBandSignaling:(int)lengthExcludingInBandSignaling - highPriority:(BOOL)highPriority; + highPriority:(BOOL)highPriority + semaphore:(nullable dispatch_semaphore_t)semaphore; - (void)scheduleTokenExecution; - (void)injectData:(NSData *)data; diff --git a/sources/VT100ScreenMutableState.m b/sources/VT100ScreenMutableState.m index f6797da1c4..41127afb2a 100644 --- a/sources/VT100ScreenMutableState.m +++ b/sources/VT100ScreenMutableState.m @@ -4560,7 +4560,7 @@ - (void)highlightRun:(VT100GridRun)run #pragma mark - Token Execution // WARNING: This is called on PTYTask's thread. -- (void)threadedReadTask:(char *)buffer length:(int)length { +- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(dispatch_semaphore_t)semaphore { // Pass the input stream to the parser. [self.terminal.parser putStreamData:buffer length:length]; @@ -4571,26 +4571,31 @@ - (void)threadedReadTask:(char *)buffer length:(int)length { if (CVectorCount(&vector) == 0) { CVectorDestroy(&vector); + if (semaphore) { + dispatch_semaphore_signal(semaphore); + } return; } [self addTokens:vector lengthTotal:length lengthExcludingInBandSignaling:nonSignalingLength - highPriority:NO]; + highPriority:NO + semaphore:semaphore]; } // WARNING: This is called on PTYTask's thread. -// This blocks when the queue of tokens gets too large. - (void)addTokens:(CVector)vector lengthTotal:(int)lengthTotal lengthExcludingInBandSignaling:(int)lengthExcludingInBandSignaling - highPriority:(BOOL)highPriority { + highPriority:(BOOL)highPriority + semaphore:(dispatch_semaphore_t)semaphore { [_echoProbe updateEchoProbeStateWithTokenCVector:&vector]; [_tokenExecutor addTokens:vector lengthTotal:lengthTotal lengthExcludingInBandSignaling:lengthExcludingInBandSignaling - highPriority:highPriority]; + highPriority:highPriority + semaphore:semaphore]; } - (void)scheduleTokenExecution { @@ -4611,7 +4616,8 @@ - (void)injectData:(NSData *)data { [self addTokens:vector lengthTotal:data.length lengthExcludingInBandSignaling:data.length - highPriority:YES]; + highPriority:YES + semaphore:nil]; [self scheduleTokenExecution]; }