Skip to content

Commit c7e0aea

Browse files
feat(telemetry): Go runtime tracing and go-adk 1.0.0 (#1618)
- Adds OTLP exporter setup similar to python runtime - Adds span processor to add various span attributes for python adk parity - Bumps go-adk to latest v1.0.0 - Aligns various A2A messaging behaviour between Python and Go so clients can switch between the two without having to rewrite A2A client logic, this includes streaming and persisted tasks and events An example trace with go runtime agent and subagent: <img width="1697" height="732" alt="Screenshot 2026-04-02 at 3 49 22 PM" src="https://github.com/user-attachments/assets/42068a10-d9ec-4a40-af12-d99981fda1db" /> --------- Signed-off-by: Jet Chiang <pokyuen.jetchiang-ext@solo.io>
1 parent 418efad commit c7e0aea

14 files changed

Lines changed: 613 additions & 115 deletions

File tree

go/adk/cmd/main.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
kagentmemory "github.com/kagent-dev/kagent/go/adk/pkg/memory"
1919
runnerpkg "github.com/kagent-dev/kagent/go/adk/pkg/runner"
2020
"github.com/kagent-dev/kagent/go/adk/pkg/session"
21+
"github.com/kagent-dev/kagent/go/adk/pkg/telemetry"
2122
"go.uber.org/zap"
2223
"go.uber.org/zap/zapcore"
2324
)
@@ -93,8 +94,38 @@ func main() {
9394
"sseTools", len(agentConfig.SseTools),
9495
"remoteAgents", len(agentConfig.RemoteAgents))
9596

97+
kagentName := os.Getenv("KAGENT_NAME")
98+
kagentNamespace := os.Getenv("KAGENT_NAMESPACE")
99+
96100
// Derive app name from env or agent card.
97-
appName := deriveAppName(agentCard, logger)
101+
appName := deriveAppName(kagentName, kagentNamespace, agentCard, logger)
102+
103+
// Fall back to appName / "default" so traces always have a non-empty service identity.
104+
serviceNameSource := kagentName
105+
if serviceNameSource == "" {
106+
serviceNameSource = appName
107+
}
108+
serviceNamespaceSource := kagentNamespace
109+
if serviceNamespaceSource == "" {
110+
serviceNamespaceSource = "default"
111+
}
112+
serviceName := strings.ReplaceAll(serviceNameSource, "-", "_")
113+
serviceNamespace := strings.ReplaceAll(serviceNamespaceSource, "-", "_")
114+
shutdownTelemetry, telemetryEnabled, telErr := telemetry.Init(context.Background(), serviceName, serviceNamespace)
115+
if telErr != nil {
116+
logger.Error(telErr, "Failed to initialize ADK telemetry providers; continuing without telemetry export")
117+
} else if telemetryEnabled {
118+
defer func() {
119+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
120+
defer cancel()
121+
if err := shutdownTelemetry(shutdownCtx); err != nil {
122+
logger.Error(err, "Failed to shutdown telemetry providers cleanly")
123+
}
124+
}()
125+
logger.Info("ADK telemetry initialized")
126+
} else {
127+
logger.Info("ADK telemetry disabled (set OTEL_TRACING_ENABLED or OTEL_LOGGING_ENABLED to true)")
128+
}
98129

99130
// Create authenticated HTTP client when kagent persistence is enabled.
100131
// This client is shared between the executor's session service and
@@ -195,10 +226,7 @@ func main() {
195226
}
196227
}
197228

198-
func deriveAppName(agentCard *a2atype.AgentCard, logger logr.Logger) string {
199-
kagentName := os.Getenv("KAGENT_NAME")
200-
kagentNamespace := os.Getenv("KAGENT_NAMESPACE")
201-
229+
func deriveAppName(kagentName, kagentNamespace string, agentCard *a2atype.AgentCard, logger logr.Logger) string {
202230
if kagentNamespace != "" && kagentName != "" {
203231
namespace := strings.ReplaceAll(kagentNamespace, "-", "_")
204232
name := strings.ReplaceAll(kagentName, "-", "_")

go/adk/pkg/a2a/executor.go

Lines changed: 49 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/kagent-dev/kagent/go/adk/pkg/session"
1414
"github.com/kagent-dev/kagent/go/adk/pkg/skills"
1515
"github.com/kagent-dev/kagent/go/adk/pkg/telemetry"
16+
"go.opentelemetry.io/otel/attribute"
1617
adkagent "google.golang.org/adk/agent"
1718
"google.golang.org/adk/runner"
1819
"google.golang.org/adk/server/adka2a"
@@ -130,6 +131,8 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
130131
spanAttributes["kagent.app_name"] = e.appName
131132
}
132133
ctx = telemetry.SetKAgentSpanAttributes(ctx, spanAttributes)
134+
ctx, invocationSpan := telemetry.StartInvocationSpan(ctx)
135+
defer invocationSpan.End()
133136

134137
// 3. Initialize skills session path.
135138
if e.skillsDirectory != "" && sessionID != "" {
@@ -189,11 +192,20 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
189192

190193
// 9. Emit initial events.
191194
if reqCtx.StoredTask == nil {
192-
// New task — emit submitted.
193-
submitted := a2atype.NewStatusUpdateEvent(reqCtx, a2atype.TaskStateSubmitted, nil)
195+
// New task — emit submitted with the user's message
196+
submitted := a2atype.NewStatusUpdateEvent(reqCtx, a2atype.TaskStateSubmitted, reqCtx.Message)
194197
if err := queue.Write(ctx, submitted); err != nil {
195198
return fmt.Errorf("failed to write submitted event: %w", err)
196199
}
200+
} else if ExtractDecisionFromMessage(reqCtx.Message) != "" {
201+
// a2a-go appends incoming message to task history before executor runs.
202+
// See https://github.com/a2aproject/a2a-go/blob/v0.3.13/a2asrv/agentexec.go#L188
203+
// Remove the pre-appended copy and emit one decision status event.
204+
dropPreAppendedDecisionFromHistory(reqCtx.StoredTask, reqCtx.Message)
205+
decision := a2atype.NewStatusUpdateEvent(reqCtx, a2atype.TaskStateWorking, reqCtx.Message)
206+
if err := queue.Write(ctx, decision); err != nil {
207+
return fmt.Errorf("failed to write HITL decision status event: %w", err)
208+
}
197209
}
198210

199211
// Base metadata carried on every event (app_name, user_id, session_id).
@@ -217,13 +229,10 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
217229

218230
// State tracked across the event loop.
219231
var (
220-
invocationID string
221-
artifactID a2atype.ArtifactID
222-
partialArtifactID a2atype.ArtifactID
223-
lastTextParts a2atype.ContentParts
224-
lastTextMeta map[string]any
225-
hitlParts a2atype.ContentParts
226-
runErr error
232+
invocationID string
233+
lastNonPartialParts a2atype.ContentParts
234+
hitlParts a2atype.ContentParts
235+
runErr error
227236
)
228237

229238
for adkEvent, adkErr := range r.Run(ctx, userID, sessionID, content, runConfig) {
@@ -238,6 +247,7 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
238247
// Track invocation ID from the first event that has one.
239248
if adkEvent.InvocationID != "" && invocationID == "" {
240249
invocationID = adkEvent.InvocationID
250+
invocationSpan.SetAttributes(attribute.String("gcp.vertex.agent.invocation_id", invocationID))
241251
}
242252

243253
// Build per-event metadata (inherits baseMeta + adds invocation_id, usage etc.).
@@ -299,8 +309,11 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
299309
}
300310

301311
if adkEvent.Partial {
302-
// Partial event: emit as working status (text-only) for UI streaming,
303-
// and as partial artifact update.
312+
// Partial event: emit as working status (text-only) for UI streaming.
313+
// Note: Go ADK executor uses TaskArtifactUpdateEvent for partial events,
314+
// so we don't need to emit a separate partial artifact update.
315+
// However, this is done here in order to match the Python executor's behavior.
316+
// Go ADK executor also uses different A2A response formats than Python ADK.
304317
textOnly := filterTextParts(a2aParts)
305318
if len(textOnly) > 0 {
306319
mirrorMeta := maps.Clone(eventMeta)
@@ -313,28 +326,7 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
313326
return fmt.Errorf("failed to write partial status event: %w", err)
314327
}
315328
}
316-
317-
// Emit partial artifact update.
318-
var partialEv *a2atype.TaskArtifactUpdateEvent
319-
if partialArtifactID == "" {
320-
partialEv = a2atype.NewArtifactEvent(reqCtx, a2aParts...)
321-
partialArtifactID = partialEv.Artifact.ID
322-
} else {
323-
partialEv = a2atype.NewArtifactUpdateEvent(reqCtx, partialArtifactID, a2aParts...)
324-
}
325-
if partialEv.Artifact.Metadata == nil {
326-
partialEv.Artifact.Metadata = map[string]any{}
327-
}
328-
partialEv.Artifact.Metadata[adka2a.ToA2AMetaKey("partial")] = true
329-
partialEv.Metadata = maps.Clone(eventMeta)
330-
partialEv.Metadata[adka2a.ToA2AMetaKey("partial")] = true
331-
partialEv.Append = false // discard partial events
332-
if err := queue.Write(ctx, partialEv); err != nil {
333-
return fmt.Errorf("failed to write partial artifact event: %w", err)
334-
}
335329
} else {
336-
// Non-partial event: emit as artifact + mirror as working status.
337-
// Mirror: emit working status with non-empty parts.
338330
mirrorParts := a2aParts
339331
if len(hitlParts) == 0 {
340332
// Only mirror when not accumulating HITL parts (those go into input_required).
@@ -345,27 +337,7 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
345337
if err := queue.Write(ctx, statusEv); err != nil {
346338
return fmt.Errorf("failed to write mirror status event: %w", err)
347339
}
348-
}
349-
350-
// Track last text parts for final status injection.
351-
if tp := filterTextParts(a2aParts); len(tp) > 0 {
352-
lastTextParts = tp
353-
lastTextMeta = eventMeta
354-
}
355-
356-
// Emit artifact.
357-
var artifactEv *a2atype.TaskArtifactUpdateEvent
358-
if artifactID == "" {
359-
artifactEv = a2atype.NewArtifactEvent(reqCtx, a2aParts...)
360-
artifactID = artifactEv.Artifact.ID
361-
} else {
362-
artifactEv = a2atype.NewArtifactUpdateEvent(reqCtx, artifactID, a2aParts...)
363-
}
364-
// Mark as non-partial so downstream consumers skip their own aggregation.
365-
artifactEv.Metadata = maps.Clone(eventMeta)
366-
artifactEv.Metadata[adka2a.ToA2AMetaKey("partial")] = false
367-
if err := queue.Write(ctx, artifactEv); err != nil {
368-
return fmt.Errorf("failed to write artifact event: %w", err)
340+
lastNonPartialParts = mirrorParts
369341
}
370342
}
371343

@@ -398,15 +370,16 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont
398370
return queue.Write(ctx, inputRequired)
399371
}
400372

401-
// completed: inject last text into final status if no message present.
402-
var finalMsg *a2atype.Message
403-
if len(lastTextParts) > 0 {
404-
finalMsg = a2atype.NewMessage(a2atype.MessageRoleAgent, lastTextParts...)
405-
if len(lastTextMeta) > 0 {
406-
finalMsg.Metadata = maps.Clone(lastTextMeta)
373+
// Final artifact update with lastChunk=true (if we have parts) and final completed status update (no message payload).
374+
if len(lastNonPartialParts) > 0 {
375+
finalArtifact := a2atype.NewArtifactEvent(reqCtx, lastNonPartialParts...)
376+
finalArtifact.LastChunk = true
377+
if err := queue.Write(ctx, finalArtifact); err != nil {
378+
return fmt.Errorf("failed to write final artifact event: %w", err)
407379
}
408380
}
409-
completed := a2atype.NewStatusUpdateEvent(reqCtx, a2atype.TaskStateCompleted, finalMsg)
381+
382+
completed := a2atype.NewStatusUpdateEvent(reqCtx, a2atype.TaskStateCompleted, nil)
410383
completed.Final = true
411384
completed.Metadata = finalMeta
412385
return queue.Write(ctx, completed)
@@ -434,3 +407,19 @@ func extractSessionName(message *a2atype.Message) string {
434407
}
435408
return ""
436409
}
410+
411+
// dropPreAppendedDecisionFromHistory removes a pre-appended HITL decision
412+
// message inserted by a2a-go before executor invocation.
413+
func dropPreAppendedDecisionFromHistory(task *a2atype.Task, incoming *a2atype.Message) {
414+
if task == nil || incoming == nil || len(task.History) == 0 {
415+
return
416+
}
417+
last := task.History[len(task.History)-1]
418+
if last == nil || last.ID != incoming.ID {
419+
return
420+
}
421+
if ExtractDecisionFromMessage(last) == "" {
422+
return
423+
}
424+
task.History = task.History[:len(task.History)-1]
425+
}

go/adk/pkg/a2a/server/server.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
a2atype "github.com/a2aproject/a2a-go/a2a"
1313
"github.com/a2aproject/a2a-go/a2asrv"
1414
"github.com/go-logr/logr"
15+
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
1516
)
1617

1718
// ServerConfig holds configuration for the A2A server.
@@ -38,6 +39,23 @@ func NewA2AServer(agentCard a2atype.AgentCard, executor a2asrv.AgentExecutor, lo
3839
RegisterHealthEndpoints(mux)
3940
mux.Handle(a2asrv.WellKnownAgentCardPath, a2asrv.NewStaticAgentCardHandler(&agentCard))
4041
mux.Handle("/", jsonrpcHandler)
42+
// Wrap the whole server mux to enable trace context extraction and an inbound
43+
// HTTP server span for each request.
44+
instrumentedHandler := otelhttp.NewHandler(
45+
mux,
46+
"a2a-server",
47+
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
48+
return r.Method + " " + r.URL.Path
49+
}),
50+
otelhttp.WithFilter(func(r *http.Request) bool {
51+
switch r.URL.Path {
52+
case "/health", "/healthz", a2asrv.WellKnownAgentCardPath:
53+
return false
54+
default:
55+
return true
56+
}
57+
}),
58+
)
4159

4260
addr := ":" + config.Port
4361
if config.Host != "" {
@@ -47,7 +65,7 @@ func NewA2AServer(agentCard a2atype.AgentCard, executor a2asrv.AgentExecutor, lo
4765
return &A2AServer{
4866
httpServer: &http.Server{
4967
Addr: addr,
50-
Handler: mux,
68+
Handler: instrumentedHandler,
5169
},
5270
logger: logger,
5371
config: config,

go/adk/pkg/memory/kagent_service.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ func New(cfg Config) (*KagentMemoryService, error) {
8080
}, nil
8181
}
8282

83-
// AddSession implements memory.Service.AddSession.
83+
// AddSessionToMemory implements memory.Service.AddSessionToMemory.
8484
// It extracts content from the session, optionally summarizes it, generates embeddings,
8585
// and stores it via the Kagent API.
86-
func (s *KagentMemoryService) AddSession(ctx context.Context, session adksession.Session) error {
86+
func (s *KagentMemoryService) AddSessionToMemory(ctx context.Context, session adksession.Session) error {
8787
log := logr.FromContextOrDiscard(ctx)
8888
log.V(1).Info("Adding session to memory", "sessionID", session.ID(), "userID", session.UserID())
8989

@@ -162,9 +162,9 @@ func (s *KagentMemoryService) storeMemory(ctx context.Context, userID, content s
162162
return nil
163163
}
164164

165-
// Search implements memory.Service.Search.
165+
// SearchMemory implements memory.Service.SearchMemory.
166166
// It searches for relevant memories using vector similarity.
167-
func (s *KagentMemoryService) Search(ctx context.Context, req *memory.SearchRequest) (*memory.SearchResponse, error) {
167+
func (s *KagentMemoryService) SearchMemory(ctx context.Context, req *memory.SearchRequest) (*memory.SearchResponse, error) {
168168
log := logr.FromContextOrDiscard(ctx)
169169
log.V(1).Info("Searching memory", "query", req.Query, "userID", req.UserID)
170170

go/adk/pkg/memory/kagent_service_test.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"iter"
77
"net/http"
88
"net/http/httptest"
9+
"strings"
910
"testing"
1011
"time"
1112

@@ -146,7 +147,7 @@ func TestKagentMemoryService_AddSession(t *testing.T) {
146147
model: nil, // No summarization
147148
}
148149

149-
err := svc.AddSession(context.Background(), tt.session)
150+
err := svc.AddSessionToMemory(context.Background(), tt.session)
150151
if (err != nil) != tt.wantErr {
151152
t.Errorf("AddSession() error = %v, wantErr %v", err, tt.wantErr)
152153
}
@@ -251,7 +252,7 @@ func TestKagentMemoryService_Search(t *testing.T) {
251252
model: nil,
252253
}
253254

254-
resp, err := svc.Search(context.Background(), &memory.SearchRequest{
255+
resp, err := svc.SearchMemory(context.Background(), &memory.SearchRequest{
255256
Query: tt.query,
256257
UserID: tt.userID,
257258
})
@@ -407,7 +408,7 @@ func TestKagentMemoryService_ExtractSessionContent(t *testing.T) {
407408
t.Error("Expected non-empty content, got empty")
408409
}
409410

410-
if tt.wantContain != "" && !contains(content, tt.wantContain) {
411+
if tt.wantContain != "" && !strings.Contains(content, tt.wantContain) {
411412
t.Errorf("Expected content to contain %q, got: %s", tt.wantContain, content)
412413
}
413414
})
@@ -566,16 +567,3 @@ func newMockEventWithFunctionCall(author, functionName string) *adksession.Event
566567
}
567568
return evt
568569
}
569-
570-
func contains(s, substr string) bool {
571-
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
572-
}
573-
574-
func containsHelper(s, substr string) bool {
575-
for i := 0; i <= len(s)-len(substr); i++ {
576-
if s[i:i+len(substr)] == substr {
577-
return true
578-
}
579-
}
580-
return false
581-
}

0 commit comments

Comments
 (0)