Skip to content

Commit a39dab3

Browse files
authored
Merge pull request #38 from zhujian0805/main
feat: implement unified fetching framework and eliminate code duplication
2 parents f46cb33 + 4740d6e commit a39dab3

File tree

16 files changed

+2792
-258
lines changed

16 files changed

+2792
-258
lines changed

REFACTOR_FETCH_DUPLICATION.md

Lines changed: 1152 additions & 0 deletions
Large diffs are not rendered by default.

code_assistant_manager/agents/manager.py

Lines changed: 28 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,13 @@
44
across different AI tool handlers.
55
"""
66

7-
import concurrent.futures
87
import json
98
import logging
10-
import shutil
11-
import threading
129
from pathlib import Path
1310
from typing import Dict, List, Optional, Type
1411

1512
from ..repo_loader import RepoConfigLoader
13+
from ..fetching.base import BaseEntityFetcher, RepoConfig
1614
from .base import BaseAgentHandler
1715
from .claude import ClaudeAgentHandler
1816
from .codebuddy import CodebuddyAgentHandler
@@ -116,6 +114,10 @@ def __init__(self, config_dir: Optional[Path] = None):
116114
self.repos_file = self.config_dir / "agent_repos.json"
117115
self.config_dir.mkdir(parents=True, exist_ok=True)
118116

117+
# Initialize fetcher with agent parser
118+
from ..fetching.parsers import AgentParser
119+
self.fetcher = BaseEntityFetcher(parser=AgentParser())
120+
119121
# Initialize handlers
120122
self._handlers: Dict[str, BaseAgentHandler] = {}
121123
for app_name, handler_class in AGENT_HANDLERS.items():
@@ -311,120 +313,35 @@ def fetch_agents_from_repos(self, max_workers: int = 8) -> List[Agent]:
311313
self._init_default_repos_file()
312314
repos = self._load_repos()
313315

314-
# Filter enabled repos
315-
enabled_repos = {
316-
repo_id: repo for repo_id, repo in repos.items() if repo.enabled
317-
}
318-
319-
if not enabled_repos:
320-
logger.warning("No enabled repositories found")
321-
return []
316+
# Convert AgentRepo objects to RepoConfig objects for the fetcher
317+
repo_configs = [
318+
RepoConfig(
319+
owner=repo.owner,
320+
name=repo.name,
321+
branch=repo.branch,
322+
path=repo.agents_path,
323+
enabled=repo.enabled
324+
)
325+
for repo in repos.values()
326+
]
322327

323-
logger.info(f"Fetching agents from {len(enabled_repos)} repositories in parallel")
328+
# Fetch using unified fetcher
329+
agents = self.fetcher.fetch_from_repos(
330+
repos=repo_configs,
331+
max_workers=max_workers
332+
)
324333

325-
all_agents = []
334+
# Update installed status from existing agents
326335
existing_agents = self._load_agents()
327-
328-
# Thread-safe storage for results
329-
agents_results = []
330-
lock = threading.Lock()
331-
332-
# Use claude handler for fetching (all repos use same format)
333-
handler = self.get_handler("claude")
334-
335-
def process_repository(repo_id: str, repo: AgentRepo):
336-
"""Process a single repository to extract agents."""
337-
try:
338-
agents = self._fetch_agents_from_repo(repo, handler)
339-
for agent in agents:
340-
if agent.key in existing_agents:
341-
agent.installed = existing_agents[agent.key].installed
342-
343-
with lock:
344-
agents_results.extend(agents)
345-
346-
logger.info(f"Found {len(agents)} agents in {repo_id}")
347-
return len(agents)
348-
except Exception as e:
349-
logger.warning(f"Failed to fetch agents from {repo_id}: {e}")
350-
return 0
351-
352-
# Use ThreadPoolExecutor for parallel processing
353-
actual_workers = min(max_workers, len(enabled_repos))
354-
logger.debug(f"Using {actual_workers} concurrent workers")
355-
356-
with concurrent.futures.ThreadPoolExecutor(max_workers=actual_workers) as executor:
357-
# Submit all tasks
358-
future_to_repo = {
359-
executor.submit(process_repository, repo_id, repo): repo_id
360-
for repo_id, repo in enabled_repos.items()
361-
}
362-
363-
# Wait for all tasks to complete
364-
for future in concurrent.futures.as_completed(future_to_repo):
365-
repo_id = future_to_repo[future]
366-
try:
367-
agent_count = future.result()
368-
logger.debug(f"Completed processing {repo_id}: {agent_count} agents")
369-
except Exception as e:
370-
logger.error(f"Exception processing {repo_id}: {e}")
371-
372-
all_agents = agents_results
373-
374-
# Merge and save
375-
for agent in all_agents:
336+
for agent in agents:
337+
if agent.key in existing_agents:
338+
agent.installed = existing_agents[agent.key].installed
376339
existing_agents[agent.key] = agent
377-
self._save_agents(existing_agents)
378-
379-
logger.info(f"Total agents fetched: {len(all_agents)}")
380-
return all_agents
381-
382-
def _fetch_agents_from_repo(
383-
self, repo: AgentRepo, handler: BaseAgentHandler
384-
) -> List[Agent]:
385-
"""Fetch agents from a single repository.
386-
387-
Args:
388-
repo: The repository to fetch from
389-
handler: The handler to use for parsing
390340

391-
Returns:
392-
List of agents found
393-
"""
394-
# Use the new Fetcher class similar to awesome-claude-agents
395-
fetcher = Fetcher()
396-
397-
repo_data = {
398-
"owner": repo.owner,
399-
"name": repo.name,
400-
"branch": repo.branch,
401-
"agentsPath": repo.agents_path or "agents"
402-
}
403-
404-
agents_data = fetcher.fetch_agents_from_repo(repo_data)
405-
agents = []
406-
407-
for agent_data in agents_data:
408-
# Convert to Agent model
409-
filename = agent_data["file_path"].split("/")[-1] if "/" in agent_data["file_path"] else agent_data["file_path"]
410-
411-
agent = Agent(
412-
key=f"{repo.owner}/{repo.name}:{agent_data['name']}",
413-
name=agent_data["name"],
414-
description=agent_data["description"],
415-
filename=filename,
416-
installed=False,
417-
repo_owner=repo.owner,
418-
repo_name=repo.name,
419-
repo_branch=repo.branch,
420-
agents_path=repo.agents_path,
421-
readme_url=f"https://github.com/{repo.owner}/{repo.name}/blob/{repo.branch}/{agent_data['file_path']}",
422-
tools=agent_data.get("source_data", {}).get("tools", []),
423-
color=agent_data.get("source_data", {}).get("color"),
424-
)
425-
agents.append(agent)
426-
logger.debug(f"Found agent: {agent.key}")
341+
# Save updated agents
342+
self._save_agents(existing_agents)
427343

344+
logger.info(f"Total agents fetched: {len(agents)}")
428345
return agents
429346

430347
def fetch_agents_from_external_sources(self) -> List[Agent]:

code_assistant_manager/cli/plugins/plugin_install_commands.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,12 +390,19 @@ def install_plugin(
390390
marketplace = _resolve_plugin_conflict(plugin, app)
391391

392392
# Use @ syntax for Claude CLI compatibility, but show : syntax in output
393-
plugin_ref = f"{plugin}@{marketplace}" if marketplace else plugin
393+
# For Claude CLI, extract just the repo name from owner/repo format
394+
claude_marketplace = marketplace
395+
if getattr(handler, "uses_cli_plugin_commands", False) and marketplace and "/" in marketplace:
396+
# Claude CLI uses just repo name, not owner/repo
397+
claude_marketplace = marketplace.split("/")[-1] # Extract "claude-plugins-official" from "anthropics/claude-plugins-official"
398+
plugin_ref = f"{plugin}@{claude_marketplace}"
399+
else:
400+
plugin_ref = f"{plugin}@{marketplace}" if marketplace else plugin
394401
display_ref = f"{marketplace}:{plugin}" if marketplace else plugin
395402
typer.echo(f"{Colors.CYAN}Installing plugin: {display_ref}...{Colors.RESET}")
396403

397404
if getattr(handler, "uses_cli_plugin_commands", False):
398-
success, msg = handler.install_plugin(plugin, marketplace)
405+
success, msg = handler.install_plugin(plugin, claude_marketplace)
399406
else:
400407
# Install directly from CAM-configured marketplace (no app CLI required)
401408
from code_assistant_manager.plugins import PluginManager

code_assistant_manager/cli/plugins/plugin_management_commands.py

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ def _show_available_plugins(manager: PluginManager, query: Optional[str] = None,
224224
"""Show available plugins from all configured marketplaces."""
225225
from code_assistant_manager.plugins.fetch import fetch_repo_info
226226
from code_assistant_manager.cli.plugins.plugin_discovery_commands import _filter_plugins, _display_plugin
227+
import concurrent.futures
228+
import threading
227229

228230
all_repos = manager.get_all_repos()
229231
if not all_repos:
@@ -234,16 +236,50 @@ def _show_available_plugins(manager: PluginManager, query: Optional[str] = None,
234236
all_plugins = []
235237
repo_sources = {} # Track which repo each plugin comes from
236238

237-
for repo_name, repo in all_repos.items():
238-
if not repo.repo_owner or not repo.repo_name:
239-
continue
239+
# Thread-safe storage for results
240+
results_lock = threading.Lock()
241+
fetch_results = []
240242

241-
# Fetch repo info
242-
info = fetch_repo_info(
243-
repo.repo_owner, repo.repo_name, repo.repo_branch or "main"
244-
)
245-
if not info:
246-
continue
243+
def fetch_single_repo(repo_name: str, repo):
244+
"""Fetch plugins from a single repository."""
245+
try:
246+
# Fetch repo info
247+
info = fetch_repo_info(
248+
repo.repo_owner, repo.repo_name, repo.repo_branch or "main"
249+
)
250+
if not info:
251+
return None
252+
253+
result_data = {"repo_name": repo_name, "repo": repo, "info": info}
254+
return result_data
255+
except Exception as e:
256+
logger.warning(f"Failed to fetch from {repo_name}: {e}")
257+
return None
258+
259+
# Use ThreadPoolExecutor for parallel fetching (5-10x speedup!)
260+
actual_workers = min(8, len(all_repos)) # Max 8 concurrent requests
261+
logger.debug(f"Fetching from {len(all_repos)} repositories with {actual_workers} workers")
262+
263+
with concurrent.futures.ThreadPoolExecutor(max_workers=actual_workers) as executor:
264+
# Submit all fetch tasks
265+
future_to_repo = {
266+
executor.submit(fetch_single_repo, repo_name, repo): repo_name
267+
for repo_name, repo in all_repos.items()
268+
if repo.repo_owner and repo.repo_name
269+
}
270+
271+
# Collect results as they complete
272+
for future in concurrent.futures.as_completed(future_to_repo):
273+
result = future.result()
274+
if result:
275+
with results_lock:
276+
fetch_results.append(result)
277+
278+
# Process the results
279+
for result_data in fetch_results:
280+
repo_name = result_data["repo_name"]
281+
repo = result_data["repo"]
282+
info = result_data["info"]
247283

248284
if info.type == "marketplace":
249285
# Add plugins from marketplace with their source
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
"""Unified fetching framework for skills, agents, and plugins."""
2+
3+
from .base import BaseEntityFetcher, EntityParser, RepoConfig
4+
from .parallel import ParallelFetcher
5+
from .repository import GitRepository
6+
from .cache import FetchCache
7+
8+
__all__ = [
9+
"BaseEntityFetcher",
10+
"EntityParser",
11+
"RepoConfig",
12+
"ParallelFetcher",
13+
"GitRepository",
14+
"FetchCache",
15+
]

0 commit comments

Comments
 (0)