Skip to content

Commit d39fa0b

Browse files
supreme-gg-ggjmhbh
andauthored
Migrate from A2A v0 to v1 (part 1) (#1921)
The 1st of 3 PRs for A2A v0 to v1 zero downtime migration. This will go into release 0.10.0 Makes the controller compatible with A2A v0 (trpc-a2a-go) and v1 (a2a-go) data and both v0 and v1 JSON-RPC routes / agent card routes depending on `A2A-Version` header (designed explicitly by the protocol for version negotiations). Replaces **all** internal usages of `trpc-a2a-go` with `a2a-go` except in data conversion code and CLI client code. This ensure that the controller is compatible with old / existing agents and UI in A2A v0 and existing v0 data in db, while adding the capability to communicate with v1 agents and UI during upgrade as they are added in the next part. Follows https://a2a-protocol.org/latest/whats-new-v1/#migration-strategy-recommendations and db conversion reuses as much of existing a2a backward compatible code as possible. --------- Signed-off-by: Jet Chiang <pokyuen.jetchiang-ext@solo.io> Signed-off-by: JM Huibonhoa <jm.huibonhoa@solo.io> Co-authored-by: JM Huibonhoa <jm.huibonhoa@solo.io>
1 parent 7fb3aa6 commit d39fa0b

85 files changed

Lines changed: 2420 additions & 779 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

go/adk/pkg/tools/skills_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ func TestResolveReadPath_AllowsSymlinkedSkillsDirectory(t *testing.T) {
2121
if err != nil {
2222
t.Fatalf("resolveReadPath() error = %v", err)
2323
}
24-
if resolved != skillFile {
25-
t.Fatalf("resolveReadPath() = %q, want %q", resolved, skillFile)
24+
want, err := filepath.EvalSymlinks(skillFile)
25+
if err != nil {
26+
t.Fatalf("EvalSymlinks() error = %v", err)
27+
}
28+
if resolved != want {
29+
t.Fatalf("resolveReadPath() = %q, want %q", resolved, want)
2630
}
2731
}
2832

go/api/config/crd/bases/kagent.dev_agents.yaml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7739,35 +7739,36 @@ spec:
77397739
description: Examples are optional usage examples.
77407740
items:
77417741
type: string
7742+
maxItems: 20
77427743
type: array
77437744
id:
77447745
description: ID is the unique identifier for the skill.
77457746
type: string
77467747
inputModes:
7747-
description: InputModes are the supported input data
7748-
modes/types.
7748+
description: InputModes are the supported input MIME
7749+
types for this skill, overriding the agent's defaults.
77497750
items:
77507751
type: string
77517752
type: array
77527753
name:
77537754
description: Name is the human-readable name of the
77547755
skill.
7756+
minLength: 1
77557757
type: string
77567758
outputModes:
7757-
description: OutputModes are the supported output data
7758-
modes/types.
7759+
description: OutputModes are the supported output MIME
7760+
types for this skill, overriding the agent's defaults.
77597761
items:
77607762
type: string
77617763
type: array
77627764
tags:
77637765
description: Tags are optional tags for categorization.
77647766
items:
77657767
type: string
7768+
maxItems: 20
77667769
type: array
77677770
required:
7768-
- id
77697771
- name
7770-
- tags
77717772
type: object
77727773
minItems: 1
77737774
type: array

go/api/config/crd/bases/kagent.dev_sandboxagents.yaml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5397,35 +5397,36 @@ spec:
53975397
description: Examples are optional usage examples.
53985398
items:
53995399
type: string
5400+
maxItems: 20
54005401
type: array
54015402
id:
54025403
description: ID is the unique identifier for the skill.
54035404
type: string
54045405
inputModes:
5405-
description: InputModes are the supported input data
5406-
modes/types.
5406+
description: InputModes are the supported input MIME
5407+
types for this skill, overriding the agent's defaults.
54075408
items:
54085409
type: string
54095410
type: array
54105411
name:
54115412
description: Name is the human-readable name of the
54125413
skill.
5414+
minLength: 1
54135415
type: string
54145416
outputModes:
5415-
description: OutputModes are the supported output data
5416-
modes/types.
5417+
description: OutputModes are the supported output MIME
5418+
types for this skill, overriding the agent's defaults.
54175419
items:
54185420
type: string
54195421
type: array
54205422
tags:
54215423
description: Tags are optional tags for categorization.
54225424
items:
54235425
type: string
5426+
maxItems: 20
54245427
type: array
54255428
required:
5426-
- id
54275429
- name
5428-
- tags
54295430
type: object
54305431
minItems: 1
54315432
type: array

go/api/database/client.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import (
44
"context"
55
"time"
66

7+
a2a "github.com/a2aproject/a2a-go/v2/a2a"
78
"github.com/kagent-dev/kagent/go/api/v1alpha2"
89
"github.com/pgvector/pgvector-go"
9-
"trpc.group/trpc-go/trpc-a2a-go/protocol"
1010
)
1111

1212
type QueryOptions struct {
@@ -24,8 +24,8 @@ type Client interface {
2424
StoreFeedback(ctx context.Context, feedback *Feedback) error
2525
StoreSession(ctx context.Context, session *Session) error
2626
StoreAgent(ctx context.Context, agent *Agent) error
27-
StoreTask(ctx context.Context, task *protocol.Task) error
28-
StorePushNotification(ctx context.Context, config *protocol.TaskPushNotificationConfig) error
27+
StoreTask(ctx context.Context, task *a2a.Task) error
28+
StorePushNotification(ctx context.Context, config *a2a.PushConfig) error
2929
StoreToolServer(ctx context.Context, toolServer *ToolServer) (*ToolServer, error)
3030
StoreEvents(ctx context.Context, messages ...*Event) error
3131

@@ -40,23 +40,23 @@ type Client interface {
4040
// Get methods
4141
GetSession(ctx context.Context, sessionID string, userID string) (*Session, error)
4242
GetAgent(ctx context.Context, name string) (*Agent, error)
43-
GetTask(ctx context.Context, id string) (*protocol.Task, error)
43+
GetTask(ctx context.Context, id string) (*a2a.Task, error)
4444
GetTool(ctx context.Context, name string) (*Tool, error)
4545
GetToolServer(ctx context.Context, name string) (*ToolServer, error)
46-
GetPushNotification(ctx context.Context, taskID string, configID string) (*protocol.TaskPushNotificationConfig, error)
46+
GetPushNotification(ctx context.Context, taskID string, configID string) (*a2a.PushConfig, error)
4747

4848
// List methods
4949
ListTools(ctx context.Context) ([]Tool, error)
5050
ListFeedback(ctx context.Context, userID string) ([]Feedback, error)
51-
ListTasksForSession(ctx context.Context, sessionID string) ([]*protocol.Task, error)
51+
ListTasksForSession(ctx context.Context, sessionID string) ([]*a2a.Task, error)
5252
ListSessions(ctx context.Context, userID string) ([]Session, error)
5353
ListSessionsForAgent(ctx context.Context, agentID string, userID string) ([]Session, error)
5454
ListSessionsForAgentAllUsers(ctx context.Context, agentID string) ([]Session, error)
5555
ListAgents(ctx context.Context) ([]Agent, error)
5656
ListToolServers(ctx context.Context) ([]ToolServer, error)
5757
ListToolsForServer(ctx context.Context, serverName string, groupKind string) ([]Tool, error)
5858
ListEventsForSession(ctx context.Context, sessionID, userID string, options QueryOptions) ([]*Event, error)
59-
ListPushNotifications(ctx context.Context, taskID string) ([]*protocol.TaskPushNotificationConfig, error)
59+
ListPushNotifications(ctx context.Context, taskID string) ([]*a2a.PushConfig, error)
6060

6161
// Helper methods
6262
RefreshToolsForServer(ctx context.Context, serverName string, groupKind string, tools ...*v1alpha2.MCPTool) error

go/api/database/models.go

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import (
44
"encoding/json"
55
"time"
66

7+
a2a "github.com/a2aproject/a2a-go/v2/a2a"
78
"github.com/kagent-dev/kagent/go/api/adk"
89
"github.com/kagent-dev/kagent/go/api/v1alpha2"
910
"github.com/pgvector/pgvector-go"
10-
"trpc.group/trpc-go/trpc-a2a-go/protocol"
1111
)
1212

1313
type Agent struct {
@@ -32,16 +32,16 @@ type Event struct {
3232
Data string `json:"data"` // JSON-serialized protocol.Message
3333
}
3434

35-
func (m *Event) Parse() (protocol.Message, error) {
36-
var data protocol.Message
35+
func (m *Event) Parse() (a2a.Message, error) {
36+
var data a2a.Message
3737
if err := json.Unmarshal([]byte(m.Data), &data); err != nil {
38-
return protocol.Message{}, err
38+
return a2a.Message{}, err
3939
}
4040
return data, nil
4141
}
4242

43-
func ParseMessages(messages []Event) ([]*protocol.Message, error) {
44-
result := make([]*protocol.Message, 0, len(messages))
43+
func ParseMessages(messages []Event) ([]*a2a.Message, error) {
44+
result := make([]*a2a.Message, 0, len(messages))
4545
for _, message := range messages {
4646
parsed, err := message.Parse()
4747
if err != nil {
@@ -77,24 +77,25 @@ type Session struct {
7777
}
7878

7979
type Task struct {
80-
ID string `json:"id"`
81-
CreatedAt time.Time `json:"created_at"`
82-
UpdatedAt time.Time `json:"updated_at"`
83-
DeletedAt *time.Time `json:"deleted_at,omitempty"`
84-
Data string `json:"data"` // JSON-serialized task data
85-
SessionID string `json:"session_id"`
80+
ID string `json:"id"`
81+
CreatedAt time.Time `json:"created_at"`
82+
UpdatedAt time.Time `json:"updated_at"`
83+
DeletedAt *time.Time `json:"deleted_at,omitempty"`
84+
Data string `json:"data"` // JSON-serialized task data
85+
ProtocolVersion *string `json:"protocol_version,omitempty"`
86+
SessionID string `json:"session_id"`
8687
}
8788

88-
func (t *Task) Parse() (protocol.Task, error) {
89-
var data protocol.Task
89+
func (t *Task) Parse() (a2a.Task, error) {
90+
var data a2a.Task
9091
if err := json.Unmarshal([]byte(t.Data), &data); err != nil {
91-
return protocol.Task{}, err
92+
return a2a.Task{}, err
9293
}
9394
return data, nil
9495
}
9596

96-
func ParseTasks(tasks []Task) ([]*protocol.Task, error) {
97-
result := make([]*protocol.Task, 0, len(tasks))
97+
func ParseTasks(tasks []Task) ([]*a2a.Task, error) {
98+
result := make([]*a2a.Task, 0, len(tasks))
9899
for _, task := range tasks {
99100
parsed, err := task.Parse()
100101
if err != nil {
@@ -106,12 +107,13 @@ func ParseTasks(tasks []Task) ([]*protocol.Task, error) {
106107
}
107108

108109
type PushNotification struct {
109-
ID string `json:"id"`
110-
TaskID string `json:"task_id"`
111-
CreatedAt time.Time `json:"created_at"`
112-
UpdatedAt time.Time `json:"updated_at"`
113-
DeletedAt *time.Time `json:"deleted_at,omitempty"`
114-
Data string `json:"data"` // JSON-serialized push notification config
110+
ID string `json:"id"`
111+
TaskID string `json:"task_id"`
112+
CreatedAt time.Time `json:"created_at"`
113+
UpdatedAt time.Time `json:"updated_at"`
114+
DeletedAt *time.Time `json:"deleted_at,omitempty"`
115+
Data string `json:"data"` // JSON-serialized push notification config
116+
ProtocolVersion *string `json:"protocol_version,omitempty"`
115117
}
116118

117119
// FeedbackIssueType represents the category of feedback issue

go/api/v1alpha2/agent_types.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ import (
2626
"k8s.io/apimachinery/pkg/runtime/schema"
2727
"k8s.io/apimachinery/pkg/types"
2828
"sigs.k8s.io/controller-runtime/pkg/client"
29-
30-
"trpc.group/trpc-go/trpc-a2a-go/server"
3129
)
3230

3331
// AgentType represents the agent type
@@ -557,7 +555,33 @@ type A2AConfig struct {
557555
Skills []AgentSkill `json:"skills,omitempty"`
558556
}
559557

560-
type AgentSkill server.AgentSkill
558+
// AgentSkill describes a specific capability or function of the agent.
559+
type AgentSkill struct {
560+
// ID is the unique identifier for the skill.
561+
// +optional
562+
ID string `json:"id,omitempty"`
563+
// Name is the human-readable name of the skill.
564+
// +kubebuilder:validation:MinLength=1
565+
// +required
566+
Name string `json:"name"`
567+
// Description is an optional detailed description of the skill.
568+
// +optional
569+
Description string `json:"description,omitempty"`
570+
// Tags are optional tags for categorization.
571+
// +optional
572+
// +kubebuilder:validation:MaxItems=20
573+
Tags []string `json:"tags,omitempty"`
574+
// Examples are optional usage examples.
575+
// +optional
576+
// +kubebuilder:validation:MaxItems=20
577+
Examples []string `json:"examples,omitempty"`
578+
// InputModes are the supported input MIME types for this skill, overriding the agent's defaults.
579+
// +optional
580+
InputModes []string `json:"inputModes,omitempty"`
581+
// OutputModes are the supported output MIME types for this skill, overriding the agent's defaults.
582+
// +optional
583+
OutputModes []string `json:"outputModes,omitempty"`
584+
}
561585

562586
const (
563587
AgentConditionTypeAccepted = "Accepted"

go/api/v1alpha2/zz_generated.deepcopy.go

Lines changed: 0 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/core/internal/a2a/a2a_handler_mux.go

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,27 @@ package a2a
33
import (
44
"fmt"
55
"net/http"
6+
"slices"
67
"strings"
78
"sync"
89

10+
a2atype "github.com/a2aproject/a2a-go/v2/a2a"
11+
a2aclient "github.com/a2aproject/a2a-go/v2/a2aclient"
12+
"github.com/a2aproject/a2a-go/v2/a2acompat/a2av0"
13+
"github.com/a2aproject/a2a-go/v2/a2asrv"
914
"github.com/gorilla/mux"
1015
authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth"
1116
common "github.com/kagent-dev/kagent/go/core/internal/utils"
1217
"github.com/kagent-dev/kagent/go/core/pkg/auth"
13-
"trpc.group/trpc-go/trpc-a2a-go/client"
14-
"trpc.group/trpc-go/trpc-a2a-go/server"
1518
)
1619

1720
// A2AHandlerMux is an interface that defines methods for adding, getting, and removing agentic task handlers.
1821
type A2AHandlerMux interface {
1922
SetAgentHandler(
2023
agentRef string,
21-
client *client.A2AClient,
22-
card server.AgentCard,
23-
tracing server.Middleware,
24+
client *a2aclient.Client,
25+
card a2atype.AgentCard,
26+
tracing middleware,
2427
) error
2528
RemoveAgentHandler(
2629
agentRef string,
@@ -38,6 +41,10 @@ type handlerMux struct {
3841

3942
var _ A2AHandlerMux = &handlerMux{}
4043

44+
type middleware interface {
45+
Wrap(next http.Handler) http.Handler
46+
}
47+
4148
func NewA2AHttpMux(agentPathPrefix, sandboxPathPrefix string, authenticator auth.AuthProvider) *handlerMux {
4249
return &handlerMux{
4350
handlers: make(map[string]http.Handler),
@@ -49,23 +56,47 @@ func NewA2AHttpMux(agentPathPrefix, sandboxPathPrefix string, authenticator auth
4956

5057
func (a *handlerMux) SetAgentHandler(
5158
agentRef string,
52-
client *client.A2AClient,
53-
card server.AgentCard,
54-
tracing server.Middleware,
59+
client *a2aclient.Client,
60+
card a2atype.AgentCard,
61+
tracing middleware,
5562
) error {
56-
middlewares := []server.Middleware{authimpl.NewA2AAuthenticator(a.authenticator)}
63+
requestHandler := NewPassthroughRequestHandler(client, &card)
64+
legacyJSONRPCHandler := a2av0.NewJSONRPCHandler(requestHandler)
65+
v1JSONRPCHandler := a2asrv.NewJSONRPCHandler(requestHandler)
66+
cardHandler := a2asrv.NewAgentCardHandler(a2av0.NewStaticAgentCardProducer(&card))
67+
wellKnownPath := "/" + strings.TrimPrefix(a2asrv.WellKnownAgentCardPath, "/")
68+
69+
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
70+
if strings.HasSuffix(r.URL.Path, wellKnownPath) {
71+
cardHandler.ServeHTTP(w, r)
72+
return
73+
}
74+
wireVersion, err := common.NegotiateA2AWireVersion(r)
75+
if err != nil {
76+
http.Error(w, err.Error(), http.StatusBadRequest)
77+
return
78+
}
79+
switch wireVersion {
80+
case common.A2AWireVersionLegacy:
81+
legacyJSONRPCHandler.ServeHTTP(w, r)
82+
case common.A2AWireVersionV1:
83+
v1JSONRPCHandler.ServeHTTP(w, r)
84+
default:
85+
http.Error(w, fmt.Sprintf("unknown negotiated A2A wire version %q", wireVersion), http.StatusBadRequest)
86+
}
87+
})
88+
middlewares := []middleware{authimpl.NewA2AAuthenticator(a.authenticator)}
5789
if tracing != nil {
5890
middlewares = append(middlewares, tracing)
5991
}
60-
srv, err := server.NewA2AServer(card, NewPassthroughManager(client), server.WithMiddleWare(middlewares...))
61-
if err != nil {
62-
return fmt.Errorf("failed to create A2A server: %w", err)
92+
for _, middleware := range slices.Backward(middlewares) {
93+
handler = middleware.Wrap(handler)
6394
}
6495

6596
a.lock.Lock()
6697
defer a.lock.Unlock()
6798

68-
a.handlers[agentRef] = srv.Handler()
99+
a.handlers[agentRef] = handler
69100

70101
return nil
71102
}

0 commit comments

Comments
 (0)