diff --git a/charts/oz-agent-worker/templates/configmap.yaml b/charts/oz-agent-worker/templates/configmap.yaml index 994ff52..4f27bfb 100644 --- a/charts/oz-agent-worker/templates/configmap.yaml +++ b/charts/oz-agent-worker/templates/configmap.yaml @@ -11,6 +11,10 @@ data: max_concurrent_tasks: {{ .Values.worker.maxConcurrentTasks }} {{- if .Values.worker.idleOnComplete }} idle_on_complete: {{ .Values.worker.idleOnComplete | quote }} + {{- end }} + {{- with .Values.worker.skillsDirs }} + skills_dirs: +{{ toYaml . | indent 6 }} {{- end }} backend: kubernetes: diff --git a/charts/oz-agent-worker/values.yaml b/charts/oz-agent-worker/values.yaml index d5b6eb4..ce766fe 100644 --- a/charts/oz-agent-worker/values.yaml +++ b/charts/oz-agent-worker/values.yaml @@ -31,6 +31,10 @@ worker: cleanup: true maxConcurrentTasks: 0 idleOnComplete: "" + # skillsDirs is a list of local filesystem directories containing skill folders. + # Each directory should contain subdirectories with a SKILL.md file. + # Skills discovered here appear in the webapp's agent selector. + skillsDirs: [] extraArgs: [] deploymentAnnotations: {} podAnnotations: {} diff --git a/internal/config/config.go b/internal/config/config.go index e3a13da..7db51ea 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,6 +26,12 @@ type FileConfig struct { // overrides are no longer needed. IdleOnComplete *string `yaml:"idle_on_complete"` Backend BackendConfig `yaml:"backend"` + // SkillsDirs is a list of local filesystem directories containing skill folders. + // Each directory should contain subdirectories with a SKILL.md file: + // /skill-name/SKILL.md + // Skills discovered here are reported to the server on connect so they appear + // in the webapp's agent/skill selector for users with access to this worker. + SkillsDirs []string `yaml:"skills_dirs"` } // BackendConfig contains the backend selection. diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 8c43506..33f81dd 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -408,6 +408,43 @@ worker_id: "test" }) } +func TestLoadSkillsDirs(t *testing.T) { + t.Run("parses skills_dirs when set", func(t *testing.T) { + path := writeTestConfig(t, ` +worker_id: "test" +skills_dirs: + - /opt/skills + - /home/user/my-skills +`) + cfg, err := Load(path) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(cfg.SkillsDirs) != 2 { + t.Fatalf("skills_dirs count = %d, want 2", len(cfg.SkillsDirs)) + } + if cfg.SkillsDirs[0] != "/opt/skills" { + t.Errorf("skills_dirs[0] = %q, want %q", cfg.SkillsDirs[0], "/opt/skills") + } + if cfg.SkillsDirs[1] != "/home/user/my-skills" { + t.Errorf("skills_dirs[1] = %q, want %q", cfg.SkillsDirs[1], "/home/user/my-skills") + } + }) + + t.Run("skills_dirs is nil when not set", func(t *testing.T) { + path := writeTestConfig(t, ` +worker_id: "test" +`) + cfg, err := Load(path) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cfg.SkillsDirs != nil { + t.Errorf("expected skills_dirs to be nil, got %v", cfg.SkillsDirs) + } + }) +} + func TestLoadValidKubernetesPodTemplateConfig(t *testing.T) { path := writeTestConfig(t, ` worker_id: "k8s-worker" diff --git a/internal/types/messages.go b/internal/types/messages.go index 3763e05..eb98dcb 100644 --- a/internal/types/messages.go +++ b/internal/types/messages.go @@ -14,6 +14,7 @@ const ( MessageTypeTaskFailed MessageType = "task_failed" MessageTypeTaskRejected MessageType = "task_rejected" MessageTypeHeartbeat MessageType = "heartbeat" + MessageTypeWorkerSkills MessageType = "worker_skills" ) // WebSocketMessage is the base structure for all WebSocket messages @@ -65,6 +66,22 @@ type TaskDefinition struct { Prompt string `json:"prompt"` } +// WorkerSkill represents a single skill discovered from a local skills directory. +type WorkerSkill struct { + Name string `json:"name"` + Description string `json:"description"` + // Path is the absolute filesystem path to the SKILL.md file. + Path string `json:"path"` + // BasePrompt is the SKILL.md body content (everything after the frontmatter). + BasePrompt string `json:"base_prompt,omitempty"` +} + +// WorkerSkillsMessage is sent from worker to server after connecting, +// reporting the skills available in the worker's configured skills_dirs. +type WorkerSkillsMessage struct { + Skills []WorkerSkill `json:"skills"` +} + // Harness defines a third-party harness to run a cloud agent with. type Harness struct { // Type is the name of the harness, e.g. "claude". diff --git a/internal/worker/skills.go b/internal/worker/skills.go new file mode 100644 index 0000000..9cc63ec --- /dev/null +++ b/internal/worker/skills.go @@ -0,0 +1,122 @@ +package worker + +import ( + "context" + "os" + "path/filepath" + "regexp" + "strings" + + "github.com/warpdotdev/oz-agent-worker/internal/log" + "github.com/warpdotdev/oz-agent-worker/internal/types" + "gopkg.in/yaml.v3" +) + +var skillFrontMatterRegex = regexp.MustCompile(`(?ms)\A\s*---[ \t]*\r?\n(.*?)\r?\n---[ \t]*\r?\n?`) + +type skillFrontMatter struct { + Name string `yaml:"name"` + Description string `yaml:"description"` +} + +// scanSkillsDirs walks the configured skills directories and returns all valid skills found. +// Each directory should contain subdirectories with a SKILL.md file: +// +// /skill-name/SKILL.md +// +// Skills with missing or unparseable frontmatter are skipped with a warning. +func scanSkillsDirs(ctx context.Context, dirs []string) []types.WorkerSkill { + var skills []types.WorkerSkill + seen := make(map[string]struct{}) // deduplicate by absolute path + + for _, dir := range dirs { + absDir, err := filepath.Abs(dir) + if err != nil { + log.Warnf(ctx, "Skipping skills dir %q: failed to resolve absolute path: %v", dir, err) + continue + } + + entries, err := os.ReadDir(absDir) + if err != nil { + log.Warnf(ctx, "Skipping skills dir %q: %v", absDir, err) + continue + } + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + skillFile := filepath.Join(absDir, entry.Name(), "SKILL.md") + skill, err := parseSkillFile(skillFile) + if err != nil { + log.Warnf(ctx, "Skipping skill at %q: %v", skillFile, err) + continue + } + + if _, exists := seen[skill.Path]; exists { + continue + } + seen[skill.Path] = struct{}{} + skills = append(skills, *skill) + } + } + + return skills +} + +// parseSkillFile reads a SKILL.md file and extracts the skill metadata from its YAML frontmatter. +func parseSkillFile(path string) (*types.WorkerSkill, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + name, description, basePrompt, err := parseSkillMarkdown(string(data)) + if err != nil { + return nil, err + } + + absPath, err := filepath.Abs(path) + if err != nil { + return nil, err + } + + return &types.WorkerSkill{ + Name: name, + Description: description, + Path: absPath, + BasePrompt: basePrompt, + }, nil +} + +// parseSkillMarkdown extracts the name, description, and base prompt from a SKILL.md file. +func parseSkillMarkdown(content string) (string, string, string, error) { + loc := skillFrontMatterRegex.FindStringSubmatchIndex(content) + if len(loc) < 4 { + return "", "", "", errMissingFrontMatter + } + + frontMatterYaml := content[loc[2]:loc[3]] + var fm skillFrontMatter + if err := yaml.Unmarshal([]byte(frontMatterYaml), &fm); err != nil { + return "", "", "", err + } + + name := strings.TrimSpace(fm.Name) + if name == "" { + return "", "", "", errMissingSkillName + } + + basePrompt := strings.TrimSpace(content[loc[1]:]) + return name, strings.TrimSpace(fm.Description), basePrompt, nil +} + +type skillError string + +func (e skillError) Error() string { return string(e) } + +const ( + errMissingFrontMatter skillError = "missing YAML front matter" + errMissingSkillName skillError = "missing skill name in front matter" +) diff --git a/internal/worker/skills_test.go b/internal/worker/skills_test.go new file mode 100644 index 0000000..d64521d --- /dev/null +++ b/internal/worker/skills_test.go @@ -0,0 +1,206 @@ +package worker + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +func TestParseSkillMarkdown(t *testing.T) { + tests := []struct { + name string + content string + wantName string + wantDesc string + wantBasePrompt string + wantErr bool + wantErrText string + }{ + { + name: "valid frontmatter with name and description", + content: `--- +name: deploy +description: Deploy to production +--- +Do the deployment. +`, + wantName: "deploy", + wantDesc: "Deploy to production", + wantBasePrompt: "Do the deployment.", + }, + { + name: "valid frontmatter name only", + content: `--- +name: my-skill +--- +Some instructions. +`, + wantName: "my-skill", + wantDesc: "", + wantBasePrompt: "Some instructions.", + }, + { + name: "missing frontmatter", + content: "Just some markdown without frontmatter.", + wantErr: true, + wantErrText: "missing YAML front matter", + }, + { + name: "empty name", + content: `--- +description: something +--- +body +`, + wantErr: true, + wantErrText: "missing skill name in front matter", + }, + { + name: "whitespace-only name", + content: `--- +name: " " +--- +body +`, + wantErr: true, + wantErrText: "missing skill name in front matter", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + name, desc, basePrompt, err := parseSkillMarkdown(tt.content) + if tt.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + if tt.wantErrText != "" && err.Error() != tt.wantErrText { + t.Errorf("error = %q, want %q", err.Error(), tt.wantErrText) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if name != tt.wantName { + t.Errorf("name = %q, want %q", name, tt.wantName) + } + if desc != tt.wantDesc { + t.Errorf("description = %q, want %q", desc, tt.wantDesc) + } + if basePrompt != tt.wantBasePrompt { + t.Errorf("basePrompt = %q, want %q", basePrompt, tt.wantBasePrompt) + } + }) + } +} + +func TestScanSkillsDirs(t *testing.T) { + ctx := context.Background() + + // Create a temp directory structure: + // dir/ + // skill-a/SKILL.md (valid) + // skill-b/SKILL.md (valid) + // not-a-dir.txt (file, should be skipped) + // skill-c/ (no SKILL.md, should be skipped) + // skill-d/SKILL.md (invalid frontmatter, should be skipped) + dir := t.TempDir() + + writeSkill(t, dir, "skill-a", `--- +name: skill-a +description: First skill +--- +Do something. +`) + writeSkill(t, dir, "skill-b", `--- +name: skill-b +description: Second skill +--- +Do something else. +`) + + // File at top level (not a directory) + if err := os.WriteFile(filepath.Join(dir, "not-a-dir.txt"), []byte("hello"), 0644); err != nil { + t.Fatal(err) + } + + // Directory without SKILL.md + if err := os.MkdirAll(filepath.Join(dir, "skill-c"), 0755); err != nil { + t.Fatal(err) + } + + // Directory with invalid SKILL.md + writeSkill(t, dir, "skill-d", "no frontmatter here") + + skills := scanSkillsDirs(ctx, []string{dir}) + + if len(skills) != 2 { + t.Fatalf("got %d skills, want 2", len(skills)) + } + + // Sort isn't guaranteed, so check by name + nameSet := map[string]bool{} + for _, s := range skills { + nameSet[s.Name] = true + if s.Path == "" { + t.Errorf("skill %q has empty path", s.Name) + } + } + if !nameSet["skill-a"] { + t.Error("missing skill-a") + } + if !nameSet["skill-b"] { + t.Error("missing skill-b") + } +} + +func TestScanSkillsDirsNonexistentDir(t *testing.T) { + ctx := context.Background() + skills := scanSkillsDirs(ctx, []string{"/nonexistent/path/12345"}) + if len(skills) != 0 { + t.Errorf("expected 0 skills for nonexistent dir, got %d", len(skills)) + } +} + +func TestScanSkillsDirsDedup(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + + writeSkill(t, dir, "my-skill", `--- +name: my-skill +description: A skill +--- +Body. +`) + + // Pass same directory twice + skills := scanSkillsDirs(ctx, []string{dir, dir}) + if len(skills) != 1 { + t.Errorf("expected 1 skill after dedup, got %d", len(skills)) + } +} + +func TestScanSkillsDirsEmpty(t *testing.T) { + ctx := context.Background() + skills := scanSkillsDirs(ctx, nil) + if len(skills) != 0 { + t.Errorf("expected 0 skills for nil dirs, got %d", len(skills)) + } + skills = scanSkillsDirs(ctx, []string{}) + if len(skills) != 0 { + t.Errorf("expected 0 skills for empty dirs, got %d", len(skills)) + } +} + +func writeSkill(t *testing.T, baseDir, skillName, content string) { + t.Helper() + skillDir := filepath.Join(baseDir, skillName) + if err := os.MkdirAll(skillDir, 0755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(skillDir, "SKILL.md"), []byte(content), 0644); err != nil { + t.Fatal(err) + } +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index c8e789e..ccf3282 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -21,6 +21,7 @@ const ( ReconnectBackoffRate = 2.0 HeartbeatInterval = 30 * time.Second + SkillsRefreshInterval = 60 * time.Second PongWait = 60 * time.Second WriteWait = 10 * time.Second BackendShutdownTimeout = 10 * time.Second @@ -39,6 +40,9 @@ type Config struct { IdleOnComplete string // SessionSharingServerURL, when non-empty, is forwarded to the oz CLI via --session-sharing-server-url. SessionSharingServerURL string + // SkillsDirs is a list of local directories to scan for SKILL.md files. + // Discovered skills are reported to the server on each WebSocket connect. + SkillsDirs []string // Backend-specific configs. Only the one matching BackendType should be set. Docker *DockerBackendConfig @@ -130,6 +134,7 @@ func (w *Worker) Start() error { w.reconnectDelay = InitialReconnectDelay + w.reportSkills() w.run() } } @@ -180,6 +185,7 @@ func (w *Worker) run() { go w.readLoop(done) go w.writeLoop(done) go w.heartbeatLoop(done) + go w.skillsRefreshLoop(done) <-done @@ -292,6 +298,28 @@ func (w *Worker) heartbeatLoop(done chan struct{}) { } } +// skillsRefreshLoop periodically re-sends the worker_skills message to the server +// so the Redis TTL is refreshed before expiry. +func (w *Worker) skillsRefreshLoop(done chan struct{}) { + if len(w.config.SkillsDirs) == 0 { + return + } + + ticker := time.NewTicker(SkillsRefreshInterval) + defer ticker.Stop() + + for { + select { + case <-w.ctx.Done(): + return + case <-done: + return + case <-ticker.C: + w.reportSkills() + } + } +} + func (w *Worker) handleMessage(message []byte) { log.Debugf(w.ctx, "Received message: %s", string(message)) @@ -537,6 +565,39 @@ func (w *Worker) sendMessage(message []byte) error { } } +// reportSkills scans the configured skills directories and sends a worker_skills +// message to the server so they appear in the webapp's agent selector. +func (w *Worker) reportSkills() { + if len(w.config.SkillsDirs) == 0 { + return + } + + skills := scanSkillsDirs(w.ctx, w.config.SkillsDirs) + log.Infof(w.ctx, "Discovered %d skills from %d configured directories", len(skills), len(w.config.SkillsDirs)) + + msg := types.WorkerSkillsMessage{Skills: skills} + data, err := json.Marshal(msg) + if err != nil { + log.Errorf(w.ctx, "Failed to marshal worker skills message: %v", err) + return + } + + wsMsg := types.WebSocketMessage{ + Type: types.MessageTypeWorkerSkills, + Data: data, + } + + msgBytes, err := json.Marshal(wsMsg) + if err != nil { + log.Errorf(w.ctx, "Failed to marshal websocket message: %v", err) + return + } + + if err := w.sendMessage(msgBytes); err != nil { + log.Errorf(w.ctx, "Failed to send worker skills message: %v", err) + } +} + func (w *Worker) Shutdown() { log.Infof(w.ctx, "Shutting down worker...") diff --git a/main.go b/main.go index 6f17bf2..9163504 100644 --- a/main.go +++ b/main.go @@ -139,6 +139,12 @@ func mergeConfig(fileConfig *config.FileConfig) (worker.Config, error) { idleOnComplete = *fileConfig.IdleOnComplete } + // Resolve skills_dirs from config file. + var skillsDirs []string + if fileConfig != nil { + skillsDirs = fileConfig.SkillsDirs + } + wc := worker.Config{ APIKey: CLI.APIKey, WorkerID: workerID, @@ -149,6 +155,7 @@ func mergeConfig(fileConfig *config.FileConfig) (worker.Config, error) { MaxConcurrentTasks: maxConcurrentTasks, IdleOnComplete: idleOnComplete, SessionSharingServerURL: CLI.SessionSharingServerURL, + SkillsDirs: skillsDirs, } switch backendType {