Skip to content

Commit f0cc8b9

Browse files
authored
Merge branch 'main' into add-env
2 parents 752f4fa + e9d43f6 commit f0cc8b9

14 files changed

Lines changed: 324 additions & 152 deletions

File tree

go/core/internal/a2a/a2a_registrar.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
type A2ARegistrar struct {
2727
cache crcache.Cache
2828
handlerMux A2AHandlerMux
29+
clientRegistry *AgentClientRegistry
2930
a2aBaseURL string
3031
sandboxA2AURL string
3132
authenticator auth.AuthProvider
@@ -37,27 +38,32 @@ var _ manager.Runnable = (*A2ARegistrar)(nil)
3738
func NewA2ARegistrar(
3839
cache crcache.Cache,
3940
mux A2AHandlerMux,
41+
clientRegistry *AgentClientRegistry,
4042
a2aBaseUrl string,
4143
sandboxA2ABaseURL string,
4244
authenticator auth.AuthProvider,
4345
streamingMaxBuf int,
4446
streamingInitialBuf int,
4547
streamingTimeout time.Duration,
46-
) *A2ARegistrar {
48+
) (*A2ARegistrar, error) {
49+
if clientRegistry == nil {
50+
return nil, fmt.Errorf("clientRegistry must not be nil")
51+
}
4752
reg := &A2ARegistrar{
48-
cache: cache,
49-
handlerMux: mux,
50-
a2aBaseURL: a2aBaseUrl,
51-
sandboxA2AURL: sandboxA2ABaseURL,
52-
authenticator: authenticator,
53+
cache: cache,
54+
handlerMux: mux,
55+
clientRegistry: clientRegistry,
56+
a2aBaseURL: a2aBaseUrl,
57+
sandboxA2AURL: sandboxA2ABaseURL,
58+
authenticator: authenticator,
5359
a2aBaseOptions: []a2aclient.Option{
5460
a2aclient.WithTimeout(streamingTimeout),
5561
a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf),
5662
debugOpt(),
5763
},
5864
}
5965

60-
return reg
66+
return reg, nil
6167
}
6268

6369
func (a *A2ARegistrar) NeedLeaderElection() bool {
@@ -117,6 +123,7 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
117123
}
118124
ref := a2aRouteKey(agent)
119125
a.handlerMux.RemoveAgentHandler(ref)
126+
a.clientRegistry.delete(ref)
120127
log.V(1).Info("removed A2A handler", "agent", ref)
121128
},
122129
}); err != nil {
@@ -182,10 +189,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag
182189
cardCopy := *card
183190
cardCopy.URL = a.a2aRouteURL(agent)
184191

185-
if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
192+
routeRef := a2aRouteKey(agent)
193+
if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
186194
return fmt.Errorf("set handler for %s: %w", agentRef, err)
187195
}
188196

197+
a.clientRegistry.set(routeRef, client)
198+
189199
log.V(1).Info("registered/updated A2A handler", "agent", agentRef)
190200
return nil
191201
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package a2a
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
9+
"trpc.group/trpc-go/trpc-a2a-go/protocol"
10+
)
11+
12+
// AgentClientRegistry maps agent route keys to their A2A clients.
13+
// The A2ARegistrar populates it; the MCP handler reads from it to invoke
14+
// agents without an HTTP round trip through the controller's own A2A listener.
15+
type AgentClientRegistry struct {
16+
mu sync.RWMutex
17+
clients map[string]*a2aclient.A2AClient
18+
}
19+
20+
func NewAgentClientRegistry() *AgentClientRegistry {
21+
return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)}
22+
}
23+
24+
// set stores the client under the agent's route key (e.g. "namespace/name" or
25+
// "sandboxes/namespace/name").
26+
func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) {
27+
r.mu.Lock()
28+
defer r.mu.Unlock()
29+
r.clients[agentRef] = c
30+
}
31+
32+
// delete removes the client for the given agent route key.
33+
func (r *AgentClientRegistry) delete(agentRef string) {
34+
r.mu.Lock()
35+
defer r.mu.Unlock()
36+
delete(r.clients, agentRef)
37+
}
38+
39+
// Register adds or replaces the A2A client for the given agent. It is the
40+
// exported counterpart of set, intended for use in tests and explicit
41+
// registrations outside the A2ARegistrar lifecycle.
42+
func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) {
43+
r.set(namespace+"/"+name, c)
44+
}
45+
46+
// SendMessage invokes an agent directly via its cached A2A client.
47+
// namespace and name must identify a non-sandbox agent; sandbox agents use a
48+
// different route key and are not yet reachable via this method.
49+
func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) {
50+
key := namespace + "/" + name
51+
r.mu.RLock()
52+
c, ok := r.clients[key]
53+
r.mu.RUnlock()
54+
if !ok {
55+
return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name)
56+
}
57+
return c.SendMessage(ctx, params)
58+
}

go/core/internal/mcp/mcp_handler.go

Lines changed: 12 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,26 @@ import (
55
"fmt"
66
"net/http"
77
"strings"
8-
"sync"
9-
"time"
108

119
"github.com/google/jsonschema-go/jsonschema"
1210
"github.com/kagent-dev/kagent/go/api/v1alpha2"
1311
"github.com/kagent-dev/kagent/go/core/internal/a2a"
14-
authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth"
1512
"github.com/kagent-dev/kagent/go/core/internal/version"
1613
"github.com/kagent-dev/kagent/go/core/pkg/auth"
1714
"github.com/kagent-dev/kagent/go/core/pkg/env"
1815
mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp"
19-
"k8s.io/apimachinery/pkg/types"
2016
"sigs.k8s.io/controller-runtime/pkg/client"
2117
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
22-
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
2318
"trpc.group/trpc-go/trpc-a2a-go/protocol"
2419
)
2520

2621
// MCPHandler handles MCP requests and bridges them to A2A endpoints
2722
type MCPHandler struct {
2823
kubeClient client.Client
29-
a2aBaseURL string
30-
a2aTimeout time.Duration
24+
agentClients *a2a.AgentClientRegistry
3125
authenticator auth.AuthProvider
3226
httpHandler *mcpsdk.StreamableHTTPHandler
3327
server *mcpsdk.Server
34-
a2aClients sync.Map
3528
}
3629

3730
// Input types for MCP tools
@@ -58,20 +51,12 @@ type InvokeAgentOutput struct {
5851
ContextID string `json:"context_id,omitempty"`
5952
}
6053

61-
// defaultA2ATimeout is the fallback timeout for A2A client calls and should match
62-
// the configured default streaming timeout.
63-
const defaultA2ATimeout = 10 * time.Minute
64-
65-
// NewMCPHandler creates a new MCP handler
66-
// Wraps the StreamableHTTPHandler and adds A2A bridging and context management.
67-
func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider, a2aTimeout time.Duration) (*MCPHandler, error) {
68-
if a2aTimeout <= 0 {
69-
a2aTimeout = defaultA2ATimeout
70-
}
54+
// NewMCPHandler creates a new MCP handler that bridges MCP tool calls directly
55+
// to agent A2A clients, bypassing the controller's own HTTP A2A listener.
56+
func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegistry, authenticator auth.AuthProvider) (*MCPHandler, error) {
7157
handler := &MCPHandler{
7258
kubeClient: kubeClient,
73-
a2aBaseURL: a2aBaseURL,
74-
a2aTimeout: a2aTimeout,
59+
agentClients: agentClients,
7560
authenticator: authenticator,
7661
}
7762

@@ -198,18 +183,18 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR
198183
func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) {
199184
log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent")
200185

201-
// Parse agent reference (namespace/name or just name)
202-
agentNS, agentName, ok := strings.Cut(input.Agent, "/")
203-
if !ok {
186+
// Parse agent reference — must be exactly "namespace/name".
187+
parts := strings.SplitN(input.Agent, "/", 3)
188+
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
204189
return &mcpsdk.CallToolResult{
205190
Content: []mcpsdk.Content{
206191
&mcpsdk.TextContent{Text: "agent must be in format 'namespace/name'"},
207192
},
208193
IsError: true,
209194
}, InvokeAgentOutput{}, nil
210195
}
196+
agentNS, agentName := parts[0], parts[1]
211197
agentRef := agentNS + "/" + agentName
212-
agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName}
213198

214199
// Get context ID from client request (stateless mode)
215200
// If not provided, contextIDPtr will be nil and a new conversation will start
@@ -219,47 +204,9 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool
219204
log.V(1).Info("Using context_id from client request", "context_id", input.ContextID)
220205
}
221206

222-
// Get or create cached A2A client for this agent
223-
a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef)
224-
var a2aClient *a2aclient.A2AClient
225-
226-
if cached, ok := h.a2aClients.Load(agentRef); ok {
227-
if client, ok := cached.(*a2aclient.A2AClient); ok {
228-
a2aClient = client
229-
}
230-
}
231-
232-
// Create new client if not cached
233-
if a2aClient == nil {
234-
// Build A2A client options with authentication propagation
235-
a2aOpts := []a2aclient.Option{
236-
a2aclient.WithTimeout(h.a2aTimeout),
237-
a2aclient.WithHTTPReqHandler(
238-
authimpl.A2ARequestHandler(
239-
h.authenticator,
240-
agentNns,
241-
),
242-
),
243-
}
244-
245-
newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...)
246-
if err != nil {
247-
log.Error(err, "Failed to create A2A client", "agent", agentRef)
248-
return &mcpsdk.CallToolResult{
249-
Content: []mcpsdk.Content{
250-
&mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)},
251-
},
252-
IsError: true,
253-
}, InvokeAgentOutput{}, nil
254-
}
255-
256-
// Cache the client
257-
h.a2aClients.Store(agentRef, newClient)
258-
a2aClient = newClient
259-
}
260-
261-
// Send message via A2A
262-
result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{
207+
// Send message directly via the agent's A2A client, bypassing the
208+
// controller's own HTTP A2A listener.
209+
result, err := h.agentClients.SendMessage(ctx, agentNS, agentName, protocol.SendMessageParams{
263210
Message: protocol.Message{
264211
Kind: protocol.KindMessage,
265212
Role: protocol.MessageRoleUser,

0 commit comments

Comments
 (0)