88
99import collections .abc as cabc
1010import contextlib
11+ import io
1112import math
1213import os
1314import shlex
2324from ._compat import CYGWIN
2425from ._compat import get_best_encoding
2526from ._compat import isatty
26- from ._compat import open_stream
2727from ._compat import strip_ansi
2828from ._compat import term_len
2929from ._compat import WIN
@@ -366,7 +366,20 @@ def generator(self) -> cabc.Iterator[V]:
366366 self .render_progress ()
367367
368368
369- def pager (generator : cabc .Iterable [str ], color : bool | None = None ) -> None :
369+ class MaybeStripAnsi (io .TextIOWrapper ):
370+ def __init__ (self , stream : t .IO [bytes ], * , color : bool , ** kwargs : t .Any ):
371+ super ().__init__ (stream , ** kwargs )
372+ self .color = color
373+
374+ def write (self , text : str ) -> int :
375+ if not self .color :
376+ text = strip_ansi (text )
377+ return super ().write (text )
378+
379+
380+ def _pager_contextmanager (
381+ color : bool | None = None ,
382+ ) -> t .ContextManager [tuple [t .BinaryIO | t .TextIO , str , bool ]]:
370383 """Decide what method to use for paging through text."""
371384 stdout = _default_text_stdout ()
372385
@@ -376,7 +389,7 @@ def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None:
376389 stdout = StringIO ()
377390
378391 if not isatty (sys .stdin ) or not isatty (stdout ):
379- return _nullpager (stdout , generator , color )
392+ return _nullpager (stdout , color )
380393
381394 # Split using POSIX mode (the default) so that quote characters are
382395 # stripped from tokens and quoted Windows paths are preserved.
@@ -385,35 +398,45 @@ def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None:
385398 pager_cmd_parts = shlex .split (os .environ .get ("PAGER" , "" ))
386399 if pager_cmd_parts :
387400 if WIN :
388- if _tempfilepager (generator , pager_cmd_parts , color ):
389- return
390- elif _pipepager (generator , pager_cmd_parts , color ):
391- return
401+ return _tempfilepager (pager_cmd_parts , color )
402+ return _pipepager (pager_cmd_parts , color )
392403
393404 if os .environ .get ("TERM" ) in ("dumb" , "emacs" ):
394- return _nullpager (stdout , generator , color )
395- if (WIN or sys .platform .startswith ("os2" )) and _tempfilepager (
396- generator , ["more" ], color
397- ):
398- return
399- if _pipepager (generator , ["less" ], color ):
400- return
401-
402- import tempfile
403-
404- fd , filename = tempfile .mkstemp ()
405- os .close (fd )
406- try :
407- if _pipepager (generator , ["more" ], color ):
408- return
409- return _nullpager (stdout , generator , color )
410- finally :
411- os .unlink (filename )
405+ return _nullpager (stdout , color )
406+ if WIN or sys .platform .startswith ("os2" ):
407+ return _tempfilepager (["more" ], color )
408+ return _pipepager (["less" ], color )
409+
410+
411+ @contextlib .contextmanager
412+ def get_pager_file (color : bool | None = None ) -> t .Generator [t .TextIO , None , None ]:
413+ """Context manager.
414+ Yields a writable file-like object which can be used as an output pager.
415+ .. versionadded:: 8.2
416+ :param color: controls if the pager supports ANSI colors or not. The
417+ default is autodetection.
418+ """
419+ with _pager_contextmanager (color = color ) as (stream , encoding , color ):
420+ if not isinstance (stream , MaybeStripAnsi ):
421+ if hasattr (stream , "buffer" ):
422+ # Real TextIO with buffer - unwrap and wrap in MaybeStripAnsi
423+ stream = MaybeStripAnsi (stream .buffer , color = color , encoding = encoding )
424+ elif not getattr (stream , "encoding" , None ):
425+ # BinaryIO - wrap directly in MaybeStripAnsi
426+ stream = MaybeStripAnsi (stream , color = color , encoding = encoding )
427+ else :
428+ # StringIO - add .color attribute only, no ANSI stripping
429+ stream .color = color # type: ignore[attr-defined]
430+ try :
431+ yield t .cast (t .TextIO , stream )
432+ finally :
433+ stream .flush ()
412434
413435
436+ @contextlib .contextmanager
414437def _pipepager (
415- generator : cabc . Iterable [ str ], cmd_parts : list [str ], color : bool | None
416- ) -> bool :
438+ cmd_parts : list [str ], color : bool | None = None
439+ ) -> t . Iterator [ tuple [ t . BinaryIO | t . TextIO , str , bool ]] :
417440 """Page through text by feeding it to another program.
418441
419442 Invokes the pager via :class:`subprocess.Popen` with an ``argv`` list
@@ -424,13 +447,12 @@ def _pipepager(
424447 Invoking a pager through this might support colors: if piping to
425448 ``less`` and the user hasn't decided on colors, ``LESS=-R`` is set
426449 automatically.
427-
428- Returns ``True`` if the command was found and executed, ``False``
429- otherwise so another pager can be attempted.
430450 """
431451 # Split the command into the invoked CLI and its parameters.
432452 if not cmd_parts :
433- return False
453+ stdout = _default_text_stdout () or StringIO ()
454+ yield stdout , "utf-8" , False
455+ return
434456
435457 import shutil
436458
@@ -439,7 +461,9 @@ def _pipepager(
439461
440462 cmd_filepath = shutil .which (cmd )
441463 if not cmd_filepath :
442- return False
464+ stdout = _default_text_stdout () or StringIO ()
465+ yield stdout , "utf-8" , False
466+ return
443467
444468 # Produces a normalized absolute path string.
445469 # multi-call binaries such as busybox derive their identity from the symlink
@@ -462,6 +486,9 @@ def _pipepager(
462486 elif "r" in less_flags or "R" in less_flags :
463487 color = True
464488
489+ if color is None :
490+ color = False
491+
465492 c = subprocess .Popen (
466493 [str (cmd_path )] + cmd_params ,
467494 shell = False ,
@@ -470,13 +497,10 @@ def _pipepager(
470497 errors = "replace" ,
471498 text = True ,
472499 )
473- assert c .stdin is not None
500+ stdin = t .cast (t .BinaryIO , c .stdin )
501+ encoding = get_best_encoding (stdin )
474502 try :
475- for text in generator :
476- if not color :
477- text = strip_ansi (text )
478-
479- c .stdin .write (text )
503+ yield stdin , encoding , color
480504 except BrokenPipeError :
481505 # In case the pager exited unexpectedly, ignore the broken pipe error.
482506 pass
@@ -490,7 +514,7 @@ def _pipepager(
490514 finally :
491515 # We must close stdin and wait for the pager to exit before we continue
492516 try :
493- c . stdin .close ()
517+ stdin .close ()
494518 # Close implies flush, so it might throw a BrokenPipeError if the pager
495519 # process exited already.
496520 except BrokenPipeError :
@@ -512,69 +536,67 @@ def _pipepager(
512536 else :
513537 break
514538
515- return True
516-
517539
540+ @contextlib .contextmanager
518541def _tempfilepager (
519- generator : cabc . Iterable [ str ], cmd_parts : list [str ], color : bool | None
520- ) -> bool :
542+ cmd_parts : list [str ], color : bool | None = None
543+ ) -> t . Iterator [ tuple [ t . BinaryIO | t . TextIO , str , bool ]] :
521544 """Page through text by invoking a program on a temporary file.
522545
523546 Used as the primary pager strategy on Windows (where piping to
524547 ``more`` adds spurious ``\\ r\\ n``), and as a fallback on other
525548 platforms. The command is resolved to an absolute path with
526549 :func:`shutil.which`.
527-
528- Returns ``True`` if the command was found and executed, ``False``
529- otherwise so another pager can be attempted.
530550 """
531551 # Split the command into the invoked CLI and its parameters.
532552 if not cmd_parts :
533- return False
553+ stdout = _default_text_stdout () or StringIO ()
554+ yield stdout , "utf-8" , False
555+ return
534556
535557 import shutil
558+ import subprocess
536559
537560 cmd = cmd_parts [0 ]
538561
539562 cmd_filepath = shutil .which (cmd )
540563 if not cmd_filepath :
541- return False
564+ stdout = _default_text_stdout () or StringIO ()
565+ yield stdout , "utf-8" , False
566+ return
567+
542568 # Produces a normalized absolute path string.
543569 # multi-call binaries such as busybox derive their identity from the symlink
544570 # less -> busybox. resolve() causes them to misbehave. (eg. less becomes busybox)
545571 cmd_path = Path (cmd_filepath ).absolute ()
546572
547- import subprocess
548573 import tempfile
549574
550- fd , filename = tempfile .mkstemp ()
551- # TODO: This never terminates if the passed generator never terminates.
552- text = "" .join (generator )
553- if not color :
554- text = strip_ansi (text )
555575 encoding = get_best_encoding (sys .stdout )
556- with open_stream (filename , "wb" )[0 ] as f :
557- f .write (text .encode (encoding ))
576+ if color is None :
577+ color = False
578+ # On Windows, NamedTemporaryFile cannot be opened by another process
579+ # while Python still has it open, so we use delete=False and clean up manually
580+ # rather than using a contextmanager here.
581+ f = tempfile .NamedTemporaryFile (mode = "wb" , delete = False )
558582 try :
559- subprocess . call ([ str ( cmd_path ), filename ])
560- except OSError :
561- # Command not found
562- pass
583+ yield t . cast ( t . BinaryIO , f ), encoding , color
584+ f . flush ()
585+ f . close ()
586+ subprocess . call ([ str ( cmd_path ), f . name ])
563587 finally :
564- os .close (fd )
565- os .unlink (filename )
566-
567- return True
588+ os .unlink (f .name )
568589
569590
591+ @contextlib .contextmanager
570592def _nullpager (
571- stream : t .TextIO , generator : cabc . Iterable [ str ], color : bool | None
572- ) -> None :
593+ stream : t .TextIO , color : bool | None = None
594+ ) -> t . Iterator [ tuple [ t . TextIO , str , bool ]] :
573595 """Simply print unformatted text. This is the ultimate fallback."""
574- for text in generator :
575- if not color :
576- text = strip_ansi ( text )
577- stream . write ( text )
596+ encoding = get_best_encoding ( stream )
597+ if color is None :
598+ color = False
599+ yield stream , encoding , color
578600
579601
580602class Editor :
0 commit comments