diff --git a/src/scripts/ossinsight_importer.py b/src/scripts/ossinsight_importer.py index adedc0f51..394657d51 100644 --- a/src/scripts/ossinsight_importer.py +++ b/src/scripts/ossinsight_importer.py @@ -106,37 +106,78 @@ def get_collection_repos(collection_id: str) -> List[Dict[str, str]]: def display_collections_table(collections: List[Dict[str, str]]) -> None: """ Display collections in a formatted table. - + Args: collections (List[Dict[str, str]]): List of collections with 'id' and 'name' keys """ if not collections: print("No collections found.") return - + table_data = [[col["id"], col["name"]] for col in collections] print("\n" + tabulate(table_data, headers=["ID", "Collection Name"], tablefmt="grid") + "\n") -def prompt_collection_id(collections: List[Dict[str, str]]) -> Optional[str]: +def filter_collections(collections: List[Dict[str, str]], keywords: List[str]) -> List[Dict[str, str]]: """ - Prompt user to select a collection ID and validate it. - + Filter collections by keywords (case-insensitive substring match on name). + Args: - collections (List[Dict[str, str]]): List of collections with 'id' and 'name' keys - + collections (List[Dict[str, str]]): All collections + keywords (List[str]): Keywords to filter by (OR logic — any match included) + Returns: - Optional[str]: The selected collection ID, or None if invalid + List[Dict[str, str]]: Filtered collections """ - collection_ids = {col["id"] for col in collections} - + if not keywords: + return collections + lower_keywords = [kw.lower() for kw in keywords] + return [ + col for col in collections + if any(kw in col["name"].lower() for kw in lower_keywords) + ] + + +def prompt_batch_collection_ids(collections: List[Dict[str, str]]) -> List[str]: + """ + Show filtered collections and let the user confirm or edit the selection. + + Args: + collections (List[Dict[str, str]]): Pre-filtered collections to present + + Returns: + List[str]: Confirmed collection IDs to process + """ + valid_ids = {col["id"] for col in collections} + id_to_col = {col["id"]: col for col in collections} + + display_collections_table(collections) + print(f"{len(collections)} collection(s) matched.") + while True: - collection_id = input("Enter the collection ID you want to index: ").strip() - - if collection_id in collection_ids: - return collection_id - else: - print(f"Invalid collection ID. Please enter one of: {', '.join(sorted(collection_ids))}") + print("\nOptions:") + print(" [Enter] — process all listed collections") + print(" comma-sep IDs — process only those IDs (e.g. 1,3,7)") + print(" q — quit") + raw = input("Your choice: ").strip() + + if raw.lower() == "q": + return [] + + if raw == "": + return [col["id"] for col in collections] + + chosen = [t.strip() for t in raw.split(",") if t.strip()] + invalid = [c for c in chosen if c not in valid_ids] + if invalid: + print(f"Unknown IDs: {', '.join(invalid)}. Valid IDs are: {', '.join(sorted(valid_ids))}") + continue + + print("\nYou selected:") + display_collections_table([id_to_col[c] for c in chosen]) + confirm = input("Confirm? (y/n): ").strip().lower() + if confirm == "y": + return chosen def get_github_entity_type(owner: str) -> Tuple[str, Optional[str]]: @@ -383,71 +424,54 @@ def create_collection_file(collection_name: str, collection_display_name: str, p return True -def main(): - """Main workflow function.""" - print("OSS Insight Collection Importer") - print("=" * 50) - - print("\nFetching collections from OSS Insight...") - collections = list_collections() - - if not collections: - print("No collections found. Exiting.") - return - - display_collections_table(collections) - - collection_id = prompt_collection_id(collections) - if not collection_id: - print("Invalid collection ID. Exiting.") - return - - selected_collection = next((c for c in collections if c["id"] == collection_id), None) - if not selected_collection: - print("Collection not found. Exiting.") - return - - collection_name = selected_collection["name"] - print(f"\nSelected collection: {collection_name}") - - print(f"\nFetching repositories for collection {collection_id}...") +def process_collection( + collection_id: str, + collection_name: str, + project_names: set, + repo_to_name_mapping: Dict[str, str], +) -> Tuple[List[str], List[str], List[str]]: + """ + Fetch repos for one collection, create missing projects, return project lists. + + Args: + collection_id (str): OSS Insight collection ID + collection_name (str): Human-readable collection name (for logging) + project_names (set): Mutable set of known project names (updated in-place) + repo_to_name_mapping (Dict[str, str]): Mutable URL→name mapping (updated in-place) + + Returns: + Tuple of (processed_projects, existing_projects_found, new_projects_created) + """ + print(f"\nFetching repositories for '{collection_name}' (id={collection_id})...") repos = get_collection_repos(collection_id) - + if not repos: - print("No repositories found in this collection. Exiting.") - return - - print(f"Found {len(repos)} repositories.") - - print("\nLoading existing OSO projects...") - yaml_data = get_yaml_data_from_path(path=LOCAL_PROJECTS_PATH) - repo_to_name_mapping = map_repos_to_names(yaml_data) - project_names = {data.get("name") for data in yaml_data if data.get("name")} - - print(f"Found {len(project_names)} existing projects.") - - print("\nProcessing repositories...") - processed_projects = [] - existing_projects_found = [] - new_projects_created = [] - processed_owners = set() - + print(" No repositories found — skipping.") + return [], [], [] + + print(f" Found {len(repos)} repositories.") + + processed_projects: List[str] = [] + existing_projects_found: List[str] = [] + new_projects_created: List[str] = [] + processed_owners: set = set() + for repo_info in repos: repo_name = repo_info.get("repo_name", "") if not repo_name or "/" not in repo_name: continue - + owner, repo = repo_name.split("/", 1) owner_lower = owner.lower() repo_url = f"https://github.com/{owner}/{repo}" - + if owner_lower in processed_owners: continue - + existing_project = find_existing_project(owner, repo_url, project_names, repo_to_name_mapping) - + if existing_project: - print(f"✓ Found existing project '{existing_project}' for {repo_name}") + print(f" ✓ Found existing project '{existing_project}' for {repo_name}") existing_projects_found.append(existing_project) if existing_project not in processed_projects: processed_projects.append(existing_project) @@ -455,8 +479,8 @@ def main(): else: try: entity_type, display_name = get_github_entity_type(owner) - - if entity_type == 'org': + + if entity_type == "org": if create_org_project(owner, display_name): processed_projects.append(owner_lower) new_projects_created.append(owner_lower) @@ -471,40 +495,112 @@ def main(): new_projects_created.append(owner_lower) project_names.add(owner_lower) for user_repo in user_repos: - repo_to_name_mapping[normalize_github_url(f"https://github.com/{owner}/{user_repo}")] = owner_lower + repo_to_name_mapping[ + normalize_github_url(f"https://github.com/{owner}/{user_repo}") + ] = owner_lower processed_owners.add(owner_lower) else: - print(f"⚠ No repositories found for user {owner}, skipping") + print(f" ⚠ No repositories found for user {owner}, skipping") processed_owners.add(owner_lower) - + except Exception as e: - print(f"⚠ Error processing {repo_name}: {e}") + print(f" ⚠ Error processing {repo_name}: {e}") processed_owners.add(owner_lower) continue - + + return processed_projects, existing_projects_found, new_projects_created + + +def main(): + """Main workflow function.""" + print("OSS Insight Collection Importer") + print("=" * 50) + + print("\nFetching collections from OSS Insight...") + collections = list_collections() + + if not collections: + print("No collections found. Exiting.") + return + + # --- keyword filter --- + raw_keywords = input( + "\nEnter keyword(s) to filter collections (comma-separated, or leave blank for all): " + ).strip() + keywords = [k.strip() for k in raw_keywords.split(",") if k.strip()] if raw_keywords else [] + + filtered = filter_collections(collections, keywords) + if not filtered: + print("No collections matched your keywords. Exiting.") + return + + # --- batch confirmation / selection --- + selected_ids = prompt_batch_collection_ids(filtered) + if not selected_ids: + print("No collections selected. Exiting.") + return + + id_to_col = {col["id"]: col for col in collections} + + # --- load existing projects once --- + print("\nLoading existing OSO projects...") + yaml_data = get_yaml_data_from_path(path=LOCAL_PROJECTS_PATH) + repo_to_name_mapping = map_repos_to_names(yaml_data) + project_names = {data.get("name") for data in yaml_data if data.get("name")} + print(f"Found {len(project_names)} existing projects.") + + # --- process each collection --- + batch_results: List[Dict] = [] + + for collection_id in selected_ids: + col = id_to_col[collection_id] + processed, existing, created = process_collection( + collection_id, col["name"], project_names, repo_to_name_mapping + ) + batch_results.append({ + "id": collection_id, + "name": col["name"], + "processed": processed, + "existing": existing, + "created": created, + }) + + # --- summary --- print("\n" + "=" * 50) - print("Processing Summary:") - print(f" Total repositories: {len(repos)}") - print(f" Existing projects found: {len(existing_projects_found)}") - print(f" New projects created: {len(new_projects_created)}") - print(f" Total projects for collection: {len(processed_projects)}") - - if processed_projects: - print("\n" + "=" * 50) - response = input("Do you want to create a new OSO collection file? (y/n): ").lower().strip() - - if response == 'y': - current_date = datetime.now() - month_year = current_date.strftime("%b %Y") - collection_display_name = f"{collection_name} (ossinsight, {month_year})" - unique_projects = sorted(list(set(processed_projects))) - - create_collection_file(collection_name, collection_display_name, unique_projects) - print("\n✓ Collection file created successfully!") + print("Batch Summary:") + summary_rows = [ + [r["name"], len(r["existing"]), len(r["created"]), len(r["processed"])] + for r in batch_results + ] + print( + tabulate( + summary_rows, + headers=["Collection", "Existing", "New", "Total"], + tablefmt="grid", + ) + ) + + # --- collection file creation --- + print("\n" + "=" * 50) + current_date = datetime.now() + month_year = current_date.strftime("%b %Y") + + for result in batch_results: + if not result["processed"]: + print(f"\nNo projects for '{result['name']}' — skipping collection file.") + continue + + response = input( + f"\nCreate OSO collection file for '{result['name']}'? (y/n): " + ).lower().strip() + + if response == "y": + collection_display_name = f"{result['name']} (ossinsight, {month_year})" + unique_projects = sorted(set(result["processed"])) + create_collection_file(result["name"], collection_display_name, unique_projects) + print(" ✓ Collection file created/updated.") else: - print("\nSkipping collection file creation.") - else: - print("\nNo projects to add to collection.") + print(" Skipped.") if __name__ == "__main__":