Skip to content

Commit 3524e75

Browse files
onematchfoxclaudeEItanya
authored
refactor(controller): invoke agents directly in MCP handler (#1855)
Replace the HTTP round-trip through the controller's own A2A listener with direct invocation via a new `AgentClientRegistry`. The registry is owned by `A2ARegistrar`, which already maintains an `A2AClient` per agent for its HTTP mux — the registry gives the MCP handler access to those same clients without an extra network hop. The old approach routed through the controller's public A2A endpoint, meaning requests could traverse the external network (and any ingress or load-balancer in front of it) unnecessarily. The new path stays in-process. The old handler also cached its own `A2AClient` per agent in a `sync.Map` with no eviction, so clients for deleted agents would remain indefinitely. The registry is kept consistent by the registrar's add/update/delete lifecycle, eliminating that staleness. `A2ARegistrar.upsertAgentHandler` writes to both the HTTP mux (for inbound /api/a2a/<ns>/<name>/ routing) and the registry (for direct invocation). The registry is exposed via `ClientRegistry()` and passed to `NewMCPHandler` in app.go. ~Note:~ ~- currently includes changes from #1853 as well since this needs to adjust the tests added in that PR.~ --------- Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Eitan Yarmush <eitan.yarmush@solo.io>
1 parent 59f5a65 commit 3524e75

5 files changed

Lines changed: 250 additions & 80 deletions

File tree

go/core/internal/a2a/a2a_registrar.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
type A2ARegistrar struct {
2727
cache crcache.Cache
2828
handlerMux A2AHandlerMux
29+
clientRegistry *AgentClientRegistry
2930
a2aBaseURL string
3031
sandboxA2AURL string
3132
authenticator auth.AuthProvider
@@ -37,27 +38,32 @@ var _ manager.Runnable = (*A2ARegistrar)(nil)
3738
func NewA2ARegistrar(
3839
cache crcache.Cache,
3940
mux A2AHandlerMux,
41+
clientRegistry *AgentClientRegistry,
4042
a2aBaseUrl string,
4143
sandboxA2ABaseURL string,
4244
authenticator auth.AuthProvider,
4345
streamingMaxBuf int,
4446
streamingInitialBuf int,
4547
streamingTimeout time.Duration,
46-
) *A2ARegistrar {
48+
) (*A2ARegistrar, error) {
49+
if clientRegistry == nil {
50+
return nil, fmt.Errorf("clientRegistry must not be nil")
51+
}
4752
reg := &A2ARegistrar{
48-
cache: cache,
49-
handlerMux: mux,
50-
a2aBaseURL: a2aBaseUrl,
51-
sandboxA2AURL: sandboxA2ABaseURL,
52-
authenticator: authenticator,
53+
cache: cache,
54+
handlerMux: mux,
55+
clientRegistry: clientRegistry,
56+
a2aBaseURL: a2aBaseUrl,
57+
sandboxA2AURL: sandboxA2ABaseURL,
58+
authenticator: authenticator,
5359
a2aBaseOptions: []a2aclient.Option{
5460
a2aclient.WithTimeout(streamingTimeout),
5561
a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf),
5662
debugOpt(),
5763
},
5864
}
5965

60-
return reg
66+
return reg, nil
6167
}
6268

6369
func (a *A2ARegistrar) NeedLeaderElection() bool {
@@ -117,6 +123,7 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
117123
}
118124
ref := a2aRouteKey(agent)
119125
a.handlerMux.RemoveAgentHandler(ref)
126+
a.clientRegistry.delete(ref)
120127
log.V(1).Info("removed A2A handler", "agent", ref)
121128
},
122129
}); err != nil {
@@ -182,10 +189,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag
182189
cardCopy := *card
183190
cardCopy.URL = a.a2aRouteURL(agent)
184191

185-
if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
192+
routeRef := a2aRouteKey(agent)
193+
if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
186194
return fmt.Errorf("set handler for %s: %w", agentRef, err)
187195
}
188196

197+
a.clientRegistry.set(routeRef, client)
198+
189199
log.V(1).Info("registered/updated A2A handler", "agent", agentRef)
190200
return nil
191201
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package a2a
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
9+
"trpc.group/trpc-go/trpc-a2a-go/protocol"
10+
)
11+
12+
// AgentClientRegistry maps agent route keys to their A2A clients.
13+
// The A2ARegistrar populates it; the MCP handler reads from it to invoke
14+
// agents without an HTTP round trip through the controller's own A2A listener.
15+
type AgentClientRegistry struct {
16+
mu sync.RWMutex
17+
clients map[string]*a2aclient.A2AClient
18+
}
19+
20+
func NewAgentClientRegistry() *AgentClientRegistry {
21+
return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)}
22+
}
23+
24+
// set stores the client under the agent's route key (e.g. "namespace/name" or
25+
// "sandboxes/namespace/name").
26+
func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) {
27+
r.mu.Lock()
28+
defer r.mu.Unlock()
29+
r.clients[agentRef] = c
30+
}
31+
32+
// delete removes the client for the given agent route key.
33+
func (r *AgentClientRegistry) delete(agentRef string) {
34+
r.mu.Lock()
35+
defer r.mu.Unlock()
36+
delete(r.clients, agentRef)
37+
}
38+
39+
// Register adds or replaces the A2A client for the given agent. It is the
40+
// exported counterpart of set, intended for use in tests and explicit
41+
// registrations outside the A2ARegistrar lifecycle.
42+
func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) {
43+
r.set(namespace+"/"+name, c)
44+
}
45+
46+
// SendMessage invokes an agent directly via its cached A2A client.
47+
// namespace and name must identify a non-sandbox agent; sandbox agents use a
48+
// different route key and are not yet reachable via this method.
49+
func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) {
50+
key := namespace + "/" + name
51+
r.mu.RLock()
52+
c, ok := r.clients[key]
53+
r.mu.RUnlock()
54+
if !ok {
55+
return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name)
56+
}
57+
return c.SendMessage(ctx, params)
58+
}

go/core/internal/mcp/mcp_handler.go

Lines changed: 12 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,26 @@ import (
55
"fmt"
66
"net/http"
77
"strings"
8-
"sync"
9-
"time"
108

119
"github.com/google/jsonschema-go/jsonschema"
1210
"github.com/kagent-dev/kagent/go/api/v1alpha2"
1311
"github.com/kagent-dev/kagent/go/core/internal/a2a"
14-
authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth"
1512
"github.com/kagent-dev/kagent/go/core/internal/version"
1613
"github.com/kagent-dev/kagent/go/core/pkg/auth"
1714
"github.com/kagent-dev/kagent/go/core/pkg/env"
1815
mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp"
19-
"k8s.io/apimachinery/pkg/types"
2016
"sigs.k8s.io/controller-runtime/pkg/client"
2117
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
22-
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
2318
"trpc.group/trpc-go/trpc-a2a-go/protocol"
2419
)
2520

2621
// MCPHandler handles MCP requests and bridges them to A2A endpoints
2722
type MCPHandler struct {
2823
kubeClient client.Client
29-
a2aBaseURL string
30-
a2aTimeout time.Duration
24+
agentClients *a2a.AgentClientRegistry
3125
authenticator auth.AuthProvider
3226
httpHandler *mcpsdk.StreamableHTTPHandler
3327
server *mcpsdk.Server
34-
a2aClients sync.Map
3528
}
3629

3730
// Input types for MCP tools
@@ -58,20 +51,12 @@ type InvokeAgentOutput struct {
5851
ContextID string `json:"context_id,omitempty"`
5952
}
6053

61-
// defaultA2ATimeout is the fallback timeout for A2A client calls and should match
62-
// the configured default streaming timeout.
63-
const defaultA2ATimeout = 10 * time.Minute
64-
65-
// NewMCPHandler creates a new MCP handler
66-
// Wraps the StreamableHTTPHandler and adds A2A bridging and context management.
67-
func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider, a2aTimeout time.Duration) (*MCPHandler, error) {
68-
if a2aTimeout <= 0 {
69-
a2aTimeout = defaultA2ATimeout
70-
}
54+
// NewMCPHandler creates a new MCP handler that bridges MCP tool calls directly
55+
// to agent A2A clients, bypassing the controller's own HTTP A2A listener.
56+
func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegistry, authenticator auth.AuthProvider) (*MCPHandler, error) {
7157
handler := &MCPHandler{
7258
kubeClient: kubeClient,
73-
a2aBaseURL: a2aBaseURL,
74-
a2aTimeout: a2aTimeout,
59+
agentClients: agentClients,
7560
authenticator: authenticator,
7661
}
7762

@@ -198,18 +183,18 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR
198183
func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) {
199184
log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent")
200185

201-
// Parse agent reference (namespace/name or just name)
202-
agentNS, agentName, ok := strings.Cut(input.Agent, "/")
203-
if !ok {
186+
// Parse agent reference — must be exactly "namespace/name".
187+
parts := strings.SplitN(input.Agent, "/", 3)
188+
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
204189
return &mcpsdk.CallToolResult{
205190
Content: []mcpsdk.Content{
206191
&mcpsdk.TextContent{Text: "agent must be in format 'namespace/name'"},
207192
},
208193
IsError: true,
209194
}, InvokeAgentOutput{}, nil
210195
}
196+
agentNS, agentName := parts[0], parts[1]
211197
agentRef := agentNS + "/" + agentName
212-
agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName}
213198

214199
// Get context ID from client request (stateless mode)
215200
// If not provided, contextIDPtr will be nil and a new conversation will start
@@ -219,47 +204,9 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool
219204
log.V(1).Info("Using context_id from client request", "context_id", input.ContextID)
220205
}
221206

222-
// Get or create cached A2A client for this agent
223-
a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef)
224-
var a2aClient *a2aclient.A2AClient
225-
226-
if cached, ok := h.a2aClients.Load(agentRef); ok {
227-
if client, ok := cached.(*a2aclient.A2AClient); ok {
228-
a2aClient = client
229-
}
230-
}
231-
232-
// Create new client if not cached
233-
if a2aClient == nil {
234-
// Build A2A client options with authentication propagation
235-
a2aOpts := []a2aclient.Option{
236-
a2aclient.WithTimeout(h.a2aTimeout),
237-
a2aclient.WithHTTPReqHandler(
238-
authimpl.A2ARequestHandler(
239-
h.authenticator,
240-
agentNns,
241-
),
242-
),
243-
}
244-
245-
newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...)
246-
if err != nil {
247-
log.Error(err, "Failed to create A2A client", "agent", agentRef)
248-
return &mcpsdk.CallToolResult{
249-
Content: []mcpsdk.Content{
250-
&mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)},
251-
},
252-
IsError: true,
253-
}, InvokeAgentOutput{}, nil
254-
}
255-
256-
// Cache the client
257-
h.a2aClients.Store(agentRef, newClient)
258-
a2aClient = newClient
259-
}
260-
261-
// Send message via A2A
262-
result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{
207+
// Send message directly via the agent's A2A client, bypassing the
208+
// controller's own HTTP A2A listener.
209+
result, err := h.agentClients.SendMessage(ctx, agentNS, agentName, protocol.SendMessageParams{
263210
Message: protocol.Message{
264211
Kind: protocol.KindMessage,
265212
Role: protocol.MessageRoleUser,

0 commit comments

Comments
 (0)