-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexec.go
More file actions
754 lines (651 loc) · 19 KB
/
exec.go
File metadata and controls
754 lines (651 loc) · 19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
package sprites
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
)
// ErrNotStarted is returned when Wait is called before Start.
var ErrNotStarted = errors.New("sprite: command not started")
// Cmd represents a command to be run on a sprite.
// It mirrors the API of exec.Cmd for compatibility.
type Cmd struct {
// Path is the path of the command to run.
Path string
// Args holds command line arguments, including the command as Args[0].
Args []string
// Env specifies the environment of the process.
// Each entry is of the form "key=value".
// If Env is nil, the new process uses the current process's environment.
Env []string
// Dir specifies the working directory of the command.
// If Dir is the empty string, the command runs in the sprite's default directory.
Dir string
// Stdin specifies the process's standard input.
// If Stdin is nil, the process reads from the null device (os.DevNull).
// If Stdin is an *os.File, the process's standard input is connected
// directly to that file.
// Otherwise, during the execution of the command a separate
// goroutine reads from Stdin and delivers that data to the command
// over the network. In this case, Wait does not complete until the goroutine
// stops copying, either because it has reached the end of Stdin
// (EOF or a read error) or because writing to the network returned an error.
Stdin io.Reader
// Stdout and Stderr specify the process's standard output and error.
// If either is nil, the command uses the null device (os.DevNull).
// If either is an *os.File, the process's corresponding output
// is connected directly to that file.
// Otherwise, during the execution of the command a separate goroutine
// reads from the network and delivers that data to the corresponding Writer.
// In this case, Wait does not complete until the goroutine reaches EOF or
// encounters an error.
Stdout io.Writer
Stderr io.Writer
// Process-specific state
ctx context.Context
sprite *Sprite
wsCmd *wsCmd
// Synchronization
mu sync.Mutex
started bool
finished bool
waitErr error
exitCode int
// Pipe management
stdinPipe *writePipe
stdoutPipe *readPipe
stderrPipe *readPipe
closers []io.Closer
goroutines []func() error
// TTY support
tty bool
ttySize *ttySize
// Session management
sessionID string
controlMode bool
// Control connection for cleanup
controlConn *controlConn
// TextMessageHandler is called when text messages are received from the server.
// This is typically used for port notifications or other out-of-band messages.
// The handler is called with the raw message data.
//
// Example usage for handling port notifications:
//
// import "encoding/json"
//
// cmd.TextMessageHandler = func(data []byte) {
// var notification sprites.PortNotificationMessage
// if err := json.Unmarshal(data, ¬ification); err != nil {
// log.Printf("Failed to parse notification: %v", err)
// return
// }
//
// switch notification.Type {
// case "port_opened":
// fmt.Printf("Port %d opened by PID %d\n", notification.Port, notification.PID)
// // Start local proxy or take other action
// case "port_closed":
// fmt.Printf("Port %d closed by PID %d\n", notification.Port, notification.PID)
// // Stop local proxy or take other action
// }
// }
TextMessageHandler func([]byte)
}
// ttySize represents terminal dimensions
type ttySize struct {
Rows uint16
Cols uint16
}
// Command returns a new Cmd to execute the named program with the given arguments on the sprite.
func (s *Sprite) Command(name string, arg ...string) *Cmd {
cmd := &Cmd{
Path: name,
Args: append([]string{name}, arg...),
ctx: context.Background(),
sprite: s,
}
return cmd
}
// CommandContext is like Command but includes a context.
// The provided context is used to kill the process (by calling os.Process.Kill)
// if the context becomes done before the command completes on its own.
func (s *Sprite) CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
if ctx == nil {
panic("sprite: CommandContext called with nil context")
}
cmd := s.Command(name, arg...)
cmd.ctx = ctx
return cmd
}
// String returns a human-readable description of c.
// It is intended only for debugging.
func (c *Cmd) String() string {
if c == nil {
return "<nil>"
}
return fmt.Sprintf("%s %v", c.Path, c.Args[1:])
}
// ConnectionMode returns the connection mode used by this command.
// Returns "control" for multiplexed control connections, "direct" for
// direct WebSocket connections, or "" if Start() hasn't been called yet.
func (c *Cmd) ConnectionMode() string {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return ""
}
if c.controlMode {
return "control"
}
return "direct"
}
// Run starts the specified command and waits for it to complete.
func (c *Cmd) Run() error {
if err := c.Start(); err != nil {
return err
}
return c.Wait()
}
// Start starts the specified command but does not wait for it to complete.
func (c *Cmd) Start() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.started {
return errors.New("sprite: already started")
}
c.started = true
// Close any existing pipes on error
closeDescriptors := func(closers []io.Closer) {
for _, fd := range closers {
fd.Close()
}
}
// For attach operations, ensure we know the sprite's version to select the right endpoint
if c.sessionID != "" && c.sprite.client.SpriteVersion() == "" {
if err := c.sprite.client.FetchVersion(c.ctx, c.sprite.name); err != nil {
closeDescriptors(c.closers)
return fmt.Errorf("failed to fetch sprite version: %w", err)
}
}
// Check if sprite supports control connections (lazy check on first use)
c.sprite.ensureControlSupport(c.ctx)
var controlConn *controlConn
usingControl := false
if c.sprite.supportsControl {
// Try to use control connection
pool := c.sprite.client.getOrCreatePool(c.sprite.name)
var err error
controlConn, err = pool.checkout(c.ctx)
if err == nil && controlConn != nil {
// Successfully got a control connection
usingControl = true
c.controlMode = true
dbg("sprites: using control conn for exec", "sprite", c.sprite.name)
}
}
// Build WebSocket URL
wsURL, err := c.buildWebSocketURL()
if err != nil {
if controlConn != nil {
pool := c.sprite.client.getOrCreatePool(c.sprite.name)
pool.checkin(controlConn)
}
closeDescriptors(c.closers)
return err
}
// Create HTTP request (for Request field, even though we may not dial)
req, err := http.NewRequestWithContext(c.ctx, "GET", wsURL.String(), nil)
if err != nil {
if controlConn != nil {
pool := c.sprite.client.getOrCreatePool(c.sprite.name)
pool.checkin(controlConn)
}
closeDescriptors(c.closers)
return err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.sprite.client.token))
// Create WebSocket command
var args []string
if len(c.Args) > 1 {
args = c.Args[1:]
}
c.wsCmd = newWSCmdContext(c.ctx, req, c.Path, args...)
c.wsCmd.dialContext = c.sprite.client.netDialContext
// If using control connection, provide the existing WebSocket
if usingControl {
c.wsCmd.existingConn = controlConn.ws
c.wsCmd.usingControl = true
c.wsCmd.controlConn = controlConn
}
// Set up I/O
c.setupIO()
// Set TTY mode and attach flag
c.wsCmd.Tty = c.tty
c.wsCmd.IsAttach = c.sessionID != ""
c.wsCmd.AttachSessionID = c.sessionID
// Set environment and directory
c.wsCmd.Env = c.Env
c.wsCmd.Dir = c.Dir
// Set text message handler if provided
if c.TextMessageHandler != nil {
c.wsCmd.TextMessageHandler = c.TextMessageHandler
}
// Start goroutines for pipe handling
for _, fn := range c.goroutines {
go fn()
}
// Start the WebSocket command
if err := c.wsCmd.Start(); err != nil {
// Check for 404 - sprite may need legacy endpoint format
if c.sessionID != "" && !c.sprite.useLegacyExecEndpoint && strings.Contains(err.Error(), "HTTP 404") {
// Mark sprite as requiring legacy format and force TTY mode
c.sprite.useLegacyExecEndpoint = true
c.tty = true
// Rebuild URL with legacy query parameter format
wsURL, err = c.buildWebSocketURL()
if err != nil {
closeDescriptors(c.closers)
return err
}
// Create new request and wsCmd for retry
req, err = http.NewRequestWithContext(c.ctx, "GET", wsURL.String(), nil)
if err != nil {
closeDescriptors(c.closers)
return err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.sprite.client.token))
c.wsCmd = newWSCmdContext(c.ctx, req, c.Path, args...)
c.wsCmd.dialContext = c.sprite.client.netDialContext
c.setupIO()
c.wsCmd.Tty = c.tty
c.wsCmd.IsAttach = true
if c.TextMessageHandler != nil {
c.wsCmd.TextMessageHandler = c.TextMessageHandler
}
// Retry with legacy endpoint
if retryErr := c.wsCmd.Start(); retryErr != nil {
closeDescriptors(c.closers)
return fmt.Errorf("failed to start sprite command: %w", retryErr)
}
return nil
}
if controlConn != nil {
pool := c.sprite.client.getOrCreatePool(c.sprite.name)
pool.checkin(controlConn)
}
closeDescriptors(c.closers)
return fmt.Errorf("failed to start sprite command: %w", err)
}
// Store control connection for cleanup in Wait()
if usingControl {
c.controlConn = controlConn
}
return nil
}
// Wait waits for the command to exit and waits for any copying to stdin or
// copying from stdout or stderr to complete.
func (c *Cmd) Wait() error {
c.mu.Lock()
if !c.started {
c.mu.Unlock()
return ErrNotStarted
}
if c.finished {
err := c.waitErr
c.mu.Unlock()
return err
}
c.mu.Unlock()
// Wait for the command
if c.wsCmd == nil {
return errors.New("sprite: command not fully initialized")
}
err := c.wsCmd.Wait()
// Get exit code
c.exitCode = c.wsCmd.ExitCode()
// Close write end of stdin pipe
if c.stdinPipe != nil {
c.stdinPipe.Close()
}
// Wait for I/O goroutines
var copyError error
for _, fn := range c.goroutines {
if err := fn(); err != nil && copyError == nil {
copyError = err
}
}
// Close all descriptors
for _, closer := range c.closers {
closer.Close()
}
// Clean up control connection if we used one
if c.controlConn != nil {
c.controlConn.sendRelease()
pool := c.sprite.client.getOrCreatePool(c.sprite.name)
pool.checkin(c.controlConn)
dbg("sprites: returned control conn after exec", "sprite", c.sprite.name)
c.controlConn = nil
}
c.mu.Lock()
c.finished = true
// Determine final error
if err != nil {
c.waitErr = err
} else if c.exitCode == -1 {
// Exit code -1 means we never received an exit code - connection was lost
c.waitErr = errors.New("connection closed")
} else if c.exitCode != 0 {
// Server sent a non-zero exit code
c.waitErr = &ExitError{Code: c.exitCode}
} else if copyError != nil {
c.waitErr = copyError
}
err = c.waitErr
c.mu.Unlock()
return err
}
// Output runs the command and returns its standard output.
func (c *Cmd) Output() ([]byte, error) {
if c.Stdout != nil {
return nil, errors.New("sprite: Stdout already set")
}
var stdout []byte
c.Stdout = &outputBuffer{bytes: &stdout}
err := c.Run()
return stdout, err
}
// CombinedOutput runs the command and returns its combined standard
// output and standard error.
func (c *Cmd) CombinedOutput() ([]byte, error) {
if c.Stdout != nil {
return nil, errors.New("sprite: Stdout already set")
}
if c.Stderr != nil {
return nil, errors.New("sprite: Stderr already set")
}
var b []byte
out := &outputBuffer{bytes: &b}
c.Stdout = out
c.Stderr = out
err := c.Run()
return b, err
}
// StdinPipe returns a pipe that will be connected to the command's
// standard input when the command starts.
func (c *Cmd) StdinPipe() (io.WriteCloser, error) {
if c.Stdin != nil {
return nil, errors.New("sprite: Stdin already set")
}
if c.started {
return nil, errors.New("sprite: StdinPipe after process started")
}
pr, pw := io.Pipe()
c.Stdin = pr
wp := &writePipe{WriteCloser: pw}
c.stdinPipe = wp
c.closers = append(c.closers, pr)
return wp, nil
}
// StdoutPipe returns a pipe that will be connected to the command's
// standard output when the command starts.
func (c *Cmd) StdoutPipe() (io.ReadCloser, error) {
if c.Stdout != nil {
return nil, errors.New("sprite: Stdout already set")
}
if c.started {
return nil, errors.New("sprite: StdoutPipe after process started")
}
pr, pw := io.Pipe()
c.Stdout = pw
rp := &readPipe{ReadCloser: pr}
c.stdoutPipe = rp
c.closers = append(c.closers, pw)
return rp, nil
}
// StderrPipe returns a pipe that will be connected to the command's
// standard error when the command starts.
func (c *Cmd) StderrPipe() (io.ReadCloser, error) {
if c.Stderr != nil {
return nil, errors.New("sprite: Stderr already set")
}
if c.started {
return nil, errors.New("sprite: StderrPipe after process started")
}
pr, pw := io.Pipe()
c.Stderr = pw
rp := &readPipe{ReadCloser: pr}
c.stderrPipe = rp
c.closers = append(c.closers, pw)
return rp, nil
}
// SetTTY enables or disables TTY mode for the command.
// When TTY mode is enabled, the command runs with a pseudo-terminal.
func (c *Cmd) SetTTY(enable bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.started {
panic("sprite: SetTTY after process started")
}
c.tty = enable
}
// SetTTYSize sets the terminal size for TTY mode.
// If called before Start(), it sets the initial size.
// If called after Start(), it resizes the running terminal.
func (c *Cmd) SetTTYSize(rows, cols uint16) error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.tty {
return errors.New("sprite: SetTTYSize called but TTY mode not enabled")
}
// If process is already started, resize the running terminal
if c.started && !c.finished {
if c.wsCmd == nil {
return errors.New("sprite: command not fully initialized")
}
return c.wsCmd.Resize(cols, rows)
}
// Otherwise set the initial size
c.ttySize = &ttySize{Rows: rows, Cols: cols}
return nil
}
// Resize changes the terminal size of a running TTY command.
// Deprecated: Use SetTTYSize instead, which works both before and after Start().
func (c *Cmd) Resize(rows, cols uint16) error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return errors.New("sprite: Resize before process started")
}
if !c.tty {
return errors.New("sprite: Resize called but TTY mode not enabled")
}
if c.finished {
return errors.New("sprite: Resize after process finished")
}
if c.wsCmd == nil {
return errors.New("sprite: command not fully initialized")
}
return c.wsCmd.Resize(cols, rows)
}
// Signal sends a signal to the remote process.
// If the server supports WebSocket signals (advertised via X-Sprite-Capabilities header),
// it sends the signal over the existing WebSocket connection. Otherwise, it falls back
// to an HTTP POST request to the kill endpoint.
// Valid signal names: INT, TERM, HUP, KILL, QUIT, USR1, USR2
func (c *Cmd) Signal(signal string) error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return errors.New("sprite: Signal before process started")
}
if c.finished {
return errors.New("sprite: Signal after process finished")
}
// Use WebSocket if server supports it
if c.wsCmd.HasCapability("signal") {
return c.wsCmd.Signal(signal)
}
// Fall back to HTTP POST
// Use session ID from attach or from session_info message
sessID := c.sessionID
if sessID == "" {
sessID = c.wsCmd.SessionID()
}
if sessID == "" {
return errors.New("sprite: no session ID for HTTP signal fallback")
}
return c.sprite.client.signalSession(c.ctx, c.sprite.name, sessID, signal)
}
// ExitCode returns the exit code of the exited process, or -1
// if the process hasn't exited or was terminated by a signal.
func (c *Cmd) ExitCode() int {
c.mu.Lock()
defer c.mu.Unlock()
if !c.finished {
return -1
}
return c.exitCode
}
// buildWebSocketURL constructs the WebSocket URL for the exec endpoint
func (c *Cmd) buildWebSocketURL() (*url.URL, error) {
baseURL := c.sprite.client.baseURL
// Convert HTTP(S) to WS(S)
if baseURL[:4] == "http" {
baseURL = "ws" + baseURL[4:]
}
// Parse base URL
u, err := url.Parse(baseURL)
if err != nil {
return nil, fmt.Errorf("invalid base URL: %w", err)
}
// Build query parameters
q := u.Query()
// Build path - endpoint format depends on server version and cached preference
if c.sessionID != "" {
// Use legacy format if sprite is known to require it, or if version doesn't support path attach
if c.sprite.useLegacyExecEndpoint || !c.sprite.client.supportsPathAttach() {
// Legacy format: query parameter
u.Path = fmt.Sprintf("/v1/sprites/%s/exec", c.sprite.name)
q.Set("id", c.sessionID)
} else {
// New format: path parameter (try first)
u.Path = fmt.Sprintf("/v1/sprites/%s/exec/%s", c.sprite.name, c.sessionID)
}
} else {
u.Path = fmt.Sprintf("/v1/sprites/%s/exec", c.sprite.name)
}
// Add command arguments (only for new commands, not attach)
if c.sessionID == "" {
for i, arg := range c.Args {
q.Add("cmd", arg)
if i == 0 {
q.Set("path", arg)
}
}
}
// Add environment variables
for _, env := range c.Env {
q.Add("env", env)
}
// Add working directory
if c.Dir != "" {
q.Set("dir", c.Dir)
}
// Add TTY settings
if c.tty {
q.Set("tty", "true")
if c.ttySize != nil {
q.Set("rows", fmt.Sprintf("%d", c.ttySize.Rows))
q.Set("cols", fmt.Sprintf("%d", c.ttySize.Cols))
}
}
// Add control mode flag
if c.controlMode {
q.Set("cc", "true")
}
// Add stdin parameter so the server knows whether to expect input
if c.Stdin == nil {
q.Set("stdin", "false")
} else {
q.Set("stdin", "true")
}
u.RawQuery = q.Encode()
return u, nil
}
// setupIO configures I/O for the WebSocket command
func (c *Cmd) setupIO() {
// Set stdin
if c.Stdin == nil {
c.wsCmd.Stdin = nil
} else {
c.wsCmd.Stdin = c.Stdin
}
// Set stdout
if c.Stdout == nil {
c.wsCmd.Stdout = nil
} else {
c.wsCmd.Stdout = c.Stdout
}
// Set stderr
if c.Stderr == nil {
c.wsCmd.Stderr = nil
} else {
c.wsCmd.Stderr = c.Stderr
}
}
// ExitError reports an unsuccessful exit by a command.
type ExitError struct {
Code int
}
func (e *ExitError) Error() string {
return fmt.Sprintf("exit status %d", e.Code)
}
// ExitCode returns the exit code of the exited process.
func (e *ExitError) ExitCode() int {
return e.Code
}
// writePipe wraps an io.WriteCloser to prevent double closes
type writePipe struct {
io.WriteCloser
mu sync.Mutex
closed bool
}
func (p *writePipe) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if !p.closed && p.WriteCloser != nil {
p.closed = true
return p.WriteCloser.Close()
}
return nil
}
// readPipe wraps an io.ReadCloser to prevent double closes
type readPipe struct {
io.ReadCloser
mu sync.Mutex
closed bool
}
func (p *readPipe) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if !p.closed && p.ReadCloser != nil {
p.closed = true
return p.ReadCloser.Close()
}
return nil
}
// outputBuffer is a thread-safe buffer for capturing output
type outputBuffer struct {
bytes *[]byte
mu sync.Mutex
}
func (b *outputBuffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
*b.bytes = append(*b.bytes, p...)
return len(p), nil
}