|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import argparse |
| 4 | +import datetime |
4 | 5 | import os.path |
5 | 6 | from argparse import ArgumentParser, Namespace |
6 | | -from typing import Optional, Callable |
| 7 | +from typing import Optional, Callable, Dict, List, Any, Tuple, Set, TextIO |
7 | 8 |
|
8 | 9 | import fs.copy |
9 | 10 | from fs.base import FS |
| 11 | +from fs.multifs import MultiFS |
10 | 12 | from relic.core.cli import CliPluginGroup, _SubParsersAction, CliPlugin |
11 | 13 |
|
| 14 | +from relic.sga.core.definitions import StorageType |
| 15 | +from relic.sga.core.filesystem import EssenceFS, _EssenceDriveFS |
| 16 | + |
12 | 17 |
|
13 | 18 | class RelicSgaCli(CliPluginGroup): |
14 | 19 | GROUP = "relic.cli.sga" |
@@ -130,3 +135,144 @@ def _create_parser( |
130 | 135 | # pack further delegates to version plugins |
131 | 136 |
|
132 | 137 | return parser |
| 138 | + |
| 139 | + |
| 140 | +class RelicSgaInfoCli(CliPlugin): |
| 141 | + def _create_parser( |
| 142 | + self, command_group: Optional[_SubParsersAction] = None |
| 143 | + ) -> ArgumentParser: |
| 144 | + parser: ArgumentParser |
| 145 | + description = "Dumps metadata packed into an SGA file." |
| 146 | + if command_group is None: |
| 147 | + parser = ArgumentParser("info", description=description) |
| 148 | + else: |
| 149 | + parser = command_group.add_parser("info", description=description) |
| 150 | + |
| 151 | + parser.add_argument( |
| 152 | + "sga", |
| 153 | + type=_get_file_type_validator(exists=True), |
| 154 | + help="SGA File to inspect", |
| 155 | + ) |
| 156 | + parser.add_argument( |
| 157 | + "log_file", |
| 158 | + nargs="?", |
| 159 | + type=_get_file_type_validator(exists=False), |
| 160 | + help="Optional file to write messages to, required if `-q/--quiet` is used", |
| 161 | + default=None, |
| 162 | + ) |
| 163 | + parser.add_argument( |
| 164 | + "-q", |
| 165 | + "--quiet", |
| 166 | + action="store_true", |
| 167 | + default=False, |
| 168 | + help="When specified, SGA info is not printed to the console", |
| 169 | + ) |
| 170 | + return parser |
| 171 | + |
| 172 | + def command(self, ns: Namespace) -> Optional[int]: |
| 173 | + sga: str = ns.sga |
| 174 | + log_file: str = ns.log_file |
| 175 | + quiet: bool = ns.quiet |
| 176 | + |
| 177 | + logger: Optional[TextIO] = None |
| 178 | + try: |
| 179 | + if log_file is not None: |
| 180 | + logger = open(log_file, "w") |
| 181 | + |
| 182 | + outputs: List[Optional[TextIO]] = [] |
| 183 | + if quiet is False: |
| 184 | + outputs.append(None) # None is a sentinel for stdout |
| 185 | + if logger is not None: |
| 186 | + outputs.append(logger) |
| 187 | + |
| 188 | + if len(outputs) == 0: |
| 189 | + print( |
| 190 | + "Please specify a `log_file` if using the `-q` or `--quiet` command" |
| 191 | + ) |
| 192 | + return 1 |
| 193 | + |
| 194 | + def _print( |
| 195 | + *msg: str, sep: Optional[str] = None, end: Optional[str] = None |
| 196 | + ) -> None: |
| 197 | + for output in outputs: |
| 198 | + print(*msg, sep=sep, end=end, file=output) |
| 199 | + |
| 200 | + def _is_container(d: Any) -> bool: |
| 201 | + return isinstance(d, (Dict, List, Tuple, Set)) # type: ignore |
| 202 | + |
| 203 | + def _stringify(d: Any, indent: int = 0) -> None: |
| 204 | + _TAB = "\t" |
| 205 | + if isinstance(d, Dict): |
| 206 | + for k, v in d.items(): |
| 207 | + if _is_container(v): |
| 208 | + _print(f"{_TAB * indent}{k}:") |
| 209 | + _stringify(v, indent + 1) |
| 210 | + else: |
| 211 | + _print(f"{_TAB * indent}{k}: {v}") |
| 212 | + elif isinstance(d, (List, Tuple, Set)): # type: ignore |
| 213 | + _print(f"{_TAB * indent}{', '.join(*d)}") |
| 214 | + else: |
| 215 | + _print(f"{_TAB * indent}{d}") |
| 216 | + |
| 217 | + def _getessence(fs: FS, path: str = "/") -> Dict[str, Any]: |
| 218 | + return fs.getinfo(path, "essence").raw.get("essence", {}) # type: ignore |
| 219 | + |
| 220 | + _print(f"File: `{sga}`") |
| 221 | + sgafs: EssenceFS |
| 222 | + with fs.open_fs(f"sga://{sga}") as sgafs: # type: ignore |
| 223 | + _print("Archive Metadata:") |
| 224 | + _stringify(sgafs.getmeta("essence"), indent=1) |
| 225 | + |
| 226 | + drive: _EssenceDriveFS |
| 227 | + for alias, drive in sgafs.iterate_fs(): # type: ignore |
| 228 | + _print(f"Drive: `{drive.name}` (`{drive.alias}`)") |
| 229 | + _print("\tDrive Metadata:") |
| 230 | + info = _getessence(drive) |
| 231 | + if len(info) > 0: |
| 232 | + _stringify(info, indent=2) |
| 233 | + else: |
| 234 | + _print(f"\t\tNo Metadata") |
| 235 | + |
| 236 | + _print("\tDrive Files Metadata:") |
| 237 | + for f in drive.walk.files(): |
| 238 | + _print(f"\t\t`{f}`:") |
| 239 | + finfo: Dict[str, Any] = _getessence(drive, f) |
| 240 | + finfo = finfo.copy() |
| 241 | + # We alter storage_type cause it *should* always be present, if its not, we dont do anything |
| 242 | + key = "storage_type" |
| 243 | + if key in finfo: |
| 244 | + stv: int = finfo[key] |
| 245 | + st: StorageType = StorageType(stv) |
| 246 | + finfo[key] = f"{stv} ({st.name})" |
| 247 | + |
| 248 | + # We alter modified too, cause when it is present, its garbage |
| 249 | + key = "modified" |
| 250 | + if key in finfo: |
| 251 | + mtv: int = finfo[key] |
| 252 | + mt = datetime.datetime.fromtimestamp( |
| 253 | + mtv, datetime.timezone.utc |
| 254 | + ) |
| 255 | + finfo[key] = str(mt) |
| 256 | + |
| 257 | + # And CRC32 if it's in bytes; this should be removed ASAP tho # I only put this in because its such a minor patch to V2 |
| 258 | + key = "crc32" |
| 259 | + if key in finfo: |
| 260 | + crcv: bytes = finfo[key] |
| 261 | + if isinstance(crcv, bytes): |
| 262 | + crc32 = int.from_bytes(crcv, "little", signed=False) |
| 263 | + finfo[key] = crc32 |
| 264 | + |
| 265 | + if len(finfo) > 0: |
| 266 | + _stringify(finfo, indent=3) |
| 267 | + else: |
| 268 | + _print(f"\t\t\tNo Metadata") |
| 269 | + |
| 270 | + finally: |
| 271 | + if logger is not None: |
| 272 | + logger.close() |
| 273 | + |
| 274 | + if log_file is not None: |
| 275 | + print( |
| 276 | + f"Saved to `{os.path.join(os.getcwd(), log_file)}`" |
| 277 | + ) # DO NOT USE _PRINT |
| 278 | + return None |
0 commit comments