Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions sources/PTYSession.m
Original file line number Diff line number Diff line change
Expand Up @@ -3857,9 +3857,9 @@ - (void)writeTask:(NSString *)string
}

// This is run in PTYTask's thread. It parses the input here and then queues an async task to run
// in the main thread to execute the parsed tokens. This blocks when the queue of tokens gets too large.
- (void)threadedReadTask:(char *)buffer length:(int)length {
[_screen threadedReadTask:buffer length:length];
// in the main thread to execute the parsed tokens.
- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(dispatch_semaphore_t)semaphore {
[_screen threadedReadTask:buffer length:length semaphore:semaphore];
}

- (BOOL)haveResizedRecently {
Expand Down
7 changes: 5 additions & 2 deletions sources/PTYSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1013,12 +1013,15 @@ extension PTYSession {

channelClients.add(channelClient)
let session = newSession(forChannelID: channelClient.uid, command: command)!
let channelSemaphore = DispatchSemaphore(value: Int(iTermAdvancedSettingsModel.bufferDepth()))
let iobuffer = IOBuffer(fileDescriptor: channelClient.fd,
operationQueue: FileDescriptorMonitor.queue) { [weak session] in
operationQueue: FileDescriptorMonitor.queue) { [weak session, channelSemaphore] in
guard let session else { return }
// receive returns nil if only part of a segmented message is received.
if var data = channelClient.mux.receive() {
channelSemaphore.wait()
data.withUnsafeMutableBytes { (ptr: UnsafeMutableRawBufferPointer) in
session?.threadedReadTask(ptr.baseAddress!, length: Int32(ptr.count))
session.threadedReadTask(ptr.baseAddress!, length: Int32(ptr.count), semaphore: channelSemaphore)
}
}
} writeClosure: { data in
Expand Down
4 changes: 2 additions & 2 deletions sources/PTYTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
@protocol PTYTaskDelegate <NSObject>
// Runs in a background thread. Should do as much work as possible in this
// thread before kicking off a possibly async task in the main thread.
- (void)threadedReadTask:(char *)buffer length:(int)length;
- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(nullable dispatch_semaphore_t)semaphore;

// Runs in the same background task as -threadedReadTask:length:.
// Runs in the same background task as -threadedReadTask:length:semaphore:.
- (void)threadedTaskBrokenPipe;
- (void)brokenPipe; // Called in main thread
- (void)tmuxClientWrite:(NSData *)data;
Expand Down
9 changes: 7 additions & 2 deletions sources/PTYTask.m
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ @implementation PTYTask {
BOOL _isTmuxTask;
}

@synthesize currentReadSemaphore = _currentReadSemaphore;

- (instancetype)init {
self = [super init];
if (self) {
Expand Down Expand Up @@ -890,8 +892,11 @@ - (void)readTask:(char *)buffer length:(int)length {
}

// The delegate is responsible for parsing VT100 tokens here and sending them off to the
// main thread for execution. If its queues get too large, it can block.
[self.delegate threadedReadTask:buffer length:length];
// main thread for execution. The semaphore is passed downstream to TokenArray, which
// signals it when the batch is consumed.
dispatch_semaphore_t semaphore = self.currentReadSemaphore;
self.currentReadSemaphore = nil;
[self.delegate threadedReadTask:buffer length:length semaphore:semaphore];

@synchronized (self) {
if (coprocess_ && !self.sshIntegrationActive) {
Expand Down
3 changes: 3 additions & 0 deletions sources/TaskNotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ extern NSString *const kCoprocessStatusChangeNotification;
@property (nonatomic, readonly) BOOL hasBrokenPipe;
@property (atomic, readonly) BOOL sshIntegrationActive;

// Set by TaskNotifier before processRead; consumed by the read path and passed to TokenArray.
@property (atomic, strong, nullable) dispatch_semaphore_t currentReadSemaphore;

- (void)processRead;
- (void)processWrite;
// Called on any thread
Expand Down
24 changes: 24 additions & 0 deletions sources/TaskNotifier.m
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#import "TaskNotifier.h"
#import "Coprocess.h"
#import "DebugLogging.h"
#import "iTermAdvancedSettingsModel.h"

#include <sys/time.h>
#include <sys/select.h>
Expand All @@ -30,6 +31,10 @@ @implementation TaskNotifier

// A set of NSNumber*s holding pids of tasks that need to be wait()ed on
NSMutableSet* deadpool;

// Per-task backpressure semaphores, keyed by task identity.
// Created on register, removed on deregister.
NSMapTable<id<iTermTask>, dispatch_semaphore_t> *_backpressureSemaphores;
}


Expand All @@ -52,6 +57,7 @@ - (instancetype)init {
_tasks = [[NSMutableArray alloc] init];
tasksLock = [[NSRecursiveLock alloc] init];
tasksChanged = NO;
_backpressureSemaphores = [[NSMapTable strongToStrongObjectsMapTable] retain];

int unblockPipe[2];
if (pipe(unblockPipe) != 0) {
Expand All @@ -73,6 +79,7 @@ - (instancetype)init {

- (void)dealloc {
[_tasks release];
[_backpressureSemaphores release];
[tasksLock release];
[deadpool release];
close(unblockPipeR);
Expand All @@ -85,6 +92,8 @@ - (void)registerTask:(id<iTermTask>)task {
[tasksLock lock];
PtyTaskDebugLog(@"Add task at %p\n", (void*)task);
[_tasks addObject:task];
[_backpressureSemaphores setObject:dispatch_semaphore_create([iTermAdvancedSettingsModel bufferDepth])
forKey:task];
PtyTaskDebugLog(@"There are now %lu tasks\n", (unsigned long)_tasks.count);
tasksChanged = YES;
PtyTaskDebugLog(@"registerTask: unlock\n");
Expand All @@ -110,6 +119,7 @@ - (void)deregisterTask:(id<iTermTask>)task {
[deadpool addObject:@([[task coprocess] pid])];
}
[_tasks removeObject:task];
[_backpressureSemaphores removeObjectForKey:task];
tasksChanged = YES;
PtyTaskDebugLog(@"End remove task %p. There are now %lu tasks.\n",
(void *)task,
Expand Down Expand Up @@ -141,10 +151,24 @@ void UnblockTaskNotifier(void) {
- (BOOL)handleReadOnFileDescriptor:(int)fd task:(id<iTermTask>)task fdSet:(fd_set *)fdSet {
if (FD_ISSET(fd, fdSet)) {
PtyTaskDebugLog(@"run/processRead: unlock");
// Look up the per-task semaphore while holding the lock. Retain both the task and
// semaphore across the unlocked region since this is MRR code and deregisterTask:
// could remove them from the map while we're blocked.
dispatch_semaphore_t sem = [[_backpressureSemaphores objectForKey:task] retain];
[task retain];
[tasksLock unlock];
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
task.currentReadSemaphore = sem;
[task processRead];
// Signal if the semaphore was not consumed by the read path (e.g., brokenPipe or empty read).
if (task.currentReadSemaphore) {
dispatch_semaphore_signal(task.currentReadSemaphore);
task.currentReadSemaphore = nil;
}
PtyTaskDebugLog(@"run/processRead: lock");
[tasksLock lock];
[sem release];
[task release];
if (tasksChanged) {
PtyTaskDebugLog(@"Restart iteration\n");
tasksChanged = NO;
Expand Down
41 changes: 5 additions & 36 deletions sources/TokenExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ class TokenExecutor: NSObject {
impl.delegate = delegate
}
}
private let semaphore = DispatchSemaphore(value: Int(iTermAdvancedSettingsModel.bufferDepth()))
private let impl: TokenExecutorImpl
private let queue: DispatchQueue
private static let isTokenExecutorSpecificKey = DispatchSpecificKey<Bool>()
Expand Down Expand Up @@ -134,38 +133,18 @@ class TokenExecutor: NSObject {
queue.setSpecific(key: Self.isTokenExecutorSpecificKey, value: true)
impl = TokenExecutorImpl(terminal,
slownessDetector: slownessDetector,
semaphore: semaphore,
queue: queue)
}

// This takes ownership of vector.
// You can call this on any queue.
@objc
func addTokens(_ vector: CVector,
lengthTotal: Int,
lengthExcludingInBandSignaling: Int) {
addTokens(vector,
lengthTotal: lengthTotal,
lengthExcludingInBandSignaling: lengthExcludingInBandSignaling,
highPriority: false)
}

private static let addTokensTimingStats: TimingStats = {
TimingStats(name: "TokenExecutor")
}()
// Flip this to true to measure how much time the TaskNotifier thread spends busy (reading,
// parsing, and in select()) vs idle (blocked on TokenExecutor's semaphore).
private let enableTimingStats = false

// This takes ownership of vector.
// You can call this on any queue when not high priority.
// If high priority, then you must be on the main queue or have joined the main & mutation queue.
// This blocks when the queue of tokens gets too large.
@objc
func addTokens(_ vector: CVector,
lengthTotal: Int,
lengthExcludingInBandSignaling: Int,
highPriority: Bool) {
highPriority: Bool,
semaphore: DispatchSemaphore?) {
if gDebugLogging.boolValue { DLog("Add tokens with length \(lengthTotal) (excluding OOB: \(lengthExcludingInBandSignaling)), highpri=\(highPriority)") }
if lengthTotal == 0 {
return
Expand All @@ -180,18 +159,11 @@ class TokenExecutor: NSObject {
lengthTotal: lengthTotal,
lengthExcludingInBandSignaling: lengthExcludingInBandSignaling,
highPriority: highPriority,
semaphore: nil as DispatchSemaphore?)
semaphore: nil)
return
}
// Normal code path for tokens from PTY. Use the semaphore to give backpressure to reading.
let semaphore = self.semaphore
if enableTimingStats {
TokenExecutor.addTokensTimingStats.recordEnd()
}
_ = semaphore.wait(timeout: .distantFuture)
if enableTimingStats {
TokenExecutor.addTokensTimingStats.recordStart()
}
// Normal code path for tokens from PTY. The semaphore is provided by the caller:
// TaskNotifier for the PTY read path, PTYSession for the channel/mux path.
reallyAddTokens(vector,
lengthTotal: lengthTotal,
lengthExcludingInBandSignaling: lengthExcludingInBandSignaling,
Expand Down Expand Up @@ -314,7 +286,6 @@ private class TokenExecutorImpl {
private let terminal: VT100Terminal
private let queue: DispatchQueue
private let slownessDetector: SlownessDetector
private let semaphore: DispatchSemaphore
private var taskQueue = iTermTaskQueue()
private var sideEffects = iTermTaskQueue()
private let tokenQueue = TwoTierTokenQueue()
Expand Down Expand Up @@ -350,12 +321,10 @@ private class TokenExecutorImpl {

init(_ terminal: VT100Terminal,
slownessDetector: SlownessDetector,
semaphore: DispatchSemaphore,
queue: DispatchQueue) {
self.terminal = terminal
self.queue = queue
self.slownessDetector = slownessDetector
self.semaphore = semaphore
sideEffectScheduler = PeriodicScheduler(DispatchQueue.main, period: 1 / 30.0, action: { [weak self] in
guard let self = self else {
return
Expand Down
2 changes: 1 addition & 1 deletion sources/VT100Screen.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ typedef NS_ENUM(NSUInteger, VT100ScreenTriggerCheckType) {
- (void)sendPasswordInEchoProbe;
- (void)setEchoProbeDelegate:(id<iTermEchoProbeDelegate>)echoProbeDelegate;
- (void)resetEchoProbe;
- (void)threadedReadTask:(char *)buffer length:(int)length;
- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(nullable dispatch_semaphore_t)semaphore;

- (void)destructivelySetScreenWidth:(int)width
height:(int)height
Expand Down
4 changes: 2 additions & 2 deletions sources/VT100Screen.m
Original file line number Diff line number Diff line change
Expand Up @@ -1714,8 +1714,8 @@ - (void)injectData:(NSData *)data {
}

// Warning: this is called on PTYTask's thread.
- (void)threadedReadTask:(char *)buffer length:(int)length {
[_mutableState threadedReadTask:buffer length:length];
- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(dispatch_semaphore_t)semaphore {
[_mutableState threadedReadTask:buffer length:length semaphore:semaphore];
}

- (long long)lastPromptLine {
Expand Down
5 changes: 3 additions & 2 deletions sources/VT100ScreenMutableState.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,12 @@ basedAtAbsoluteLineNumber:(long long)absoluteLineNumber
@property (nonatomic) BOOL hasMuteCoprocess;
@property (nonatomic) BOOL suppressAllOutput;

- (void)threadedReadTask:(char *)buffer length:(int)length;
- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(nullable dispatch_semaphore_t)semaphore;
- (void)addTokens:(CVector)vector
lengthTotal:(int)lengthTotal
lengthExcludingInBandSignaling:(int)lengthExcludingInBandSignaling
highPriority:(BOOL)highPriority;
highPriority:(BOOL)highPriority
semaphore:(nullable dispatch_semaphore_t)semaphore;

- (void)scheduleTokenExecution;
- (void)injectData:(NSData *)data;
Expand Down
18 changes: 12 additions & 6 deletions sources/VT100ScreenMutableState.m
Original file line number Diff line number Diff line change
Expand Up @@ -4560,7 +4560,7 @@ - (void)highlightRun:(VT100GridRun)run
#pragma mark - Token Execution

// WARNING: This is called on PTYTask's thread.
- (void)threadedReadTask:(char *)buffer length:(int)length {
- (void)threadedReadTask:(char *)buffer length:(int)length semaphore:(dispatch_semaphore_t)semaphore {
// Pass the input stream to the parser.
[self.terminal.parser putStreamData:buffer length:length];

Expand All @@ -4571,26 +4571,31 @@ - (void)threadedReadTask:(char *)buffer length:(int)length {

if (CVectorCount(&vector) == 0) {
CVectorDestroy(&vector);
if (semaphore) {
dispatch_semaphore_signal(semaphore);
}
return;
}

[self addTokens:vector
lengthTotal:length
lengthExcludingInBandSignaling:nonSignalingLength
highPriority:NO];
highPriority:NO
semaphore:semaphore];
}

// WARNING: This is called on PTYTask's thread.
// This blocks when the queue of tokens gets too large.
- (void)addTokens:(CVector)vector
lengthTotal:(int)lengthTotal
lengthExcludingInBandSignaling:(int)lengthExcludingInBandSignaling
highPriority:(BOOL)highPriority {
highPriority:(BOOL)highPriority
semaphore:(dispatch_semaphore_t)semaphore {
[_echoProbe updateEchoProbeStateWithTokenCVector:&vector];
[_tokenExecutor addTokens:vector
lengthTotal:lengthTotal
lengthExcludingInBandSignaling:lengthExcludingInBandSignaling
highPriority:highPriority];
highPriority:highPriority
semaphore:semaphore];
}

- (void)scheduleTokenExecution {
Expand All @@ -4611,7 +4616,8 @@ - (void)injectData:(NSData *)data {
[self addTokens:vector
lengthTotal:data.length
lengthExcludingInBandSignaling:data.length
highPriority:YES];
highPriority:YES
semaphore:nil];
[self scheduleTokenExecution];
}

Expand Down
Loading