Skip to content

Commit d59acf5

Browse files
jerm-droclaude
andauthored
Add E2E test for MCPServer cross-replica session routing with Redis (#4525)
* Add E2E test for MCPServer cross-replica session routing with Redis MCPServer supports horizontal scaling with Redis session storage, but there was no E2E test verifying that a session established on one pod is accessible from a different pod. This test deploys an MCPServer with replicas=2 and Redis session storage, initializes an MCP session, then sends raw JSON-RPC requests directly to each pod IP using the same Mcp-Session-Id header to prove sessions are shared via Redis. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Use kubectl port-forward instead of pod IPs for cross-replica test Pod IPs are not reachable from the CI runner host in Kind clusters. Replace direct pod IP HTTP calls with kubectl port-forward to each pod, which tunnels through the Kind node's network. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Wire Redis session storage into MCPServer proxy runner The MCPServer CRD's sessionStorage config was populated by the operator into RunConfig but the proxy runner never read it — sessions always used in-memory LocalStorage, making cross-replica routing non-functional. Add WithSessionStorage transport option and wire ScalingConfig.SessionRedis from RunConfig into the transport layer so both StdioTransport and HTTPTransport (transparent proxy) use Redis-backed session storage when configured. Rewrite the E2E test to use mcp-go clients throughout, including transport.WithSession to create a client on pod B that reuses the session established on pod A. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Move session storage to transport Config, fix codespell Pass SessionStorage through types.Config instead of a factory option with interface assertion. The factory now sets the field directly on each transport type during construction. Add clientA to codespell ignore list. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Address review comments - Move Redis image to test/e2e/images/images.go (RedisImage constant) - Move RedisPasswordEnvVar to pkg/transport/session/redis_config.go to avoid proxy runner depending on pkg/vmcp/config - Remove unused vmcpconfig import from runner.go Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 51fda65 commit d59acf5

File tree

8 files changed

+404
-5
lines changed

8 files changed

+404
-5
lines changed

.codespellrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[codespell]
2-
ignore-words-list = NotIn,notin,AfterAll,ND,aks,deriver,te
2+
ignore-words-list = NotIn,notin,AfterAll,ND,aks,deriver,te,clientA
33
skip = *.svg,*.mod,*.sum

pkg/runner/runner.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/stacklok/toolhive/pkg/secrets"
3434
"github.com/stacklok/toolhive/pkg/telemetry"
3535
"github.com/stacklok/toolhive/pkg/transport"
36+
"github.com/stacklok/toolhive/pkg/transport/session"
3637
"github.com/stacklok/toolhive/pkg/transport/types"
3738
"github.com/stacklok/toolhive/pkg/workloads/statuses"
3839
)
@@ -353,6 +354,31 @@ func (r *Runner) Run(ctx context.Context) error {
353354
}
354355
}
355356

357+
// When Redis session storage is configured, create a Redis-backed session store
358+
// so sessions are shared across proxy replicas instead of being pod-local.
359+
if r.Config.ScalingConfig != nil && r.Config.ScalingConfig.SessionRedis != nil {
360+
redisCfg := r.Config.ScalingConfig.SessionRedis
361+
keyPrefix := redisCfg.KeyPrefix
362+
if keyPrefix == "" {
363+
keyPrefix = "thv:proxy:session:"
364+
}
365+
storage, err := session.NewRedisStorage(ctx, session.RedisConfig{
366+
Addr: redisCfg.Address,
367+
Password: os.Getenv(session.RedisPasswordEnvVar),
368+
DB: int(redisCfg.DB),
369+
KeyPrefix: keyPrefix,
370+
}, session.DefaultSessionTTL)
371+
if err != nil {
372+
return fmt.Errorf("failed to create Redis session storage: %w", err)
373+
}
374+
slog.Info("using Redis session storage",
375+
"address", redisCfg.Address,
376+
"db", redisCfg.DB,
377+
"key_prefix", keyPrefix,
378+
)
379+
transportConfig.SessionStorage = storage
380+
}
381+
356382
// Create transport with options
357383
transportHandler, err := transport.NewFactory().Create(transportConfig, transportOpts...)
358384
if err != nil {

pkg/transport/factory.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,17 @@ func (*Factory) Create(config types.Config, opts ...Option) (types.Transport, er
4747

4848
switch config.Type {
4949
case types.TransportTypeStdio:
50-
tr = NewStdioTransport(
50+
stdio := NewStdioTransport(
5151
config.Host, config.ProxyPort, config.Deployer, config.Debug, config.TrustProxyHeaders,
5252
config.PrometheusHandler, config.Middlewares...,
5353
)
54-
tr.(*StdioTransport).SetProxyMode(config.ProxyMode)
54+
stdio.SetProxyMode(config.ProxyMode)
55+
if config.SessionStorage != nil {
56+
stdio.SetSessionStorage(config.SessionStorage)
57+
}
58+
tr = stdio
5559
case types.TransportTypeSSE:
56-
tr = NewHTTPTransport(
60+
httpTransport := NewHTTPTransport(
5761
types.TransportTypeSSE,
5862
config.Host,
5963
config.ProxyPort,
@@ -68,8 +72,10 @@ func (*Factory) Create(config types.Config, opts ...Option) (types.Transport, er
6872
config.TrustProxyHeaders,
6973
config.Middlewares...,
7074
)
75+
httpTransport.sessionStorage = config.SessionStorage
76+
tr = httpTransport
7177
case types.TransportTypeStreamableHTTP:
72-
tr = NewHTTPTransport(
78+
httpTransport := NewHTTPTransport(
7379
types.TransportTypeStreamableHTTP,
7480
config.Host,
7581
config.ProxyPort,
@@ -84,6 +90,8 @@ func (*Factory) Create(config types.Config, opts ...Option) (types.Transport, er
8490
config.TrustProxyHeaders,
8591
config.Middlewares...,
8692
)
93+
httpTransport.sessionStorage = config.SessionStorage
94+
tr = httpTransport
8795
case types.TransportTypeInspector:
8896
// HTTP transport is not implemented yet
8997
return nil, errors.ErrUnsupportedTransport

pkg/transport/http.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
transporterrors "github.com/stacklok/toolhive/pkg/transport/errors"
2323
"github.com/stacklok/toolhive/pkg/transport/middleware"
2424
"github.com/stacklok/toolhive/pkg/transport/proxy/transparent"
25+
"github.com/stacklok/toolhive/pkg/transport/session"
2526
"github.com/stacklok/toolhive/pkg/transport/types"
2627
)
2728

@@ -73,6 +74,10 @@ type HTTPTransport struct {
7374
// Mutex for protecting shared state
7475
mutex sync.Mutex
7576

77+
// sessionStorage overrides the default in-memory session store when set.
78+
// Used for Redis-backed session sharing across replicas.
79+
sessionStorage session.Storage
80+
7681
// Transparent proxy
7782
proxy types.Proxy
7883

@@ -236,6 +241,8 @@ func (t *HTTPTransport) setTargetURI(targetURI string) {
236241

237242
// Start initializes the transport and begins processing messages.
238243
// The transport is responsible for starting the container.
244+
//
245+
//nolint:gocyclo // Start is a candidate for refactoring; keeping this PR focused on the Redis wiring
239246
func (t *HTTPTransport) Start(ctx context.Context) error {
240247
t.mutex.Lock()
241248
defer t.mutex.Unlock()
@@ -319,6 +326,9 @@ func (t *HTTPTransport) Start(ctx context.Context) error {
319326
proxyOptions = append(proxyOptions, transparent.WithRemoteBasePath(remoteBasePath))
320327
}
321328
proxyOptions = append(proxyOptions, transparent.WithRemoteRawQuery(remoteRawQuery))
329+
if t.sessionStorage != nil {
330+
proxyOptions = append(proxyOptions, transparent.WithSessionStorage(t.sessionStorage))
331+
}
322332

323333
// Create the transparent proxy
324334
t.proxy = transparent.NewTransparentProxyWithOptions(

pkg/transport/session/redis_config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ package session
55

66
import "time"
77

8+
// RedisPasswordEnvVar is the environment variable name for the Redis session storage password.
9+
// The operator injects this as a SecretKeyRef when sessionStorage.provider is "redis"
10+
// and passwordRef is set.
11+
// #nosec G101 -- This is an environment variable name, not a hardcoded credential
12+
const RedisPasswordEnvVar = "THV_SESSION_REDIS_PASSWORD"
13+
814
// Default timeouts for Redis operations.
915
const (
1016
DefaultDialTimeout = 5 * time.Second

pkg/transport/types/transport.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/stacklok/toolhive/pkg/authserver/server/keys"
2020
rt "github.com/stacklok/toolhive/pkg/container/runtime"
2121
"github.com/stacklok/toolhive/pkg/transport/errors"
22+
"github.com/stacklok/toolhive/pkg/transport/session"
2223
)
2324

2425
// MiddlewareFunction is a function that wraps an http.Handler with additional functionality.
@@ -270,6 +271,11 @@ type Config struct {
270271
// "/.well-known/oauth-authorization-server": authServerHandler,
271272
// }
272273
PrefixHandlers map[string]http.Handler
274+
275+
// SessionStorage overrides the default in-memory session store when set.
276+
// Used for Redis-backed session sharing across replicas.
277+
// When nil, transports use their default in-memory LocalStorage.
278+
SessionStorage session.Storage
273279
}
274280

275281
// ProxyMode represents the proxy mode for stdio transport.

test/e2e/images/images.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,9 @@ const (
9797
// PagerDutyMCPServerImage is used for testing multi-backend optimizer scenarios.
9898
// Provides ~64 PagerDuty incident management tools (incidents, services, schedules, etc.).
9999
PagerDutyMCPServerImage = pagerdutyMCPServerImageURL + ":" + pagerdutyMCPServerImageTag
100+
101+
redisImageURL = "redis"
102+
redisImageTag = "7-alpine"
103+
// RedisImage is used for Redis-backed session storage in scaling tests.
104+
RedisImage = redisImageURL + ":" + redisImageTag
100105
)

0 commit comments

Comments
 (0)