Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/tools/doc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package tools defines tool registries and integrations (MCP, HTTP, native).
//
// [Registry] resolves tool.<name>.<operation> uses strings and dispatches MVP native and mock tools.
// [Registry] resolves tool.<name>.<operation> uses strings and dispatches MVP native, mock, and MCP stdio tools.
// Responses use [ToolCallResponse] with output + meta per §13.2.
package tools
106 changes: 106 additions & 0 deletions internal/tools/mcp/call.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package mcp

import (
"context"
"errors"
"strings"
"time"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
)

// ExecMeta is timing/cost metadata for an MCP call (§13.2 placeholders).
type ExecMeta struct {
DurationMs int64
CostUSD float64
}

// CallStdio runs one MCP tools/call over a fresh stdio subprocess, with optional retries on transport errors (§13.4).
func CallStdio(ctx context.Context, cfg *spec.ToolMCP, retry *spec.ToolRetry, toolName string, arguments map[string]any) (map[string]any, ExecMeta, error) {
if cfg == nil {
return nil, ExecMeta{}, errors.New("mcp: nil mcp config")
}
if strings.ToLower(strings.TrimSpace(cfg.Transport)) != "stdio" {
return nil, ExecMeta{}, errors.New("mcp: only transport stdio is supported in MVP")
}
cmd := strings.TrimSpace(cfg.Command)
if cmd == "" {
return nil, ExecMeta{}, errors.New("mcp: empty command")
}

attempts := 1
if retry != nil && retry.MaxAttempts > 0 {
attempts = retry.MaxAttempts
}
backoff := ""
if retry != nil {
backoff = retry.Backoff
}

startAll := time.Now()
var lastErr error
for attempt := 0; attempt < attempts; attempt++ {
if attempt > 0 {
sleepBackoff(ctx, attempt, backoff)
}
out, err := oneStdioAttempt(ctx, cmd, cfg.Args, toolName, arguments)
if err == nil {
return out, ExecMeta{DurationMs: time.Since(startAll).Milliseconds(), CostUSD: 0}, nil
}
lastErr = err
if !retryableTransportErr(err) {
break
}
}
return nil, ExecMeta{DurationMs: time.Since(startAll).Milliseconds(), CostUSD: 0}, lastErr
}

func oneStdioAttempt(ctx context.Context, command string, args []string, toolName string, arguments map[string]any) (map[string]any, error) {
tr := NewStdioTransport(command, args)
if err := tr.Start(ctx); err != nil {
return nil, err
}
defer tr.Close()

if err := tr.Initialize(ctx); err != nil {
return nil, err
}
return tr.CallTool(ctx, toolName, arguments)
}

func retryableTransportErr(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
var re *rpcError
if errors.As(err, &re) {
return false
}
return true
}

func sleepBackoff(ctx context.Context, attempt int, kind string) {
if attempt <= 0 {
return
}
var d time.Duration
switch strings.ToLower(strings.TrimSpace(kind)) {
case "exponential":
shift := attempt
if shift > 8 {
shift = 8
}
d = time.Millisecond * time.Duration(50*(1<<shift))
case "fixed":
d = 100 * time.Millisecond
default:
d = 50 * time.Millisecond
}
select {
case <-ctx.Done():
case <-time.After(d):
}
}
73 changes: 73 additions & 0 deletions internal/tools/mcp/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package mcp

import (
"context"
"encoding/json"
"fmt"
)

// Initialize performs the MCP initialize + notifications/initialized handshake.
func (t *StdioTransport) Initialize(ctx context.Context) error {
params := map[string]any{
"protocolVersion": "2024-11-05",
"capabilities": map[string]any{},
"clientInfo": map[string]any{
"name": "agentic-control-plane",
"version": "0",
},
}
if _, err := t.RoundTrip(ctx, "initialize", params); err != nil {
return err
}
// Notification (no response).
t.mu.Lock()
defer t.mu.Unlock()
return t.writeMessage(map[string]any{
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": map[string]any{},
})
}

// CallTool invokes tools/call and maps the MCP result into a plain map for §13.2 output.
func (t *StdioTransport) CallTool(ctx context.Context, name string, arguments map[string]any) (map[string]any, error) {
if arguments == nil {
arguments = map[string]any{}
}
raw, err := t.RoundTrip(ctx, "tools/call", map[string]any{
"name": name,
"arguments": arguments,
})
if err != nil {
return nil, err
}
return parseCallToolResult(raw)
}

func parseCallToolResult(raw json.RawMessage) (map[string]any, error) {
var envelope struct {
Content []struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"content"`
IsError bool `json:"isError"`
}
if err := json.Unmarshal(raw, &envelope); err != nil {
return nil, fmt.Errorf("mcp: decode tools/call result: %w", err)
}
if envelope.IsError {
return nil, rpcErrorf("tools/call isError=true: %s", string(raw))
}
if len(envelope.Content) == 0 {
return map[string]any{}, nil
}
first := envelope.Content[0]
if first.Type == "text" && first.Text != "" {
var obj map[string]any
if json.Unmarshal([]byte(first.Text), &obj) == nil {
return obj, nil
}
return map[string]any{"text": first.Text}, nil
}
return map[string]any{"content": envelope.Content}, nil
}
4 changes: 4 additions & 0 deletions internal/tools/mcp/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Package mcp implements MCP tool transport (MVP: stdio JSON-RPC) per design doc §7.3.
//
// [CallStdio] runs a subprocess, performs initialize / notifications/initialized, then tools/call.
package mcp
16 changes: 16 additions & 0 deletions internal/tools/mcp/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mcp

import "fmt"

// rpcError is a JSON-RPC error returned by the MCP server (do not retry as transport).
type rpcError struct {
detail string
}

func (e *rpcError) Error() string {
return "mcp: " + e.detail
}

func rpcErrorf(format string, args ...any) error {
return &rpcError{detail: fmt.Sprintf(format, args...)}
}
55 changes: 55 additions & 0 deletions internal/tools/mcp/mcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package mcp

import (
"context"
"os/exec"
"path/filepath"
"runtime"
"testing"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
)

func mockMCPExecutable(t *testing.T) string {
t.Helper()
name := "mockmcp"
if runtime.GOOS == "windows" {
name += ".exe"
}
out := filepath.Join(t.TempDir(), name)
cmd := exec.Command("go", "build", "-o", out, "./testdata/mockmcp")
cmd.Dir = "."
if b, err := cmd.CombinedOutput(); err != nil {
t.Fatalf("build mockmcp: %v\n%s", err, b)
}
return out
}

func TestCallStdio_mockSubprocess(t *testing.T) {
bin := mockMCPExecutable(t)
ctx := context.Background()
out, meta, err := CallStdio(ctx, &spec.ToolMCP{
Transport: "stdio",
Command: bin,
}, nil, "any", map[string]any{"repo": "acme/api", "n": float64(3)})
if err != nil {
t.Fatal(err)
}
if meta.DurationMs < 0 {
t.Fatalf("meta %+v", meta)
}
if out["repo"] != "acme/api" || out["n"] != float64(3) {
t.Fatalf("output %+v", out)
}
}

func TestCallStdio_retryOnTransportFailure(t *testing.T) {
ctx := context.Background()
_, _, err := CallStdio(ctx, &spec.ToolMCP{
Transport: "stdio",
Command: "/nonexistent/binary/mcp-missing-xyz",
}, &spec.ToolRetry{MaxAttempts: 2, Backoff: "fixed"}, "x", nil)
if err == nil {
t.Fatal("expected error")
}
}
62 changes: 62 additions & 0 deletions internal/tools/mcp/testdata/mockmcp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Mock MCP stdio server for tests: handles initialize and tools/call by echoing arguments as JSON text.
package main

import (
"bufio"
"encoding/json"
"os"
)

func main() {
sc := bufio.NewScanner(os.Stdin)
enc := json.NewEncoder(os.Stdout)
for sc.Scan() {
line := sc.Bytes()
var msg map[string]any
if err := json.Unmarshal(line, &msg); err != nil {
continue
}
method, _ := msg["method"].(string)
if msg["id"] == nil {
continue
}
id := msg["id"]
switch method {
case "initialize":
_ = enc.Encode(map[string]any{
"jsonrpc": "2.0",
"id": id,
"result": map[string]any{
"protocolVersion": "2024-11-05",
"capabilities": map[string]any{"tools": map[string]any{}},
"serverInfo": map[string]any{"name": "mockmcp", "version": "1"},
},
})
case "tools/call":
params, _ := msg["params"].(map[string]any)
args := map[string]any{}
if params != nil {
if a, ok := params["arguments"].(map[string]any); ok {
args = a
}
}
b, _ := json.Marshal(args)
_ = enc.Encode(map[string]any{
"jsonrpc": "2.0",
"id": id,
"result": map[string]any{
"content": []any{map[string]any{"type": "text", "text": string(b)}},
},
})
default:
_ = enc.Encode(map[string]any{
"jsonrpc": "2.0",
"id": id,
"error": map[string]any{
"code": -32601,
"message": "method not found",
},
})
}
}
}
Loading
Loading