Skip to content

Commit ee2f61b

Browse files
author
Lukas Urbonas
committed
feat(adk): propagate parent + root context_id headers in Go runtime
Mirrors the Python ADK change from this PR over into the Go adk KAgentRemoteA2ATool path so multi-hop A2A fleets get the same cross-hop conversation lineage in both runtimes. Single file, no executor change needed: the Go interceptor still has access to the inbound a2asrv.CallContext on ctx (already used by the authzForwardingInterceptor), so the root header can be forwarded unchanged from inbound RequestMeta without mirroring Python's session.state["headers"] copy in _agent_executor.py. Wiring follows the existing kagent precedent in remote_a2a_tool.go: - parent_context_id comes from ctx.SessionID() stashed via a context value at the same call sites that already stash userIDContextKey, matching userIDForwardingInterceptor's pattern; - root_context_id is read from the inbound a2asrv.CallContext via CallContextFrom, matching authzForwardingInterceptor's pattern; - pre-existing headers on req.Meta win, which gives callers using extraHeaders the same override knob Python exposes via header_provider. Test coverage mirrors the five derivation cases in the Python TestLineageHeaderPropagation suite: chain root, mid-chain forward, legacy-parent promotion, no-session-id no-op, caller override wins. Signed-off-by: Lukas Urbonas <lukas.urbonas@surfsharkteam.com>
1 parent d683bae commit ee2f61b

2 files changed

Lines changed: 203 additions & 0 deletions

File tree

go/adk/pkg/tools/remote_a2a_tool.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,33 @@ import (
2222
// userIDContextKey is the context key for passing the session user_id to the subagent.
2323
type userIDContextKey struct{}
2424

25+
// parentContextIDContextKey is the context key carrying this agent's own
26+
// A2A context_id (== ADK session id) into the outbound interceptor so it can
27+
// be stamped as the parent_context_id header on every outbound A2A call.
28+
type parentContextIDContextKey struct{}
29+
30+
// Conversation-lineage headers stamped on outbound A2A calls so a remote
31+
// agent can correlate this turn with the originating chat conversation -
32+
// useful when downstream code keys per-conversation state (sessions, sandbox
33+
// pods, cache entries) on a stable identifier across A2A hops.
34+
//
35+
// ParentContextIDHeader is the immediate caller's A2A context_id (the
36+
// session id of the agent that ran this tool). It changes with every hop in
37+
// a chain of A2A calls.
38+
//
39+
// RootContextIDHeader is the top-of-chain context_id - the agent at the
40+
// start of the chain (typically the user-facing chat agent). It stays
41+
// stable across every hop and across every turn of the same conversation,
42+
// so downstream agents can key state that should outlive a single A2A call
43+
// (e.g. claim a per-conversation worker pod that survives between turns).
44+
//
45+
// Mirrors the Python ADK constants in
46+
// python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py.
47+
const (
48+
ParentContextIDHeader = "x-kagent-parent-context-id"
49+
RootContextIDHeader = "x-kagent-root-context-id"
50+
)
51+
2552
// userIDForwardingInterceptor forwards the session user_id as an x-user-id header.
2653
type userIDForwardingInterceptor struct {
2754
a2aclient.PassthroughInterceptor
@@ -34,6 +61,56 @@ func (u *userIDForwardingInterceptor) Before(ctx context.Context, req *a2aclient
3461
return ctx, nil
3562
}
3663

64+
// lineageHeadersInterceptor stamps the parent + root context_id headers on
65+
// every outbound A2A call. Parent comes from a context value populated by the
66+
// caller (the tool's own ADK session id). Root is forwarded unchanged from the
67+
// inbound A2A request when present (so the value set by the agent at the start
68+
// of the chain survives every hop), with a legacy fallback to the inbound
69+
// parent header for older callers, and a final fallback to our own session id
70+
// when this agent is the chain root.
71+
//
72+
// Pre-existing headers on req.Meta win (analogous to Python's header_provider
73+
// override), so a caller that sets extraHeaders for either header keeps full
74+
// control.
75+
type lineageHeadersInterceptor struct {
76+
a2aclient.PassthroughInterceptor
77+
}
78+
79+
func (l *lineageHeadersInterceptor) Before(ctx context.Context, req *a2aclient.Request) (context.Context, error) {
80+
parent, _ := ctx.Value(parentContextIDContextKey{}).(string)
81+
if parent == "" {
82+
return ctx, nil
83+
}
84+
85+
var inboundRoot, inboundParent string
86+
if callCtx, ok := a2asrv.CallContextFrom(ctx); ok {
87+
if meta := callCtx.RequestMeta(); meta != nil {
88+
if vals, ok := meta.Get(RootContextIDHeader); ok && len(vals) > 0 {
89+
inboundRoot = vals[0]
90+
}
91+
if vals, ok := meta.Get(ParentContextIDHeader); ok && len(vals) > 0 {
92+
inboundParent = vals[0]
93+
}
94+
}
95+
}
96+
97+
root := inboundRoot
98+
if root == "" {
99+
root = inboundParent
100+
}
101+
if root == "" {
102+
root = parent
103+
}
104+
105+
if len(req.Meta.Get(ParentContextIDHeader)) == 0 {
106+
req.Meta.Append(ParentContextIDHeader, parent)
107+
}
108+
if len(req.Meta.Get(RootContextIDHeader)) == 0 {
109+
req.Meta.Append(RootContextIDHeader, root)
110+
}
111+
return ctx, nil
112+
}
113+
37114
// authzForwardingInterceptor forwards the Authorization header from the
38115
// incoming A2A request context to outbound sub-agent A2A calls.
39116
type authzForwardingInterceptor struct {
@@ -150,6 +227,7 @@ func (s *remoteA2AState) ensureClient(ctx context.Context) (*a2aclient.Client, e
150227
interceptors := []a2aclient.CallInterceptor{
151228
a2aclient.NewStaticCallMetaInjector(meta),
152229
&userIDForwardingInterceptor{},
230+
&lineageHeadersInterceptor{},
153231
}
154232
if s.propagateToken {
155233
interceptors = append(interceptors, &authzForwardingInterceptor{})
@@ -192,6 +270,7 @@ func (s *remoteA2AState) handleFirstCall(ctx tool.Context, requestText string) (
192270
message.ContextID = s.lastContextID
193271

194272
sendCtx := context.WithValue(ctx, userIDContextKey{}, ctx.UserID())
273+
sendCtx = context.WithValue(sendCtx, parentContextIDContextKey{}, ctx.SessionID())
195274
result, err := client.SendMessage(sendCtx, &a2atype.MessageSendParams{Message: message})
196275
if err != nil {
197276
slog.Error("Remote agent request failed", "tool", s.name, "error", err)
@@ -242,6 +321,7 @@ func (s *remoteA2AState) handleResume(ctx tool.Context) (map[string]any, error)
242321
}
243322

244323
sendCtx := context.WithValue(ctx, userIDContextKey{}, ctx.UserID())
324+
sendCtx = context.WithValue(sendCtx, parentContextIDContextKey{}, ctx.SessionID())
245325
result, err := client.SendMessage(sendCtx, &a2atype.MessageSendParams{Message: message})
246326
if err != nil {
247327
slog.Error("Remote agent resume failed", "tool", subagentName, "error", err)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package tools
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/a2aproject/a2a-go/a2aclient"
8+
"github.com/a2aproject/a2a-go/a2asrv"
9+
)
10+
11+
// newReq returns an empty outbound client Request with an initialized CallMeta.
12+
func newReq() *a2aclient.Request {
13+
return &a2aclient.Request{Meta: a2aclient.CallMeta{}}
14+
}
15+
16+
// withCallContext returns a context that carries an a2asrv.CallContext whose
17+
// RequestMeta exposes the given inbound headers, so the interceptor's
18+
// CallContextFrom + RequestMeta path can be exercised.
19+
func withCallContext(parent context.Context, inbound map[string][]string) context.Context {
20+
ctx, _ := a2asrv.WithCallContext(parent, a2asrv.NewRequestMeta(inbound))
21+
return ctx
22+
}
23+
24+
// TestLineageHeaderPropagation covers the parent + root context_id header
25+
// derivation. Mirrors the Python TestLineageHeaderPropagation cases in
26+
// python/packages/kagent-adk/tests/unittests/test_remote_a2a_tool.py.
27+
func TestLineageHeaderPropagation(t *testing.T) {
28+
const ownSession = "own-session-123"
29+
const upstreamRoot = "root-from-upstream"
30+
const upstreamParent = "parent-from-upstream"
31+
32+
t.Run("chain root stamps own id as parent and root", func(t *testing.T) {
33+
ctx := context.WithValue(context.Background(), parentContextIDContextKey{}, ownSession)
34+
req := newReq()
35+
36+
if _, err := (&lineageHeadersInterceptor{}).Before(ctx, req); err != nil {
37+
t.Fatalf("Before returned error: %v", err)
38+
}
39+
40+
assertSingleHeader(t, req, ParentContextIDHeader, ownSession)
41+
assertSingleHeader(t, req, RootContextIDHeader, ownSession)
42+
})
43+
44+
t.Run("mid-chain forwards root unchanged and overrides parent with own id", func(t *testing.T) {
45+
ctx := context.WithValue(context.Background(), parentContextIDContextKey{}, ownSession)
46+
ctx = withCallContext(ctx, map[string][]string{
47+
RootContextIDHeader: {upstreamRoot},
48+
ParentContextIDHeader: {upstreamParent},
49+
})
50+
req := newReq()
51+
52+
if _, err := (&lineageHeadersInterceptor{}).Before(ctx, req); err != nil {
53+
t.Fatalf("Before returned error: %v", err)
54+
}
55+
56+
assertSingleHeader(t, req, ParentContextIDHeader, ownSession)
57+
assertSingleHeader(t, req, RootContextIDHeader, upstreamRoot)
58+
})
59+
60+
t.Run("legacy inbound with only parent header promotes it to root", func(t *testing.T) {
61+
ctx := context.WithValue(context.Background(), parentContextIDContextKey{}, ownSession)
62+
ctx = withCallContext(ctx, map[string][]string{
63+
ParentContextIDHeader: {upstreamParent},
64+
})
65+
req := newReq()
66+
67+
if _, err := (&lineageHeadersInterceptor{}).Before(ctx, req); err != nil {
68+
t.Fatalf("Before returned error: %v", err)
69+
}
70+
71+
assertSingleHeader(t, req, ParentContextIDHeader, ownSession)
72+
assertSingleHeader(t, req, RootContextIDHeader, upstreamParent)
73+
})
74+
75+
t.Run("no session id is a no-op", func(t *testing.T) {
76+
// No parentContextIDContextKey on ctx - matches the stub tool_context
77+
// case in Python (empty dict, no headers stamped).
78+
ctx := context.Background()
79+
req := newReq()
80+
81+
if _, err := (&lineageHeadersInterceptor{}).Before(ctx, req); err != nil {
82+
t.Fatalf("Before returned error: %v", err)
83+
}
84+
85+
if got := req.Meta.Get(ParentContextIDHeader); len(got) != 0 {
86+
t.Errorf("expected no parent header, got %v", got)
87+
}
88+
if got := req.Meta.Get(RootContextIDHeader); len(got) != 0 {
89+
t.Errorf("expected no root header, got %v", got)
90+
}
91+
})
92+
93+
t.Run("pre-existing header on req.Meta wins over lineage", func(t *testing.T) {
94+
// Analogous to Python's header_provider override: a caller-supplied
95+
// header that is already present on the outbound request must not be
96+
// overwritten by the lineage interceptor.
97+
ctx := context.WithValue(context.Background(), parentContextIDContextKey{}, ownSession)
98+
ctx = withCallContext(ctx, map[string][]string{
99+
RootContextIDHeader: {upstreamRoot},
100+
})
101+
req := newReq()
102+
req.Meta.Append(ParentContextIDHeader, "caller-override-parent")
103+
req.Meta.Append(RootContextIDHeader, "caller-override-root")
104+
105+
if _, err := (&lineageHeadersInterceptor{}).Before(ctx, req); err != nil {
106+
t.Fatalf("Before returned error: %v", err)
107+
}
108+
109+
assertSingleHeader(t, req, ParentContextIDHeader, "caller-override-parent")
110+
assertSingleHeader(t, req, RootContextIDHeader, "caller-override-root")
111+
})
112+
}
113+
114+
func assertSingleHeader(t *testing.T, req *a2aclient.Request, key, want string) {
115+
t.Helper()
116+
got := req.Meta.Get(key)
117+
if len(got) != 1 {
118+
t.Fatalf("%s: expected exactly 1 value, got %v", key, got)
119+
}
120+
if got[0] != want {
121+
t.Errorf("%s: got %q, want %q", key, got[0], want)
122+
}
123+
}

0 commit comments

Comments
 (0)