Skip to content

Add per-PTY dispatch sources and TaskNotifier bypass#580

Open
chall37 wants to merge 38 commits into
gnachman:masterfrom
chall37:feat/fairness-scheduler-v3
Open

Add per-PTY dispatch sources and TaskNotifier bypass#580
chall37 wants to merge 38 commits into
gnachman:masterfrom
chall37:feat/fairness-scheduler-v3

Conversation

@chall37
Copy link
Copy Markdown
Contributor

@chall37 chall37 commented Feb 12, 2026

Summary

This PR replaces TaskNotifier's single-threaded select() loop with per-PTY GCD dispatch sources for sessions using the fairness scheduler. It completes the I/O pipeline started in PR #567 (backpressure) and PR #568 (scheduler + executor), closing the circuit so that backpressure actually controls reading.

Motivation

With PR #568, the scheduler controls execution order, but reading still goes through TaskNotifier's monolithic select() loop. Sessions accumulate tokens faster than the scheduler drains them. Per-PTY dispatch sources let each session's I/O be individually suspended/resumed based on backpressure level, so a session under heavy backpressure stops reading from its PTY until the scheduler catches up.

Changes

  • PTYTask dispatch sourcessetupDispatchSources creates per-fd read/write GCD sources on a private serial queue (_ioQueue). Read source starts suspended and is controlled by updateReadSourceState based on shouldRead (not paused, I/O allowed, backpressure < heavy). Write source activates when writeBuffer has data.
  • Backpressure-gated readingshouldRead checks tokenExecutor.backpressureLevel < .heavy. When backpressure becomes heavy, the read source suspends; when the backpressureReleaseHandler fires (wired in PR Add backpressure infrastructure for fairness scheduler integration #567), it resumes.
  • Registration orderingdidRegister calls taskDidRegister: (which wires tokenExecutor) before setupDispatchSources, ensuring shouldRead sees backpressure state from the first event.
  • TaskNotifier bypass — sessions using dispatch sources (useDispatchSource protocol method) are excluded from select() fd sets. TaskNotifier still handles legacy sessions, process status monitoring, and the run loop.
  • Coprocess I/O — dispatch source read handler routes PTY output to coprocess (same as legacy readTask:length:).
  • LifecycleteardownDispatchSources cancels both sources with proper suspension-count handling (GCD requires sources to be resumed before cancel). Called from deregisterFileDescriptor: and closeFileDescriptor.

Benchmarks (5 tabs, 15s, Deployment build)

Metric Legacy Fairness Delta
Iteration rate 18,392/s 17,738/s -3.6%
Metal FPS 23 37 +61%
Refresh FPS 24 28 +17%
Content frames 43,443 50,335 +16%

With interaction injection (keyboard + scroll + tab switch):

Metric Legacy Fairness Delta
Iteration rate 20,145/s 18,893/s -6.2%
Metal FPS 10 36 +260%
Refresh FPS 17 29 +71%

The fairness scheduler trades ~4-6% raw throughput for significantly higher frame rates, especially under interactive load.

Seam

All dispatch source code is gated by useFairnessScheduler:

Test Coverage

  • PTYTaskDispatchSourceLifecycleTests — source creation, teardown, double-teardown safety
  • PTYTaskReadStateTests — shouldRead predicate under all backpressure levels
  • PTYTaskWriteStateTests — write source activation with buffer data
  • PTYTaskEventHandlerTests — read handler pipeline, data delivery to delegate
  • PTYTaskBackpressureIntegrationTests — backpressure suspend/resume, registration ordering, preloaded-data race test
  • PTYTaskEOFTests — broken pipe detection, EOF with buffered data
  • TaskNotifierDispatchSourceProtocolTests — fd exclusion from select()
  • IntegrationTests — end-to-end registration through TaskNotifier + real session wiring

PR Sequence

PR Content Status
#567 Backpressure infrastructure Closed
#568 FairnessScheduler + TokenExecutor integration Closed
This Per-PTY dispatch sources + TaskNotifier bypass Open

Depends on #568.

- Add BackpressureLevel enum (none, light, moderate, heavy, blocked)
- Track available slots with atomic counter alongside DispatchSemaphore
- Add onSemaphoreSignaled callback to TokenArray for slot release notification
- Expose backpressureLevel property on TokenExecutor (callable from any queue)

This enables adaptive behavior based on queue load (e.g., join timeouts).
…pass

- Initialize availableSlots with correct value in property declaration using
  immediately-executed closure, avoiding race where backpressureLevel could
  return .blocked before init completes
- Document that high-priority tokens intentionally bypass backpressure, so
  metric only reflects normal PTY token load
- Add Comparable conformance to BackpressureLevel for threshold comparisons
- Add backpressureReleaseHandler callback (stub for future PTYTask integration)
- Add TwoTierTokenQueue.discardAllAndReturnCount() for cleanup accounting
- Call backpressureReleaseHandler when crossing out of heavy backpressure
- Fix backpressureLevel to handle negative availableSlots

These additions enable future fairness scheduler integration without
changing current behavior - the handler starts as nil.
- Add FairnessScheduler.swift with round-robin busy list scheduling
- Add useFairnessScheduler feature flag (default OFF)
- Add FairnessSchedulerExecutor protocol conformance to TokenExecutor
- Add executeTurn() with budget enforcement for scheduler
- Add cleanupForUnregistration() for proper token cleanup
- Add fairnessSessionId property for scheduler registration
- Add notifyScheduler() with conditional dispatch based on flag
- When flag OFF: legacy execute() behavior preserved
- When flag ON: FairnessScheduler coordinates round-robin execution

Tested with multi-tab stress test:
- Flag ON:  18k-20k iterations/sec across 1-5 tabs
- Flag OFF: 20k iterations/sec (legacy mode)
- Start nextSessionId at 1 so 0 can be "not registered" sentinel
- Add setUseFairnessSchedulerForTesting: for unit test control
- Add VT100ScreenMutableState registration with FairnessScheduler
- Add feature flag gating tests proving flag controls code path
- Add FairnessScheduler and TokenExecutor test suites
- Add run_fairness_tests.sh test runner
- Added `private let useFairnessScheduler: Bool` to TokenExecutorImpl
- notifyScheduler() now uses cached flag as sole decision point
- Added assertion for programmer error: flag ON but not registered
- Added #if ITERM_DEBUG test hook `testSkipNotifyScheduler`

Test updates:
- Fixed test classes with proper flag settings in setUp
- Added cleanupForUnregistrationOnMutationQueue helper
- Skipped tests requiring milestone 3 non-blocking model
- Updated run_fairness_tests.sh with runtime warning
Use IdempotentOperationJoiner to coalesce multiple ensureExecutionScheduled()
calls into a single async dispatch. The joiner handles the scheduling state
atomically via setNeedsUpdate/updateIfNeeded pattern:

- Multiple calls before dispatch coalesce into one
- Calls during execution schedule a new dispatch (closure cleared before exec)
- No manual flag management needed

Removed executionScheduled from testReset() since the empty busyList guard
in executeNextTurn() handles any stale dispatches.
Addresses PR comments 11-14:

1. Move tokenExecutorShouldQueueTokens() check to after high-priority tasks
   and nil-delegate guard, matching execute() ordering

2. Wrap token enumeration in slownessDetector.measure() for instrumentation

3. Add DLog statements matching execute():
   - "Will enumerate token arrays" before enumeration
   - "Begin/Done executing a batch of tokens..." inside loop
   - "Finished enumerating token arrays..." after enumeration
   - Active session drain logging

4. Use labeled do{} block so defer { executeHighPriorityTasks() } fires
   before completion() is called. Store result in local variable and use
   break instead of return for early exits.
…ordering.

Tests validate that high-priority tasks scheduled during token execution
run before the completion callback. Comments are precise about scope:

- testDeferFiresBeforeCompletionCallback: Verifies HP tasks run before
  completion, but does not isolate outer vs inner defer (both satisfy).

- testHighPriorityTaskInDeferAddingTokensTriggersNotifyScheduler: Tests
  safety net ensuring tokens added by HP tasks get processed. Documents
  that sessionDidEnqueueWork uses queue.async so can race with
  sessionFinishedTurn (either path works).

- testTurnResultReflectsTokenQueueStateAfterDefer: Documents (non-asserting)
  that turnResult is calculated before outer defer fires.
…bled

- Move FairnessScheduler registration from init to setTerminalEnabled:YES
  for symmetry with unregistration in setTerminalEnabled:NO
- cleanupForUnregistration() no longer discards tokens; they are preserved
  to support session revive and drain naturally when re-enabled
- On revive, re-registration gets a fresh sessionId
- Add TODO comments to tests that need adjustment for new behavior
…utation queue

FairnessScheduler.register() was using .sync to the mutation queue to protect
its internal state. This caused a deadlock when called from a "joined block"
context (where the mutation queue is deliberately blocked waiting for the
main thread).

Solution: Use a private Mutex lock for scheduler bookkeeping instead of the
mutation queue. This completely decouples scheduler synchronization from the
join protocol. Cleanup still dispatches to mutation queue as required by
TokenExecutor.cleanupForUnregistration().

Also includes related changes:
- TokenExecutor: Add isRegistered flag to explicitly track registration state
  (separate from sessionId). Change notifyScheduler() to guard on isRegistered
  instead of asserting on sessionId, allowing tokens to accumulate before
  registration completes.
- TokenExecutor: Add test hooks (testQueuedTokenCount, testTotalSlots,
  testAvailableSlots) for verifying token preservation in tests.
- TwoTierTokenQueue: Add count property to support testQueuedTokenCount.
…ation

- Add FairnessSchedulerSessionRestorationTests (4 tests) verifying re-registration
  after unregister, preserved token processing, and double-unregister safety
- Add TokenExecutorSessionReviveTests (5 tests) covering full disable→preserve→
  revive→drain cycle
- Update TokenExecutorCleanupTests to verify tokens are preserved (not discarded)
  after cleanupForUnregistration(); reduce token count to 30 (within 40-slot limit)
- Fix TokenExecutorAvailableSlotsBoundaryTests accounting assertions to match
  new preservation behavior
- Add missing isRegistered=true after registration in multiple test classes
- Add testFinished guard to prevent async polling crash after test timeout
- Enable fairness scheduler flag in TokenExecutorLegacyRemovalTests
- Register new test classes in run_fairness_tests.sh
- Add timeouts and state assertions to concurrency tests
  (testConcurrentRegisterAndUnregister, testConcurrentEnqueueAndUnregister)
- Add cleanupCallCount tracking to verify double-unregister does not call
  cleanup twice (testDoubleUnregister)
- Replace timing-dependent inverted expectations with deterministic
  mutation queue syncs for no-execution tests
- Rename testBackgroundSessionGetsEqualTurns to match actual behavior
  (testBackgroundSessionCanProcessTokens)
- Rename testNoDuplicateNotificationsForBusySession to
  testRapidAddTokensAllProcessed with direct byte count verification
- Delete non-asserting testTurnResultReflectsTokenQueueStateAfterDefer
- Delete obsolete skipped tests (testLegacyPathWhenSessionIdIsZero,
  testCodePathDiffersBetweenRegisteredAndUnregistered)
- Delete TwoTierTokenQueueTests class (tests dead discardAllAndReturnCount)
These methods were added but never called. TokenExecutor uses removeAll()
in invalidate() and preserves tokens in cleanupForUnregistration().
…nce regression

The previous deadlock fix (be3df18) used a Mutex for all state access, but
this caused ~13% throughput regression on the hot path (sessionDidEnqueueWork).

The deadlock only affected register() which used .sync dispatch to the mutation
queue. The hot path was always called from the mutation queue via async dispatch,
so it never could have deadlocked.

This change:
- Uses Mutex only for ID allocation in register() (instant, no queue dispatch)
- Dispatches session creation async to mutation queue (avoids joined block deadlock)
- Restores async dispatch for sessionDidEnqueueWork() and all scheduling state
- Manages busyList/busySet directly on mutation queue (no lock needed)

Performance improvement: ~6-9% throughput increase, bringing the fairness
scheduler within 4-8% of production baseline.

Future optimization: Consider an atomic 'hasBusyWork' flag on enqueue, only
doing lock work when transitioning 0→1. This might reduce contention enough
to make the synchronous path viable again.
Doubles the per-turn token budget to reduce turn overhead. With the previous
500 token budget, throughput was ~7% below production in low-contention
scenarios (2 tabs). Doubling to 1000 brings performance within 1% of production.

TODO: This fixed budget may not translate well to slower CPUs. Should be
tested on minimum supported hardware for validation. Consider investigating
adaptive token budgets based on throughput to auto-tune for different
hardware capabilities.
…king

- Use synchronous completion callback in FairnessScheduler to eliminate
  unnecessary async dispatch per turn (completion is already on mutationQueue)
- Add dispatchPrecondition to verify threading contract in DEBUG builds
- Skip legacy activeSessionsWithTokens tracking when using FairnessScheduler
  (round-robin scheduling already handles prioritization)
- Document threading contract for executeTurn protocol method
- Update tests to call completion on mutation queue per new contract
…hods

Document that cleanupForUnregistration() is called on mutationQueue in the
protocol definition and implementations.
Without this, notifyScheduler() early-returns and tokens are never
scheduled for execution, resulting in no terminal output.
Documents thread safety requirements for all member variables in
TokenExecutorImpl to match the documentation standard in FairnessScheduler.

- taskQueue, sideEffects: Thread-safe via iTermTaskQueue internal locking
- tokenQueue, executingCount, commit: Access on mutation queue only
- pauseCount: Thread-safe via atomic operations
- executingSideEffects: Thread-safe via MutableAtomicObject
- sideEffectScheduler: Period modified on mutation queue
- throughputEstimator: addByteCount from any queue
- isRegistered: Access on mutation queue only
…path

Replace TaskNotifier's single-threaded select() loop with per-PTY
dispatch sources when FairnessScheduler is enabled. Each PTY now has
dedicated read/write dispatch sources that feed directly into the
token processing pipeline.

Key changes:
- Per-PTY dispatch sources with backpressure-aware suspend/resume
- Tasks using dispatch sources skip TaskNotifier registration
- Deadpool handling preserved for process cleanup
- Synchronous completion callback in FairnessScheduler (eliminates
  unnecessary async dispatch per turn)
- Skip legacy activeSessionsWithTokens tracking when using FairnessScheduler
- Document threading contract for executeTurn protocol method

This eliminates the TaskNotifier bottleneck and enables parallel
reads across PTYs while maintaining serialized token processing.
Upstream added two new delegate methods that FakeSession needs to implement:
- screenOffscreenCommandLineShouldBeVisibleForCurrentCommand
- screenUpdateBlock:action:
The threading contract verification for the completion callback should
only run during tests, not in regular debug builds. ITERM_DEBUG ensures
the check runs when tests execute but avoids runtime overhead in both
development and release builds.
1. Replace Mutex with iTermAtomicInt64 for session ID allocation
   - Lock-free atomic increment instead of mutex sync
   - Simpler and more appropriate for single counter

2. Extract BusyQueue type to encapsulate busyList/busySet
   - Enforces invariant that set and list stay in sync
   - Uses it_fatalError to catch duplicate enqueue attempts
   - Cleaner API: enqueue/dequeue/contains/removeFromSet

3. Remove cleanupForUnregistration from protocol (YAGNI)
   - Method existed but intentionally did nothing
   - Tokens are preserved on unregister for session revival
   - No cleanup hook needed if there is nothing to clean up
…Queue

Three changes addressing fairness scheduler gaps:

1. Coprocess dispatch sources (PTYTask.m): When useFairnessScheduler is
   active, tasks aren't in TaskNotifier's _tasks array, so the select()
   loop never monitors coprocess FDs. Add per-PTY dispatch sources for
   coprocess read/write on _ioQueue, with lifecycle management tied to
   setCoprocess:/stopCoprocess. Includes writeBufferHasRoom re-evaluation
   hook in handleWriteEvent to prevent coprocess read stalls.

2. Non-blocking addTokens (TokenExecutor.swift, TokenArray.swift): The
   semaphore in addTokens() blocks _ioQueue when the fairness scheduler
   is active, stalling writes and coprocess I/O for the affected PTY.
   PTYTask already handles backpressure via dispatch source suspend/resume
   (shouldRead checks backpressureLevel < .heavy), making the semaphore
   redundant. Skip semaphore.wait() in the fairness path; decouple the
   slot-release callback from semaphore presence so availableSlots
   accounting works without a semaphore.

3. Fix race where executeTurn with nil delegate destroys preserved tokens
   (TokenExecutor.swift): When setTerminalEnabled:NO runs during a joined
   block, it sets delegate=nil and isRegistered=false. If executeTurn was
   already scheduled, it finds delegate==nil and called removeAll(),
   destroying tokens that should survive for session revive. Guard
   removeAll() on isRegistered so tokens are preserved when disabled.

Enables 6 previously-skipped tests and adds 1 regression test (103 total).
When a session is unregistered while its executeTurn is still running,
sessionFinishedTurn hits the guard on sessions[sessionId] and returns
without calling ensureExecutionScheduled(). Other sessions waiting in
busyQueue stall indefinitely until an external event triggers scheduling.

Call ensureExecutionScheduled() in the early-return path so the scheduler
continues pumping after mid-flight unregistration.

Adds regression test that verifies session B gets its turn after session A
is unregistered during execution.
…DEBUG guards, fix races

Tests that create TokenExecutor instances need useFairnessScheduler
enabled BEFORE init (the flag is cached). Without it, addTokens takes the
legacy semaphore-blocking path causing deadlocks after 40 tokens, and
useDispatchSource returns NO preventing coprocess dispatch source setup.

- Add setUseFairnessSchedulerForTesting(true) setUp/tearDown to test
  classes that depend on fairness scheduler behavior
- Remove #if ITERM_DEBUG guard from effectiveIoAllowed in PTYTask.m
  (made testIoAllowedOverride dead code in test target)
- Remove all #if ITERM_DEBUG guards from FairnessSchedulerTests.swift
  and TokenExecutorFairnessTests.swift (silently skipping test code)
- Fix testReadSourceResumesWhenBackpressureDrops: add MockTokenExecutorDelegate
  so executeTurn actually processes tokens, use mutationQueue
- Fix testWriteTaskWithRealJobManager: move shouldWrite assertion before
  dispatch source setup to avoid race with write source draining buffer
- Fix executeTurn calls to run on mutationQueue (dispatchPrecondition)
- Add testSkipNotifyScheduler to prevent deadlocks during bulk token loading
- Use timestamped result bundle paths in run_fairness_tests.sh
- Add milestone 3-5 test files, mocks, and PTYSession wiring tests
Verifies that taskDidRegister: (which wires the tokenExecutor) runs
before setupDispatchSources, so shouldRead sees backpressure at
source creation time rather than falling through the executor==nil path.
Reorder didRegister to call taskDidRegister: before setupDispatchSources.
taskDidRegister: wires task.tokenExecutor; without it, shouldRead sees
executor==nil and skips backpressure, allowing unconditional reads.
Annotate all new methods and ivars with queue affinity comments
following codebase conventions. Add ITDebugAssert queue checks to
dispatch source event handlers on _ioQueue.
…essure

The existing didRegister ordering test only checks source suspension state.
This new test preloads data on the pipe before registration and verifies
that no threadedReadTask callback leaks through the brief dispatch_resume/
dispatch_suspend window in setupDispatchSources when backpressure is heavy.
@gnachman
Copy link
Copy Markdown
Owner

This note is mostly to jog my own memory: we plan to a new class instead of modifying TaskNotifier. It can be written in Swift and be nice and clean. We'll still need to switch between the two based on the advanced setting since all this stuff has to be enabled gradually. Also, let's make that PR stand alone — this has gotten big enough that my head hurts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants