|
4 | 4 | __license__ = "MIT" |
5 | 5 |
|
6 | 6 | import asyncio |
| 7 | +import base64 |
7 | 8 | from collections import UserDict |
8 | 9 | from pathlib import Path |
9 | 10 | import re |
|
22 | 23 | TargetSpec = namedtuple("TargetSpec", ["rulename", "wildcards_dict"]) |
23 | 24 |
|
24 | 25 |
|
25 | | -def format_cli_arg(flag, value, quote=True, skip=False): |
| 26 | +def format_cli_arg(flag, value, quote=True, skip=False, base64_encode: bool = False): |
26 | 27 | if not skip and value: |
27 | 28 | if isinstance(value, bool): |
28 | 29 | value = "" |
29 | 30 | else: |
30 | | - value = format_cli_pos_arg(value, quote=quote) |
| 31 | + value = format_cli_pos_arg(value, quote=quote, base64_encode=base64_encode) |
31 | 32 | return f"{flag} {value}" |
32 | 33 | return "" |
33 | 34 |
|
34 | 35 |
|
35 | | -def format_cli_pos_arg(value, quote=True): |
| 36 | +def format_cli_pos_arg(value, quote=True, base64_encode: bool = False): |
36 | 37 | if isinstance(value, (dict, UserDict)): |
37 | | - return join_cli_args( |
38 | | - repr(f"{key}={format_cli_value(val)}") for key, val in value.items() |
39 | | - ) |
| 38 | + |
| 39 | + def fmt_item(key, value): |
| 40 | + expr = f"{key}={format_cli_value(value)}" |
| 41 | + return encode_as_base64(expr) if base64_encode else repr(expr) |
| 42 | + |
| 43 | + return join_cli_args(fmt_item(key, val) for key, val in value.items()) |
40 | 44 | elif not_iterable(value): |
41 | | - return format_cli_value(value) |
| 45 | + return format_cli_value(value, base64_encode=base64_encode) |
42 | 46 | else: |
43 | | - return join_cli_args(format_cli_value(v, quote=True) for v in value) |
| 47 | + return join_cli_args( |
| 48 | + format_cli_value(v, quote=True, base64_encode=base64_encode) for v in value |
| 49 | + ) |
44 | 50 |
|
45 | 51 |
|
46 | | -def format_cli_value(value: Any, quote: bool = False) -> str: |
| 52 | +def format_cli_value( |
| 53 | + value: Any, quote: bool = False, base64_encode: bool = False |
| 54 | +) -> str: |
| 55 | + """Format a given value for passing it to CLI. |
| 56 | +
|
| 57 | + If base64_encode is True, str values are encoded and flagged as being base64 encoded. |
| 58 | + """ |
| 59 | + |
| 60 | + def maybe_encode(value): |
| 61 | + return encode_as_base64(value) if base64_encode else value |
| 62 | + |
47 | 63 | if isinstance(value, SettingsEnumBase): |
48 | 64 | return value.item_to_choice() |
49 | 65 | elif isinstance(value, Path): |
50 | 66 | return shlex.quote(str(value)) |
51 | 67 | elif isinstance(value, str): |
52 | | - if is_quoted(value): |
| 68 | + if is_quoted(value) and not base64_encode: |
53 | 69 | # the value is already quoted, do not quote again |
54 | | - return value |
55 | | - elif quote: |
56 | | - return repr(value) |
| 70 | + return maybe_encode(value) |
| 71 | + elif quote and not base64_encode: |
| 72 | + return maybe_encode(repr(value)) |
57 | 73 | else: |
58 | | - return value |
| 74 | + return maybe_encode(value) |
59 | 75 | else: |
60 | 76 | return repr(value) |
61 | 77 |
|
62 | 78 |
|
63 | 79 | def join_cli_args(args): |
64 | 80 | try: |
65 | 81 | return " ".join(arg for arg in args if arg) |
66 | | - except TypeError: |
| 82 | + except TypeError as e: |
67 | 83 | raise TypeError( |
68 | 84 | f"bug: join_cli_args expects iterable of strings. Given: {args}" |
69 | | - ) |
| 85 | + ) from e |
70 | 86 |
|
71 | 87 |
|
72 | 88 | def url_can_parse(url: str) -> bool: |
@@ -113,3 +129,40 @@ async def async_lock(_lock: threading.Lock): |
113 | 129 |
|
114 | 130 | def is_quoted(value: str) -> bool: |
115 | 131 | return _is_quoted_re.match(value) is not None |
| 132 | + |
| 133 | + |
| 134 | +base64_prefix = "base64//" |
| 135 | + |
| 136 | + |
| 137 | +def maybe_base64(parser_func): |
| 138 | + """Parse optionally base64 encoded CLI args, applying parser_func if not None.""" |
| 139 | + |
| 140 | + def inner(args): |
| 141 | + def is_base64(arg): |
| 142 | + return arg.startswith(base64_prefix) |
| 143 | + |
| 144 | + def decode(arg): |
| 145 | + if is_base64(arg): |
| 146 | + return base64.b64decode(arg[len(base64_prefix) :]).decode() |
| 147 | + else: |
| 148 | + return arg |
| 149 | + |
| 150 | + def apply_parser(args): |
| 151 | + if parser_func is not None: |
| 152 | + return parser_func(args) |
| 153 | + else: |
| 154 | + return args |
| 155 | + |
| 156 | + if isinstance(args, str): |
| 157 | + return apply_parser(decode(args)) |
| 158 | + elif isinstance(args, list): |
| 159 | + decoded = [decode(arg) for arg in args] |
| 160 | + return apply_parser(decoded) |
| 161 | + else: |
| 162 | + raise NotImplementedError() |
| 163 | + |
| 164 | + return inner |
| 165 | + |
| 166 | + |
| 167 | +def encode_as_base64(arg: str): |
| 168 | + return f"{base64_prefix}{base64.b64encode(arg.encode()).decode()}" |
0 commit comments