Skip to content

Commit 99cce2b

Browse files
committed
mcp
1 parent cd6b737 commit 99cce2b

1 file changed

Lines changed: 225 additions & 0 deletions

File tree

internal/worker/direct.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package worker
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"os"
8+
"os/exec"
9+
"path/filepath"
10+
"strings"
11+
12+
"github.com/warpdotdev/oz-agent-worker/internal/log"
13+
)
14+
15+
const defaultWorkspaceRoot = "/var/lib/oz/workspaces"
16+
17+
// DirectBackendConfig holds configuration specific to the direct (non-containerized) backend.
18+
type DirectBackendConfig struct {
19+
WorkspaceRoot string
20+
OzPath string // Path to the oz CLI binary. If empty, looks up "oz" in PATH.
21+
SetupCommand string
22+
TeardownCommand string
23+
NoCleanup bool
24+
Env map[string]string
25+
}
26+
27+
// DirectBackend executes tasks directly on the host without Docker.
28+
type DirectBackend struct {
29+
config DirectBackendConfig
30+
ozPath string // resolved path to the oz CLI
31+
}
32+
33+
// NewDirectBackend creates a new direct backend, verifying the oz CLI is available.
34+
func NewDirectBackend(ctx context.Context, config DirectBackendConfig) (*DirectBackend, error) {
35+
ozPath := config.OzPath
36+
if ozPath == "" {
37+
var err error
38+
ozPath, err = exec.LookPath("oz")
39+
if err != nil {
40+
return nil, fmt.Errorf("oz CLI not found in PATH: %w", err)
41+
}
42+
}
43+
log.Infof(ctx, "Using oz CLI at: %s", ozPath)
44+
45+
if config.WorkspaceRoot == "" {
46+
config.WorkspaceRoot = defaultWorkspaceRoot
47+
}
48+
49+
// Ensure workspace root exists.
50+
if err := os.MkdirAll(config.WorkspaceRoot, 0755); err != nil {
51+
return nil, fmt.Errorf("failed to create workspace root %s: %w", config.WorkspaceRoot, err)
52+
}
53+
54+
return &DirectBackend{
55+
config: config,
56+
ozPath: ozPath,
57+
}, nil
58+
}
59+
60+
// ExecuteTask runs the agent directly on the host.
61+
func (b *DirectBackend) ExecuteTask(ctx context.Context, params *TaskParams) error {
62+
taskID := params.TaskID
63+
64+
// 1. Create per-task workspace directory.
65+
workspaceDir := filepath.Join(b.config.WorkspaceRoot, taskID)
66+
if err := os.MkdirAll(workspaceDir, 0755); err != nil {
67+
return fmt.Errorf("failed to create workspace directory: %w", err)
68+
}
69+
log.Infof(ctx, "Created workspace: %s", workspaceDir)
70+
71+
defer func() {
72+
if b.config.NoCleanup {
73+
log.Infof(ctx, "Skipping cleanup for workspace: %s", workspaceDir)
74+
return
75+
}
76+
b.cleanup(ctx, taskID, workspaceDir)
77+
}()
78+
79+
// 2. Create temp environment file for setup script to write to.
80+
envFile, err := os.CreateTemp(workspaceDir, "oz-env-*")
81+
if err != nil {
82+
return fmt.Errorf("failed to create environment file: %w", err)
83+
}
84+
envFilePath := envFile.Name()
85+
if err := envFile.Close(); err != nil {
86+
return fmt.Errorf("failed to close environment file: %w", err)
87+
}
88+
89+
// 3. Build environment variables: common + config-level.
90+
envVars := make([]string, len(params.EnvVars))
91+
copy(envVars, params.EnvVars)
92+
for key, value := range b.config.Env {
93+
envVars = append(envVars, fmt.Sprintf("%s=%s", key, value))
94+
}
95+
96+
// 4. Run setup command if configured.
97+
if b.config.SetupCommand != "" {
98+
setupEnv := append(envVars,
99+
fmt.Sprintf("OZ_WORKSPACE_ROOT=%s", workspaceDir),
100+
"OZ_WORKER_BACKEND=direct",
101+
fmt.Sprintf("OZ_RUN_ID=%s", taskID),
102+
fmt.Sprintf("OZ_ENVIRONMENT_FILE=%s", envFilePath),
103+
)
104+
105+
log.Infof(ctx, "Running setup command: %s", b.config.SetupCommand)
106+
if err := b.runCommand(ctx, b.config.SetupCommand, workspaceDir, setupEnv); err != nil {
107+
return fmt.Errorf("setup command failed: %w", err)
108+
}
109+
}
110+
111+
// 5. Parse environment file for KEY=VALUE pairs written by setup script.
112+
setupScriptEnv, err := parseEnvFile(envFilePath)
113+
if err != nil {
114+
log.Warnf(ctx, "Failed to parse environment file: %v", err)
115+
}
116+
for key, value := range setupScriptEnv {
117+
envVars = append(envVars, fmt.Sprintf("%s=%s", key, value))
118+
}
119+
120+
// 6. Invoke oz CLI with base args.
121+
// Task env vars override host env vars (e.g. WARP_API_KEY from the assignment
122+
// must take precedence over the worker's own WARP_API_KEY).
123+
cmd := exec.CommandContext(ctx, b.ozPath, params.BaseArgs...)
124+
cmd.Dir = workspaceDir
125+
cmd.Env = mergeEnvVars(os.Environ(), envVars)
126+
cmd.Stdout = os.Stdout
127+
cmd.Stderr = os.Stderr
128+
129+
log.Infof(ctx, "Running oz agent in workspace %s", workspaceDir)
130+
log.Debugf(ctx, "Command: %s %s", b.ozPath, strings.Join(params.BaseArgs, " "))
131+
132+
if err := cmd.Run(); err != nil {
133+
return fmt.Errorf("oz agent exited with error: %w", err)
134+
}
135+
136+
log.Infof(ctx, "Task %s execution completed successfully", taskID)
137+
return nil
138+
}
139+
140+
// Shutdown is a no-op for the direct backend.
141+
func (b *DirectBackend) Shutdown(ctx context.Context) {}
142+
143+
// cleanup runs the teardown command (if configured) and removes the workspace directory.
144+
func (b *DirectBackend) cleanup(ctx context.Context, taskID, workspaceDir string) {
145+
if b.config.TeardownCommand != "" {
146+
teardownEnv := []string{
147+
fmt.Sprintf("OZ_WORKSPACE_ROOT=%s", workspaceDir),
148+
"OZ_WORKER_BACKEND=direct",
149+
fmt.Sprintf("OZ_RUN_ID=%s", taskID),
150+
}
151+
152+
log.Infof(ctx, "Running teardown command: %s", b.config.TeardownCommand)
153+
if err := b.runCommand(ctx, b.config.TeardownCommand, workspaceDir, teardownEnv); err != nil {
154+
log.Warnf(ctx, "Teardown command failed: %v", err)
155+
}
156+
}
157+
158+
log.Infof(ctx, "Removing workspace: %s", workspaceDir)
159+
if err := os.RemoveAll(workspaceDir); err != nil {
160+
log.Warnf(ctx, "Failed to remove workspace %s: %v", workspaceDir, err)
161+
}
162+
}
163+
164+
// runCommand executes a shell command with the given working directory and environment.
165+
func (b *DirectBackend) runCommand(ctx context.Context, command, dir string, env []string) error {
166+
cmd := exec.CommandContext(ctx, "/bin/sh", "-c", command)
167+
cmd.Dir = dir
168+
cmd.Env = mergeEnvVars(os.Environ(), env)
169+
cmd.Stdout = os.Stdout
170+
cmd.Stderr = os.Stderr
171+
return cmd.Run()
172+
}
173+
174+
// mergeEnvVars merges base and override env var slices (KEY=VALUE format).
175+
// Override entries take precedence over base entries with the same key.
176+
func mergeEnvVars(base, override []string) []string {
177+
envMap := make(map[string]string, len(base)+len(override))
178+
var keys []string
179+
180+
for _, entry := range base {
181+
key, _, _ := strings.Cut(entry, "=")
182+
if _, exists := envMap[key]; !exists {
183+
keys = append(keys, key)
184+
}
185+
envMap[key] = entry
186+
}
187+
188+
for _, entry := range override {
189+
key, _, _ := strings.Cut(entry, "=")
190+
if _, exists := envMap[key]; !exists {
191+
keys = append(keys, key)
192+
}
193+
envMap[key] = entry
194+
}
195+
196+
result := make([]string, 0, len(keys))
197+
for _, key := range keys {
198+
result = append(result, envMap[key])
199+
}
200+
return result
201+
}
202+
203+
// parseEnvFile reads a file of KEY=VALUE lines, skipping empty lines and comments.
204+
func parseEnvFile(path string) (map[string]string, error) {
205+
f, err := os.Open(path)
206+
if err != nil {
207+
return nil, err
208+
}
209+
defer f.Close()
210+
211+
result := make(map[string]string)
212+
scanner := bufio.NewScanner(f)
213+
for scanner.Scan() {
214+
line := strings.TrimSpace(scanner.Text())
215+
if line == "" || strings.HasPrefix(line, "#") {
216+
continue
217+
}
218+
key, value, ok := strings.Cut(line, "=")
219+
if !ok {
220+
continue
221+
}
222+
result[strings.TrimSpace(key)] = strings.TrimSpace(value)
223+
}
224+
return result, scanner.Err()
225+
}

0 commit comments

Comments
 (0)