|
5 | 5 | For more detailed information about formatting codes, see the the `format_codes` module documentation. |
6 | 6 | """ |
7 | 7 |
|
8 | | -from .base.consts import COLOR, CHARS |
9 | | -from .format_codes import FormatCodes, _COMPILED |
| 8 | +from .base.consts import COLOR, CHARS, ANSI |
| 9 | +from .format_codes import FormatCodes, _COMPILED as _FC_COMPILED |
10 | 10 | from .string import String |
11 | 11 | from .color import Color, Rgba, Hexa |
12 | 12 |
|
13 | | -from typing import Callable, Optional, Literal, Mapping, TypeVar, Any, cast |
| 13 | +from typing import Generator, Callable, Optional, Literal, Mapping, Pattern, TypeVar, TextIO, Any, cast |
14 | 14 | from prompt_toolkit.key_binding import KeyPressEvent, KeyBindings |
15 | 15 | from prompt_toolkit.validation import ValidationError, Validator |
16 | 16 | from prompt_toolkit.styles import Style |
| 17 | +from contextlib import contextmanager |
17 | 18 | from prompt_toolkit.keys import Keys |
18 | 19 | import prompt_toolkit as _pt |
19 | 20 | import keyboard as _keyboard |
20 | 21 | import getpass as _getpass |
21 | 22 | import shutil as _shutil |
22 | 23 | import sys as _sys |
23 | 24 | import os as _os |
| 25 | +import re as _re |
| 26 | +import io as _io |
| 27 | + |
| 28 | + |
| 29 | +_COMPILED: dict[str, Pattern] = { # PRECOMPILE REGULAR EXPRESSIONS |
| 30 | + "label": _re.compile(r"(?i)\{(?:label|l)\}"), |
| 31 | + "bar": _re.compile(r"(?i)\{(?:bar|b)\}"), |
| 32 | + "current": _re.compile(r"(?i)\{(?:current|c)\}"), |
| 33 | + "total": _re.compile(r"(?i)\{(?:total|t)\}"), |
| 34 | + "percentage": _re.compile(r"(?i)\{(?:percentage|percent|p)\}"), |
| 35 | +} |
24 | 36 |
|
25 | 37 |
|
26 | 38 | class _ConsoleWidth: |
@@ -574,7 +586,7 @@ def log_box_filled( |
574 | 586 | spaces_l = " " * indent |
575 | 587 | lines = [ |
576 | 588 | f"{spaces_l}[bg:{box_bg_color}]{' ' * w_padding}" |
577 | | - + _COMPILED["formatting"].sub(lambda m: f"{m.group(0)}[bg:{box_bg_color}]", line) + |
| 589 | + + _FC_COMPILED["formatting"].sub(lambda m: f"{m.group(0)}[bg:{box_bg_color}]", line) + |
578 | 590 | (" " * ((w_padding + max_line_len - len(unfmt)) + pad_w_full)) + "[*]" for line, unfmt in zip(lines, unfmt_lines) |
579 | 591 | ] |
580 | 592 | pady = " " * (Console.w if w_full else max_line_len + (2 * w_padding)) |
@@ -925,3 +937,249 @@ def _(event: KeyPressEvent) -> None: |
925 | 937 | except (ValueError, TypeError): |
926 | 938 | if has_default: return default_val |
927 | 939 | raise |
| 940 | + |
| 941 | + |
| 942 | +class ProgressBar: |
| 943 | + """A console progress bar with smooth transitions and customizable appearance.\n |
| 944 | + ------------------------------------------------------------------------------------------------- |
| 945 | + - `min_width` -⠀the min width of the progress bar in chars |
| 946 | + - `max_width` -⠀the max width of the progress bar in chars |
| 947 | + - `bar_format` -⠀the format string used to render the progress bar, containing placeholders: |
| 948 | + * `{label}` `{l}` |
| 949 | + * `{bar}` `{b}` |
| 950 | + * `{current}` `{c}` |
| 951 | + * `{total}` `{t}` |
| 952 | + * `{percentage}` `{percent}` `{p}` |
| 953 | + - `limited_bar_format` -⠀a simplified format string used when the console width is too small |
| 954 | + - `chars` -⠀a tuple of characters ordered from full to empty progress<br> |
| 955 | + The first character represents completely filled sections, intermediate |
| 956 | + characters create smooth transitions, and the last character represents |
| 957 | + empty sections. Default is a set of Unicode block characters. |
| 958 | + -------------------------------------------------------------------------------------------------- |
| 959 | + The bar format (also limited) can additionally be formatted with special formatting codes. For |
| 960 | + more detailed information about formatting codes, see the `format_codes` module documentation.""" |
| 961 | + |
| 962 | + def __init__( |
| 963 | + self, |
| 964 | + min_width: int = 10, |
| 965 | + max_width: int = 50, |
| 966 | + bar_format: str = "{l} |{b}| [b]({c})/{t} [dim](([i]({p}%)))", |
| 967 | + limited_bar_format: str = "|{b}|", |
| 968 | + chars: tuple[str, ...] = ("█", "▉", "▊", "▋", "▌", "▍", "▎", "▏", " "), |
| 969 | + ): |
| 970 | + self.active: bool = False |
| 971 | + """Whether the progress bar is currently active (intercepting stdout) or not.""" |
| 972 | + self.min_width: int = max(1, min_width) |
| 973 | + """The min width of the progress bar in chars.""" |
| 974 | + self.max_width: int = max(self.min_width, max_width) |
| 975 | + """The max width of the progress bar in chars.""" |
| 976 | + self.bar_format: str = bar_format |
| 977 | + """The format string used to render the progress bar.""" |
| 978 | + self.limited_bar_format: str = limited_bar_format |
| 979 | + """The simplified format string used when the console width is too small.""" |
| 980 | + self.chars: tuple[str, ...] = chars |
| 981 | + """A tuple of characters ordered from full to empty progress.""" |
| 982 | + |
| 983 | + self._buffer: list[str] = [] |
| 984 | + self._original_stdout: Optional[TextIO] = None |
| 985 | + self._current_progress_str: str = "" |
| 986 | + self._last_line_len: int = 0 |
| 987 | + |
| 988 | + def show_progress(self, current: int, total: int, label: Optional[str] = None) -> None: |
| 989 | + """Show or update the progress bar.\n |
| 990 | + ----------------------------------------------------------------------------------------- |
| 991 | + - `current` -⠀the current progress value (below 0 or greater than `total` hides the bar) |
| 992 | + - `total` -⠀the total value representing 100% progress (must be greater than 0) |
| 993 | + - `label` -⠀an optional label to display alongside the progress bar""" |
| 994 | + if total <= 0: |
| 995 | + raise ValueError("Total must be greater than 0.") |
| 996 | + |
| 997 | + try: |
| 998 | + if not self.active: |
| 999 | + self._start_intercepting() |
| 1000 | + self._flush_buffer() |
| 1001 | + self._draw_progress_bar(current, total, label or "") |
| 1002 | + if current < 0 or current > total: |
| 1003 | + self.hide_progress() |
| 1004 | + except Exception: |
| 1005 | + self._emergency_cleanup() |
| 1006 | + raise |
| 1007 | + |
| 1008 | + def hide_progress(self) -> None: |
| 1009 | + """Hide the progress bar and restore normal console output.""" |
| 1010 | + if self.active: |
| 1011 | + self._clear_progress_line() |
| 1012 | + self._stop_intercepting() |
| 1013 | + |
| 1014 | + def set_width(self, min_width: Optional[int] = None, max_width: Optional[int] = None) -> None: |
| 1015 | + """Set the width of the progress bar.\n |
| 1016 | + -------------------------------------------------------------- |
| 1017 | + - `min_width` -⠀the min width of the progress bar in chars |
| 1018 | + - `max_width` -⠀the max width of the progress bar in chars""" |
| 1019 | + self.min_width = self.min_width if min_width is None else max(1, min_width) |
| 1020 | + self.max_width = self.max_width if max_width is None else max(self.min_width, max_width) |
| 1021 | + |
| 1022 | + def set_bar_format(self, bar_format: Optional[str] = None, limited_bar_format: Optional[str] = None) -> None: |
| 1023 | + """Set the format string used to render the progress bar.\n |
| 1024 | + -------------------------------------------------------------------------------------------------- |
| 1025 | + - `bar_format` -⠀the format string used to render the progress bar, containing placeholders: |
| 1026 | + * `{label}` `{l}` |
| 1027 | + * `{bar}` `{b}` |
| 1028 | + * `{current}` `{c}` |
| 1029 | + * `{total}` `{t}` |
| 1030 | + * `{percentage}` `{percent}` `{p}` |
| 1031 | + - `limited_bar_format` -⠀a simplified format string used when the console width is too small |
| 1032 | + -------------------------------------------------------------------------------------------------- |
| 1033 | + The bar format (also limited) can additionally be formatted with special formatting codes. For |
| 1034 | + more detailed information about formatting codes, see the `format_codes` module documentation.""" |
| 1035 | + self.bar_format = "{l} |{b}| [b]({c})/{t} [dim](([i]({p}%)))" if bar_format is None else bar_format |
| 1036 | + self.limited_bar_format = "|{b}|" if limited_bar_format is None else limited_bar_format |
| 1037 | + |
| 1038 | + def set_chars(self, chars: Optional[tuple[str, ...]] = None) -> None: |
| 1039 | + """Set the characters used to render the progress bar.\n |
| 1040 | + -------------------------------------------------------------------------- |
| 1041 | + - `chars` -⠀a tuple of characters ordered from full to empty progress<br> |
| 1042 | + The first character represents completely filled sections, intermediate |
| 1043 | + characters create smooth transitions, and the last character represents |
| 1044 | + empty sections. If None, uses default Unicode block characters.""" |
| 1045 | + self.chars = ("█", "▉", "▊", "▋", "▌", "▍", "▎", "▏", " ") if chars is None else chars |
| 1046 | + |
| 1047 | + def _start_intercepting(self) -> None: |
| 1048 | + self.active = True |
| 1049 | + self._original_stdout = _sys.stdout |
| 1050 | + _sys.stdout = _InterceptedOutput(self) |
| 1051 | + |
| 1052 | + def _stop_intercepting(self) -> None: |
| 1053 | + if self._original_stdout: |
| 1054 | + _sys.stdout = self._original_stdout |
| 1055 | + self._original_stdout = None |
| 1056 | + self.active = False |
| 1057 | + self._buffer.clear() |
| 1058 | + self._last_line_len = 0 |
| 1059 | + self._current_progress_str = "" |
| 1060 | + |
| 1061 | + def _emergency_cleanup(self) -> None: |
| 1062 | + """Emergency cleanup to restore stdout in case of exceptions.""" |
| 1063 | + try: |
| 1064 | + self._stop_intercepting() |
| 1065 | + except Exception: |
| 1066 | + pass |
| 1067 | + |
| 1068 | + def _flush_buffer(self) -> None: |
| 1069 | + if self._buffer and self._original_stdout: |
| 1070 | + self._clear_progress_line() |
| 1071 | + for content in self._buffer: |
| 1072 | + self._original_stdout.write(content) |
| 1073 | + self._original_stdout.flush() |
| 1074 | + self._buffer.clear() |
| 1075 | + |
| 1076 | + def _draw_progress_bar(self, current: int, total: int, label: Optional[str] = None) -> None: |
| 1077 | + if total <= 0 or not self._original_stdout: |
| 1078 | + return |
| 1079 | + percentage = min(100, (current / total) * 100) |
| 1080 | + formatted, bar_width = self._get_formatted_info_and_bar_width(self.bar_format, current, total, percentage, label) |
| 1081 | + if bar_width < self.min_width: |
| 1082 | + formatted, bar_width = self._get_formatted_info_and_bar_width( |
| 1083 | + self.limited_bar_format, current, total, percentage, label |
| 1084 | + ) |
| 1085 | + bar = self._create_bar(current, total, max(1, bar_width)) + "[*]" |
| 1086 | + progress_text = _COMPILED["bar"].sub(FormatCodes.to_ansi(bar), formatted) |
| 1087 | + self._current_progress_str = progress_text |
| 1088 | + self._last_line_len = len(progress_text) |
| 1089 | + self._original_stdout.write(f"\r{progress_text}") |
| 1090 | + self._original_stdout.flush() |
| 1091 | + |
| 1092 | + def _get_formatted_info_and_bar_width( |
| 1093 | + self, |
| 1094 | + bar_format: str, |
| 1095 | + current: int, |
| 1096 | + total: int, |
| 1097 | + percentage: float, |
| 1098 | + label: Optional[str] = None, |
| 1099 | + ) -> tuple[str, int]: |
| 1100 | + formatted = _COMPILED["label"].sub(label or "", bar_format) |
| 1101 | + formatted = _COMPILED["current"].sub(str(current), formatted) |
| 1102 | + formatted = _COMPILED["total"].sub(str(total), formatted) |
| 1103 | + formatted = _COMPILED["percentage"].sub(f"{percentage:.1f}", formatted) |
| 1104 | + formatted = FormatCodes.to_ansi(formatted) |
| 1105 | + bar_space = Console.w - len(FormatCodes.remove_ansi(_COMPILED["bar"].sub("", formatted))) |
| 1106 | + bar_width = min(bar_space, self.max_width) if bar_space > 0 else 0 |
| 1107 | + return formatted, bar_width |
| 1108 | + |
| 1109 | + def _create_bar(self, current: int, total: int, bar_width: int) -> str: |
| 1110 | + progress = current / total if total > 0 else 0 |
| 1111 | + bar = [] |
| 1112 | + |
| 1113 | + for i in range(bar_width): |
| 1114 | + pos_progress = (i + 1) / bar_width |
| 1115 | + if progress >= pos_progress: |
| 1116 | + bar.append(self.chars[0]) |
| 1117 | + elif progress >= pos_progress - (1 / bar_width): |
| 1118 | + remainder = (progress - (pos_progress - (1 / bar_width))) * bar_width |
| 1119 | + char_idx = len(self.chars) - 1 - min(int(remainder * len(self.chars)), len(self.chars) - 1) |
| 1120 | + bar.append(self.chars[char_idx]) |
| 1121 | + else: |
| 1122 | + bar.append(self.chars[-1]) |
| 1123 | + return "".join(bar) |
| 1124 | + |
| 1125 | + def _clear_progress_line(self) -> None: |
| 1126 | + if self._last_line_len > 0 and self._original_stdout: |
| 1127 | + self._original_stdout.write(f"{ANSI.CHAR}[2K\r") |
| 1128 | + self._original_stdout.flush() |
| 1129 | + |
| 1130 | + def _redraw_progress_bar(self) -> None: |
| 1131 | + if self._current_progress_str and self._original_stdout: |
| 1132 | + self._original_stdout.write(f"{self._current_progress_str}") |
| 1133 | + self._original_stdout.flush() |
| 1134 | + |
| 1135 | + @contextmanager |
| 1136 | + def progress_context(self, total: int, label: Optional[str] = None) -> Generator[Callable[[int], None], None, None]: |
| 1137 | + """Context manager for automatic cleanup. Returns a function to update progress.\n |
| 1138 | + ----------------------------------------------------------------------------------- |
| 1139 | + - `total` -⠀the total value representing 100% progress |
| 1140 | + - `label` -⠀an optional label to display alongside the progress bar |
| 1141 | + ----------------------------------------------------------------------------------- |
| 1142 | + Example usage: |
| 1143 | + ```python |
| 1144 | + with pb2.progress_context(500, "Loading") as update_progress: |
| 1145 | + for i in range(500): |
| 1146 | + # Do some work... |
| 1147 | + update_progress(i + 1) # Update progress |
| 1148 | + ```""" |
| 1149 | + try: |
| 1150 | + |
| 1151 | + def update_progress(current: int) -> None: |
| 1152 | + self.show_progress(current, total, label) |
| 1153 | + |
| 1154 | + yield update_progress |
| 1155 | + except Exception: |
| 1156 | + self._emergency_cleanup() |
| 1157 | + raise |
| 1158 | + finally: |
| 1159 | + self.hide_progress() |
| 1160 | + |
| 1161 | + |
| 1162 | +class _InterceptedOutput(_io.StringIO): |
| 1163 | + """Custom StringIO that captures output and stores it in the progress bar buffer.""" |
| 1164 | + |
| 1165 | + def __init__(self, progress_bar: ProgressBar): |
| 1166 | + super().__init__() |
| 1167 | + self.progress_bar = progress_bar |
| 1168 | + |
| 1169 | + def write(self, content: str) -> int: |
| 1170 | + try: |
| 1171 | + if content and content != "\r": |
| 1172 | + self.progress_bar._buffer.append(content) |
| 1173 | + return len(content) |
| 1174 | + except Exception: |
| 1175 | + self.progress_bar._emergency_cleanup() |
| 1176 | + raise |
| 1177 | + |
| 1178 | + def flush(self) -> None: |
| 1179 | + try: |
| 1180 | + if self.progress_bar.active and self.progress_bar._buffer: |
| 1181 | + self.progress_bar._flush_buffer() |
| 1182 | + self.progress_bar._redraw_progress_bar() |
| 1183 | + except Exception: |
| 1184 | + self.progress_bar._emergency_cleanup() |
| 1185 | + raise |
0 commit comments