|
4 | 4 |
|
5 | 5 | import argparse |
6 | 6 | import asyncio |
| 7 | +import concurrent.futures |
7 | 8 | import hashlib |
8 | 9 | import json |
9 | 10 | import os |
|
29 | 30 |
|
30 | 31 | REQUIRED_METADATA_FIELDS = ("name", "desc", "version", "author") |
31 | 32 | DEFAULT_CLONE_TIMEOUT = 120 |
| 33 | +DEFAULT_MAX_WORKERS = 8 |
32 | 34 | CONFLICT_MARKERS = ("<<<<<<<", "=======", ">>>>>>>") |
33 | 35 |
|
34 | 36 |
|
@@ -489,29 +491,60 @@ def validate_selected_plugins( |
489 | 491 | work_dir: Path, |
490 | 492 | clone_timeout: int, |
491 | 493 | load_timeout: int, |
| 494 | + max_workers: int, |
492 | 495 | ) -> list[dict]: |
493 | | - results = [] |
494 | 496 | total = len(selected) |
495 | | - |
496 | | - for index, (plugin, plugin_data) in enumerate(selected, start=1): |
497 | | - print(f"[{index}/{total}] Validating {plugin}", flush=True) |
498 | | - result = validate_plugin( |
499 | | - plugin=plugin, |
500 | | - plugin_data=plugin_data, |
501 | | - astrbot_path=astrbot_path, |
502 | | - script_path=script_path, |
503 | | - work_dir=work_dir, |
504 | | - clone_timeout=clone_timeout, |
505 | | - load_timeout=load_timeout, |
| 497 | + results: list[dict | None] = [None] * total |
| 498 | + |
| 499 | + def task(index: int, plugin: str, plugin_data: dict) -> tuple[int, dict]: |
| 500 | + return ( |
| 501 | + index, |
| 502 | + validate_plugin( |
| 503 | + plugin=plugin, |
| 504 | + plugin_data=plugin_data, |
| 505 | + astrbot_path=astrbot_path, |
| 506 | + script_path=script_path, |
| 507 | + work_dir=work_dir, |
| 508 | + clone_timeout=clone_timeout, |
| 509 | + load_timeout=load_timeout, |
| 510 | + ), |
506 | 511 | ) |
507 | | - results.append(result) |
508 | 512 |
|
509 | | - status = "PASS" if result.get("ok") else "FAIL" |
510 | | - stage = result.get("stage", "unknown") |
511 | | - message = result.get("message", "") |
512 | | - print(f"[{index}/{total}] {status} {plugin} [{stage}] {message}", flush=True) |
| 513 | + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
| 514 | + future_to_context: dict[concurrent.futures.Future, tuple[int, str]] = {} |
513 | 515 |
|
514 | | - return results |
| 516 | + for index, (plugin, plugin_data) in enumerate(selected, start=1): |
| 517 | + print(f"[{index}/{total}] Queued {plugin}", flush=True) |
| 518 | + future = executor.submit(task, index, plugin, plugin_data) |
| 519 | + future_to_context[future] = (index, plugin) |
| 520 | + |
| 521 | + for future in concurrent.futures.as_completed(future_to_context): |
| 522 | + index, plugin = future_to_context[future] |
| 523 | + try: |
| 524 | + original_index, result = future.result() |
| 525 | + except Exception as exc: |
| 526 | + original_index = index |
| 527 | + result = build_result( |
| 528 | + plugin=plugin, |
| 529 | + repo="", |
| 530 | + normalized_repo_url=None, |
| 531 | + ok=False, |
| 532 | + stage="threadpool", |
| 533 | + message=str(exc), |
| 534 | + details=traceback.format_exc(), |
| 535 | + ) |
| 536 | + |
| 537 | + results[original_index - 1] = result |
| 538 | + status = "PASS" if result.get("ok") else "FAIL" |
| 539 | + stage = result.get("stage", "unknown") |
| 540 | + message = result.get("message", "") |
| 541 | + print(f"[{original_index}/{total}] {status} {plugin} [{stage}] {message}", flush=True) |
| 542 | + |
| 543 | + finalized = [result for result in results if result is not None] |
| 544 | + if len(finalized) != total: |
| 545 | + raise RuntimeError("parallel validation finished with missing results") |
| 546 | + |
| 547 | + return finalized |
515 | 548 |
|
516 | 549 |
|
517 | 550 | class NullStub: |
@@ -684,6 +717,7 @@ def build_parser() -> argparse.ArgumentParser: |
684 | 717 | parser.add_argument("--work-dir") |
685 | 718 | parser.add_argument("--clone-timeout", type=int, default=DEFAULT_CLONE_TIMEOUT) |
686 | 719 | parser.add_argument("--load-timeout", type=int, default=300) |
| 720 | + parser.add_argument("--max-workers", type=int, default=DEFAULT_MAX_WORKERS) |
687 | 721 | parser.add_argument("--worker", action="store_true") |
688 | 722 | parser.add_argument("--plugin-source-dir") |
689 | 723 | parser.add_argument("--plugin-dir-name") |
@@ -736,6 +770,7 @@ def main() -> int: |
736 | 770 | work_dir=work_dir, |
737 | 771 | clone_timeout=args.clone_timeout, |
738 | 772 | load_timeout=args.load_timeout, |
| 773 | + max_workers=args.max_workers, |
739 | 774 | ) |
740 | 775 | finally: |
741 | 776 | if temp_dir is not None: |
|
0 commit comments