Skip to content

Commit d4c326d

Browse files
committed
some fixes
Signed-off-by: Jet Chiang <pokyuen.jetchiang-ext@solo.io>
1 parent fc5123f commit d4c326d

6 files changed

Lines changed: 70 additions & 42 deletions

File tree

go/core/internal/a2a/a2a_handler_mux.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
a2atype "github.com/a2aproject/a2a-go/v2/a2a"
1010
a2aclient "github.com/a2aproject/a2a-go/v2/a2aclient"
11+
"github.com/a2aproject/a2a-go/v2/a2acompat/a2av0"
1112
"github.com/a2aproject/a2a-go/v2/a2asrv"
1213
"github.com/gorilla/mux"
1314
authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth"
@@ -59,7 +60,8 @@ func (a *handlerMux) SetAgentHandler(
5960
tracing middleware,
6061
) error {
6162
requestHandler := a2asrv.NewHandler(NewPassthroughExecutor(client))
62-
jsonrpcHandler := a2asrv.NewJSONRPCHandler(requestHandler)
63+
legacyJSONRPCHandler := a2av0.NewJSONRPCHandler(requestHandler)
64+
v1JSONRPCHandler := a2asrv.NewJSONRPCHandler(requestHandler)
6365
cardHandler := a2asrv.NewStaticAgentCardHandler(&card)
6466
wellKnownPath := "/" + strings.TrimPrefix(a2asrv.WellKnownAgentCardPath, "/")
6567

@@ -68,7 +70,19 @@ func (a *handlerMux) SetAgentHandler(
6870
cardHandler.ServeHTTP(w, r)
6971
return
7072
}
71-
jsonrpcHandler.ServeHTTP(w, r)
73+
wireVersion, err := common.NegotiateA2AWireVersion(r)
74+
if err != nil {
75+
http.Error(w, err.Error(), http.StatusBadRequest)
76+
return
77+
}
78+
switch wireVersion {
79+
case common.A2AWireVersionLegacy:
80+
legacyJSONRPCHandler.ServeHTTP(w, r)
81+
case common.A2AWireVersionV1:
82+
v1JSONRPCHandler.ServeHTTP(w, r)
83+
default:
84+
http.Error(w, fmt.Sprintf("unknown negotiated A2A wire version %q", wireVersion), http.StatusBadRequest)
85+
}
7286
})
7387
middlewares := []middleware{authimpl.NewA2AAuthenticator(a.authenticator)}
7488
if tracing != nil {

go/core/internal/a2a/a2a_registrar.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
a2atype "github.com/a2aproject/a2a-go/v2/a2a"
1212
a2aclient "github.com/a2aproject/a2a-go/v2/a2aclient"
13+
"github.com/a2aproject/a2a-go/v2/a2acompat/a2av0"
1314
"github.com/go-logr/logr"
1415
"github.com/kagent-dev/kagent/go/api/v1alpha2"
1516
agent_translator "github.com/kagent-dev/kagent/go/core/internal/controller/translator/agent"
@@ -161,6 +162,17 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag
161162
// TODO: Switch this to 1.0 in release 0.11.0 when all agents are migrated to v1
162163
filterInterfacesByVersion(card.SupportedInterfaces, a2atype.ProtocolVersion("0.3")),
163164
a2aclient.WithJSONRPCTransport(httpClient),
165+
// TODO: Remove this in release 0.11.0 when all agents are migrated to v1
166+
a2aclient.WithCompatTransport(
167+
a2atype.ProtocolVersion("0.3"),
168+
a2atype.TransportProtocolJSONRPC,
169+
a2aclient.TransportFactoryFn(func(_ context.Context, _ *a2atype.AgentCard, iface *a2atype.AgentInterface) (a2aclient.Transport, error) {
170+
return a2av0.NewJSONRPCTransport(a2av0.JSONRPCTransportConfig{
171+
URL: iface.URL,
172+
Client: httpClient,
173+
}), nil
174+
}),
175+
),
164176
a2aclient.WithCallInterceptors(
165177
NewUpstreamAuthInterceptor(a.authenticator, agentRef),
166178
),

go/core/internal/httpserver/handlers/a2a_version.go

Lines changed: 0 additions & 29 deletions
This file was deleted.

go/core/internal/httpserver/handlers/sessions.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -347,15 +347,15 @@ func (h *SessionsHandler) HandleListTasksForSession(w ErrorResponseWriter, r *ht
347347
w.RespondWithError(errors.NewInternalServerError("Failed to get session runs", err))
348348
return
349349
}
350-
wireVersion, err := negotiatedA2AWireVersion(r)
350+
wireVersion, err := utils.NegotiateA2AWireVersion(r)
351351
if err != nil {
352352
w.RespondWithError(errors.NewBadRequestError("Unsupported A2A version", err))
353353
return
354354
}
355355

356356
log.Info("Successfully retrieved session tasks", "count", len(tasks))
357357
switch wireVersion {
358-
case a2aWireV0:
358+
case utils.A2AWireVersionLegacy:
359359
legacyTasks := make([]any, 0, len(tasks))
360360
for i := range tasks {
361361
legacyTask, convErr := trpcv0.ToLegacyTask(tasks[i])
@@ -367,7 +367,7 @@ func (h *SessionsHandler) HandleListTasksForSession(w ErrorResponseWriter, r *ht
367367
}
368368
data := api.NewResponse(legacyTasks, "Successfully retrieved session tasks", false)
369369
RespondWithJSON(w, http.StatusOK, data)
370-
case a2aWireV1:
370+
case utils.A2AWireVersionV1:
371371
data := api.NewResponse(tasks, "Successfully retrieved session tasks", false)
372372
RespondWithJSON(w, http.StatusOK, data)
373373
default:

go/core/internal/httpserver/handlers/tasks.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
a2a "github.com/a2aproject/a2a-go/v2/a2a"
88
api "github.com/kagent-dev/kagent/go/api/httpapi"
99
"github.com/kagent-dev/kagent/go/core/internal/httpserver/errors"
10+
"github.com/kagent-dev/kagent/go/core/internal/utils"
1011
"github.com/kagent-dev/kagent/go/core/pkg/a2acompat/trpcv0"
1112
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
1213
"trpc.group/trpc-go/trpc-a2a-go/protocol"
@@ -37,7 +38,7 @@ func (h *TasksHandler) HandleGetTask(w ErrorResponseWriter, r *http.Request) {
3738
w.RespondWithError(errors.NewNotFoundError("Task not found", err))
3839
return
3940
}
40-
wireVersion, err := negotiatedA2AWireVersion(r)
41+
wireVersion, err := utils.NegotiateA2AWireVersion(r)
4142
if err != nil {
4243
w.RespondWithError(errors.NewBadRequestError("Unsupported A2A version", err))
4344
return
@@ -46,14 +47,14 @@ func (h *TasksHandler) HandleGetTask(w ErrorResponseWriter, r *http.Request) {
4647
log.Info("Successfully retrieved task")
4748
var data any
4849
switch wireVersion {
49-
case a2aWireV0:
50+
case utils.A2AWireVersionLegacy:
5051
legacyTask, convErr := trpcv0.ToLegacyTask(task)
5152
if convErr != nil {
5253
w.RespondWithError(errors.NewInternalServerError("Failed to convert task", convErr))
5354
return
5455
}
5556
data = legacyTask
56-
case a2aWireV1:
57+
case utils.A2AWireVersionV1:
5758
data = task
5859
default:
5960
w.RespondWithError(errors.NewBadRequestError("Unsupported A2A version", fmt.Errorf("unknown negotiated wire version %q", wireVersion)))
@@ -66,15 +67,15 @@ func (h *TasksHandler) HandleGetTask(w ErrorResponseWriter, r *http.Request) {
6667
func (h *TasksHandler) HandleCreateTask(w ErrorResponseWriter, r *http.Request) {
6768
log := ctrllog.FromContext(r.Context()).WithName("tasks-handler").WithValues("operation", "create-task")
6869

69-
wireVersion, err := negotiatedA2AWireVersion(r)
70+
wireVersion, err := utils.NegotiateA2AWireVersion(r)
7071
if err != nil {
7172
w.RespondWithError(errors.NewBadRequestError("Unsupported A2A version", err))
7273
return
7374
}
7475

7576
task := a2a.Task{}
7677
switch wireVersion {
77-
case a2aWireV0:
78+
case utils.A2AWireVersionLegacy:
7879
legacyTask := protocol.Task{}
7980
if err := DecodeJSONBody(r, &legacyTask); err != nil {
8081
w.RespondWithError(errors.NewBadRequestError("Invalid request body", err))
@@ -88,7 +89,7 @@ func (h *TasksHandler) HandleCreateTask(w ErrorResponseWriter, r *http.Request)
8889
if converted != nil {
8990
task = *converted
9091
}
91-
case a2aWireV1:
92+
case utils.A2AWireVersionV1:
9293
if err := DecodeJSONBody(r, &task); err != nil {
9394
w.RespondWithError(errors.NewBadRequestError("Invalid request body", err))
9495
return
@@ -110,14 +111,14 @@ func (h *TasksHandler) HandleCreateTask(w ErrorResponseWriter, r *http.Request)
110111
log.Info("Successfully created task")
111112
var data any
112113
switch wireVersion {
113-
case a2aWireV0:
114+
case utils.A2AWireVersionLegacy:
114115
legacyTask, convErr := trpcv0.ToLegacyTask(&task)
115116
if convErr != nil {
116117
w.RespondWithError(errors.NewInternalServerError("Failed to convert task", convErr))
117118
return
118119
}
119120
data = legacyTask
120-
case a2aWireV1:
121+
case utils.A2AWireVersionV1:
121122
data = task
122123
default:
123124
w.RespondWithError(errors.NewBadRequestError("Unsupported A2A version", fmt.Errorf("unknown negotiated wire version %q", wireVersion)))
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package utils
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
7+
a2atype "github.com/a2aproject/a2a-go/v2/a2a"
8+
"github.com/a2aproject/a2a-go/v2/a2acompat/a2av0"
9+
)
10+
11+
type A2AWireVersion string
12+
13+
const (
14+
A2AWireVersionLegacy A2AWireVersion = "v0"
15+
A2AWireVersionV1 A2AWireVersion = "v1"
16+
)
17+
18+
// NegotiateA2AWireVersion returns the A2A wire version requested by the client.
19+
// Missing or explicit 0.3 headers use the legacy/current kagent A2A wire shape.
20+
func NegotiateA2AWireVersion(r *http.Request) (A2AWireVersion, error) {
21+
version := r.Header.Get(a2atype.SvcParamVersion)
22+
switch version {
23+
case "", string(a2av0.Version):
24+
return A2AWireVersionLegacy, nil
25+
case string(a2atype.Version):
26+
return A2AWireVersionV1, nil
27+
default:
28+
return "", fmt.Errorf("unsupported A2A version %q", version)
29+
}
30+
}

0 commit comments

Comments
 (0)