diff --git a/.gitignore b/.gitignore index ca2b2686c..92fafecbd 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ *.so *.dylib /mcpproxy +/test/launcher-server/launcher-server __debug_bin* # Playwright MCP artifacts diff --git a/docs/cli-management-commands.md b/docs/cli-management-commands.md index d8135d8b6..0e60178e0 100644 --- a/docs/cli-management-commands.md +++ b/docs/cli-management-commands.md @@ -352,6 +352,14 @@ mcpproxy upstream restart --all **Note:** Restart does not require confirmation as it's non-destructive. +**Locally-launched HTTP/SSE upstreams:** when a server is configured with both +`command` and an HTTP/SSE `url` (see [docs/configuration.md](configuration.md#locally-launched-http--sse-servers)), +`restart` stops the spawned child (`SIGTERM` → grace → `SIGKILL`) before +re-running Connect. The grace timeout is fixed at 5s today; the next start +won't begin until the previous child is fully reaped, so you can rely on the +port being free after the command returns. Stop ordering is: close MCP client +→ stop launched child → release per-server state. + --- ### `mcpproxy doctor` diff --git a/docs/configuration.md b/docs/configuration.md index 00f1ab73a..50f375437 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -147,8 +147,9 @@ MCPProxy looks for configuration in these locations (in order): | `args` | array | No | Command arguments | | `url` | string | Yes* | Server URL (required for `http`/`sse`/`streamable-http` protocols) | | `headers` | object | No | HTTP headers for HTTP-based protocols | -| `working_dir` | string | No | Working directory for stdio servers (default: current directory) | -| `env` | object | No | Environment variables for stdio servers | +| `working_dir` | string | No | Working directory for stdio servers, or for the locally-launched child of an HTTP/SSE server (default: current directory) | +| `env` | object | No | Environment variables for stdio servers, or for the locally-launched child of an HTTP/SSE server | +| `launcher_wait_timeout` | duration | No | When `command` is set together with an HTTP/SSE `url`, how long mcpproxy waits for that URL to become reachable after spawning the child (e.g. `"15s"`, default `"30s"`) | | `oauth` | object | No | OAuth configuration (see [OAuth Configuration](#oauth-configuration)) | | `isolation` | object | No | Per-server Docker isolation settings (see [Docker Isolation](#docker-isolation)) | | `enabled` | boolean | No | Enable/disable server (default: `true`) | @@ -210,6 +211,54 @@ MCPProxy looks for configuration in these locations (in order): } ``` +### Locally-launched HTTP / SSE servers + +By default `command` is only used for `stdio` servers. When you set `command` +together with an HTTP/SSE `url` and an explicit `protocol` of `http`, `sse`, +or `streamable-http`, mcpproxy will: + +1. Spawn the command (with `args`, `env`, `working_dir`, and Docker isolation + exactly like a stdio server). +2. Wait up to `launcher_wait_timeout` (default 30s) for `url` to accept a TCP + connection. +3. Connect via the configured HTTP/SSE transport. +4. Own the child's lifecycle — the process is stopped (`SIGTERM`, then + `SIGKILL` after a grace period) on disconnect, restart, server-disable, or + mcpproxy shutdown. Unexpected exits trigger an automatic disconnect, which + the existing reconnect path picks up. + +```json +{ + "name": "local-http-mcp", + "protocol": "http", + "url": "http://127.0.0.1:9999/mcp", + "command": "node", + "args": ["./examples/echo-http-server.js", "--port", "9999"], + "working_dir": "/path/to/repo", + "launcher_wait_timeout": "15s", + "enabled": true +} +``` + +`stdout` and `stderr` of the child are routed to the per-server log, so +`mcpproxy upstream logs ` continues to work the same way it does for +stdio servers. + +#### Behaviour matrix when both `command` and `url` are set + +| `protocol` | `command` | `url` | Behaviour | +|---|---|---|---| +| `stdio` (explicit) | set | any | Stdio transport, child via stdin/stdout — `url` ignored. | +| `http` / `sse` / `streamable-http` (explicit) | set | set | **Locally-launched HTTP/SSE** — spawn child, wait for URL, connect via network. | +| `http` / `sse` / `streamable-http` (explicit) | unset | set | Connect to remote URL — no spawn. | +| `auto` or unset | set | any | Stdio (`command` wins over `url` for back-compat — set `protocol` explicitly to opt into the launcher). | +| `auto` or unset | unset | set | HTTP/SSE remote — no spawn. | + +The "command wins" rule under `auto` is intentional: it preserves backwards +compatibility with configurations written before the launcher feature +existed. To launch a local HTTP/SSE server you **must** set `protocol` +explicitly to one of `http`, `sse`, or `streamable-http`. + ### OAuth Configuration ```json diff --git a/internal/config/config.go b/internal/config/config.go index 7985cb77c..fe7604433 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -224,6 +224,13 @@ type ServerConfig struct { Updated time.Time `json:"updated,omitempty" mapstructure:"updated"` Isolation *IsolationConfig `json:"isolation,omitempty" mapstructure:"isolation"` // Per-server isolation settings ReconnectOnUse bool `json:"reconnect_on_use,omitempty" mapstructure:"reconnect-on-use"` // Attempt reconnection when a tool call targets a disconnected server + + // LauncherWaitTimeout caps how long mcpproxy will wait for a locally-launched + // HTTP/SSE upstream's URL to become reachable after Spawn(). Only consulted + // when the server is configured with both Command and an HTTP/SSE URL — i.e., + // mcpproxy starts the process AND connects via network. Stdio servers ignore + // this field. Zero or unset → 30s default. + LauncherWaitTimeout Duration `json:"launcher_wait_timeout,omitempty" mapstructure:"launcher_wait_timeout" swaggertype:"string"` } // OAuthConfig represents OAuth configuration for a server diff --git a/internal/config/merge.go b/internal/config/merge.go index 58f0b5acf..c09f81f0e 100644 --- a/internal/config/merge.go +++ b/internal/config/merge.go @@ -528,17 +528,18 @@ func CopyServerConfig(src *ServerConfig) *ServerConfig { } dst := &ServerConfig{ - Name: src.Name, - URL: src.URL, - Protocol: src.Protocol, - Command: src.Command, - WorkingDir: src.WorkingDir, - Enabled: src.Enabled, - Quarantined: src.Quarantined, - SkipQuarantine: src.SkipQuarantine, - Shared: src.Shared, - Created: src.Created, - Updated: src.Updated, + Name: src.Name, + URL: src.URL, + Protocol: src.Protocol, + Command: src.Command, + WorkingDir: src.WorkingDir, + Enabled: src.Enabled, + Quarantined: src.Quarantined, + SkipQuarantine: src.SkipQuarantine, + Shared: src.Shared, + Created: src.Created, + Updated: src.Updated, + LauncherWaitTimeout: src.LauncherWaitTimeout, } // Copy slices diff --git a/internal/upstream/core/client.go b/internal/upstream/core/client.go index fc85bbdc7..715c91489 100644 --- a/internal/upstream/core/client.go +++ b/internal/upstream/core/client.go @@ -19,6 +19,7 @@ import ( "github.com/smart-mcp-proxy/mcpproxy-go/internal/secret" "github.com/smart-mcp-proxy/mcpproxy-go/internal/secureenv" "github.com/smart-mcp-proxy/mcpproxy-go/internal/storage" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/upstream/launcher" "github.com/smart-mcp-proxy/mcpproxy-go/internal/upstream/types" "github.com/mark3labs/mcp-go/client" @@ -98,6 +99,14 @@ type Client struct { containerName string // Store container name for cleanup via docker container commands isDockerCommand bool + // Local launcher tracking — only populated when this Client is using + // HTTP/SSE/streamable-HTTP transport AND ServerConfig.Command is set. + // In that mode mcpproxy spawns the upstream process before connecting, + // and owns its lifecycle via the handle below. Stdio servers leave + // these fields nil — they spawn through mcp-go's stdio transport. + launcherHandle launcher.Handle + launcherCIDFile string + // Notification callback for tools/list_changed onToolsChanged func(serverName string) } diff --git a/internal/upstream/core/connection.go b/internal/upstream/core/connection.go index 4b2b9a576..bef73ee2b 100644 --- a/internal/upstream/core/connection.go +++ b/internal/upstream/core/connection.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "os" "time" "github.com/smart-mcp-proxy/mcpproxy-go/internal/transport" @@ -118,6 +119,21 @@ func (c *Client) Connect(ctx context.Context) error { // Create and connect client based on transport type var err error + // Locally-launched HTTP/SSE upstreams: spawn the child process before + // the transport-level connect, then wait for its URL to become + // reachable. Stdio is excluded because the stdio transport spawns + // through mcp-go itself; running the launcher here would double-spawn. + switch c.transportType { + case transportHTTP, transportHTTPStreamable, transportSSE: + if c.config.Command != "" { + c.logger.Debug("🚀 Launching local upstream before HTTP/SSE connect", + zap.String("server", c.config.Name), + zap.String("transport", c.transportType)) + if launchErr := c.connectWithLauncher(ctx); launchErr != nil { + return fmt.Errorf("failed to launch local upstream: %w", launchErr) + } + } + } switch c.transportType { case transportStdio: c.logger.Debug("📡 Using STDIO transport") @@ -183,6 +199,36 @@ func (c *Client) Connect(ctx context.Context) error { c.processGroupID = 0 } + // Stop any locally-launched upstream child the HTTP/SSE path + // started — connectWithLauncher itself only stops it on + // wait-for-url failure, not on subsequent transport-level + // connect failure. + // + // IMPORTANT: c.mu is held for the duration of Connect (see + // the c.mu.Lock at the top of this function), so we can read + // the launcher fields directly. We release the lock briefly + // around handle.Stop because Stop blocks until the child is + // reaped and we don't want to hold c.mu that long; the + // `connecting` flag already prevents a concurrent Connect. + if c.launcherHandle != nil { + handle := c.launcherHandle + cidFile := c.launcherCIDFile + c.launcherHandle = nil + c.launcherCIDFile = "" + c.mu.Unlock() + stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) + if stopErr := handle.Stop(stopCtx); stopErr != nil { + c.logger.Warn("error stopping launcher during connect-failure cleanup", + zap.String("server", c.config.Name), + zap.Error(stopErr)) + } + stopCancel() + if cidFile != "" { + _ = os.Remove(cidFile) + } + c.mu.Lock() + } + return fmt.Errorf("failed to connect: %w", err) } diff --git a/internal/upstream/core/connection_launcher.go b/internal/upstream/core/connection_launcher.go new file mode 100644 index 000000000..0d3807e55 --- /dev/null +++ b/internal/upstream/core/connection_launcher.go @@ -0,0 +1,309 @@ +package core + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "strings" + "time" + + "go.uber.org/zap" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/upstream/launcher" +) + +// defaultLauncherWaitTimeout caps how long we'll wait for a locally-launched +// HTTP/SSE upstream to bind its listener. Tight enough that a misconfigured +// command surfaces as a connect failure within a single retry window; +// configurable per-server via ServerConfig.LauncherWaitTimeout. +const defaultLauncherWaitTimeout = 30 * time.Second + +// connectWithLauncher starts the local upstream process described by +// c.config (Command/Args/Env/WorkingDir) and waits for c.config.URL to +// accept a TCP connection. Used by HTTP / SSE / streamable-HTTP transports +// when Command is also set on the server config — letting mcpproxy own the +// process lifecycle of an upstream that exposes its MCP endpoint over the +// network rather than stdio. +// +// Stdio transport never goes through this path — it spawns the child via +// mcp-go's stdio transport internally. See connection_stdio.go. +// +// On success, c.launcherHandle is populated; the caller (Connect) is then +// responsible for the transport-level connect and for arranging +// Disconnect-time cleanup. On failure, the child (if it started) is +// already stopped before this function returns. +func (c *Client) connectWithLauncher(ctx context.Context) error { + if c.config.Command == "" { + return nil // launcher not requested + } + + if err := validateStdioConfig(c.config); err != nil { + return fmt.Errorf("launcher config invalid: %w", err) + } + if c.config.URL == "" { + return fmt.Errorf("launcher requires url for transport %q", c.transportType) + } + if err := validateWorkingDir(c.config.WorkingDir); err != nil { + return fmt.Errorf("launcher working_dir invalid: %w", err) + } + + // Pre-flight: detect Docker isolation up-front so we can hold the + // per-server container lock across the entire spawn sequence + // (matches connectStdio's behaviour at internal/upstream/core/ + // connection_stdio.go:97). Otherwise concurrent reconnects could + // race to create overlapping containers for the same upstream. + willUseDocker := (c.config.Command == cmdDocker || strings.HasSuffix(c.config.Command, "/"+cmdDocker)) && len(c.config.Args) > 0 && c.config.Args[0] == cmdRun + if !willUseDocker && c.isolationManager != nil { + willUseDocker = c.isolationManager.ShouldIsolate(c.config) + } + if willUseDocker { + lock := globalContainerLock.Lock(c.config.Name) + defer lock.Unlock() + if err := c.ensureNoExistingContainers(ctx); err != nil { + c.logger.Warn("ensure-no-existing-containers failed; continuing", + zap.String("server", c.config.Name), + zap.Error(err)) + } + } + + cmd, isDocker, cidFile, err := c.buildLauncherCmd(ctx, willUseDocker) + if err != nil { + return fmt.Errorf("launcher build cmd: %w", err) + } + + c.isDockerCommand = isDocker + + sink := newLoggerWriter(c.upstreamLogger, c.logger) + spec := &launcher.Spec{ + Cmd: cmd, + LogSink: sink, + Name: c.config.Name, + } + + handle, err := launcher.Spawn(ctx, spec, c.logger) + if err != nil { + if cidFile != "" { + _ = os.Remove(cidFile) + } + return fmt.Errorf("launcher spawn: %w", err) + } + + // Caller (Client.Connect) already holds c.mu for the duration of + // Connect, so we set the launcher fields directly here. Read paths + // (watchLauncher, Disconnect, stopLauncher) use proper locking + // against this write via the same c.mu / c.mu.RLock. + // + // Deliberately NOT setting c.processCmd / c.processGroupID — the + // launcher.Handle owns the child's lifecycle and the existing + // stdio-path cleanup helpers would race with it. The launcher + // already places the child in its own pgroup via applyProcAttrs. + c.launcherHandle = handle + c.launcherCIDFile = cidFile + + // Watch the child for unexpected exit during connect/steady-state. If + // the launched process dies after Connect returns we want to react + // fast (mark disconnected) instead of waiting for the transport's + // keepalive to time out. + go c.watchLauncher(handle) + + waitTimeout := c.config.LauncherWaitTimeout.Duration() + if waitTimeout <= 0 { + waitTimeout = defaultLauncherWaitTimeout + } + + c.logger.Info("waiting for upstream URL to become reachable", + zap.String("server", c.config.Name), + zap.String("url", c.config.URL), + zap.Duration("timeout", waitTimeout)) + + if err := launcher.WaitForURL(ctx, c.config.URL, waitTimeout); err != nil { + // Tear down the child we just started — caller will see a + // failed Connect and the next attempt should start fresh. + // Release c.mu around handle.Stop because it blocks until the + // child is reaped; the `connecting` flag (set by Connect) + // prevents a concurrent Connect from sneaking in. Reacquire + // before returning so the caller's deferred Unlock balances. + c.launcherHandle = nil + c.launcherCIDFile = "" + c.mu.Unlock() + stopCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if stopErr := handle.Stop(stopCtx); stopErr != nil { + c.logger.Warn("failed to stop launcher after wait-for-url failure", + zap.String("server", c.config.Name), + zap.Error(stopErr)) + } + cancel() + if cidFile != "" { + _ = os.Remove(cidFile) + } + c.mu.Lock() + return fmt.Errorf("launcher wait-for-url: %w", err) + } + + c.logger.Info("upstream URL is reachable", + zap.String("server", c.config.Name), + zap.String("url", c.config.URL)) + return nil +} + +// stopLauncher tears down the spawned child, if any. Safe to call when no +// launcher is active. Called from DisconnectWithContext and from the +// connection-failure cleanup path in Connect. +func (c *Client) stopLauncher(ctx context.Context) { + c.mu.Lock() + handle := c.launcherHandle + cidFile := c.launcherCIDFile + c.launcherHandle = nil + c.launcherCIDFile = "" + c.mu.Unlock() + + if handle == nil { + return + } + + if err := handle.Stop(ctx); err != nil { + c.logger.Warn("error stopping upstream child", + zap.String("server", c.config.Name), + zap.Error(err)) + } + if cidFile != "" { + _ = os.Remove(cidFile) + } +} + +// watchLauncher reacts to an unexpected child exit by closing the MCP +// client and clearing connected state. The reconnect loop in +// internal/upstream/manager.go can then take over. +func (c *Client) watchLauncher(handle launcher.Handle) { + <-handle.Done() + c.mu.RLock() + stillOurs := c.launcherHandle == handle + connected := c.connected + c.mu.RUnlock() + if !stillOurs { + // Disconnect already cleaned up — the child exit is expected. + return + } + if !connected { + // We're still inside Connect; the wait-for-url path will see + // the failure on its own (TCP dial will eventually fail). + return + } + c.logger.Warn("upstream child exited unexpectedly, disconnecting", + zap.String("server", c.config.Name)) + if err := c.Disconnect(); err != nil { + c.logger.Error("disconnect after unexpected child exit failed", + zap.String("server", c.config.Name), + zap.Error(err)) + } +} + +// buildLauncherCmd produces a *exec.Cmd ready to start, mirroring the +// command-prep that connectStdio does today (env merge, Docker isolation, +// shell-wrap, working-dir, --cidfile threading). The launcher will set +// SysProcAttr for process-group cleanup; we don't pre-set it here. The +// caller is responsible for holding globalContainerLock when willUseDocker +// is true (see connectWithLauncher). +// +// The supplied ctx is intentionally not bound to the returned cmd. Connect +// contexts are typically short-lived (deadline-driven), but the launched +// child must outlive Connect: its lifetime is owned by launcher.Handle +// instead. Stdio's connection_stdio.go uses the same pattern (persistent +// ctx for client.Start). +func (c *Client) buildLauncherCmd(_ context.Context, willUseDocker bool) (*exec.Cmd, bool, string, error) { + envVars := c.envManager.BuildSecureEnvironment() + for k, v := range c.config.Env { + found := false + for i, ev := range envVars { + if strings.HasPrefix(ev, k+"=") { + envVars[i] = k + "=" + v + found = true + break + } + } + if !found { + envVars = append(envVars, k+"="+v) + } + } + + args := c.config.Args + var cidFile string + + if willUseDocker { + if tmp, err := os.CreateTemp("", "mcpproxy-cid-*.txt"); err == nil { + cidFile = tmp.Name() + tmp.Close() + os.Remove(cidFile) + } else { + c.logger.Warn("could not create cidfile", + zap.String("server", c.config.Name), + zap.Error(err)) + } + } + + var finalCommand string + var finalArgs []string + + if c.isolationManager != nil && c.isolationManager.ShouldIsolate(c.config) { + finalCommand, finalArgs = c.setupDockerIsolation(c.config.Command, args) + if cidFile != "" { + finalArgs = c.insertCidfileIntoShellDockerCommand(finalArgs, cidFile) + } + } else { + argsToWrap := args + isDirectDockerRun := (c.config.Command == cmdDocker || strings.HasSuffix(c.config.Command, "/"+cmdDocker)) && len(args) > 0 && args[0] == cmdRun + if isDirectDockerRun && len(c.config.Env) > 0 { + argsToWrap = c.injectEnvVarsIntoDockerArgs(args, c.config.Env) + } + finalCommand, finalArgs = c.wrapWithUserShell(c.config.Command, argsToWrap) + if isDirectDockerRun && cidFile != "" { + finalArgs = c.insertCidfileIntoShellDockerCommand(finalArgs, cidFile) + } + } + + cmd := exec.Command(finalCommand, finalArgs...) + cmd.Env = envVars + if c.config.WorkingDir != "" { + cmd.Dir = c.config.WorkingDir + } + + c.logger.Debug("launcher command prepared", + zap.String("server", c.config.Name), + zap.String("command", finalCommand), + zap.Strings("args", finalArgs), + zap.String("working_dir", c.config.WorkingDir), + zap.Bool("docker", willUseDocker)) + + return cmd, willUseDocker, cidFile, nil +} + +// loggerWriter bridges an io.Writer (one Write per line) to the per-server +// zap logger so launcher-pumped output lands in the same place as +// `mcpproxy upstream logs ` already shows. +type loggerWriter struct { + primary *zap.Logger + fallback *zap.Logger +} + +func newLoggerWriter(primary, fallback *zap.Logger) io.Writer { + if primary == nil && fallback == nil { + return io.Discard + } + return &loggerWriter{primary: primary, fallback: fallback} +} + +func (w *loggerWriter) Write(p []byte) (int, error) { + line := strings.TrimRight(string(p), "\n") + if line == "" { + return len(p), nil + } + switch { + case w.primary != nil: + w.primary.Info(line) + case w.fallback != nil: + w.fallback.Info(line) + } + return len(p), nil +} diff --git a/internal/upstream/core/connection_lifecycle.go b/internal/upstream/core/connection_lifecycle.go index d097815ea..f5509798d 100644 --- a/internal/upstream/core/connection_lifecycle.go +++ b/internal/upstream/core/connection_lifecycle.go @@ -242,13 +242,22 @@ func (c *Client) DisconnectWithContext(_ context.Context) error { } } - // Step 6: Update state under lock + // Step 6: Stop any locally-launched HTTP/SSE upstream. We do this + // AFTER closing the MCP client — the child should see the network + // transport go away first, giving it a clean shutdown signal before + // we send SIGTERM. + stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) + c.stopLauncher(stopCtx) + stopCancel() + + // Step 7: Update state under lock c.mu.Lock() c.client = nil c.serverInfo = nil c.connected = false c.cachedTools = nil c.processGroupID = 0 + c.processCmd = nil c.mu.Unlock() c.logger.Debug("Disconnect completed successfully", diff --git a/internal/upstream/launcher/integration_test.go b/internal/upstream/launcher/integration_test.go new file mode 100644 index 000000000..6eb888059 --- /dev/null +++ b/internal/upstream/launcher/integration_test.go @@ -0,0 +1,106 @@ +package launcher + +import ( + "bytes" + "context" + "net" + "os/exec" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +// TestSpawnAndWaitForURL_Integration exercises the full flow that +// connection.go relies on: spawn a child that binds late, then poll the +// configured URL until it accepts a TCP connection. +// +// Uses a `sh` one-liner with a here-doc that exec's a Go program built into +// the test binary at run time — that's brittle. Simpler: shell-side TCP +// trickery via /dev/tcp isn't a listener. We rely on `python3 -c` if +// available (covers Linux/macOS development environments) and skip +// otherwise. The point is to prove Spawn + WaitForURL agree on a real +// listening child. +func TestSpawnAndWaitForURL_Integration(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration relies on POSIX shell + python") + } + if _, err := exec.LookPath("python3"); err != nil { + t.Skip("python3 not available in PATH") + } + + // Pick a free port we'll tell python to bind. + probe, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + port := probe.Addr().(*net.TCPAddr).Port + probe.Close() + + // Tiny python "server": bind, accept once, hold open. Includes a + // 250ms pre-bind sleep so WaitForURL has to actually poll, not + // succeed on its first dial. + script := ` +import socket, time, sys +time.sleep(0.25) +s = socket.socket() +s.bind(('127.0.0.1', ` + itoa(port) + `)) +s.listen(8) +try: + while True: + c, _ = s.accept() + c.close() +except KeyboardInterrupt: + pass +` + cmd := exec.Command("python3", "-c", script) + var sink bytes.Buffer + + h, err := Spawn(context.Background(), &Spec{ + Cmd: cmd, + LogSink: &sink, + Name: "python-listener", + StopGrace: 1 * time.Second, + }, zap.NewNop()) + require.NoError(t, err) + defer func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _ = h.Stop(stopCtx) + }() + + url := "http://127.0.0.1:" + itoa(port) + "/mcp" + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + start := time.Now() + err = WaitForURL(ctx, url, 3*time.Second) + elapsed := time.Since(start) + assert.NoError(t, err) + assert.GreaterOrEqual(t, elapsed, 200*time.Millisecond, "should have polled while python warmed up") +} + +// itoa avoids strconv import for a one-liner test helper. +func itoa(n int) string { + if n == 0 { + return "0" + } + var buf [12]byte + i := len(buf) + neg := n < 0 + if neg { + n = -n + } + for n > 0 { + i-- + buf[i] = byte('0' + n%10) + n /= 10 + } + if neg { + i-- + buf[i] = '-' + } + return string(buf[i:]) +} diff --git a/internal/upstream/launcher/launcher.go b/internal/upstream/launcher/launcher.go new file mode 100644 index 000000000..e2ad41b9b --- /dev/null +++ b/internal/upstream/launcher/launcher.go @@ -0,0 +1,321 @@ +package launcher + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os/exec" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" +) + +// DefaultStopGrace is the time we wait between SIGTERM and SIGKILL when +// stopping a launched child. Tunable via Handle.SetStopGrace if a particular +// upstream needs longer (e.g. a Docker container with a slow shutdown +// hook). Five seconds is a common default for "graceful" in this codebase +// (see processGracefulTimeout in internal/upstream/core). +const DefaultStopGrace = 5 * time.Second + +// Spec describes the command to launch. +// +// Spec is intentionally narrow: it doesn't replicate the full ServerConfig +// surface — the caller is responsible for assembling the final exec.Cmd +// (env, working dir, Docker shell-wrap, process-group attrs). The launcher +// only owns the lifecycle once Cmd is started. +type Spec struct { + // Cmd is the command to start. The launcher will set Stdout/Stderr + // pipes and call cmd.Start(). Callers MUST NOT have started cmd + // already; calling Start again is undefined. + Cmd *exec.Cmd + + // LogSink receives the child's combined stdout+stderr, line by line. + // May be nil to discard output (tests). + LogSink io.Writer + + // Name is used in log messages to identify which upstream's launcher + // is doing the talking. Optional; defaults to cmd.Path basename. + Name string + + // StopGrace overrides DefaultStopGrace if non-zero. + StopGrace time.Duration +} + +// Handle represents a running child managed by the launcher. Stop, Wait, +// and Done are safe to call from multiple goroutines. +type Handle interface { + // Stop signals the child to exit (SIGTERM → grace → SIGKILL on + // timeout). Blocks until the child is reaped or ctx fires. Calling + // Stop more than once is safe — subsequent calls return the result + // of the first. + Stop(ctx context.Context) error + + // Wait blocks until the child exits. Returns the exit error from + // exec.Cmd.Wait() (nil on clean exit, *exec.ExitError on non-zero). + Wait() error + + // Done is closed when the child has exited for any reason. Useful + // for select{}-driven supervision. + Done() <-chan struct{} + + // Pid returns the OS process id, or 0 if the child has exited. + Pid() int +} + +// Spawn starts spec.Cmd and returns a Handle owning its lifecycle. +// +// stdout+stderr are line-buffered into spec.LogSink (or discarded if nil). +// On Unix, the child is placed in its own process group so Stop can signal +// the entire group, not just the immediate child — this matters when the +// command shells out (e.g. `sh -c 'docker run …'`) and the actual server +// is a grandchild. +// +// The supplied ctx is NOT used to bound the child's runtime — once Spawn +// returns, the child outlives ctx. ctx is only used to abort cmd.Start() +// itself if it's slow (rare). Callers who want ctx-tied lifetime should +// call Stop from a goroutine watching ctx.Done(). +func Spawn(ctx context.Context, spec *Spec, log *zap.Logger) (Handle, error) { + if spec == nil || spec.Cmd == nil { + return nil, errors.New("launcher.Spawn: spec.Cmd is required") + } + if log == nil { + log = zap.NewNop() + } + + cmd := spec.Cmd + name := spec.Name + if name == "" { + name = cmd.Path + } + + // Apply Unix process-group attrs (no-op on Windows). Callers that + // already set SysProcAttr win — we don't override their fields. + applyProcAttrs(cmd) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("launcher: stdout pipe: %w", err) + } + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("launcher: stderr pipe: %w", err) + } + + // Note: we don't honor ctx for Start() because exec.Cmd.Start() is + // synchronous and short — there's no useful way to interrupt it. We + // could plumb cmd.Cancel but that's also tied to the cmd's + // lifetime, not just startup. + _ = ctx + + if err := cmd.Start(); err != nil { + _ = stdout.Close() + _ = stderr.Close() + return nil, fmt.Errorf("launcher: start %s: %w", name, err) + } + + h := &handle{ + cmd: cmd, + name: name, + log: log.With(zap.String("launcher_server", name)), + done: make(chan struct{}), + stopGrace: spec.StopGrace, + } + if h.stopGrace <= 0 { + h.stopGrace = DefaultStopGrace + } + h.pid.Store(int64(cmd.Process.Pid)) + + // Pump stdout+stderr to LogSink. Both streams write to the same + // sink, prefixed so they remain distinguishable. Discard mode is + // supported via io.Discard. We wrap LogSink in a small mutex so + // the stdout pump, the stderr pump, and the startup banner can + // share an arbitrary io.Writer (bytes.Buffer in tests, zap-bridge + // in production) without racing on Write. + rawSink := spec.LogSink + if rawSink == nil { + rawSink = io.Discard + } + sink := newSerializedWriter(rawSink) + + if rawSink != io.Discard { + args := cmd.Args + if len(args) > 0 { + args = args[1:] + } + _, _ = fmt.Fprintf(sink, "[launcher] starting: %s %v (pid=%d)\n", + cmd.Path, args, cmd.Process.Pid) + } + + h.pumpWG.Add(2) + go pumpLines(&h.pumpWG, stdout, sink, "[launcher stdout] ") + go pumpLines(&h.pumpWG, stderr, sink, "[launcher stderr] ") + + // Reaper goroutine: blocks on cmd.Wait, captures the exit error, + // then closes the done channel exactly once. + go h.reap() + + h.log.Info("upstream child process started", + zap.Int("pid", cmd.Process.Pid), + zap.String("command", cmd.Path)) + + return h, nil +} + +type handle struct { + cmd *exec.Cmd + name string + log *zap.Logger + done chan struct{} + stopGrace time.Duration + + pid atomic.Int64 // 0 once exited + pumpWG sync.WaitGroup + + stopOnce sync.Once + stopErr error + + waitErrMu sync.Mutex + waitErr error +} + +func (h *handle) Pid() int { + return int(h.pid.Load()) +} + +func (h *handle) Done() <-chan struct{} { + return h.done +} + +// Wait waits for the child to exit. Multiple callers see the same error. +func (h *handle) Wait() error { + <-h.done + h.waitErrMu.Lock() + defer h.waitErrMu.Unlock() + return h.waitErr +} + +// Stop is the one method users call to bring the child down deterministically. +// First call drives the actual signal sequence; subsequent calls just wait +// for it to finish. +func (h *handle) Stop(ctx context.Context) error { + h.stopOnce.Do(func() { + h.stopErr = h.stopLocked(ctx) + }) + // Even if another goroutine drove the stop, we still want to wait + // for the child to finish reaping before returning so the caller + // can rely on "Stop returned → port is free". + select { + case <-h.done: + case <-ctx.Done(): + return ctx.Err() + } + return h.stopErr +} + +func (h *handle) stopLocked(ctx context.Context) error { + // Already exited? + select { + case <-h.done: + return nil + default: + } + + if h.cmd.Process == nil { + return nil + } + + h.log.Info("stopping upstream child process", + zap.Int("pid", h.Pid()), + zap.Duration("grace", h.stopGrace)) + + // SIGTERM the process group (Unix) / process (Windows fallback). + if err := terminateProcess(h.cmd, h.log); err != nil { + h.log.Warn("terminate failed (will fall through to wait/kill)", + zap.Error(err)) + } + + // Wait for graceful exit, falling back to SIGKILL after stopGrace. + graceCtx, cancel := context.WithTimeout(ctx, h.stopGrace) + defer cancel() + + select { + case <-h.done: + return nil + case <-graceCtx.Done(): + } + + // Still alive — send SIGKILL. + h.log.Warn("child did not exit within grace period; sending SIGKILL", + zap.Int("pid", h.Pid()), + zap.Duration("grace", h.stopGrace)) + if err := killProcess(h.cmd, h.log); err != nil { + h.log.Error("kill failed", zap.Error(err)) + // Fall through — we still wait for done. + } + + select { + case <-h.done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (h *handle) reap() { + err := h.cmd.Wait() + // Drain log pumps so any final output lands in the sink. + h.pumpWG.Wait() + + h.waitErrMu.Lock() + h.waitErr = err + h.waitErrMu.Unlock() + + h.pid.Store(0) + + exitCode := -1 + if h.cmd.ProcessState != nil { + exitCode = h.cmd.ProcessState.ExitCode() + } + h.log.Info("upstream child process exited", + zap.Int("exit_code", exitCode), + zap.Error(err)) + + close(h.done) +} + +// pumpLines reads r line-by-line and writes each line, prefixed, to dst. +// One Write per line so adapters that bridge io.Writer to a structured +// logger get one log entry per line (the obvious shape) instead of three. +func pumpLines(wg *sync.WaitGroup, r io.Reader, dst io.Writer, prefix string) { + defer wg.Done() + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 0, 4096), 1024*1024) + for scanner.Scan() { + line := scanner.Text() + _, _ = dst.Write([]byte(prefix + line + "\n")) + } +} + +// serializedWriter wraps an io.Writer so concurrent Writes are atomic. +// Production sinks (per-server zap logger) are already thread-safe; we +// add this guard because we promise the launcher will accept an arbitrary +// io.Writer (notably *bytes.Buffer in tests), and bytes.Buffer is NOT +// safe for concurrent Write. +type serializedWriter struct { + mu sync.Mutex + w io.Writer +} + +func newSerializedWriter(w io.Writer) *serializedWriter { + return &serializedWriter{w: w} +} + +func (s *serializedWriter) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.w.Write(p) +} diff --git a/internal/upstream/launcher/launcher_test.go b/internal/upstream/launcher/launcher_test.go new file mode 100644 index 000000000..2a298e8ff --- /dev/null +++ b/internal/upstream/launcher/launcher_test.go @@ -0,0 +1,211 @@ +package launcher + +import ( + "bytes" + "context" + "os/exec" + "regexp" + "runtime" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestSpawn_Stop_GracefulExit(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("graceful-exit test relies on POSIX signal semantics") + } + + // `sleep 30` ignores nothing — SIGTERM kills it cleanly. Good for + // covering the SIGTERM-then-Wait happy path. + cmd := exec.Command("sleep", "30") + var sink bytes.Buffer + h, err := Spawn(context.Background(), &Spec{ + Cmd: cmd, + LogSink: &sink, + Name: "test-sleep", + StopGrace: 2 * time.Second, + }, zap.NewNop()) + require.NoError(t, err) + require.NotNil(t, h) + require.Greater(t, h.Pid(), 0) + + stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = h.Stop(stopCtx) + assert.NoError(t, err) + + select { + case <-h.Done(): + case <-time.After(time.Second): + t.Fatal("Done() not closed after Stop returned") + } + assert.Equal(t, 0, h.Pid(), "pid should reset after exit") +} + +func TestSpawn_Stop_SIGKILLFallback(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("SIGKILL fallback test relies on POSIX signal semantics") + } + + // trap '' TERM → ignore SIGTERM. The launcher must escalate to + // SIGKILL after StopGrace expires. + // + // The script prints "ready" once the trap is installed so we can + // synchronize Stop with the trap being live — otherwise a + // vanishingly fast test runner could SIGTERM the shell before it + // has even parsed the trap directive, which would just terminate + // it and complete Stop in microseconds. + // Detecting "trap installed and loop running" via stdout sniffing + // is fragile: the launcher banner echoes the script verbatim, so + // any literal marker in the script also appears in the banner. + // Match the marker as a shell-substituted PID-bracketed token via + // regex — only the dash-substituted line will match, since the + // banner shows the un-expanded `$$`. + const marker = `__LNCTICK__` + script := `trap '' TERM; while true; do printf '%s\n' "` + marker + `:$$"; sleep 0.1; done` + + // Use a custom matcher that requires marker + ":" + at-least-one-digit. + sinkCh := make(chan struct{}, 1) + sink := newRegexDetector(`__LNCTICK__:[0-9]+`, sinkCh) + + h, err := Spawn(context.Background(), &Spec{ + Cmd: exec.Command("sh", "-c", script), + Name: "term-ignorer", + StopGrace: 300 * time.Millisecond, + LogSink: sink, + }, zap.NewNop()) + require.NoError(t, err) + + select { + case <-sinkCh: + case <-time.After(2 * time.Second): + t.Fatal("script never produced a ticked marker — trap not installed?") + } + + stopCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + start := time.Now() + err = h.Stop(stopCtx) + elapsed := time.Since(start) + assert.NoError(t, err) + assert.GreaterOrEqual(t, elapsed, 300*time.Millisecond, "should have waited at least one grace period") + assert.Less(t, elapsed, 2*time.Second, "should escalate to SIGKILL promptly after grace expires") +} + +// regexDetector closes its channel the first time it sees a Write whose +// payload matches the given regex. Used to detect shell-substituted tokens +// without false-positiving on the launcher's startup banner (which echoes +// the script source un-expanded). +type regexDetector struct { + re *regexp.Regexp + ch chan<- struct{} + once sync.Once +} + +func newRegexDetector(pattern string, ch chan<- struct{}) *regexDetector { + return ®exDetector{re: regexp.MustCompile(pattern), ch: ch} +} + +func (d *regexDetector) Write(p []byte) (int, error) { + if d.re.Match(p) { + d.once.Do(func() { close(d.ch) }) + } + return len(p), nil +} + +func TestSpawn_DoneOnNaturalExit(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("uses /usr/bin/true — POSIX-specific") + } + cmd := exec.Command("sh", "-c", "exit 0") + h, err := Spawn(context.Background(), &Spec{Cmd: cmd, Name: "exit-zero"}, zap.NewNop()) + require.NoError(t, err) + + select { + case <-h.Done(): + case <-time.After(2 * time.Second): + t.Fatal("Done() not closed after natural exit") + } + assert.NoError(t, h.Wait(), "exit 0 should not produce a wait error") +} + +func TestSpawn_DoneOnNonZeroExit(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("uses sh -c — POSIX-specific") + } + cmd := exec.Command("sh", "-c", "exit 17") + h, err := Spawn(context.Background(), &Spec{Cmd: cmd, Name: "exit-17"}, zap.NewNop()) + require.NoError(t, err) + + <-h.Done() + werr := h.Wait() + require.Error(t, werr) + exitErr, ok := werr.(*exec.ExitError) + require.True(t, ok, "expected *exec.ExitError, got %T", werr) + assert.Equal(t, 17, exitErr.ExitCode()) +} + +func TestSpawn_StopIsIdempotent(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("POSIX-specific") + } + cmd := exec.Command("sleep", "30") + h, err := Spawn(context.Background(), &Spec{Cmd: cmd, Name: "sleeper"}, zap.NewNop()) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(3) + for i := 0; i < 3; i++ { + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _ = h.Stop(ctx) + }() + } + wg.Wait() + select { + case <-h.Done(): + default: + t.Fatal("Done not closed after concurrent Stop calls") + } +} + +func TestSpawn_LogSinkCaptured(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("POSIX-specific") + } + cmd := exec.Command("sh", "-c", "echo hello-stdout; echo hello-stderr 1>&2; exit 0") + var sink bytes.Buffer + h, err := Spawn(context.Background(), &Spec{ + Cmd: cmd, + LogSink: &sink, + Name: "loud", + }, zap.NewNop()) + require.NoError(t, err) + + <-h.Done() + out := sink.String() + assert.Contains(t, out, "hello-stdout") + assert.Contains(t, out, "hello-stderr") + assert.Contains(t, out, "[launcher stdout]") + assert.Contains(t, out, "[launcher stderr]") +} + +func TestSpawn_NilSpec(t *testing.T) { + h, err := Spawn(context.Background(), nil, zap.NewNop()) + assert.Error(t, err) + assert.Nil(t, h) +} + +func TestSpawn_NilCmd(t *testing.T) { + h, err := Spawn(context.Background(), &Spec{}, zap.NewNop()) + assert.Error(t, err) + assert.Nil(t, h) +} diff --git a/internal/upstream/launcher/launcher_unix.go b/internal/upstream/launcher/launcher_unix.go new file mode 100644 index 000000000..b48c942d5 --- /dev/null +++ b/internal/upstream/launcher/launcher_unix.go @@ -0,0 +1,49 @@ +//go:build unix + +package launcher + +import ( + "os/exec" + "syscall" + + "go.uber.org/zap" +) + +// applyProcAttrs places the child in its own process group so we can signal +// the entire group (including grandchildren spawned via `sh -c …` or +// `docker run`) when stopping. +func applyProcAttrs(cmd *exec.Cmd) { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Setpgid = true + cmd.SysProcAttr.Pgid = 0 +} + +// terminateProcess sends SIGTERM to the child's process group. +func terminateProcess(cmd *exec.Cmd, log *zap.Logger) error { + if cmd.Process == nil { + return nil + } + pgid, err := syscall.Getpgid(cmd.Process.Pid) + if err != nil || pgid <= 0 { + // Fall back to signaling just the leader if pgid lookup + // failed (e.g. the child already exited). + log.Debug("getpgid failed, signaling pid directly", zap.Error(err)) + return cmd.Process.Signal(syscall.SIGTERM) + } + return syscall.Kill(-pgid, syscall.SIGTERM) +} + +// killProcess sends SIGKILL to the child's process group. +func killProcess(cmd *exec.Cmd, log *zap.Logger) error { + if cmd.Process == nil { + return nil + } + pgid, err := syscall.Getpgid(cmd.Process.Pid) + if err != nil || pgid <= 0 { + log.Debug("getpgid failed, killing pid directly", zap.Error(err)) + return cmd.Process.Kill() + } + return syscall.Kill(-pgid, syscall.SIGKILL) +} diff --git a/internal/upstream/launcher/launcher_windows.go b/internal/upstream/launcher/launcher_windows.go new file mode 100644 index 000000000..d71b338b3 --- /dev/null +++ b/internal/upstream/launcher/launcher_windows.go @@ -0,0 +1,32 @@ +//go:build windows + +package launcher + +import ( + "os/exec" + + "go.uber.org/zap" +) + +// applyProcAttrs is a no-op on Windows. Windows uses Job Objects rather +// than process groups; integrating with the existing Windows process +// management is left to a follow-up (matching the TODO already in +// internal/upstream/core/process_windows.go). +func applyProcAttrs(_ *exec.Cmd) {} + +// terminateProcess on Windows just signals the child directly. Without +// Job Objects we cannot reach grandchildren; the existing stdio path has +// the same limitation today. +func terminateProcess(cmd *exec.Cmd, _ *zap.Logger) error { + if cmd.Process == nil { + return nil + } + return cmd.Process.Kill() +} + +func killProcess(cmd *exec.Cmd, _ *zap.Logger) error { + if cmd.Process == nil { + return nil + } + return cmd.Process.Kill() +} diff --git a/internal/upstream/launcher/wait.go b/internal/upstream/launcher/wait.go new file mode 100644 index 000000000..531c59343 --- /dev/null +++ b/internal/upstream/launcher/wait.go @@ -0,0 +1,127 @@ +// Package launcher manages locally-spawned upstream processes that expose +// their MCP endpoint over HTTP / SSE / streamable-HTTP transports. +// +// Stdio upstreams already spawn a child process — that's how the protocol +// works. HTTP/SSE upstreams have historically required the user to start +// the process themselves before mcpproxy connected. The launcher decouples +// "how to start the process" from "how to talk to it", so the same +// {command, args, env, working_dir, docker isolation} configuration that +// stdio servers use can also drive an HTTP/SSE server's lifecycle. +// +// This file owns URL-readiness probing. See launcher.go for spawn/stop. +package launcher + +import ( + "context" + "errors" + "fmt" + "net" + "net/url" + "strings" + "time" +) + +// defaultDialPerAttempt is the per-dial timeout while polling. Kept short so +// each retry can fire quickly while the listener is coming up. +const defaultDialPerAttempt = 1 * time.Second + +// defaultDialPollInterval is how long we sleep between failed dials. Short +// enough to feel snappy on a fast-starting server, long enough that we don't +// spin the CPU waiting on a slow one. +const defaultDialPollInterval = 200 * time.Millisecond + +// WaitForURL blocks until rawURL's host:port accepts a TCP connection, or +// until the context is canceled or timeout elapses (whichever comes first). +// +// The check is deliberately a TCP dial, not an HTTP GET. SSE endpoints serve +// a streaming response that never closes; an HTTP GET against one will +// either hang or return a non-2xx status the moment the server's stream +// handler is hit, neither of which actually proves "the listener is up". +// TCP-dial just proves the bind happened, which is what we need before +// handing off to the transport-level connect. +// +// timeout=0 means "use no overall deadline beyond ctx.Done()". Negative +// timeouts are coerced to 0 to keep callers from accidentally producing an +// already-expired deadline. +func WaitForURL(ctx context.Context, rawURL string, timeout time.Duration) error { + addr, err := addrFromURL(rawURL) + if err != nil { + return err + } + + if timeout < 0 { + timeout = 0 + } + + dialCtx := ctx + if timeout > 0 { + var cancel context.CancelFunc + dialCtx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + dialer := &net.Dialer{Timeout: defaultDialPerAttempt} + var lastErr error + for { + // Honor the deadline / cancel before each attempt so a cancel + // during sleep is observed promptly. + if err := dialCtx.Err(); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + if lastErr != nil { + return fmt.Errorf("url %s not reachable in %s (last dial error: %w)", rawURL, timeout, lastErr) + } + return fmt.Errorf("url %s not reachable in %s", rawURL, timeout) + } + return err + } + + conn, err := dialer.DialContext(dialCtx, "tcp", addr) + if err == nil { + _ = conn.Close() + return nil + } + lastErr = err + + select { + case <-dialCtx.Done(): + // Loop back so the deadline path above formats the error + // consistently. + continue + case <-time.After(defaultDialPollInterval): + } + } +} + +// addrFromURL extracts host:port from rawURL. If the URL omits a port, a +// default is inferred from the scheme (http→80, https→443). Unsupported +// schemes return an explanatory error so misconfigurations surface early. +func addrFromURL(rawURL string) (string, error) { + u, err := url.Parse(rawURL) + if err != nil { + return "", fmt.Errorf("parse url %q: %w", rawURL, err) + } + if u.Host == "" { + return "", fmt.Errorf("url %q has no host component", rawURL) + } + + host := u.Hostname() + if host == "" { + return "", fmt.Errorf("url %q has empty host", rawURL) + } + + port := u.Port() + if port == "" { + switch strings.ToLower(u.Scheme) { + case "http", "ws": + port = "80" + case "https", "wss": + port = "443" + case "": + return "", fmt.Errorf("url %q has no scheme — cannot infer port", rawURL) + default: + return "", fmt.Errorf("url %q has unsupported scheme %q for TCP probe", rawURL, u.Scheme) + } + } + + return net.JoinHostPort(host, port), nil +} diff --git a/internal/upstream/launcher/wait_test.go b/internal/upstream/launcher/wait_test.go new file mode 100644 index 000000000..51a7744f1 --- /dev/null +++ b/internal/upstream/launcher/wait_test.go @@ -0,0 +1,134 @@ +package launcher + +import ( + "context" + "net" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWaitForURL_ImmediatelyBound(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + + url := "http://" + ln.Addr().String() + "/mcp" + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + start := time.Now() + err = WaitForURL(ctx, url, time.Second) + assert.NoError(t, err) + assert.Less(t, time.Since(start), 500*time.Millisecond, "should return quickly when listener is already bound") +} + +func TestWaitForURL_BoundLate(t *testing.T) { + // Reserve a port, close it, then re-bind after a delay so we can + // give WaitForURL a stable address to poll while it isn't yet + // listening. + probe, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + addr := probe.Addr().String() + probe.Close() + + url := "http://" + addr + "/mcp" + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(400 * time.Millisecond) + ln, err := net.Listen("tcp", addr) + if err != nil { + t.Errorf("late bind failed: %v", err) + return + } + defer ln.Close() + // Hold the listener briefly so WaitForURL has a chance to dial it. + time.Sleep(500 * time.Millisecond) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + err = WaitForURL(ctx, url, 2*time.Second) + assert.NoError(t, err, "should succeed once the listener binds") + + wg.Wait() +} + +func TestWaitForURL_NeverBound(t *testing.T) { + // Reserve and release a port so we know nothing is listening on it. + probe, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + addr := probe.Addr().String() + probe.Close() + + url := "http://" + addr + "/mcp" + + ctx := context.Background() + start := time.Now() + err = WaitForURL(ctx, url, 300*time.Millisecond) + elapsed := time.Since(start) + assert.Error(t, err) + assert.GreaterOrEqual(t, elapsed, 250*time.Millisecond, "should poll for at least the timeout duration") + assert.Less(t, elapsed, 1500*time.Millisecond, "should give up promptly after timeout") +} + +func TestWaitForURL_ContextCanceled(t *testing.T) { + probe, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + addr := probe.Addr().String() + probe.Close() + + url := "http://" + addr + "/mcp" + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(150 * time.Millisecond) + cancel() + }() + + start := time.Now() + err = WaitForURL(ctx, url, 5*time.Second) + assert.Error(t, err) + assert.Less(t, time.Since(start), 1*time.Second, "should observe ctx cancel quickly") +} + +func TestWaitForURL_BadURL(t *testing.T) { + // Inputs WaitForURL rejects at parse time (before it ever dials). + // A URL with an explicit port + unknown scheme is fine — the user + // took responsibility for naming the port — so we don't include + // e.g. ftp://host:21 here. + tests := []struct { + name string + url string + }{ + {"empty", ""}, + {"no host", "http:///mcp"}, + {"unknown scheme without port", "ftp://example.com/foo"}, + {"no scheme no port", "example.com/foo"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := WaitForURL(context.Background(), tc.url, time.Second) + assert.Error(t, err) + }) + } +} + +func TestWaitForURL_InfersDefaultPort(t *testing.T) { + // We can't actually bind on 80/443 in a test, but we can confirm + // that an http:// URL without a port is parsed (rather than rejected) + // by checking it produces a "not reachable" error rather than a + // parse error within the short timeout window. + ctx := context.Background() + err := WaitForURL(ctx, "http://127.0.0.1/", 100*time.Millisecond) + require.Error(t, err) + assert.Contains(t, err.Error(), "not reachable", "default-port inference should reach the polling loop, not fail at parse") +} diff --git a/scripts/test-api-e2e.sh b/scripts/test-api-e2e.sh index b5e202eda..4de7972fa 100755 --- a/scripts/test-api-e2e.sh +++ b/scripts/test-api-e2e.sh @@ -77,6 +77,9 @@ cleanup() { # Additional cleanup - find any remaining mcpproxy processes pkill -f "mcpproxy.*serve" 2>/dev/null || true + # Reap the launcher-test fixture if our launcher-lifecycle test + # failed before the shutdown reap path could run. + pkill -f "launcher-server.*--port 39933" 2>/dev/null || true sleep 1 # Clean up test data @@ -170,6 +173,41 @@ wait_for_server() { return 1 } +# Wait for the launcher-test server (spec 046) to reach healthy. +# Distinct from wait_for_everything_server because it specifically +# verifies the new "mcpproxy spawned and connected to an HTTP MCP +# server it owns" path rather than a stdio upstream. +wait_for_launcher_test_server() { + local max_attempts=30 + local attempt=1 + + echo "Waiting for launcher-test server to be connected..." + + while [ $attempt -le $max_attempts ]; do + local curl_cmd="curl -s --max-time 5 $CURL_CA_OPTS" + if [ ! -z "$API_KEY" ]; then + curl_cmd="$curl_cmd -H \"X-API-Key: $API_KEY\"" + fi + curl_cmd="$curl_cmd \"${API_BASE}/servers\"" + + local response=$(eval $curl_cmd 2>/dev/null) + local connected=$(echo "$response" | jq -r '.data.servers[] | select(.name=="launcher-test") | .connected // false' 2>/dev/null) + + if [ "$connected" = "true" ]; then + echo "launcher-test server is connected" + sleep 1 + return 0 + fi + + echo "Attempt $attempt/$max_attempts - launcher-test connected: $connected" + sleep 2 + attempt=$((attempt + 1)) + done + + echo "launcher-test server failed to connect within $max_attempts attempts" + return 1 +} + # Wait for everything server to connect and be indexed wait_for_everything_server() { local max_attempts=30 @@ -366,6 +404,97 @@ test_sse_auth_failure() { fi } +# Spec-046 lifecycle test: verifies that mcpproxy spawned the launcher +# fixture, drove its lifecycle via the REST API, and reaped it cleanly. +# +# Each assertion is a separate test row so a partial failure shows up as +# one failed sub-step rather than a single opaque test fail. +test_launcher_lifecycle() { + log_test "Launcher lifecycle: tools/list call reaches launched HTTP MCP server" + local curl_cmd="curl -s --max-time 10 $CURL_CA_OPTS" + if [ ! -z "$API_KEY" ]; then + curl_cmd="$curl_cmd -H \"X-API-Key: $API_KEY\"" + fi + local tools_response + tools_response=$(eval "$curl_cmd \"${API_BASE}/servers/launcher-test/tools\"") + if echo "$tools_response" | jq -e '.success == true and any(.data.tools[]; .name == "ping")' >/dev/null 2>&1; then + log_pass "tools/list returned the fixture's ping tool" + else + log_fail "tools/list missing ping tool. Response: $tools_response" + fi + + # Step 2: the child should be a real OS process. pgrep over the + # fixture argv signature lets us detect it without knowing the PID + # mcpproxy assigned. Use `pgrep -f` for arg-line matching. + log_test "Launcher lifecycle: child process is running (pgrep)" + local before_pid + before_pid=$(pgrep -f 'launcher-server.*--port 39933' | head -1) + if [ -n "$before_pid" ]; then + log_pass "child running (pid=$before_pid)" + else + log_fail "no launcher-server process found via pgrep" + fi + + # Step 3: restart -> child must be a NEW pid afterwards. + log_test "Launcher lifecycle: POST /restart reaps + respawns child with new PID" + eval "$curl_cmd -X POST \"${API_BASE}/servers/launcher-test/restart\"" >/dev/null + sleep 4 + local after_pid + after_pid=$(pgrep -f 'launcher-server.*--port 39933' | head -1) + if [ -z "$after_pid" ]; then + log_fail "child gone after restart — should have respawned" + elif [ "$after_pid" = "$before_pid" ]; then + log_fail "child PID unchanged after restart (was $before_pid, still $after_pid)" + else + log_pass "child respawned (was=$before_pid, now=$after_pid)" + fi + + # Step 4: disable -> child must be gone. + log_test "Launcher lifecycle: POST /disable reaps the child" + eval "$curl_cmd -X POST \"${API_BASE}/servers/launcher-test/disable\"" >/dev/null + # Give the launcher up to 8s to deliver SIGTERM + wait for exit. + local waited=0 + while [ $waited -lt 8 ]; do + if ! pgrep -f 'launcher-server.*--port 39933' >/dev/null 2>&1; then + break + fi + sleep 1 + waited=$((waited + 1)) + done + if pgrep -f 'launcher-server.*--port 39933' >/dev/null 2>&1; then + local stragglers + stragglers=$(pgrep -f 'launcher-server.*--port 39933' | tr '\n' ' ') + log_fail "child still alive ${waited}s after disable (pids: $stragglers)" + else + log_pass "child reaped within ${waited}s of disable" + fi + + # Step 5: re-enable + reconnect, then verify a fresh PID appears. + log_test "Launcher lifecycle: POST /enable respawns child" + eval "$curl_cmd -X POST \"${API_BASE}/servers/launcher-test/enable\"" >/dev/null + if wait_for_launcher_test_server; then + local reenabled_pid + reenabled_pid=$(pgrep -f 'launcher-server.*--port 39933' | head -1) + if [ -n "$reenabled_pid" ] && [ "$reenabled_pid" != "$after_pid" ]; then + log_pass "child respawned after enable (pid=$reenabled_pid, different from $after_pid)" + else + log_fail "expected a new PID after enable; got '$reenabled_pid' (previous '$after_pid')" + fi + else + log_fail "launcher-test never reconnected after enable" + fi + + # Step 6: per-server log should contain the launcher banner. + log_test "Launcher lifecycle: per-server log captures child output" + local logs_response + logs_response=$(eval "$curl_cmd \"${API_BASE}/servers/launcher-test/logs?tail=200\"") + if echo "$logs_response" | jq -r '.data.logs[]?' 2>/dev/null | grep -qE '\[launcher\] starting|\[launcher-server\] listening'; then + log_pass "per-server log contains launcher banner / child stdout" + else + log_fail "per-server log missing launcher banner or child stdout" + fi +} + # Prerequisites check echo -e "${YELLOW}Checking prerequisites...${NC}" @@ -396,6 +525,21 @@ if ! command -v npx &> /dev/null; then exit 1 fi +# Build the launcher-test fixture. This is a tiny HTTP MCP server used +# by the spec-046 launcher-lifecycle test below. We rebuild every run +# so the fixture stays in lockstep with the e2e harness. +LAUNCHER_FIXTURE="./test/launcher-server/launcher-server" +echo -e "${YELLOW}Building launcher-test fixture (./test/launcher-server)...${NC}" +if ! go build -o "$LAUNCHER_FIXTURE" ./test/launcher-server >/tmp/launcher-fixture-build.log 2>&1; then + echo -e "${RED}Error: failed to build launcher-test fixture${NC}" + cat /tmp/launcher-fixture-build.log + exit 1 +fi +if [ ! -x "$LAUNCHER_FIXTURE" ]; then + echo -e "${RED}Error: launcher-test fixture not executable at $LAUNCHER_FIXTURE${NC}" + exit 1 +fi + echo -e "${GREEN}Prerequisites check passed${NC}" echo "" @@ -438,6 +582,13 @@ if ! wait_for_everything_server; then exit 1 fi +# Wait for the launcher-test server (spec 046). Failing this hard would +# mask other regressions in the suite, so we just warn and let the +# launcher tests below decide whether to fail. +if ! wait_for_launcher_test_server; then + echo -e "${YELLOW}Warning: launcher-test server never connected — launcher lifecycle test will fail loudly below.${NC}" +fi + echo "" echo -e "${YELLOW}Running API tests...${NC}" echo "" @@ -516,6 +667,13 @@ fi test_api "GET /api/v1/servers (after restart)" "GET" "${API_BASE}/servers" "200" "" \ "jq -e '.success == true and (.data.servers | length) > 0' < '$TEST_RESULTS_FILE' >/dev/null" +# Spec-046 launcher lifecycle (six sub-assertions). Runs late in the +# suite so an earlier failure that takes down mcpproxy short-circuits +# here too rather than producing confusing isolated failures. +echo "" +echo -e "${YELLOW}Running launcher lifecycle test (spec 046)...${NC}" +test_launcher_lifecycle + # Test 17: Test concurrent requests echo "" log_test "Concurrent API requests" diff --git a/specs/046-local-launcher-for-http-sse/execution_log.md b/specs/046-local-launcher-for-http-sse/execution_log.md new file mode 100644 index 000000000..a0c21d0fc --- /dev/null +++ b/specs/046-local-launcher-for-http-sse/execution_log.md @@ -0,0 +1,163 @@ +# Execution Log — 046-local-launcher-for-http-sse + +State maintained per `CLAUDE.md` autonomous-operation requirement. Each +session appends a dated entry; do not rewrite history. + +## 2026-05-10 — Initial scaffold (Roman + Claude) + +**Status**: Phase 0 + Phase 1 code landed in working tree (uncompiled — +sandbox network blocks `proxy.golang.org`, see end of log). Phase 2 partial. + +### Files added + +- `internal/upstream/launcher/launcher.go` — `Spec`, `Handle`, `Spawn`. Owns the + child's lifecycle (Stop with SIGTERM → grace → SIGKILL fallback, Wait, Done, + Pid). Pumps stdout+stderr line-by-line into a caller-supplied `io.Writer`, + one Write per line so a zap-bridge sink produces one log entry per line. +- `internal/upstream/launcher/launcher_unix.go` — Setpgid + signal-the-pgroup + for SIGTERM/SIGKILL on Linux/macOS. +- `internal/upstream/launcher/launcher_windows.go` — best-effort stubs + (matches the existing `process_windows.go` TODO; Job Objects are a + follow-up). +- `internal/upstream/launcher/wait.go` — `WaitForURL` does TCP-dial polling + rather than HTTP GET (gotcha #2 in plan: SSE endpoints stream forever and + break HTTP-GET probes). +- `internal/upstream/launcher/wait_test.go` — 6 cases (immediately bound, + bound late, never bound, ctx-canceled, bad URLs, default-port inference). +- `internal/upstream/launcher/launcher_test.go` — 7 cases (graceful exit, + SIGKILL fallback when SIGTERM is trapped, Done on natural exit, exit-code + capture via `*exec.ExitError`, Stop idempotency, log sink capture, nil + guards). +- `internal/upstream/launcher/integration_test.go` — full Spawn + WaitForURL + with a python-listener subprocess; skips when python3 is missing or on + Windows. (Pure Go testdata helper would be cleaner — TODO.) +- `internal/upstream/core/connection_launcher.go` — `connectWithLauncher`, + `stopLauncher`, `watchLauncher`, `buildLauncherCmd`, `loggerWriter`. + +### Files modified + +- `internal/config/config.go` — `LauncherWaitTimeout Duration` on + `ServerConfig`. Default 30s when zero/unset. +- `internal/config/merge.go` — `CopyServerConfig` carries the new field. +- `internal/upstream/core/client.go` — `launcherHandle launcher.Handle` and + `launcherCIDFile string` on `Client`; new import. +- `internal/upstream/core/connection.go` — pre-transport launcher dispatch + for `http`/`sse`/`streamable-http` when `Command != ""`. Stops launcher + in the connect-failure cleanup path. +- `internal/upstream/core/connection_lifecycle.go` — `stopLauncher` after + the MCP-client close in Disconnect (so the child sees the network + transport go away first); also clears `processCmd`. +- `docs/configuration.md` — new "Locally-launched HTTP / SSE servers" + section + back-compat behaviour matrix; `launcher_wait_timeout` row in + the Server Fields table. +- `docs/cli-management-commands.md` — restart-semantics note covering the + launcher stop-then-start order. + +### Decisions / assumptions + +1. **Stdio path untouched.** Plan's Phase 0 contemplated lifting env/Docker + plumbing out of `connection_stdio.go` and routing stdio through + `launcher.Spawn`. Doing that requires reworking how mcp-go owns the + stdio process (mcp-go's `Stdio` transport spawns via a `CommandFunc` it + controls — externally-spawned children can't be wired into it without + patching the upstream library). To honour the spirit of "Docker-isolation + logic must live in one place" without that reshuffling, the new + `buildLauncherCmd` reuses the same Client methods (`setupDockerIsolation`, + `injectEnvVarsIntoDockerArgs`, `insertCidfileIntoShellDockerCommand`, + `wrapWithUserShell`) the stdio path already calls. Single source of + truth, but no double-spawn risk. + +2. **Launcher-managed children stay invisible to stdio cleanup helpers.** + `connectWithLauncher` deliberately does NOT set `c.processCmd` / + `c.processGroupID`. The `launcher.Handle` owns lifecycle; setting those + would let stdio's `killProcessGroup` race with `Handle.Stop`. This is a + minor deviation from the original plan (which suggested wiring the same + process-group tracking) — the result is cleaner ownership. + +3. **Health check is a TCP dial.** Per the plan's gotcha #2. + `addrFromURL` infers default ports for http/https/ws/wss; rejects + unknown schemes early so misconfigurations surface fast. + +4. **StopGrace default is 5s.** Plan asked for an explicit decision (open + question #2). 5s matches `processGracefulTimeout` in + `internal/upstream/core/connection.go`. No per-server override yet — + `Spec.StopGrace` is plumbed but not exposed in `ServerConfig`. Promote to + config if a real-world server needs more. + +5. **Crash-while-connected → Disconnect.** `watchLauncher` calls the + `Client.Disconnect()` path on unexpected child exit (gotcha #6). + Existing reconnect logic in `internal/upstream/managed` then handles + the come-back attempt — no separate launcher-internal restart loop + (open question #3 settled toward "defer to transport-level reconnect"). + +6. **Stop ctx on shutdown.** `stopLauncher` currently uses + `context.WithTimeout(context.Background(), 10s)` everywhere. Plan + open question #4 — accept this default; raise the limit if shutdown + really needs to wait for slow Docker stop. + +### Verification round 1 (2026-05-11) + +After `sbx policy allow network proxy.golang.org,sum.golang.org` was set: + +| Command | Result | +|---|---| +| `GOTOOLCHAIN=local go vet ./internal/upstream/...` | ✅ clean | +| `GOTOOLCHAIN=local go test ./internal/upstream/launcher/...` | ✅ 15/15 | +| `GOTOOLCHAIN=local go test ./internal/upstream/...` | ✅ all packages | +| `GOTOOLCHAIN=local go test ./internal/config/...` | ✅ | +| `go test -race` | ⚠️ blocked — cgo (gcc) not installed in sandbox; user can run on host | +| `go build ./cmd/mcpproxy` | ❌ blocked — needs `storage.googleapis.com` (some Go modules CDN-served from there); user must add `sbx policy allow network storage.googleapis.com` | + +### Bugs found + fixed during verification round 1 + +1. **Deadlock in connect-failure cleanup.** `Connect` holds `c.mu` for its + entire duration; my original failure-path call to `c.stopLauncher(...)` + re-acquired the same lock → hang. Fixed by inlining the stop sequence + in `connection.go`'s cleanup branch (read fields under the held lock, + release `c.mu` around `handle.Stop()`, reacquire before return). +2. **`connectWithLauncher` redundant locking.** Same root cause — + `connectWithLauncher` is called from `Connect` which already holds + `c.mu`. Removed the inner `c.mu.Lock()/Unlock()` for the launcher + field writes; the wait-for-url failure path still releases the lock + around the blocking `handle.Stop()` and reacquires before returning. +3. **`bytes.Buffer` LogSink race.** Test failures from the stdout pump, + stderr pump, and the startup-banner write all racing on a single + `*bytes.Buffer` in tests. Fixed by wrapping `LogSink` internally with + a `serializedWriter` (mutex around `Write`). zap-bridge in production + is already thread-safe, so this is a robustness fix for test sinks + and any future single-writer adapters. +4. **SIGKILL-fallback test could detect "ready" in the banner.** The + launcher startup banner echoes the script source verbatim, so any + marker token literally present in the script also matched in the + banner — making the test think the trap was installed before the + shell even ran. Fixed by using a shell-substituted marker + (`__LNCTICK__:$$`) and a regex detector (`__LNCTICK__:[0-9]+`). +5. **`bad scheme + explicit port` test case.** Test asserted error on + `ftp://example.com:21/foo` but the launcher correctly accepts any + scheme when the port is explicit (user took responsibility). Removed + that case; replaced with the actually-invalid `ftp://example.com/foo`. + +### Outstanding network blocker + +``` +sbx policy allow network storage.googleapis.com +``` + +Needed for `go build ./cmd/mcpproxy` to fetch Bleve/Roaring/etc. CDN-backed +modules. Once allowed, the verification commands are: + +``` +GOTOOLCHAIN=local go build ./cmd/mcpproxy +./scripts/test-api-e2e.sh # optional smoke test +``` + +### Outstanding follow-ups (post-PR) + +- Replace `integration_test.go`'s python-shellout with a Go test-binary + helper invoked via `os.Args` re-entry pattern, so the test runs on any + CI that has Go (which is all of them). Plan called for a tiny binary in + `internal/upstream/launcher/testdata/`. +- Extend `scripts/test-api-e2e.sh` with a launcher-flavoured server (plan + Phase 2 item). +- Phase 3 (post-merge): `{port}` templating in `args` / `url`, per-launcher + custom health probe, exponential backoff for repeated launcher crashes. diff --git a/specs/046-local-launcher-for-http-sse/plan.md b/specs/046-local-launcher-for-http-sse/plan.md new file mode 100644 index 000000000..d1e6e4285 --- /dev/null +++ b/specs/046-local-launcher-for-http-sse/plan.md @@ -0,0 +1,331 @@ +# Implementation Plan: Local Launcher for HTTP / SSE upstreams + +**Branch**: `046-local-launcher-for-http-sse` | **Date**: 2026-05-10 | **Status**: Draft — pre-implementation plan only, no code written yet. + +> **Hand-off context for a fresh session.** Everything below is what you need to pick this up from scratch. Read top-to-bottom, then jump to "Implementation" once the goal is clear. + +## Where this lives + how the PR opens + +This work targets the **`mcpproxy-go` upstream repository**, not the outer Halo `tools/` super-repo. `mcpproxy-go` is included into Halo as a git submodule at `tools/common/mcpproxy-go/`; all editing happens *inside* that submodule. + +**Remotes in the submodule** (verified 2026-05-10): + +| Remote name | URL | Role | +|---|---|---| +| `origin` | `https://github.com/HaloCollar/mcpproxy-go.git` | Halo's fork — push branches here so we can open PRs | +| `upstream` | `https://github.com/smart-mcp-proxy/mcpproxy-go.git` | The original project — PR target | + +**Branch & PR mechanics:** + +1. Inside the submodule (`cd tools/common/mcpproxy-go`), branch off the fork's tip — practically that means `git checkout main` then `git checkout -b 046-local-launcher-for-http-sse`. If you want to base it on upstream's tip instead, `git fetch upstream && git checkout -b 046-local-launcher-for-http-sse upstream/main` — pick whichever matches what's actually shipped at submission time; the fork tends to track upstream closely. +2. All commits for this feature land on that branch (refactor commits in Phase 0, feature commits in Phase 1, tests + docs in Phase 2 — see "Implementation phases" below; keep them as separate commits so the reviewer can read them in order). +3. Push to **HaloCollar fork**: `git push -u origin 046-local-launcher-for-http-sse`. Do NOT push to `upstream` — we don't have direct write access there. +4. Open the PR with `gh pr create --repo smart-mcp-proxy/mcpproxy-go --base main --head HaloCollar:046-local-launcher-for-http-sse` (or via the GitHub UI). Use the PR description template the upstream maintainers prefer (check `CONTRIBUTING.md` in the submodule before submitting). +5. The outer `tools/` super-repo gets a **submodule pointer bump only after the upstream PR merges**. That's a separate, tiny commit on the Halo side that just advances `tools/common/mcpproxy-go`'s SHA. Don't touch it until merge. + +**This plan file itself** lives at `specs/046-local-launcher-for-http-sse/plan.md` inside the submodule, on the `046-local-launcher-for-http-sse` branch. It's part of the upstream PR's first commit so reviewers can read the design before reading the code. + +## Summary + +Today mcpproxy will spawn a child process for an upstream MCP server **only** when the transport is stdio. If the upstream uses `http`, `sse`, or `streamable-http` transport, mcpproxy reads `url` and connects directly — the `command` field on the server config is silently ignored. + +We want to allow `command` + `url` + (`http`|`sse`|`streamable-http`) together so that mcpproxy: + +1. Spawns the user's local command (e.g. `node my-server.js`, `docker run ...`, `./my-mcp-binary --port 9999`). +2. Waits for that command's HTTP/SSE endpoint at the configured `url` to come up. +3. Connects via the existing HTTP/SSE transport once the endpoint is reachable. +4. Owns the child's lifecycle: kills it on disconnect / restart / mcpproxy shutdown, captures its stdout+stderr to the per-server log. + +This lets users self-host an MCP server that exposes HTTP/SSE (instead of stdio) without separately starting it before launching mcpproxy. + +## Goal in one sentence + +Make `{command, url, protocol: "http"|"sse"|"streamable-http"}` a first-class config combo that mcpproxy launches *and* connects to, with full lifecycle ownership. + +## Why this is an implementation choice, not a protocol limitation + +The MCP spec doesn't dictate process management — that's mcpproxy's job. Stdio happens to *require* spawning (you need stdin/stdout for the protocol bytes); HTTP/SSE happens to not require it (the protocol bytes flow over network), so the original implementation conflated "needs spawn" with "is stdio". Decoupling launcher from transport is the whole point of this feature. + +## Codebase reconnaissance — read these before touching anything + +Pre-existing files relevant to this plan. Read each at least skim-level before implementing: + +| File | Why it matters | +|---|---| +| `internal/config/config.go:211-215` | `ServerConfig` already has `Command`, `Args`, `WorkingDir`, `Env`, plus `Environment *secureenv.EnvConfig`. No schema changes needed. | +| `internal/transport/http.go:409-426` | `DetermineTransportType` — the dispatch logic. Today: protocol > command > url > default-stdio. After this change: leave dispatch unchanged; the launcher activates orthogonally when command is set. | +| `internal/upstream/core/connection.go:96-160` | The transport-dispatch switch lives here (`switch c.transportType`). Inject the launcher step *before* this switch when transport != stdio AND command != "". | +| `internal/upstream/core/connection_stdio.go` | Existing stdio spawn logic — has the env-resolution, secret-injection, Docker-detection, working-dir, signal-handling patterns we want to reuse, not duplicate. | +| `internal/upstream/core/connection_http.go` | `connectHTTP` and `connectSSE`. These don't need to change — they still own the transport handshake. They just trust that the URL is reachable by the time they run. | +| `internal/upstream/core/client.go` | The Client struct. We'll add a launcher handle here so `Disconnect()` / `Restart()` can kill the child. | +| `internal/upstream/manager.go` | Manager-level reconnect / restart logic. Confirm restart goes through Disconnect → Connect; if it skips Disconnect we need to fix that so the child gets reaped. | +| `internal/logs/` | Per-server log files. Already used by stdio; child stdout/stderr should route here for `mcpproxy upstream logs ` to keep working. | +| `cmd/mcpproxy/upstream_cmd.go:76-81, 775+` | The `upstream restart` CLI handler. No code change expected — just verify the new behaviour is correct. | +| `docs/configuration.md`, `docs/cli-management-commands.md` | User-facing docs to update so the new combo is documented + the "command + url → stdio wins" footgun is called out. | + +## Design + +### Orthogonal launcher concept + +A server is described by two independent concerns: + +- **Transport** — how to send/receive MCP messages once a connection exists (stdio / http / sse / streamable-http). Decides which `connectXxx()` runs. +- **Launcher** *(optional)* — how the upstream process gets started. Decides whether mcpproxy spawns a child before connecting. + +Today they're coupled (`stdio = always spawn`, `http/sse = never spawn`). After this change: + +| `command` set? | `url` set? | `protocol` | Behaviour | +|---|---|---|---| +| yes | no | stdio (or empty/auto) | **unchanged** — stdio transport, child via stdin/stdout, no URL. | +| no | yes | http/sse/streamable-http (or empty/auto) | **unchanged** — connect to remote URL, no spawn. | +| **yes** | **yes** | **http/sse/streamable-http (explicit)** | **NEW** — spawn child, wait for URL, connect via HTTP/SSE. | +| yes | yes | empty/auto | Existing behaviour (command wins → stdio, URL ignored). **Keep this** to preserve back-compat. Document it clearly. | +| yes | yes | stdio (explicit) | Existing behaviour — stdio, URL ignored. | + +The auto-detect rules in `DetermineTransportType` stay exactly as they are. The only behavioural change is "explicit http/sse/streamable-http + command set → spawn". + +### Launcher API (new package) + +`internal/upstream/launcher/` — new package. Roughly: + +```go +package launcher + +type Handle interface { + // Stop signals the child to exit (SIGTERM → grace → SIGKILL on timeout). + // Blocks until the child is actually reaped. + Stop(ctx context.Context) error + + // Wait blocks until the child exits on its own. Returns the exit error. + Wait() error + + // Done is closed when the child exits for any reason — used by the + // connection manager to react to crashes (trigger reconnect / mark + // server unhealthy). + Done() <-chan struct{} +} + +// Spawn launches the child described by cfg. It does NOT block on the +// URL becoming reachable — that's the caller's job (via WaitForURL). +// Returns a Handle the caller is responsible for stopping. +func Spawn(ctx context.Context, cfg SpawnConfig, log *zap.Logger) (Handle, error) + +type SpawnConfig struct { + Command string + Args []string + WorkingDir string + Env map[string]string // resolved env (after secret injection) + Environment *secureenv.EnvConfig // for Docker-isolation decisions + LogSink io.Writer // child stdout+stderr go here + // Docker fields, mirroring what connection_stdio.go uses today. +} + +// WaitForURL polls url until it accepts a TCP connection (NOT a full HTTP +// GET — see "Health check" gotcha below). Returns when reachable or when +// timeout/ctx fires. +func WaitForURL(ctx context.Context, url string, timeout time.Duration) error +``` + +**Refactor before you add:** the env-resolution + Docker-detection + working-dir-resolve logic in `connection_stdio.go` should be lifted into the launcher package as the first refactor step. The stdio path then calls the same Spawn helper but pipes the child's stdin/stdout into its mcp-go transport instead of routing stdout to a log file. **Don't fork; share.** + +### Connection-side wiring + +In `connection.go`, before the existing transport switch: + +```go +// Pseudocode — see connection.go:121 for the existing dispatch. + +if c.transportType != transportStdio && c.config.Command != "" { + c.launcher, err = launcher.Spawn(ctx, c.buildSpawnConfig(), c.logger) + if err != nil { return err } + + waitTimeout := c.config.LauncherWaitTimeout // new config field, default 30s + if err := launcher.WaitForURL(ctx, c.config.URL, waitTimeout); err != nil { + _ = c.launcher.Stop(context.Background()) + c.launcher = nil + return fmt.Errorf("local launcher: url %s not reachable in %s: %w", + c.config.URL, waitTimeout, err) + } + + // Goroutine to react to unexpected child exit during steady-state. + go c.watchLauncher() +} + +switch c.transportType { +case transportStdio: err = c.connectStdio(ctx) +case transportHTTP, transportHTTPStreamable: + err = c.connectHTTP(ctx) +case transportSSE: err = c.connectSSE(ctx) +default: return fmt.Errorf("unsupported transport type: %s", c.transportType) +} +``` + +`watchLauncher()` listens on `Handle.Done()` and, if the child dies while we still have a transport-level connection, calls `Disconnect()` so the existing reconnect path kicks in. + +`Disconnect()` (existing) needs one new line at the end: if `c.launcher != nil`, call `c.launcher.Stop(ctx)` with a graceful timeout and nil out the handle. + +### Config schema + +No mandatory schema change — `Command`, `Args`, `WorkingDir`, `Env` already exist on `ServerConfig`. Optional addition: + +```go +// LauncherWaitTimeout — how long Spawn-then-WaitForURL will wait before +// declaring the launch failed. Optional; default 30s. +LauncherWaitTimeout time.Duration `json:"launcher_wait_timeout,omitempty" mapstructure:"launcher_wait_timeout"` +``` + +If left zero, default to 30 seconds. Keep it tight enough that misconfigured commands surface as connect failures fast. + +## Gotchas — decide each one explicitly during implementation + +These are the boring questions that break a launcher in the field. Pick a default for each and write it into the spec. + +### 1. Port ownership + +User pins the port in `url` (e.g. `http://127.0.0.1:9999/mcp`) and is responsible for making the command listen on that port. **Simple, ship this first.** + +Future-work: `{port}` templating in `args` / `url` so mcpproxy can pick a free port and substitute it before spawn. Don't ship this in v1 — it doubles the spec. + +### 2. Health check method + +Don't use `http.Get(url)`. SSE endpoints return a streaming response that never closes; an HTTP GET may either hang or return non-2xx for a perfectly-healthy server (the SSE endpoint typically only accepts GET-with-text/event-stream-Accept). + +**Do**: TCP-dial `host:port` from the URL until it accepts a connection. That proves the listener is bound; the transport-level connect will then do its own protocol handshake. Cleaner separation of concerns. + +```go +// pseudo +host, port := splitURLHostPort(url) +deadline := time.Now().Add(timeout) +for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", host+":"+port, 1*time.Second) + if err == nil { conn.Close(); return nil } + time.Sleep(200 * time.Millisecond) +} +return errors.New("url not reachable in time") +``` + +### 3. Process group + parent-death cleanup + +If mcpproxy crashes without reaping, the child stays alive holding the port. This is the same problem stdio has — see how `connection_stdio.go` handles it today and reuse the pattern. Cross-platform notes: + +- **Linux**: `syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM, Setpgid: true}` makes the kernel deliver SIGTERM to the child when the parent thread dies. Cheap + reliable. +- **macOS**: No pdeathsig. Run a parent-watcher goroutine that polls `os.Getppid()` and sends SIGTERM when it changes to 1 (parent reaped). Or rely on the existing graceful-shutdown handler in `internal/runtime/` to enumerate live launchers and stop them. +- **Windows**: Use Job Objects (kill-on-close). `os/exec` doesn't help directly; use `golang.org/x/sys/windows/job` or similar. + +Most stdio code today already handles this — copy that pattern wholesale. + +### 4. Docker-isolation interop + +`ServerConfig.Environment` (the `*secureenv.EnvConfig` field) already drives Docker isolation for stdio commands. The launcher must honour those same fields — Spawn() should pick Docker-via-`docker run` if the config asks for it, exactly the way `connection_stdio.go` does today. This is also why the refactor step is mandatory: the Docker-isolation logic must live in one place after this lands. + +### 5. Restart semantics + +`mcpproxy upstream restart ` currently calls Disconnect → Connect. After this change: + +- Disconnect must Stop the launcher *and* close the transport, in that order. Wait for the child to actually exit (with a 5s kill-after-grace timeout) so the next start doesn't fight for the port. +- Connect re-runs Spawn + WaitForURL + connectXxx. + +Verify the manager's reconnect loop (`internal/upstream/manager.go`) doesn't try to reconnect *during* a restart — there's an existing race-protection pattern for stdio that probably already covers this, but explicit test required. + +### 6. Crash-while-connected + +If the child dies after a successful transport-level connect, the transport will detect the dropped connection eventually (TCP keepalive / SSE stream EOF). But our `watchLauncher()` goroutine can detect it instantly and trigger Disconnect, which lets the existing reconnect/backoff path kick in faster. + +### 7. Logging + +Reuse `internal/logs/` per-server log files. Pipe `Handle.Spawn`'s `LogSink` to the same writer the stdio path uses. `mcpproxy upstream logs ` works for free. + +Add a header line on spawn (`[launcher] starting: cmd args...`) and on exit (`[launcher] exited code=N`) so users can read the log and see where the protocol layer ends and the child process layer begins. + +### 8. Config docs footgun + +The existing "command without explicit protocol wins over url and forces stdio" rule is staying. After this change, users who had `{command, url}` with no protocol will keep getting stdio — which is what they got yesterday too, no behaviour change. But anyone *expecting* "command + url = launch + HTTP" needs to set `protocol: "http"` explicitly. + +**Doc must spell this out** with a side-by-side table identical to the one in the "Design — Orthogonal launcher concept" section above. Burying it in prose guarantees a support issue. + +## Implementation phases + +### Phase 0 — Refactor (no behaviour change) + +1. Extract `internal/upstream/launcher/` package. +2. Move env-resolution + Docker-detection + working-dir + signal-handling + log-piping out of `connection_stdio.go` into `launcher`. +3. `connection_stdio.go` now calls `launcher.Spawn()` then wires the resulting child's stdin/stdout into mcp-go's stdio transport. +4. All existing stdio tests must pass unchanged. + +**Exit criteria**: `go test ./... -race` green; `mcpproxy upstream restart ` works exactly as before. + +### Phase 1 — HTTP/SSE launcher + +1. Add the spawn-before-transport step in `connection.go` (sketched above). +2. Add `LauncherWaitTimeout` to `ServerConfig` (default 30s). +3. Add `launcher.WaitForURL` TCP-dial helper. +4. Wire `Handle.Stop()` into `Disconnect()`. +5. Wire `watchLauncher()` into the connection lifecycle. + +**Exit criteria**: a test config like + +```json +{ + "name": "local-http-mcp", + "protocol": "http", + "url": "http://127.0.0.1:9999/mcp", + "command": "node", + "args": ["./examples/echo-http-server.js", "--port", "9999"], + "working_dir": "/path/to/repo", + "enabled": true +} +``` + +starts the node process, waits ~1s for the listener, connects via HTTP, and reaps the node process on `mcpproxy upstream restart local-http-mcp`. + +### Phase 2 — Tests + docs + +- Unit tests: `launcher.WaitForURL` (mock listener bound late, never bound, immediately bound, ctx cancelled mid-wait). +- Integration: a small Go binary in `internal/upstream/launcher/testdata/` that binds a port + serves a trivial HTTP `/mcp` endpoint. Mcpproxy spawns it, connects, calls a tool, restarts, asserts the PID changes. +- E2E: extend `scripts/test-api-e2e.sh` with a launcher-flavoured server. +- Docs: + - `docs/configuration.md` — new section "Locally-launched HTTP/SSE servers" with the config snippet above + the back-compat table. + - `docs/cli-management-commands.md` — restart semantics call-out. + - `docs/architecture.md` — diagram update showing launcher as a sibling of transport. + +### Phase 3 — Polish (optional, post-merge) + +- `{port}` templating in args/url for ephemeral ports. +- Per-launcher health probe customization (TCP / HTTP GET path / custom command). +- Backoff for launcher restarts when the child crashes repeatedly. + +## Estimated effort + +- Phase 0 refactor: ~1 day. Read-heavy; mechanical extraction. Done badly it spawns subtle stdio regressions, so go slow. +- Phase 1 launcher: ~1 day. Bulk of new code is in `launcher.go` (~150 lines) + connection.go wiring (~30 lines). +- Phase 2 tests + docs: ~1 day. +- **Total**: ~3 days end-to-end, plus PR review cycles. + +## Open questions to resolve before coding + +These don't have an obvious right answer and should be settled in the spec, not in PR review: + +1. **Should `command` + `url` + auto-protocol promote to "launch + HTTP" or stay as stdio?** Current plan: stay as stdio (back-compat). Confirm. +2. **`Handle.Stop()` grace timeout** — 2s? 5s? 10s? Default 5s with config override is reasonable. +3. **Should the launcher restart the child on crash, or always defer to mcpproxy's transport-level reconnect logic to handle it?** Plan: defer to transport-level. The launcher just dies and signals via `Done()`. +4. **Where does `Stop()` get its context from on shutdown?** Plan: use `context.Background()` with a fixed 10s deadline; mcpproxy's shutdown handler can pass a stricter ctx if it has one. + +## Where to look if you forget the design halfway through + +- This file. +- The codebase reconnaissance table above — every relevant file is listed. +- `internal/upstream/core/connection.go` — start at line 96 (the connect entrypoint) and follow the call graph from there. +- `internal/upstream/core/connection_stdio.go` — your reference implementation for "spawn a child + manage its lifecycle". Most of what you're building already exists here in a stdio-flavoured form. + +## Definition of done + +1. `{command, url, protocol: "http"}` in `mcp_config.json` launches the command, waits for the URL, connects, and is restartable / disconnectable cleanly. +2. Same for `protocol: "sse"` and `protocol: "streamable-http"`. +3. Child process is reaped on Disconnect, on Restart, on Server-disable, on mcpproxy shutdown (graceful and SIGKILL paths). +4. Child stdout/stderr lands in the per-server log; `mcpproxy upstream logs ` shows it. +5. All existing stdio tests still pass after the Phase 0 refactor. +6. New tests cover: successful launch, URL-never-reachable timeout, child-crashes-during-steady-state, restart, graceful-shutdown. +7. `docs/configuration.md` documents the new combo + back-compat table (inside the submodule — `docs/` here means `tools/common/mcpproxy-go/docs/`, the upstream's own docs tree). +8. CHANGELOG entry inside the submodule (whatever convention `mcpproxy-go` uses — check the repo before adding) describes the feature + the no-behaviour-change refactor. +9. PR opened from `HaloCollar:046-local-launcher-for-http-sse` → `smart-mcp-proxy:main`, passing upstream CI. +10. After PR merge: separate Halo super-repo commit advances the `tools/common/mcpproxy-go` submodule SHA. Optionally add a Halo-side changelog entry at `tools/docs/changelog/` noting that local-launcher support is now available downstream. diff --git a/test/e2e-config.template.json b/test/e2e-config.template.json index 1d44ea6f1..6811844dd 100644 --- a/test/e2e-config.template.json +++ b/test/e2e-config.template.json @@ -20,6 +20,18 @@ "quarantined": false, "created": "2025-01-01T00:00:00Z", "updated": "2025-09-23T10:13:46.357736+03:00" + }, + { + "name": "launcher-test", + "protocol": "http", + "url": "http://127.0.0.1:39933/mcp", + "command": "./test/launcher-server/launcher-server", + "args": ["--port", "39933", "--quiet"], + "launcher_wait_timeout": "10s", + "enabled": true, + "quarantined": false, + "created": "2026-05-11T00:00:00Z", + "updated": "2026-05-11T00:00:00Z" } ], "top_k": 10, diff --git a/test/launcher-server/main.go b/test/launcher-server/main.go new file mode 100644 index 000000000..ffd66e46f --- /dev/null +++ b/test/launcher-server/main.go @@ -0,0 +1,226 @@ +// Command launcher-server is a tiny HTTP MCP server used as the child +// process in the e2e test for spec 046 (local launcher for HTTP/SSE +// upstreams). It is NOT a production MCP implementation — it speaks +// just enough of the protocol for mcpproxy's StreamableHTTP transport +// to complete the initialize handshake and answer a tools/list call. +// +// Why: the unit + integration tests in internal/upstream/launcher cover +// the spawn / WaitForURL / Stop semantics in isolation. To prove the +// feature actually works end-to-end — that a `mcpproxy serve` boot +// launches this binary, connects to it via HTTP, completes the MCP +// handshake, and reaps it on disable / restart / shutdown — we need a +// real upstream the e2e harness can drive. Anything more complex than +// "implement initialize + tools/list + tools/call" is out of scope. +// +// Usage: launcher-server --port N [--addr 127.0.0.1] [--quiet] +// +// On SIGTERM/SIGINT the HTTP listener is gracefully shut down (5s +// deadline) and the process exits with code 0. Heartbeat lines are +// printed to stdout once per second by default so the per-server log +// in mcpproxy demonstrably captures the child's output. --quiet +// suppresses the heartbeat. +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" +) + +// The protocol version we advertise. Matches mark3labs/mcp-go's current +// LATEST_PROTOCOL_VERSION at the time this fixture was written. mcp-go's +// initialize negotiation accepts a mismatch (warns but proceeds) so this +// doesn't need to be in lock-step with library bumps. +const protocolVersion = "2025-11-25" + +type jsonRPCRequest struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id,omitempty"` + Method string `json:"method"` + Params json.RawMessage `json:"params,omitempty"` +} + +type jsonRPCResponse struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id,omitempty"` + Result any `json:"result,omitempty"` + Error *rpcError `json:"error,omitempty"` +} + +type rpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func main() { + port := flag.Int("port", 0, "TCP port to bind (required)") + addr := flag.String("addr", "127.0.0.1", "Bind address") + quiet := flag.Bool("quiet", false, "Suppress 1-second heartbeat log lines") + flag.Parse() + + if *port == 0 { + fmt.Fprintln(os.Stderr, "launcher-server: --port is required") + os.Exit(2) + } + + mux := http.NewServeMux() + mux.HandleFunc("/mcp", handleMCP) + + server := &http.Server{ + Addr: fmt.Sprintf("%s:%d", *addr, *port), + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + listenErr := make(chan error, 1) + go func() { + err := server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + listenErr <- err + } + close(listenErr) + }() + + fmt.Printf("[launcher-server] listening on %s (pid=%d)\n", server.Addr, os.Getpid()) + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) + + var heartbeat <-chan time.Time + if !*quiet { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + heartbeat = ticker.C + } + + for { + select { + case sig := <-sigs: + fmt.Printf("[launcher-server] received %v, shutting down\n", sig) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _ = server.Shutdown(ctx) + cancel() + return + case err, ok := <-listenErr: + if ok && err != nil { + log.Fatalf("launcher-server: listen failed: %v", err) + } + return + case <-heartbeat: + fmt.Println("[launcher-server] tick") + } + } +} + +// handleMCP implements the StreamableHTTP server side. mcpproxy POSTs +// JSON-RPC frames here with `Accept: application/json, text/event-stream` +// and we reply with `Content-Type: application/json` (single-shot +// response). We do NOT implement the GET-for-server-push side of the +// streamable transport: the fixture never originates notifications. +func handleMCP(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + // fall through + case http.MethodGet: + // Some clients open a GET for server-push events. We tell them + // we don't have any (405 ⇒ client falls back to POST-only mode). + w.Header().Set("Allow", "POST") + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + case http.MethodDelete: + // Session termination — accept and exit silently. + w.WriteHeader(http.StatusAccepted) + return + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "read body: "+err.Error(), http.StatusBadRequest) + return + } + + var req jsonRPCRequest + if err := json.Unmarshal(body, &req); err != nil { + http.Error(w, "bad json: "+err.Error(), http.StatusBadRequest) + return + } + + // Notifications carry no id and expect no response body. + isNotification := len(req.ID) == 0 || string(req.ID) == "null" + if isNotification { + fmt.Printf("[launcher-server] notification method=%s\n", req.Method) + w.WriteHeader(http.StatusAccepted) + return + } + + fmt.Printf("[launcher-server] request method=%s id=%s\n", req.Method, string(req.ID)) + + var ( + result any + rpcErr *rpcError + ) + switch req.Method { + case "initialize": + result = map[string]any{ + "protocolVersion": protocolVersion, + "capabilities": map[string]any{ + "tools": map[string]any{}, + }, + "serverInfo": map[string]any{ + "name": "launcher-test-fixture", + "version": "0.1.0", + }, + } + case "tools/list": + result = map[string]any{ + "tools": []map[string]any{ + { + "name": "ping", + "description": "Returns 'pong' so the e2e test can prove a launched HTTP MCP server actually answers tool calls.", + "inputSchema": map[string]any{ + "type": "object", + "properties": map[string]any{}, + }, + }, + }, + } + case "tools/call": + // We don't even bother parsing the name — there's only one tool. + result = map[string]any{ + "content": []map[string]any{ + {"type": "text", "text": "pong from launcher-test-fixture"}, + }, + "isError": false, + } + case "ping": + // Heartbeat method some clients send. Reply with an empty result. + result = map[string]any{} + default: + rpcErr = &rpcError{Code: -32601, Message: "method not found: " + req.Method} + } + + resp := jsonRPCResponse{JSONRPC: "2.0", ID: req.ID} + if rpcErr != nil { + resp.Error = rpcErr + } else { + resp.Result = result + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(resp); err != nil { + // Already wrote status; nothing useful to do. + log.Printf("[launcher-server] encode response: %v", err) + } +}