forked from HKUDS/OpenSpace
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathregistry.py
More file actions
753 lines (631 loc) · 29.5 KB
/
Copy pathregistry.py
File metadata and controls
753 lines (631 loc) · 29.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
"""SkillRegistry — discover, load, match, and inject skills.
Skills follow the official SKILL.md format:
- YAML frontmatter with only ``name`` and ``description``
- Markdown body with instructions (loaded only after selection)
Skills are discovered from user-configured directories and matched to
tasks via LLM-based selection (with keyword fallback).
Skill identity:
Every skill directory may contain a ``.skill_id`` sidecar file that
stores the persistent unique identifier. On **first discovery**
(no ``.skill_id`` file present), an ID is generated and written to
the file. On subsequent runs the ID is **read** from the file —
this makes the ID portable (survives directory moves, machine changes)
and deterministic (never regenerated).
Imported skills: ``{name}__imp_{uuid_hex[:8]}``
Evolved skills: ``{name}__v{gen}_{uuid_hex[:8]}`` (written by evolver)
"""
from __future__ import annotations
import json
import re
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from openspace.utils.logging import Logger
from .skill_utils import parse_frontmatter, strip_frontmatter, check_skill_safety, is_skill_safe
from .skill_ranker import SkillRanker, SkillCandidate, PREFILTER_THRESHOLD
if TYPE_CHECKING:
from openspace.llm import LLMClient
logger = Logger.get_logger(__name__)
# Sidecar filename that stores the persistent skill_id
SKILL_ID_FILENAME = ".skill_id"
def _read_or_create_skill_id(name: str, skill_dir: Path) -> str:
"""Read ``skill_id`` from ``.skill_id`` sidecar, or create one.
The sidecar file is a single-line plain-text file containing only
the ``skill_id`` string. It lives alongside ``SKILL.md`` inside
the skill directory.
First call (no file): generates ``{name}__imp_{uuid8}`` and writes it.
Subsequent calls: reads and returns the existing ID.
"""
id_file = skill_dir / SKILL_ID_FILENAME
if id_file.exists():
try:
existing = id_file.read_text(encoding="utf-8").strip()
if existing:
return existing
except OSError:
pass # fall through to generate
# Generate a new ID and persist
new_id = f"{name}__imp_{uuid.uuid4().hex[:8]}"
try:
id_file.write_text(new_id + "\n", encoding="utf-8")
logger.debug(f"Created .skill_id for '{name}': {new_id}")
except OSError as e:
logger.warning(f"Cannot write {id_file}: {e} — ID will not persist across restarts")
return new_id
def write_skill_id(skill_dir: Path, skill_id: str) -> None:
"""Write (or overwrite) the ``.skill_id`` sidecar in *skill_dir*.
Called by ``SkillEvolver`` after FIX / DERIVED / CAPTURED to stamp
the new ``skill_id`` into the skill directory so that the next
``discover()`` picks it up correctly.
"""
id_file = skill_dir / SKILL_ID_FILENAME
try:
id_file.write_text(skill_id + "\n", encoding="utf-8")
except OSError as e:
logger.warning(f"Cannot write {id_file}: {e}")
@dataclass
class SkillMeta:
"""Metadata for a discovered skill.
``skill_id`` is the globally unique identifier used throughout the
system — LLM prompts, database, evolution, and selection all
reference this field.
"""
skill_id: str # Unique — persisted in .skill_id sidecar
name: str # Human-readable name (from frontmatter or dirname)
description: str
path: Path # Absolute path to SKILL.md
class SkillRegistry:
"""Discover, load, select, and inject skills into agent context.
Args:
skill_dirs: Ordered list of directories to scan. Earlier entries have higher
priority — a skill in the first dir shadows one with the same name
in later dirs.
All internal maps are keyed by ``skill_id``, not ``name``.
"""
def __init__(self, skill_dirs: Optional[List[Path]] = None) -> None:
self._skill_dirs: List[Path] = skill_dirs or []
self._skills: Dict[str, SkillMeta] = {} # skill_id -> SkillMeta
self._content_cache: Dict[str, str] = {} # skill_id -> raw SKILL.md content
self._discovered = False
self._ranker: Optional[SkillRanker] = None # lazy-init on first use
def discover(self) -> List[SkillMeta]:
"""Scan all skill_dirs and populate the registry.
Each skill is a sub-directory containing a ``SKILL.md`` file.
The ``skill_id`` is read from the ``.skill_id`` sidecar (created
automatically on first discovery). Two skills with the same
``name`` in different directories get different IDs and can
coexist in the registry and database.
"""
self._skills.clear()
self._content_cache.clear()
for skill_dir in self._skill_dirs:
if not skill_dir.exists():
logger.debug(f"Skill dir does not exist, skipping: {skill_dir}")
continue
for entry in sorted(skill_dir.iterdir()):
if not entry.is_dir():
continue
skill_file = entry / "SKILL.md"
if not skill_file.exists():
continue
try:
content = skill_file.read_text(encoding="utf-8")
# Safety check on skill content
safety_flags = check_skill_safety(content)
if not is_skill_safe(safety_flags):
logger.warning(
f"BLOCKED skill {entry.name}: "
f"safety flags {safety_flags}"
)
continue
meta = self._parse_skill(entry.name, entry, skill_file, content)
sid = meta.skill_id
if sid in self._skills:
logger.debug(f"Skill '{sid}' already discovered, skipping {skill_file}")
continue
self._skills[sid] = meta
self._content_cache[sid] = content
if safety_flags:
logger.debug(f"Discovered skill: {sid} (safety: {safety_flags})")
else:
logger.debug(f"Discovered skill: {sid} — {meta.description[:60]}")
except Exception as e:
logger.warning(f"Failed to parse skill {skill_file}: {e}")
self._discovered = True
logger.info(
f"Skill discovery complete: {len(self._skills)} skill(s) "
f"from {len(self._skill_dirs)} dir(s)"
)
return list(self._skills.values())
def list_skills(self) -> List[SkillMeta]:
"""List all discovered skills."""
self._ensure_discovered()
return list(self._skills.values())
def get_skill(self, skill_id: str) -> Optional[SkillMeta]:
"""Get a skill by ``skill_id``."""
self._ensure_discovered()
return self._skills.get(skill_id)
def get_skill_by_name(self, name: str) -> Optional[SkillMeta]:
"""Get a skill by ``name`` (first match). Use ``get_skill`` when possible."""
self._ensure_discovered()
for meta in self._skills.values():
if meta.name == name:
return meta
return None
def update_skill(self, old_skill_id: str, new_meta: SkillMeta) -> None:
"""Replace a skill entry after FIX evolution.
Removes *old_skill_id* from the registry and inserts *new_meta*
under its (new) ``skill_id``. Content cache is refreshed from
the filesystem.
"""
self._skills.pop(old_skill_id, None)
self._content_cache.pop(old_skill_id, None)
self._skills[new_meta.skill_id] = new_meta
if new_meta.path.exists():
try:
self._content_cache[new_meta.skill_id] = (
new_meta.path.read_text(encoding="utf-8")
)
except Exception:
pass
logger.debug(
f"Registry.update_skill: {old_skill_id} → {new_meta.skill_id}"
)
def add_skill(self, meta: SkillMeta) -> None:
"""Register a newly-created skill (DERIVED / CAPTURED).
Does NOT overwrite an existing entry with the same ``skill_id``.
"""
if meta.skill_id in self._skills:
logger.debug(
f"Registry.add_skill: {meta.skill_id} already exists, skipping"
)
return
self._skills[meta.skill_id] = meta
if meta.path.exists():
try:
self._content_cache[meta.skill_id] = (
meta.path.read_text(encoding="utf-8")
)
except Exception:
pass
logger.debug(f"Registry.add_skill: {meta.skill_id}")
# Hot-reload API (add external skills at runtime)
def discover_from_dirs(self, extra_dirs: List[Path]) -> List[SkillMeta]:
"""Discover skills from additional directories and add to the registry.
Unlike :meth:`discover`, this does **NOT** clear existing skills — it
only adds new ones from the given directories. Useful for hot-loading
external skills (e.g. host-agent skills, newly downloaded cloud skills).
Safety: applies the same ``check_skill_safety`` / ``is_skill_safe``
filtering as :meth:`discover` to prevent malicious external skills.
Args:
extra_dirs: Additional directories to scan.
"""
added: List[SkillMeta] = []
for skill_dir in extra_dirs:
if not skill_dir.exists() or not skill_dir.is_dir():
logger.debug(f"discover_from_dirs: skipping {skill_dir}")
continue
for entry in sorted(skill_dir.iterdir()):
if not entry.is_dir():
continue
skill_file = entry / "SKILL.md"
if not skill_file.exists():
continue
try:
content = skill_file.read_text(encoding="utf-8")
# Safety check (same as discover())
safety_flags = check_skill_safety(content)
if not is_skill_safe(safety_flags):
logger.warning(
f"BLOCKED external skill {entry.name}: "
f"safety flags {safety_flags}"
)
continue
meta = self._parse_skill(entry.name, entry, skill_file, content)
if meta.skill_id in self._skills:
continue
self._skills[meta.skill_id] = meta
self._content_cache[meta.skill_id] = content
added.append(meta)
logger.debug(f"Hot-registered: {meta.skill_id} — {meta.description[:60]}")
except Exception as e:
logger.warning(f"Failed to parse skill {skill_file}: {e}")
if added:
logger.info(
f"discover_from_dirs: {len(added)} new skill(s) from "
f"{len(extra_dirs)} dir(s)"
)
return added
def register_skill_dir(self, skill_dir: Path) -> Optional[SkillMeta]:
"""Register a single skill directory (hot-reload).
Safety: applies ``check_skill_safety`` / ``is_skill_safe`` filtering.
Args:
skill_dir: Path to a directory containing ``SKILL.md``.
Returns:
:class:`SkillMeta` if newly registered or already present,
``None`` if the directory is invalid or the skill fails safety checks.
"""
skill_file = skill_dir / "SKILL.md"
if not skill_file.exists():
logger.debug(f"register_skill_dir: no SKILL.md in {skill_dir}")
return None
try:
content = skill_file.read_text(encoding="utf-8")
# Safety check (same as discover())
safety_flags = check_skill_safety(content)
if not is_skill_safe(safety_flags):
logger.warning(
f"BLOCKED skill {skill_dir.name}: "
f"safety flags {safety_flags}"
)
return None
meta = self._parse_skill(skill_dir.name, skill_dir, skill_file, content)
if meta.skill_id in self._skills:
logger.debug(f"register_skill_dir: {meta.skill_id} already exists")
return self._skills[meta.skill_id]
self._skills[meta.skill_id] = meta
self._content_cache[meta.skill_id] = content
logger.info(f"Hot-registered skill: {meta.skill_id}")
return meta
except Exception as e:
logger.warning(f"Failed to register skill {skill_dir}: {e}")
return None
@property
def ranker(self) -> SkillRanker:
"""Lazy-initialised :class:`SkillRanker` for hybrid pre-filtering."""
if self._ranker is None:
self._ranker = SkillRanker()
return self._ranker
async def select_skills_with_llm(
self,
task_description: str,
llm_client: "LLMClient",
max_skills: int = 2,
model: Optional[str] = None,
skill_quality: Optional[Dict[str, Dict[str, Any]]] = None,
cross_session_hints: Optional[str] = None,
) -> tuple[List[SkillMeta], Optional[Dict[str, Any]]]:
"""Use an LLM to select the most relevant skills.
When the local registry has more than ``PREFILTER_THRESHOLD`` skills,
a **BM25 → embedding** pre-filter narrows the candidate set before
sending to the LLM. This avoids stuffing an overly long catalog
into the prompt.
Progressive disclosure: the LLM only sees skill *headers*
(skill_id + description + quality stats), not the full SKILL.md
content. Full content is loaded only after selection.
Args:
task_description: The user's task instruction.
llm_client: An initialised LLMClient used for the selection call.
max_skills: Maximum number of skills to inject.
model: Override model for this selection call.
If None, falls back to ``llm_client``'s default model.
skill_quality: Optional mapping ``{skill_id: {total_applied, total_completions, total_fallbacks}}``
from :class:`SkillStore`. When provided, skills with high
fallback rates are filtered out and quality signals are
included in the LLM selection prompt.
Returns:
tuple[list[SkillMeta], dict | None]: (selected_skills, selection_record).
selection_record contains the LLM conversation for logging.
"""
self._ensure_discovered()
if not task_description:
return [], None
available = list(self._skills.values())
if not available:
return [], None
# Quality-based filtering: remove skills that consistently fail
filtered_out: List[str] = []
if skill_quality:
kept: List[SkillMeta] = []
for s in available:
q = skill_quality.get(s.skill_id)
if q:
selections = q.get("total_selections", 0)
applied = q.get("total_applied", 0)
completions = q.get("total_completions", 0)
fallbacks = q.get("total_fallbacks", 0)
# Filter 1: selected multiple times but never completed
if selections >= 2 and completions == 0:
filtered_out.append(s.skill_id)
continue
# Filter 2: high fallback rate when applied
if applied >= 2 and fallbacks / applied > 0.5:
filtered_out.append(s.skill_id)
continue
kept.append(s)
if filtered_out:
logger.info(
f"Skill quality filter: removed {len(filtered_out)} "
f"high-fallback skill(s): {filtered_out}"
)
available = kept
if not available:
return [], None
# Pre-filter when skill count exceeds threshold
prefilter_used = False
if len(available) > PREFILTER_THRESHOLD:
available = self._prefilter_skills(task_description, available, max_skills)
prefilter_used = True
# Build a concise skills catalogue for the LLM (skill_id + description + quality)
catalog_lines: List[str] = []
for s in available:
q = skill_quality.get(s.skill_id) if skill_quality else None
if q:
selections = q.get("total_selections", 0)
applied = q.get("total_applied", 0)
completions = q.get("total_completions", 0)
if applied > 0:
rate = completions / applied
catalog_lines.append(
f"- **{s.skill_id}**: {s.description} "
f"(success {completions}/{applied} = {rate:.0%})"
)
elif selections > 0:
catalog_lines.append(
f"- **{s.skill_id}**: {s.description} "
f"(selected {selections}x, never succeeded)"
)
else:
catalog_lines.append(f"- **{s.skill_id}**: {s.description} (new)")
else:
catalog_lines.append(f"- **{s.skill_id}**: {s.description}")
skills_catalog = "\n".join(catalog_lines)
prompt = self._build_skill_selection_prompt(
task_description,
skills_catalog,
max_skills,
cross_session_hints=cross_session_hints,
)
selection_record: Dict[str, Any] = {
"method": "llm",
"task": task_description[:500],
"available_skills": [s.skill_id for s in available],
"filtered_out": filtered_out,
"prefilter_used": prefilter_used,
"prompt": prompt,
}
try:
from gdpval_bench.token_tracker import set_call_source, reset_call_source
_src_tok = set_call_source("skill_select")
except ImportError:
_src_tok = None
try:
llm_kwargs = {}
if model:
llm_kwargs["model"] = model
resp = await llm_client.complete(prompt, **llm_kwargs)
content = resp["message"]["content"].strip()
selected_ids, brief_plan = self._parse_skill_selection_response(content)
selection_record["llm_response"] = content
selection_record["parsed_ids"] = selected_ids
selection_record["brief_plan"] = brief_plan
# Validate ids against registry & cap
result: List[SkillMeta] = []
for sid in selected_ids:
if len(result) >= max_skills:
break
meta = self._skills.get(sid)
if meta:
result.append(meta)
else:
logger.debug(f"LLM selected unknown skill_id: {sid}")
selection_record["selected"] = [s.skill_id for s in result]
if result:
ids = ", ".join(s.skill_id for s in result)
logger.info(f"LLM skill selection: [{ids}]")
else:
logger.info("LLM decided no skills are relevant for this task")
return result, selection_record
except Exception as e:
logger.warning(f"LLM skill selection failed: {e} — proceeding without skills")
selection_record["error"] = str(e)
selection_record["method"] = "llm_failed"
selection_record["selected"] = []
return [], selection_record
finally:
if _src_tok is not None:
reset_call_source(_src_tok)
def _prefilter_skills(
self,
task: str,
available: List[SkillMeta],
max_skills: int,
) -> List[SkillMeta]:
"""Narrow the candidate set using BM25 + embedding hybrid ranking.
Keeps at most ``max(15, max_skills * 5)`` candidates for the LLM
selection prompt.
"""
prefilter_top_k = max(15, max_skills * 5)
# Build SkillCandidate list
candidates: List[SkillCandidate] = []
for s in available:
body = ""
raw = self._content_cache.get(s.skill_id, "")
if raw:
body = strip_frontmatter(raw)
candidates.append(SkillCandidate(
skill_id=s.skill_id,
name=s.name,
description=s.description,
body=body,
))
ranked = self.ranker.hybrid_rank(task, candidates, top_k=prefilter_top_k)
# Map back to SkillMeta
ranked_ids = {c.skill_id for c in ranked}
result = [s for s in available if s.skill_id in ranked_ids]
if len(result) < len(available):
logger.info(
f"Skill pre-filter: {len(available)} → {len(result)} candidates "
f"(BM25+embedding, threshold={PREFILTER_THRESHOLD})"
)
return result
def load_skill_content(self, skill_id: str) -> Optional[str]:
"""Return the SKILL.md content (with frontmatter stripped) for *skill_id*."""
self._ensure_discovered()
raw = self._content_cache.get(skill_id)
if raw is None:
return None
return self._strip_frontmatter(raw)
def build_context_injection(
self,
skills: List[SkillMeta],
backends: Optional[List[str]] = None,
) -> str:
"""Build a prompt fragment with the full content of *skills*.
Injected as a system message into the agent's messages before the
user instruction so the LLM reads skill guidance first.
Args:
skills: Skills to inject.
backends: Active backend names (e.g. ``["shell", "mcp"]``). Used to
tailor the guidance so only actually available backends are
mentioned. ``None`` falls back to mentioning all backends.
Key features:
- Includes the skill directory path so the agent can resolve
relative references to ``scripts/``, ``references/``, ``assets/``.
- Replaces ``{baseDir}`` placeholders with the actual skill
directory path (a convention used in some SKILL.md files).
"""
parts: List[str] = []
for skill in skills:
content = self.load_skill_content(skill.skill_id)
if content:
# Resolve {baseDir} placeholder to the skill directory
skill_dir = str(skill.path.parent)
content = content.replace("{baseDir}", skill_dir)
part = (
f"### Skill: {skill.skill_id}\n"
f"**Skill directory**: `{skill_dir}`\n\n"
f"{content}"
)
parts.append(part)
if not parts:
return ""
# Build a backend hint that only mentions registered backends
scope = set(backends) if backends else {"gui", "shell", "mcp", "web", "system"}
backend_names: List[str] = []
if "mcp" in scope:
backend_names.append("MCP")
if "shell" in scope:
backend_names.append("shell")
if "gui" in scope:
backend_names.append("GUI")
tool_hint = ", ".join(backend_names) if backend_names else "available"
# Resource access tips — mention shell_agent only when shell is available
has_shell = "shell" in scope
resource_tip = (
"Use `read_file` / `list_dir` / `write_file` for file operations"
+ (" and `shell_agent` for running scripts" if has_shell else "")
+ ". Paths in skill instructions are relative to the skill "
"directory listed under each skill heading.\n\n"
)
header = (
"# Active Skills\n\n"
"The following skills provide **domain knowledge and tested procedures** "
"relevant to this task.\n\n"
"**How to use skills:**\n"
"- If a skill contains **step-by-step procedures or commands**, follow them — "
"they are verified workflows.\n"
"- If a skill provides **reference information, best practices, or tool guides**, "
"use it as context to inform your decisions.\n"
f"- Skills supplement your available tools — you may use **any** tool "
f"({tool_hint}) alongside skill guidance. "
"Choose the best tool for each sub-step.\n\n"
"**Resource access**: Each skill may include bundled resources "
"(scripts, references, assets) in its skill directory. "
+ resource_tip
)
return header + "\n\n---\n\n".join(parts)
def _ensure_discovered(self) -> None:
if not self._discovered:
self.discover()
@staticmethod
def _parse_skill(
dir_name: str,
skill_dir: Path,
skill_file: Path,
content: str,
) -> SkillMeta:
"""Parse a SKILL.md file into a SkillMeta.
Only ``name`` and ``description`` are read from frontmatter
(per the official skill format). ``skill_id`` is read from
the ``.skill_id`` sidecar (created if absent).
"""
frontmatter = parse_frontmatter(content)
name = frontmatter.get("name", dir_name)
description = frontmatter.get("description", name)
skill_id = _read_or_create_skill_id(name, skill_dir)
return SkillMeta(
skill_id=skill_id,
name=name,
description=description,
path=skill_file,
)
# Frontmatter parsing is delegated to skill_utils (single source of truth).
_extract_frontmatter = staticmethod(parse_frontmatter)
_strip_frontmatter = staticmethod(strip_frontmatter)
@staticmethod
def _build_skill_selection_prompt(
task: str,
skills_catalog: str,
max_skills: int,
cross_session_hints: Optional[str] = None,
) -> str:
"""Build the prompt for LLM skill selection.
Uses a plan-then-select pattern: the LLM first writes a brief
execution plan, then selects skills that match the plan.
``cross_session_hints`` is an optional block of cross-session
intelligence (from OpenViking or similar) containing user
preferences, known useful skills, and similar past cases. When
present it is injected as non-authoritative guidance — the
selector must still validate against the concrete skill list.
"""
hints_section = ""
if cross_session_hints and cross_session_hints.strip():
hints_section = (
"# Cross-Session Hints (non-authoritative, use as guidance only)\n\n"
f"{cross_session_hints.strip()}\n\n"
)
return f"""You are a skill selector for an autonomous agent.
# Task
{task}
{hints_section}# Available Skills
{skills_catalog}
# Instructions
Follow these steps:
**Step 1 — Plan**: Think about how you would accomplish this task. What are the key deliverables? What file formats are needed (PDF, DOCX, XLSX, etc.)? What tools or libraries would you use?
**Step 2 — Match**: Check which skills directly teach workflows for the deliverables or file formats identified in your plan. A skill is relevant ONLY if it provides a tested procedure for a core part of your plan. Skills that only share vague topical overlap (e.g. a "PDF checklist" skill for a task that just happens to involve PDFs) add noise and should be excluded.
**Step 3 — Quality check**: Among matching skills, prefer ones with higher success rates. Avoid skills marked as "never succeeded" or with very low success rates — they waste iterations and actively hurt performance.
**Step 4 — Decide**: Select at most {max_skills} skill(s). If no skill closely matches your plan, you MUST return an empty list. Selecting an irrelevant or low-quality skill is **worse than selecting none** — it forces the agent down an unproductive path and wastes the entire iteration budget. When in doubt, leave it out.
Return a JSON object:
{{"brief_plan": "1-2 sentence plan for this task", "skills": ["skill_id_1", "skill_id_2"]}}
If no skill applies:
{{"brief_plan": "1-2 sentence plan", "skills": []}}
IMPORTANT: Use the **exact skill_id** from the list above."""
@staticmethod
def _parse_skill_selection_response(content: str) -> tuple[List[str], str]:
"""Parse the LLM response and extract selected skill IDs + plan.
Returns:
(skill_ids, brief_plan)
"""
# Handle markdown code blocks
code_block = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", content, re.DOTALL)
if code_block:
content = code_block.group(1).strip()
else:
# Try to find a raw JSON object
json_match = re.search(r"\{.*\}", content, re.DOTALL)
if json_match:
content = json_match.group()
try:
data = json.loads(content)
except json.JSONDecodeError:
logger.warning(f"Failed to parse LLM skill selection JSON: {content[:200]}")
return [], ""
brief_plan = data.get("brief_plan", "")
if brief_plan:
logger.info(f"Skill selection plan: {brief_plan}")
ids = data.get("skills", [])
if not isinstance(ids, list):
return [], brief_plan
return [str(n).strip() for n in ids if n], brief_plan