Skip to content

Commit c3f1b60

Browse files
Merge pull request #174 from hdresearch/ty/exec-api
feat: switch execute to orchestrator API
2 parents 0889bac + 3718fe8 commit c3f1b60

4 files changed

Lines changed: 272 additions & 12 deletions

File tree

cmd/execute.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cmd
22

33
import (
44
"context"
5+
"os"
56
"time"
67

78
"github.com/hdresearch/vers-cli/internal/handlers"
@@ -10,13 +11,22 @@ import (
1011
)
1112

1213
var executeTimeout int
14+
var executeSSH bool
15+
var executeWorkDir string
1316

1417
// executeCmd represents the execute command
1518
var executeCmd = &cobra.Command{
16-
Use: "execute [vm-id|alias] <command> [args...]",
17-
Short: "Run a command on a specific VM",
18-
Long: `Execute a command within the Vers environment on the specified VM.
19-
If no VM is specified, the current HEAD VM is used.`,
19+
Use: "exec [vm-id|alias] <command> [args...]",
20+
Aliases: []string{"execute"},
21+
Short: "Run a command on a specific VM",
22+
Long: `Execute a command on the specified VM via the orchestrator API.
23+
24+
The command runs through the in-VM agent, which means it automatically
25+
inherits environment variables configured for your account.
26+
27+
If no VM is specified, the current HEAD VM is used.
28+
29+
Use --ssh to bypass the API and connect directly via SSH (legacy behavior).`,
2030
Args: cobra.MinimumNArgs(1),
2131
RunE: func(cmd *cobra.Command, args []string) error {
2232
// Use custom timeout if specified, otherwise use default APIMedium
@@ -44,17 +54,35 @@ If no VM is specified, the current HEAD VM is used.`,
4454
command = args[1:]
4555
}
4656

47-
view, err := handlers.HandleExecute(apiCtx, application, handlers.ExecuteReq{Target: target, Command: command})
57+
var timeoutSec uint64
58+
if executeTimeout > 0 {
59+
timeoutSec = uint64(executeTimeout)
60+
}
61+
62+
view, err := handlers.HandleExecute(apiCtx, application, handlers.ExecuteReq{
63+
Target: target,
64+
Command: command,
65+
WorkingDir: executeWorkDir,
66+
TimeoutSec: timeoutSec,
67+
UseSSH: executeSSH,
68+
})
4869
if err != nil {
4970
return err
5071
}
5172
pres.RenderExecute(application, view)
73+
74+
// Exit with the command's exit code
75+
if view.ExitCode != 0 {
76+
os.Exit(view.ExitCode)
77+
}
5278
return nil
5379
},
5480
}
5581

5682
func init() {
5783
rootCmd.AddCommand(executeCmd)
58-
executeCmd.Flags().String("host", "", "Specify the host IP to connect to (overrides default)")
84+
executeCmd.Flags().SetInterspersed(false) // stop flag parsing after first positional arg
5985
executeCmd.Flags().IntVarP(&executeTimeout, "timeout", "t", 0, "Timeout in seconds (default: 30s, use 0 for no limit)")
86+
executeCmd.Flags().BoolVar(&executeSSH, "ssh", false, "Use direct SSH instead of the VERS API")
87+
executeCmd.Flags().StringVarP(&executeWorkDir, "workdir", "w", "", "Working directory for the command")
6088
}

internal/handlers/execute.go

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package handlers
22

33
import (
4+
"bufio"
45
"context"
6+
"encoding/base64"
7+
"encoding/json"
58
"fmt"
9+
"io"
610

711
"github.com/hdresearch/vers-cli/internal/app"
812
"github.com/hdresearch/vers-cli/internal/presenters"
@@ -13,8 +17,28 @@ import (
1317
)
1418

1519
type ExecuteReq struct {
16-
Target string
17-
Command []string
20+
Target string
21+
Command []string
22+
WorkingDir string
23+
Env map[string]string
24+
TimeoutSec uint64
25+
UseSSH bool
26+
}
27+
28+
// streamResponse represents a single NDJSON line from the exec stream.
29+
// The orchestrator flattens the agent protocol into:
30+
//
31+
// {"type":"chunk","stream":"stdout","data_b64":"...","cursor":N,"exec_id":"..."}
32+
// {"type":"exit","exit_code":0,"cursor":N,"exec_id":"..."}
33+
// {"type":"error","code":"...","message":"..."}
34+
type streamResponse struct {
35+
Type string `json:"type"`
36+
Stream string `json:"stream,omitempty"`
37+
DataB64 string `json:"data_b64,omitempty"`
38+
ExitCode *int `json:"exit_code,omitempty"`
39+
Cursor uint64 `json:"cursor,omitempty"`
40+
Code string `json:"code,omitempty"`
41+
Message string `json:"message,omitempty"`
1842
}
1943

2044
func HandleExecute(ctx context.Context, a *app.App, r ExecuteReq) (presenters.ExecuteView, error) {
@@ -27,21 +51,106 @@ func HandleExecute(ctx context.Context, a *app.App, r ExecuteReq) (presenters.Ex
2751
v.UsedHEAD = t.UsedHEAD
2852
v.HeadID = t.HeadID
2953

54+
if r.UseSSH {
55+
return handleExecuteSSH(ctx, a, r, t, v)
56+
}
57+
58+
return handleExecuteAPI(ctx, a, r, t, v)
59+
}
60+
61+
// handleExecuteAPI runs the command via the orchestrator exec/stream API.
62+
func handleExecuteAPI(ctx context.Context, a *app.App, r ExecuteReq, t utils.TargetResult, v presenters.ExecuteView) (presenters.ExecuteView, error) {
63+
// Wrap the command in bash -c so shell features work
64+
command := []string{"bash", "-c", utils.ShellJoin(r.Command)}
65+
66+
body, err := vmSvc.ExecStream(ctx, t.Ident, vmSvc.ExecRequest{
67+
Command: command,
68+
Env: r.Env,
69+
WorkingDir: r.WorkingDir,
70+
TimeoutSec: r.TimeoutSec,
71+
})
72+
if err != nil {
73+
return v, fmt.Errorf("exec: %w", err)
74+
}
75+
defer body.Close()
76+
77+
exitCode, err := streamExecOutput(body, a.IO.Out, a.IO.Err)
78+
if err != nil {
79+
return v, fmt.Errorf("exec stream: %w", err)
80+
}
81+
82+
v.ExitCode = exitCode
83+
return v, nil
84+
}
85+
86+
// handleExecuteSSH runs the command via direct SSH (legacy fallback).
87+
func handleExecuteSSH(ctx context.Context, a *app.App, r ExecuteReq, t utils.TargetResult, v presenters.ExecuteView) (presenters.ExecuteView, error) {
3088
info, err := vmSvc.GetConnectInfo(ctx, a.Client, t.Ident)
3189
if err != nil {
3290
return v, fmt.Errorf("failed to get VM information: %w", err)
3391
}
3492

35-
sshHost := info.Host
3693
cmdStr := utils.ShellJoin(r.Command)
37-
38-
client := sshutil.NewClient(sshHost, info.KeyPath, info.VMDomain)
94+
client := sshutil.NewClient(info.Host, info.KeyPath, info.VMDomain)
3995
err = client.Execute(ctx, cmdStr, a.IO.Out, a.IO.Err)
4096
if err != nil {
4197
if exitErr, ok := err.(*ssh.ExitError); ok {
42-
return v, fmt.Errorf("command exited with code %d", exitErr.ExitStatus())
98+
v.ExitCode = exitErr.ExitStatus()
99+
return v, nil
43100
}
44101
return v, fmt.Errorf("failed to execute command: %w", err)
45102
}
46103
return v, nil
47104
}
105+
106+
// streamExecOutput reads NDJSON from the exec stream, writes stdout/stderr
107+
// to the provided writers, and returns the exit code.
108+
func streamExecOutput(body io.Reader, stdout, stderr io.Writer) (int, error) {
109+
scanner := bufio.NewScanner(body)
110+
// Allow large lines (agent can send up to 10MB of output)
111+
scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024)
112+
113+
exitCode := 0
114+
115+
for scanner.Scan() {
116+
line := scanner.Bytes()
117+
if len(line) == 0 {
118+
continue
119+
}
120+
121+
var resp streamResponse
122+
if err := json.Unmarshal(line, &resp); err != nil {
123+
// Skip unparseable lines
124+
continue
125+
}
126+
127+
switch resp.Type {
128+
case "chunk":
129+
data, err := base64.StdEncoding.DecodeString(resp.DataB64)
130+
if err != nil {
131+
continue
132+
}
133+
switch resp.Stream {
134+
case "stdout":
135+
stdout.Write(data)
136+
case "stderr":
137+
stderr.Write(data)
138+
}
139+
140+
case "exit":
141+
if resp.ExitCode != nil {
142+
exitCode = *resp.ExitCode
143+
}
144+
return exitCode, nil
145+
146+
case "error":
147+
return 1, fmt.Errorf("exec error [%s]: %s", resp.Code, resp.Message)
148+
}
149+
}
150+
151+
if err := scanner.Err(); err != nil {
152+
return 1, fmt.Errorf("stream read error: %w", err)
153+
}
154+
155+
return exitCode, nil
156+
}

internal/presenters/execute_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ package presenters
33
type ExecuteView struct {
44
UsedHEAD bool
55
HeadID string
6+
ExitCode int
67
}

internal/services/vm/exec.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package vm
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
11+
"github.com/hdresearch/vers-cli/internal/auth"
12+
)
13+
14+
// ExecRequest matches the orchestrator's VmExecRequest.
15+
type ExecRequest struct {
16+
Command []string `json:"command"`
17+
Env map[string]string `json:"env,omitempty"`
18+
WorkingDir string `json:"working_dir,omitempty"`
19+
Stdin string `json:"stdin,omitempty"`
20+
TimeoutSec uint64 `json:"timeout_secs,omitempty"`
21+
}
22+
23+
// ExecResponse matches the orchestrator's VmExecResponse.
24+
type ExecResponse struct {
25+
ExitCode int `json:"exit_code"`
26+
Stdout string `json:"stdout"`
27+
Stderr string `json:"stderr"`
28+
}
29+
30+
// ExecStreamChunk is a single line from the NDJSON exec stream.
31+
type ExecStreamChunk struct {
32+
Type string `json:"type"` // "chunk" or "exit"
33+
Stream string `json:"stream,omitempty"` // "stdout" or "stderr"
34+
Data string `json:"data,omitempty"` // base64-encoded bytes
35+
ExitCode *int `json:"exit_code,omitempty"` // only on type=="exit"
36+
}
37+
38+
// Exec runs a command on a VM via the orchestrator API (non-streaming).
39+
func Exec(ctx context.Context, vmID string, req ExecRequest) (*ExecResponse, error) {
40+
apiKey, err := auth.GetAPIKey()
41+
if err != nil {
42+
return nil, fmt.Errorf("failed to get API key: %w", err)
43+
}
44+
45+
baseURL, err := auth.GetVersUrl()
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to get API URL: %w", err)
48+
}
49+
50+
body, err := json.Marshal(req)
51+
if err != nil {
52+
return nil, fmt.Errorf("failed to marshal request: %w", err)
53+
}
54+
55+
url := fmt.Sprintf("%s/api/v1/vm/%s/exec", baseURL.String(), vmID)
56+
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
57+
if err != nil {
58+
return nil, fmt.Errorf("failed to create request: %w", err)
59+
}
60+
61+
httpReq.Header.Set("Authorization", "Bearer "+apiKey)
62+
httpReq.Header.Set("Content-Type", "application/json")
63+
64+
resp, err := http.DefaultClient.Do(httpReq)
65+
if err != nil {
66+
return nil, fmt.Errorf("request failed: %w", err)
67+
}
68+
defer resp.Body.Close()
69+
70+
if resp.StatusCode != http.StatusOK {
71+
errBody, _ := io.ReadAll(resp.Body)
72+
return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(errBody))
73+
}
74+
75+
var result ExecResponse
76+
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
77+
return nil, fmt.Errorf("failed to parse response: %w", err)
78+
}
79+
80+
return &result, nil
81+
}
82+
83+
// ExecStream runs a command on a VM via the orchestrator streaming API.
84+
// It returns the response body for the caller to consume as NDJSON.
85+
func ExecStream(ctx context.Context, vmID string, req ExecRequest) (io.ReadCloser, error) {
86+
apiKey, err := auth.GetAPIKey()
87+
if err != nil {
88+
return nil, fmt.Errorf("failed to get API key: %w", err)
89+
}
90+
91+
baseURL, err := auth.GetVersUrl()
92+
if err != nil {
93+
return nil, fmt.Errorf("failed to get API URL: %w", err)
94+
}
95+
96+
body, err := json.Marshal(req)
97+
if err != nil {
98+
return nil, fmt.Errorf("failed to marshal request: %w", err)
99+
}
100+
101+
url := fmt.Sprintf("%s/api/v1/vm/%s/exec/stream", baseURL.String(), vmID)
102+
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
103+
if err != nil {
104+
return nil, fmt.Errorf("failed to create request: %w", err)
105+
}
106+
107+
httpReq.Header.Set("Authorization", "Bearer "+apiKey)
108+
httpReq.Header.Set("Content-Type", "application/json")
109+
110+
resp, err := http.DefaultClient.Do(httpReq)
111+
if err != nil {
112+
return nil, fmt.Errorf("request failed: %w", err)
113+
}
114+
115+
if resp.StatusCode != http.StatusOK {
116+
defer resp.Body.Close()
117+
errBody, _ := io.ReadAll(resp.Body)
118+
return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(errBody))
119+
}
120+
121+
return resp.Body, nil
122+
}

0 commit comments

Comments
 (0)