Skip to content

feat(adk): Add SSE High-Availability Subsystem for Distributed MCP Deployments#911

Open
jizhuozhi wants to merge 3 commits into
cloudwego:mainfrom
jizhuozhi:main
Open

feat(adk): Add SSE High-Availability Subsystem for Distributed MCP Deployments#911
jizhuozhi wants to merge 3 commits into
cloudwego:mainfrom
jizhuozhi:main

Conversation

@jizhuozhi
Copy link
Copy Markdown

Summary

This PR introduces a High-Availability (HA) subsystem for SSE (Server-Sent Events) sessions in distributed MCP (Model Context Protocol) deployments (modelcontextprotocol/modelcontextprotocol#2001). It provides the infrastructure needed to maintain stateful streaming sessions across multiple nodes with automatic failover and session correction capabilities.

Why This Is Needed

SSE Long-Connection Is Still Needed

MCP Specification PR #206 introduces "Streamable HTTP" to support stateless server deployments. However, SSE long-connection sessions remain essential for many real-world scenarios:

  1. Stateful Tool Execution: Some MCP tools maintain server-side state during execution:

    • Long-running code interpreters with persistent REPL state
    • Database query sessions with transaction context
    • File editing tools with undo/redo history
    • Real-time collaboration tools with shared state
  2. Streaming Large Outputs: For tools that generate large amounts of data progressively (e.g., log streaming, large file generation), SSE provides natural backpressure and flow control that stateless HTTP cannot match.

  3. Server-Initiated Notifications: MCP's notification model (notifications/resources/updated, notifications/prompts/list_changed) requires a persistent channel from server to client.

  4. Existing Client Compatibility: Many MCP clients already implement SSE-based transport; migration to Streamable HTTP takes time.

For these stateful scenarios, session affinity and cross-node coordination are required, not optional.

The Multi-Instance Session Drift Problem

When deploying MCP servers with multiple replicas behind a load balancer:

                         ┌─────────────┐
                         │      LB     │
                         └──────┬──────┘
                    ┌────────────┼────────────┐
                    ▼            ▼            ▼
              ┌─────────┐  ┌─────────┐  ┌─────────┐
              │  Pod A  │  │  Pod B  │  │  Pod C  │
              │ Session │  │ Session │  │         │
              │   1,2   │  │   3,4   │  │         │
              └─────────┘  └─────────┘  └─────────┘

Problem 1: Client Reconnect Lands on Wrong Node

Client X (connected to Pod A)
    │
    ├── network blip, reconnect
    │
    └──► LB routes to Pod B
              │
              └─► Pod B has no session state for X
                  - Cannot resume in-progress tool execution
                  - Cannot replay missed notifications
                  - Last-Event-ID is useless without shared event store

Problem 2: Cross-Node Event Delivery

A tool executing on Pod A needs to notify a client connected to Pod B:

// Pod A: Tool execution wants to send progress update
func (t *StreamingTool) Execute(ctx context.Context) {
    for _, chunk := range generateChunks() {
        // Client is on Pod B, not Pod A
        // No channel to reach them
        t.sendProgress(chunk) // ❌ event never arrives
    }
}

Problem 3: Session Ownership Confusion

Without a shared registry, multiple pods could inadvertently serve the same session:

Pod A: "I think I own session X"
Pod B: "I think I own session X"  // after client reconnect
        ↓
Both pods send events → client receives duplicates or out-of-order events

What This PR Enables

This subsystem provides the infrastructure for multi-node coordination:

Problem Solution
Reconnect lands on wrong node MetadataStore tracks which node owns each session
Cross-node event delivery EventBus (Pub/Sub) forwards events to owning node in real-time
Duplicate session ownership Distributed lock + version-based optimistic concurrency
Zombie sessions after node crash Heartbeat-based liveness detection + auto-correction

Note: This is not a message persistence system. When an SSE connection breaks, in-flight events are lost. The EventBuffer provides short-term buffering (configurable capacity) to help bridge brief reconnection gaps, but is not designed for durable message storage.

Solution Overview

This implementation is inspired by MCP Specification PR #206 and provides:

Core Components

adk/transport/mcp/sseha/
├── session.go        # Session state types, EventBuffer
├── manager.go        # SessionManager - main coordinator
├── extension.go      # MetadataStore, EventBus, NodeDiscovery interfaces
├── corrector.go      # Session correction logic
├── middleware.go     # SSE middleware for HA injection
├── errors.go         # Error sentinels
└── redis/            # Redis backend implementation
    ├── metadata.go   # Redis MetadataStore
    ├── pubsub.go     # Redis EventBus
    ├── discovery.go  # Redis NodeDiscovery
    ├── client.go     # Redis client wrapper
    └── testing.go    # Test utilities

Key Features

Feature Description
Shared Metadata Store Session registry and ownership tracking across nodes
Pub/Sub Event Bus Cross-node SSE event forwarding
Session Correction Automatic detection and recovery of orphaned/misrouted sessions
Event Buffering Bounded event replay for reconnection scenarios
Node Discovery Cluster membership and health monitoring
Pluggable Backends Interface-based design allows Redis, etcd, or custom implementations

Architecture

Two event forwarding modes are provided as alternatives (choose one):

Option A: Pub/Sub Mode (Recommended for most cases)

When a session is established on Node B, Node B subscribes to its channel on EventBus. When Node A receives a misrouted event for Session 2, Node A publishes to EventBus.

  ┌─────────────┐                              ┌─────────────┐
  │   Node A    │                              │   Node C    │
  │             │                              │             │
  │  Session 1  │                              │  Session 3  │
  │   (owner)   │                              │   (owner)   │
  └──────┬──────┘                              └──────┬──────┘
         │                                            │
         │ publish                                    │ publish
         │                                            │
         ▼                                            ▼
    ┌─────────────────────────────────────────────────────┐
    │                    EventBus                         │
    │                 (Redis Pub/Sub)                     │
    │                channel: session:2                   │
    └──────────────────────────┬──────────────────────────┘
                               │
                               │ broadcast
                               ▼
                        ┌─────────────┐
                        │   Node B    │
                        │             │
                        │  Session 2  │
                        │   (owner)   │
                        │   subscribe │
                        └─────────────┘

Subscription Flow (at session initialization):

Client connects to Node B, establishes Session 2
    │
    ▼
Node B registers ownership in MetadataStore
    │
    ▼
Node B subscribes to EventBus channel: "session:{session_2_id}"
    │
    ▼
Now Node B can receive events for Session 2 from any node

Event Routing Flow (when misrouted):

Tool execution on Node A produces event for Session 2
    │
    ▼
Node A looks up Session 2 ownership
    │
    ▼
Node A does NOT own Session 2, so publishes to EventBus
    │
    ▼
EventBus broadcasts to channel "session:{session_2_id}"
    │
    ▼
Node B (subscribed) receives event → forwards to local Session 2

Pros: Nodes don't need to know each other's addresses; handles misrouting gracefully.
Cons: Slightly higher latency (hop through Redis); Redis EventBus is critical path.

Option B: P2P Mode

Nodes depend on MetadataStore (Registry) to discover session ownership, then forward events directly via P2P.

                    ┌─────────────────────────────────────────────┐
                    │                   Redis                      │
                    │           ┌───────────────────────┐          │
                    │           │     MetadataStore     │          │
                    │           │   (Session Registry)  │          │
                    │           │   (Node Discovery)    │          │
                    │           └───────────────────────┘          │
                    └─────────────────────▲────────────────────────┘
                                          │ depends on for
                                          │ session discovery
         ┌────────────────────────────────┴────────────────────────────┐
         │                                                             │
         ▼                                                             ▼
  ┌─────────────┐                  ┌─────────────┐                  ┌─────────────┐
  │   Node A    │◄──── P2P ───────►│   Node B    │◄──── P2P ───────►│   Node C    │
  │ ┌─────────┐ │     Forward      │ ┌─────────┐ │     Forward      │ ┌─────────┐ │
  │ │ HA      │ │                  │ │ HA      │ │                  │ │ HA      │ │
  │ │ Mgr     │ │                  │ │ Mgr     │ │                  │ │ Mgr     │ │
  │ └────┬────┘ │                  │ └────┬────┘ │                  │ └────┬────┘ │
  │      ▼      │                  │      ▼      │                  │      ▼      │
  │  Session 1  │                  │  Session 2  │                  │  Session 3  │
  └─────────────┘                  └─────────────┘                  └─────────────┘

Event flow:

Tool on Node A generates event for Session 2
    │
    ▼
Node A's HA Manager queries MetadataStore: "Who owns Session 2?"
    │
    ▼
MetadataStore returns: "Node B at 10.0.0.2:8080"
    │
    ▼
Node A's P2PForwarder sends event directly to Node B
    │
    ▼
Node B's HA Manager forwards to local Session 2

Pros: Lower latency (direct); EventBus not needed for routing.
Cons: Nodes must be network-reachable from each other; requires P2PForwarder implementation.

Choose based on your infrastructure: Pub/Sub for simplicity; P2P for latency-sensitive or Redis-avoidant event routing.

API Example

// Create Redis-backed HA session manager
store, _ := redis.NewMetadataStore(redisClient)
bus, _ := redis.NewEventBus(redisClient)
discovery, _ := redis.NewNodeDiscovery(redisClient)

manager, _ := sseha.NewSessionManager(ctx, &sseha.SessionManagerConfig{
    NodeID:         "node-1",
    NodeAddress:    "10.0.0.1:8080",
    MetadataStore:  store,
    EventBus:       bus,
    CorrectionPolicy: &sseha.CorrectionPolicy{
        EnableAutoCorrection: true,
        SuspendTimeout:       30 * time.Second,
    },
})

// Register a new SSE session
session, _ := manager.RegisterSession(ctx, &sseha.SessionInfo{
    SessionID: "session-abc123",
    NodeID:    "node-1",
    State:     sseha.SessionStateActive,
})

// Send events (automatically buffered and forwarded if needed)
manager.SendEvent(ctx, "session-abc123", &sseha.SSEEvent{
    EventID:   "evt-001",
    EventType: "message",
    Data:      []byte(`{"content": "Hello, world!"}`),
})

// On reconnection, replay missed events
events, _ := manager.ReplayEvents(ctx, "session-abc123", "evt-000")

Testing

All components include comprehensive unit tests:

adk/transport/mcp/sseha/
├── session_test.go    # EventBuffer, state transitions
└── redis/
    └── redis_test.go  # Full backend integration tests

Run tests:

go test -v ./adk/transport/mcp/sseha/...

Breaking Changes

None. This is a new optional subsystem. Existing MCP transport code continues to work unchanged.

Checklist

  • Code follows the project's style guidelines (golangci-lint run ./...)
  • All new code has appropriate GoDoc comments
  • Unit tests added for new functionality
  • No breaking changes to existing APIs
  • Backward compatible with existing MCP transport

Future Work

  • etcd backend implementation
  • Prometheus metrics integration
  • Admin API for manual session inspection/migration
  • Documentation on cloudwego.io

Related

  • MCP Specification PR #206: Replace HTTP+SSE with new "Streamable HTTP" transport
    • This PR introduces "Streamable HTTP" to address HA challenges with long-lived SSE connections
    • Enables stateless server deployments and more flexible session management
    • Our implementation provides the backend infrastructure (metadata store, event bus, session correction) needed to support this architecture
  • Enables production-grade MCP deployments with 99.9%+ availability

@jizhuozhi jizhuozhi changed the title feat(adk/transport/mcp): Add SSE High-Availability Subsystem for Distributed MCP Deployments feat(adk): Add SSE High-Availability Subsystem for Distributed MCP Deployments Mar 25, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 25, 2026

Codecov Report

❌ Patch coverage is 42.17325% with 761 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.15%. Comparing base (9514a61) to head (d326808).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
adk/transport/mcp/sseha/manager.go 40.51% 200 Missing and 29 partials ⚠️
adk/transport/mcp/sseha/redis/discovery.go 0.00% 139 Missing ⚠️
adk/transport/mcp/sseha/middleware.go 0.00% 95 Missing ⚠️
adk/transport/mcp/sseha/corrector.go 20.53% 84 Missing and 5 partials ⚠️
adk/transport/mcp/sseha/redis/metadata.go 58.87% 50 Missing and 38 partials ⚠️
adk/transport/mcp/sseha/redis/pubsub.go 56.91% 43 Missing and 10 partials ⚠️
adk/transport/mcp/sseha/redis/testing.go 71.82% 44 Missing and 7 partials ⚠️
adk/transport/mcp/sseha/session.go 74.62% 14 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #911      +/-   ##
==========================================
- Coverage   82.13%   79.15%   -2.99%     
==========================================
  Files         146      154       +8     
  Lines       16063    17383    +1320     
==========================================
+ Hits        13194    13759     +565     
- Misses       1922     2585     +663     
- Partials      947     1039      +92     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jizhuozhi jizhuozhi force-pushed the main branch 2 times, most recently from 056f9d3 to d326808 Compare March 25, 2026 17:12
…ployments

Reorganize the SSE High-Availability package under the mcp/ namespace
to clearly indicate this is MCP protocol-specific infrastructure.

- Move adk/transport/sseha/ -> adk/transport/mcp/sseha/
- Move adk/transport/sseha/redis/ -> adk/transport/mcp/sseha/redis/
- Update all import paths accordingly
- Future transport extensions (streamable HTTP, A2A, etc.) can be
  organized under their respective protocol namespaces

Package structure:
  adk/transport/mcp/sseha/       - Core interfaces and session management
  adk/transport/mcp/sseha/redis/ - Redis backend implementations

fix(adk/transport/mcp/sseha): fix Go 1.18 compatibility and lint errors

- Use int64 + atomic.AddInt64 instead of atomic.Int64 (Go 1.19+)
- Fix gci import ordering
- Fix gofmt formatting

fix(adk/transport/mcp/sseha): replace interface{} with any for gofmt compatibility

golangci-lint gofmt has rewrite-rules to replace interface{} with any.
Update all interface{} usages to any to pass CI lint checks.

test(adk/transport/mcp/sseha): add comprehensive tests for middleware and session management

- Add middleware_test.go with tests for:
  - HAMiddleware.Wrap (new session and reconnection scenarios)
  - HAResponseWriter.SendEvent
  - extractMetadata helper
  - generateSessionID helper
  - writeSSEEvent helper

- Extend redis_test.go with tests for:
  - Session migration between nodes
  - Cross-node session reconnection
  - Session manager close
  - CorrectSession operation
  - HandleReconnection already local session
  - EventBus pattern subscribe
- Add MCPMiddleware implementing MCP protocol over SSE transport
- Add HAWriter interface for type-safe SSE event writing
- Add MCP protocol compliance tests (handshake, tools, reconnection)
- Add e2e HA scenario tests (migration, failover, multi-client)
- Add StreamableMiddleware for MCP 2025-03-26 Streamable HTTP transport
  - Single POST /mcp endpoint with Mcp-Session-Id header
  - Supports JSON response and SSE streaming upgrade
  - Supports stateless mode and notification handling
- Rename middleware.go -> sse_middleware.go for clarity
- Rename middleware_test.go -> sse_middleware_test.go
- Delete redundant mcp_middleware.go (merged into sse_middleware.go)
- Rewrite e2e_test.go to cover both SSE and Streamable HTTP transports
  - Fix SSE client to use proper MCP protocol (GET /sse + POST /messages)
  - Add Streamable HTTP e2e tests (basic, session continuation, streaming)
  - HA scenarios (migration, failover) remain transport-agnostic
@shentongmartin shentongmartin added C-feature-request Category: feature request issue. Implementations of feature requests use `C-enhancement` instead. D-MCP Domain: this issue relates to MCP protocol related functionalities labels Mar 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

C-feature-request Category: feature request issue. Implementations of feature requests use `C-enhancement` instead. D-MCP Domain: this issue relates to MCP protocol related functionalities

Development

Successfully merging this pull request may close these issues.

2 participants