Add per-PTY dispatch sources and TaskNotifier bypass#580
Open
chall37 wants to merge 38 commits into
Open
Conversation
- Add BackpressureLevel enum (none, light, moderate, heavy, blocked) - Track available slots with atomic counter alongside DispatchSemaphore - Add onSemaphoreSignaled callback to TokenArray for slot release notification - Expose backpressureLevel property on TokenExecutor (callable from any queue) This enables adaptive behavior based on queue load (e.g., join timeouts).
…pass - Initialize availableSlots with correct value in property declaration using immediately-executed closure, avoiding race where backpressureLevel could return .blocked before init completes - Document that high-priority tokens intentionally bypass backpressure, so metric only reflects normal PTY token load
- Add Comparable conformance to BackpressureLevel for threshold comparisons - Add backpressureReleaseHandler callback (stub for future PTYTask integration) - Add TwoTierTokenQueue.discardAllAndReturnCount() for cleanup accounting - Call backpressureReleaseHandler when crossing out of heavy backpressure - Fix backpressureLevel to handle negative availableSlots These additions enable future fairness scheduler integration without changing current behavior - the handler starts as nil.
- Add FairnessScheduler.swift with round-robin busy list scheduling - Add useFairnessScheduler feature flag (default OFF) - Add FairnessSchedulerExecutor protocol conformance to TokenExecutor - Add executeTurn() with budget enforcement for scheduler - Add cleanupForUnregistration() for proper token cleanup - Add fairnessSessionId property for scheduler registration - Add notifyScheduler() with conditional dispatch based on flag - When flag OFF: legacy execute() behavior preserved - When flag ON: FairnessScheduler coordinates round-robin execution Tested with multi-tab stress test: - Flag ON: 18k-20k iterations/sec across 1-5 tabs - Flag OFF: 20k iterations/sec (legacy mode)
- Start nextSessionId at 1 so 0 can be "not registered" sentinel - Add setUseFairnessSchedulerForTesting: for unit test control - Add VT100ScreenMutableState registration with FairnessScheduler - Add feature flag gating tests proving flag controls code path - Add FairnessScheduler and TokenExecutor test suites - Add run_fairness_tests.sh test runner
- Added `private let useFairnessScheduler: Bool` to TokenExecutorImpl - notifyScheduler() now uses cached flag as sole decision point - Added assertion for programmer error: flag ON but not registered - Added #if ITERM_DEBUG test hook `testSkipNotifyScheduler` Test updates: - Fixed test classes with proper flag settings in setUp - Added cleanupForUnregistrationOnMutationQueue helper - Skipped tests requiring milestone 3 non-blocking model - Updated run_fairness_tests.sh with runtime warning
Use IdempotentOperationJoiner to coalesce multiple ensureExecutionScheduled() calls into a single async dispatch. The joiner handles the scheduling state atomically via setNeedsUpdate/updateIfNeeded pattern: - Multiple calls before dispatch coalesce into one - Calls during execution schedule a new dispatch (closure cleared before exec) - No manual flag management needed Removed executionScheduled from testReset() since the empty busyList guard in executeNextTurn() handles any stale dispatches.
Addresses PR comments 11-14:
1. Move tokenExecutorShouldQueueTokens() check to after high-priority tasks
and nil-delegate guard, matching execute() ordering
2. Wrap token enumeration in slownessDetector.measure() for instrumentation
3. Add DLog statements matching execute():
- "Will enumerate token arrays" before enumeration
- "Begin/Done executing a batch of tokens..." inside loop
- "Finished enumerating token arrays..." after enumeration
- Active session drain logging
4. Use labeled do{} block so defer { executeHighPriorityTasks() } fires
before completion() is called. Store result in local variable and use
break instead of return for early exits.
…ordering. Tests validate that high-priority tasks scheduled during token execution run before the completion callback. Comments are precise about scope: - testDeferFiresBeforeCompletionCallback: Verifies HP tasks run before completion, but does not isolate outer vs inner defer (both satisfy). - testHighPriorityTaskInDeferAddingTokensTriggersNotifyScheduler: Tests safety net ensuring tokens added by HP tasks get processed. Documents that sessionDidEnqueueWork uses queue.async so can race with sessionFinishedTurn (either path works). - testTurnResultReflectsTokenQueueStateAfterDefer: Documents (non-asserting) that turnResult is calculated before outer defer fires.
…bled - Move FairnessScheduler registration from init to setTerminalEnabled:YES for symmetry with unregistration in setTerminalEnabled:NO - cleanupForUnregistration() no longer discards tokens; they are preserved to support session revive and drain naturally when re-enabled - On revive, re-registration gets a fresh sessionId - Add TODO comments to tests that need adjustment for new behavior
…utation queue FairnessScheduler.register() was using .sync to the mutation queue to protect its internal state. This caused a deadlock when called from a "joined block" context (where the mutation queue is deliberately blocked waiting for the main thread). Solution: Use a private Mutex lock for scheduler bookkeeping instead of the mutation queue. This completely decouples scheduler synchronization from the join protocol. Cleanup still dispatches to mutation queue as required by TokenExecutor.cleanupForUnregistration(). Also includes related changes: - TokenExecutor: Add isRegistered flag to explicitly track registration state (separate from sessionId). Change notifyScheduler() to guard on isRegistered instead of asserting on sessionId, allowing tokens to accumulate before registration completes. - TokenExecutor: Add test hooks (testQueuedTokenCount, testTotalSlots, testAvailableSlots) for verifying token preservation in tests. - TwoTierTokenQueue: Add count property to support testQueuedTokenCount.
…ation - Add FairnessSchedulerSessionRestorationTests (4 tests) verifying re-registration after unregister, preserved token processing, and double-unregister safety - Add TokenExecutorSessionReviveTests (5 tests) covering full disable→preserve→ revive→drain cycle - Update TokenExecutorCleanupTests to verify tokens are preserved (not discarded) after cleanupForUnregistration(); reduce token count to 30 (within 40-slot limit) - Fix TokenExecutorAvailableSlotsBoundaryTests accounting assertions to match new preservation behavior - Add missing isRegistered=true after registration in multiple test classes - Add testFinished guard to prevent async polling crash after test timeout - Enable fairness scheduler flag in TokenExecutorLegacyRemovalTests - Register new test classes in run_fairness_tests.sh
- Add timeouts and state assertions to concurrency tests (testConcurrentRegisterAndUnregister, testConcurrentEnqueueAndUnregister) - Add cleanupCallCount tracking to verify double-unregister does not call cleanup twice (testDoubleUnregister) - Replace timing-dependent inverted expectations with deterministic mutation queue syncs for no-execution tests - Rename testBackgroundSessionGetsEqualTurns to match actual behavior (testBackgroundSessionCanProcessTokens) - Rename testNoDuplicateNotificationsForBusySession to testRapidAddTokensAllProcessed with direct byte count verification - Delete non-asserting testTurnResultReflectsTokenQueueStateAfterDefer - Delete obsolete skipped tests (testLegacyPathWhenSessionIdIsZero, testCodePathDiffersBetweenRegisteredAndUnregistered) - Delete TwoTierTokenQueueTests class (tests dead discardAllAndReturnCount)
These methods were added but never called. TokenExecutor uses removeAll() in invalidate() and preserves tokens in cleanupForUnregistration().
…nce regression The previous deadlock fix (be3df18) used a Mutex for all state access, but this caused ~13% throughput regression on the hot path (sessionDidEnqueueWork). The deadlock only affected register() which used .sync dispatch to the mutation queue. The hot path was always called from the mutation queue via async dispatch, so it never could have deadlocked. This change: - Uses Mutex only for ID allocation in register() (instant, no queue dispatch) - Dispatches session creation async to mutation queue (avoids joined block deadlock) - Restores async dispatch for sessionDidEnqueueWork() and all scheduling state - Manages busyList/busySet directly on mutation queue (no lock needed) Performance improvement: ~6-9% throughput increase, bringing the fairness scheduler within 4-8% of production baseline. Future optimization: Consider an atomic 'hasBusyWork' flag on enqueue, only doing lock work when transitioning 0→1. This might reduce contention enough to make the synchronous path viable again.
Doubles the per-turn token budget to reduce turn overhead. With the previous 500 token budget, throughput was ~7% below production in low-contention scenarios (2 tabs). Doubling to 1000 brings performance within 1% of production. TODO: This fixed budget may not translate well to slower CPUs. Should be tested on minimum supported hardware for validation. Consider investigating adaptive token budgets based on throughput to auto-tune for different hardware capabilities.
…king - Use synchronous completion callback in FairnessScheduler to eliminate unnecessary async dispatch per turn (completion is already on mutationQueue) - Add dispatchPrecondition to verify threading contract in DEBUG builds - Skip legacy activeSessionsWithTokens tracking when using FairnessScheduler (round-robin scheduling already handles prioritization) - Document threading contract for executeTurn protocol method - Update tests to call completion on mutation queue per new contract
…hods Document that cleanupForUnregistration() is called on mutationQueue in the protocol definition and implementations.
Without this, notifyScheduler() early-returns and tokens are never scheduled for execution, resulting in no terminal output.
Documents thread safety requirements for all member variables in TokenExecutorImpl to match the documentation standard in FairnessScheduler. - taskQueue, sideEffects: Thread-safe via iTermTaskQueue internal locking - tokenQueue, executingCount, commit: Access on mutation queue only - pauseCount: Thread-safe via atomic operations - executingSideEffects: Thread-safe via MutableAtomicObject - sideEffectScheduler: Period modified on mutation queue - throughputEstimator: addByteCount from any queue - isRegistered: Access on mutation queue only
…path Replace TaskNotifier's single-threaded select() loop with per-PTY dispatch sources when FairnessScheduler is enabled. Each PTY now has dedicated read/write dispatch sources that feed directly into the token processing pipeline. Key changes: - Per-PTY dispatch sources with backpressure-aware suspend/resume - Tasks using dispatch sources skip TaskNotifier registration - Deadpool handling preserved for process cleanup - Synchronous completion callback in FairnessScheduler (eliminates unnecessary async dispatch per turn) - Skip legacy activeSessionsWithTokens tracking when using FairnessScheduler - Document threading contract for executeTurn protocol method This eliminates the TaskNotifier bottleneck and enables parallel reads across PTYs while maintaining serialized token processing.
Upstream added two new delegate methods that FakeSession needs to implement: - screenOffscreenCommandLineShouldBeVisibleForCurrentCommand - screenUpdateBlock:action:
The threading contract verification for the completion callback should only run during tests, not in regular debug builds. ITERM_DEBUG ensures the check runs when tests execute but avoids runtime overhead in both development and release builds.
1. Replace Mutex with iTermAtomicInt64 for session ID allocation - Lock-free atomic increment instead of mutex sync - Simpler and more appropriate for single counter 2. Extract BusyQueue type to encapsulate busyList/busySet - Enforces invariant that set and list stay in sync - Uses it_fatalError to catch duplicate enqueue attempts - Cleaner API: enqueue/dequeue/contains/removeFromSet 3. Remove cleanupForUnregistration from protocol (YAGNI) - Method existed but intentionally did nothing - Tokens are preserved on unregister for session revival - No cleanup hook needed if there is nothing to clean up
…Queue Three changes addressing fairness scheduler gaps: 1. Coprocess dispatch sources (PTYTask.m): When useFairnessScheduler is active, tasks aren't in TaskNotifier's _tasks array, so the select() loop never monitors coprocess FDs. Add per-PTY dispatch sources for coprocess read/write on _ioQueue, with lifecycle management tied to setCoprocess:/stopCoprocess. Includes writeBufferHasRoom re-evaluation hook in handleWriteEvent to prevent coprocess read stalls. 2. Non-blocking addTokens (TokenExecutor.swift, TokenArray.swift): The semaphore in addTokens() blocks _ioQueue when the fairness scheduler is active, stalling writes and coprocess I/O for the affected PTY. PTYTask already handles backpressure via dispatch source suspend/resume (shouldRead checks backpressureLevel < .heavy), making the semaphore redundant. Skip semaphore.wait() in the fairness path; decouple the slot-release callback from semaphore presence so availableSlots accounting works without a semaphore. 3. Fix race where executeTurn with nil delegate destroys preserved tokens (TokenExecutor.swift): When setTerminalEnabled:NO runs during a joined block, it sets delegate=nil and isRegistered=false. If executeTurn was already scheduled, it finds delegate==nil and called removeAll(), destroying tokens that should survive for session revive. Guard removeAll() on isRegistered so tokens are preserved when disabled. Enables 6 previously-skipped tests and adds 1 regression test (103 total).
When a session is unregistered while its executeTurn is still running, sessionFinishedTurn hits the guard on sessions[sessionId] and returns without calling ensureExecutionScheduled(). Other sessions waiting in busyQueue stall indefinitely until an external event triggers scheduling. Call ensureExecutionScheduled() in the early-return path so the scheduler continues pumping after mid-flight unregistration. Adds regression test that verifies session B gets its turn after session A is unregistered during execution.
…DEBUG guards, fix races Tests that create TokenExecutor instances need useFairnessScheduler enabled BEFORE init (the flag is cached). Without it, addTokens takes the legacy semaphore-blocking path causing deadlocks after 40 tokens, and useDispatchSource returns NO preventing coprocess dispatch source setup. - Add setUseFairnessSchedulerForTesting(true) setUp/tearDown to test classes that depend on fairness scheduler behavior - Remove #if ITERM_DEBUG guard from effectiveIoAllowed in PTYTask.m (made testIoAllowedOverride dead code in test target) - Remove all #if ITERM_DEBUG guards from FairnessSchedulerTests.swift and TokenExecutorFairnessTests.swift (silently skipping test code) - Fix testReadSourceResumesWhenBackpressureDrops: add MockTokenExecutorDelegate so executeTurn actually processes tokens, use mutationQueue - Fix testWriteTaskWithRealJobManager: move shouldWrite assertion before dispatch source setup to avoid race with write source draining buffer - Fix executeTurn calls to run on mutationQueue (dispatchPrecondition) - Add testSkipNotifyScheduler to prevent deadlocks during bulk token loading - Use timestamped result bundle paths in run_fairness_tests.sh - Add milestone 3-5 test files, mocks, and PTYSession wiring tests
Verifies that taskDidRegister: (which wires the tokenExecutor) runs before setupDispatchSources, so shouldRead sees backpressure at source creation time rather than falling through the executor==nil path.
Reorder didRegister to call taskDidRegister: before setupDispatchSources. taskDidRegister: wires task.tokenExecutor; without it, shouldRead sees executor==nil and skips backpressure, allowing unconditional reads.
Annotate all new methods and ivars with queue affinity comments following codebase conventions. Add ITDebugAssert queue checks to dispatch source event handlers on _ioQueue.
…essure The existing didRegister ordering test only checks source suspension state. This new test preloads data on the pipe before registration and verifies that no threadedReadTask callback leaks through the brief dispatch_resume/ dispatch_suspend window in setupDispatchSources when backpressure is heavy.
This was referenced Feb 12, 2026
Owner
|
This note is mostly to jog my own memory: we plan to a new class instead of modifying TaskNotifier. It can be written in Swift and be nice and clean. We'll still need to switch between the two based on the advanced setting since all this stuff has to be enabled gradually. Also, let's make that PR stand alone — this has gotten big enough that my head hurts. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR replaces TaskNotifier's single-threaded
select()loop with per-PTY GCD dispatch sources for sessions using the fairness scheduler. It completes the I/O pipeline started in PR #567 (backpressure) and PR #568 (scheduler + executor), closing the circuit so that backpressure actually controls reading.Motivation
With PR #568, the scheduler controls execution order, but reading still goes through TaskNotifier's monolithic select() loop. Sessions accumulate tokens faster than the scheduler drains them. Per-PTY dispatch sources let each session's I/O be individually suspended/resumed based on backpressure level, so a session under heavy backpressure stops reading from its PTY until the scheduler catches up.
Changes
setupDispatchSourcescreates per-fd read/write GCD sources on a private serial queue (_ioQueue). Read source starts suspended and is controlled byupdateReadSourceStatebased onshouldRead(not paused, I/O allowed, backpressure < heavy). Write source activates whenwriteBufferhas data.shouldReadcheckstokenExecutor.backpressureLevel < .heavy. When backpressure becomes heavy, the read source suspends; when thebackpressureReleaseHandlerfires (wired in PR Add backpressure infrastructure for fairness scheduler integration #567), it resumes.didRegistercallstaskDidRegister:(which wirestokenExecutor) beforesetupDispatchSources, ensuringshouldReadsees backpressure state from the first event.useDispatchSourceprotocol method) are excluded fromselect()fd sets. TaskNotifier still handles legacy sessions, process status monitoring, and the run loop.readTask:length:).teardownDispatchSourcescancels both sources with proper suspension-count handling (GCD requires sources to be resumed before cancel). Called fromderegisterFileDescriptor:andcloseFileDescriptor.Benchmarks (5 tabs, 15s, Deployment build)
With interaction injection (keyboard + scroll + tab switch):
The fairness scheduler trades ~4-6% raw throughput for significantly higher frame rates, especially under interactive load.
Seam
All dispatch source code is gated by
useFairnessScheduler:Test Coverage
PR Sequence
Depends on #568.