Skip to content

Commit 88f38a5

Browse files
authored
Merge pull request #53 from LAA-Software-Engineering/issue/19-mcp-stdio
feat(tools/mcp): MCP stdio runtime and registry integration (issue #19)
2 parents 2b03c9f + 1fa00b4 commit 88f38a5

10 files changed

Lines changed: 538 additions & 3 deletions

File tree

internal/tools/doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Package tools defines tool registries and integrations (MCP, HTTP, native).
22
//
3-
// [Registry] resolves tool.<name>.<operation> uses strings and dispatches MVP native and mock tools.
3+
// [Registry] resolves tool.<name>.<operation> uses strings and dispatches MVP native, mock, and MCP stdio tools.
44
// Responses use [ToolCallResponse] with output + meta per §13.2.
55
package tools

internal/tools/mcp/call.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package mcp
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"time"
8+
9+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
10+
)
11+
12+
// ExecMeta is timing/cost metadata for an MCP call (§13.2 placeholders).
13+
type ExecMeta struct {
14+
DurationMs int64
15+
CostUSD float64
16+
}
17+
18+
// CallStdio runs one MCP tools/call over a fresh stdio subprocess, with optional retries on transport errors (§13.4).
19+
func CallStdio(ctx context.Context, cfg *spec.ToolMCP, retry *spec.ToolRetry, toolName string, arguments map[string]any) (map[string]any, ExecMeta, error) {
20+
if cfg == nil {
21+
return nil, ExecMeta{}, errors.New("mcp: nil mcp config")
22+
}
23+
if strings.ToLower(strings.TrimSpace(cfg.Transport)) != "stdio" {
24+
return nil, ExecMeta{}, errors.New("mcp: only transport stdio is supported in MVP")
25+
}
26+
cmd := strings.TrimSpace(cfg.Command)
27+
if cmd == "" {
28+
return nil, ExecMeta{}, errors.New("mcp: empty command")
29+
}
30+
31+
attempts := 1
32+
if retry != nil && retry.MaxAttempts > 0 {
33+
attempts = retry.MaxAttempts
34+
}
35+
backoff := ""
36+
if retry != nil {
37+
backoff = retry.Backoff
38+
}
39+
40+
startAll := time.Now()
41+
var lastErr error
42+
for attempt := 0; attempt < attempts; attempt++ {
43+
if attempt > 0 {
44+
sleepBackoff(ctx, attempt, backoff)
45+
}
46+
out, err := oneStdioAttempt(ctx, cmd, cfg.Args, toolName, arguments)
47+
if err == nil {
48+
return out, ExecMeta{DurationMs: time.Since(startAll).Milliseconds(), CostUSD: 0}, nil
49+
}
50+
lastErr = err
51+
if !retryableTransportErr(err) {
52+
break
53+
}
54+
}
55+
return nil, ExecMeta{DurationMs: time.Since(startAll).Milliseconds(), CostUSD: 0}, lastErr
56+
}
57+
58+
func oneStdioAttempt(ctx context.Context, command string, args []string, toolName string, arguments map[string]any) (map[string]any, error) {
59+
tr := NewStdioTransport(command, args)
60+
if err := tr.Start(ctx); err != nil {
61+
return nil, err
62+
}
63+
defer tr.Close()
64+
65+
if err := tr.Initialize(ctx); err != nil {
66+
return nil, err
67+
}
68+
return tr.CallTool(ctx, toolName, arguments)
69+
}
70+
71+
func retryableTransportErr(err error) bool {
72+
if err == nil {
73+
return false
74+
}
75+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
76+
return false
77+
}
78+
var re *rpcError
79+
if errors.As(err, &re) {
80+
return false
81+
}
82+
return true
83+
}
84+
85+
func sleepBackoff(ctx context.Context, attempt int, kind string) {
86+
if attempt <= 0 {
87+
return
88+
}
89+
var d time.Duration
90+
switch strings.ToLower(strings.TrimSpace(kind)) {
91+
case "exponential":
92+
shift := attempt
93+
if shift > 8 {
94+
shift = 8
95+
}
96+
d = time.Millisecond * time.Duration(50*(1<<shift))
97+
case "fixed":
98+
d = 100 * time.Millisecond
99+
default:
100+
d = 50 * time.Millisecond
101+
}
102+
select {
103+
case <-ctx.Done():
104+
case <-time.After(d):
105+
}
106+
}

internal/tools/mcp/client.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package mcp
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
)
8+
9+
// Initialize performs the MCP initialize + notifications/initialized handshake.
10+
func (t *StdioTransport) Initialize(ctx context.Context) error {
11+
params := map[string]any{
12+
"protocolVersion": "2024-11-05",
13+
"capabilities": map[string]any{},
14+
"clientInfo": map[string]any{
15+
"name": "agentic-control-plane",
16+
"version": "0",
17+
},
18+
}
19+
if _, err := t.RoundTrip(ctx, "initialize", params); err != nil {
20+
return err
21+
}
22+
// Notification (no response).
23+
t.mu.Lock()
24+
defer t.mu.Unlock()
25+
return t.writeMessage(map[string]any{
26+
"jsonrpc": "2.0",
27+
"method": "notifications/initialized",
28+
"params": map[string]any{},
29+
})
30+
}
31+
32+
// CallTool invokes tools/call and maps the MCP result into a plain map for §13.2 output.
33+
func (t *StdioTransport) CallTool(ctx context.Context, name string, arguments map[string]any) (map[string]any, error) {
34+
if arguments == nil {
35+
arguments = map[string]any{}
36+
}
37+
raw, err := t.RoundTrip(ctx, "tools/call", map[string]any{
38+
"name": name,
39+
"arguments": arguments,
40+
})
41+
if err != nil {
42+
return nil, err
43+
}
44+
return parseCallToolResult(raw)
45+
}
46+
47+
func parseCallToolResult(raw json.RawMessage) (map[string]any, error) {
48+
var envelope struct {
49+
Content []struct {
50+
Type string `json:"type"`
51+
Text string `json:"text"`
52+
} `json:"content"`
53+
IsError bool `json:"isError"`
54+
}
55+
if err := json.Unmarshal(raw, &envelope); err != nil {
56+
return nil, fmt.Errorf("mcp: decode tools/call result: %w", err)
57+
}
58+
if envelope.IsError {
59+
return nil, rpcErrorf("tools/call isError=true: %s", string(raw))
60+
}
61+
if len(envelope.Content) == 0 {
62+
return map[string]any{}, nil
63+
}
64+
first := envelope.Content[0]
65+
if first.Type == "text" && first.Text != "" {
66+
var obj map[string]any
67+
if json.Unmarshal([]byte(first.Text), &obj) == nil {
68+
return obj, nil
69+
}
70+
return map[string]any{"text": first.Text}, nil
71+
}
72+
return map[string]any{"content": envelope.Content}, nil
73+
}

internal/tools/mcp/doc.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// Package mcp implements MCP tool transport (MVP: stdio JSON-RPC) per design doc §7.3.
2+
//
3+
// [CallStdio] runs a subprocess, performs initialize / notifications/initialized, then tools/call.
4+
package mcp

internal/tools/mcp/errors.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package mcp
2+
3+
import "fmt"
4+
5+
// rpcError is a JSON-RPC error returned by the MCP server (do not retry as transport).
6+
type rpcError struct {
7+
detail string
8+
}
9+
10+
func (e *rpcError) Error() string {
11+
return "mcp: " + e.detail
12+
}
13+
14+
func rpcErrorf(format string, args ...any) error {
15+
return &rpcError{detail: fmt.Sprintf(format, args...)}
16+
}

internal/tools/mcp/mcp_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package mcp
2+
3+
import (
4+
"context"
5+
"os/exec"
6+
"path/filepath"
7+
"runtime"
8+
"testing"
9+
10+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
11+
)
12+
13+
func mockMCPExecutable(t *testing.T) string {
14+
t.Helper()
15+
name := "mockmcp"
16+
if runtime.GOOS == "windows" {
17+
name += ".exe"
18+
}
19+
out := filepath.Join(t.TempDir(), name)
20+
cmd := exec.Command("go", "build", "-o", out, "./testdata/mockmcp")
21+
cmd.Dir = "."
22+
if b, err := cmd.CombinedOutput(); err != nil {
23+
t.Fatalf("build mockmcp: %v\n%s", err, b)
24+
}
25+
return out
26+
}
27+
28+
func TestCallStdio_mockSubprocess(t *testing.T) {
29+
bin := mockMCPExecutable(t)
30+
ctx := context.Background()
31+
out, meta, err := CallStdio(ctx, &spec.ToolMCP{
32+
Transport: "stdio",
33+
Command: bin,
34+
}, nil, "any", map[string]any{"repo": "acme/api", "n": float64(3)})
35+
if err != nil {
36+
t.Fatal(err)
37+
}
38+
if meta.DurationMs < 0 {
39+
t.Fatalf("meta %+v", meta)
40+
}
41+
if out["repo"] != "acme/api" || out["n"] != float64(3) {
42+
t.Fatalf("output %+v", out)
43+
}
44+
}
45+
46+
func TestCallStdio_retryOnTransportFailure(t *testing.T) {
47+
ctx := context.Background()
48+
_, _, err := CallStdio(ctx, &spec.ToolMCP{
49+
Transport: "stdio",
50+
Command: "/nonexistent/binary/mcp-missing-xyz",
51+
}, &spec.ToolRetry{MaxAttempts: 2, Backoff: "fixed"}, "x", nil)
52+
if err == nil {
53+
t.Fatal("expected error")
54+
}
55+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Mock MCP stdio server for tests: handles initialize and tools/call by echoing arguments as JSON text.
2+
package main
3+
4+
import (
5+
"bufio"
6+
"encoding/json"
7+
"os"
8+
)
9+
10+
func main() {
11+
sc := bufio.NewScanner(os.Stdin)
12+
enc := json.NewEncoder(os.Stdout)
13+
for sc.Scan() {
14+
line := sc.Bytes()
15+
var msg map[string]any
16+
if err := json.Unmarshal(line, &msg); err != nil {
17+
continue
18+
}
19+
method, _ := msg["method"].(string)
20+
if msg["id"] == nil {
21+
continue
22+
}
23+
id := msg["id"]
24+
switch method {
25+
case "initialize":
26+
_ = enc.Encode(map[string]any{
27+
"jsonrpc": "2.0",
28+
"id": id,
29+
"result": map[string]any{
30+
"protocolVersion": "2024-11-05",
31+
"capabilities": map[string]any{"tools": map[string]any{}},
32+
"serverInfo": map[string]any{"name": "mockmcp", "version": "1"},
33+
},
34+
})
35+
case "tools/call":
36+
params, _ := msg["params"].(map[string]any)
37+
args := map[string]any{}
38+
if params != nil {
39+
if a, ok := params["arguments"].(map[string]any); ok {
40+
args = a
41+
}
42+
}
43+
b, _ := json.Marshal(args)
44+
_ = enc.Encode(map[string]any{
45+
"jsonrpc": "2.0",
46+
"id": id,
47+
"result": map[string]any{
48+
"content": []any{map[string]any{"type": "text", "text": string(b)}},
49+
},
50+
})
51+
default:
52+
_ = enc.Encode(map[string]any{
53+
"jsonrpc": "2.0",
54+
"id": id,
55+
"error": map[string]any{
56+
"code": -32601,
57+
"message": "method not found",
58+
},
59+
})
60+
}
61+
}
62+
}

0 commit comments

Comments
 (0)