Skip to content

Commit 8805aa8

Browse files
authored
mcp: add configurable keepalive failure threshold (#982)
mcp: add configurable keepalive failure threshold Introduce `KeepAliveFailureThreshold` option in both `ClientOptions` and `ServerOptions` to control how many consecutive keepalive ping failures are tolerated before closing the session. This aligns with the MCP spec's guidance that "multiple failed pings MAY trigger a connection reset," allowing operators to tune resilience against transient network hiccups without immediately tearing down otherwise healthy sessions. A threshold of 0 or 1 (the default) closes on the first failure, preserving existing behavior. Higher values let isolated misses pass while still closing the session once consecutive failures reach the threshold. A successful ping resets the counter. Tolerated failures are logged at WARN level; the final failure that closes the session is logged at ERROR level. This is rework of #979.
1 parent dfb45f1 commit 8805aa8

4 files changed

Lines changed: 131 additions & 14 deletions

File tree

mcp/client.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ type ClientOptions struct {
166166
// If the peer fails to respond to pings originating from the keepalive check,
167167
// the session is automatically closed.
168168
KeepAlive time.Duration
169+
// KeepAliveFailureThreshold is the number of consecutive keepalive ping
170+
// failures tolerated before the session is closed. A value of 0 or 1
171+
// closes the session on the first failure (the default). Higher values
172+
// align with the spec's "multiple failed pings MAY trigger a connection
173+
// reset" guidance, letting a transient miss pass without tearing down an
174+
// otherwise live session. Has no effect unless KeepAlive is non-zero.
175+
KeepAliveFailureThreshold int
169176
}
170177

171178
// toolContextKeyType is the context key type for passing tool definitions
@@ -441,7 +448,7 @@ func (cs *ClientSession) registerElicitationWaiter(elicitationID string) (await
441448

442449
// startKeepalive starts the keepalive mechanism for this client session.
443450
func (cs *ClientSession) startKeepalive(interval time.Duration) {
444-
startKeepalive(cs, interval, &cs.keepaliveCancel, cs.client.opts.Logger)
451+
startKeepalive(cs, interval, cs.client.opts.KeepAliveFailureThreshold, &cs.keepaliveCancel, cs.client.opts.Logger)
445452
}
446453

447454
// AddRoots adds the given roots to the client,

mcp/mcp_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1920,6 +1920,82 @@ func TestKeepAliveFailure_Logged(t *testing.T) {
19201920
})
19211921
}
19221922

1923+
// scriptedKeepaliveSession is a keepaliveSession test double whose Ping
1924+
// returns errors from a script (one entry consumed per call; the last entry
1925+
// repeats once exhausted), and records how many times Close was called. Ping
1926+
// returns immediately so the keepalive loop's pace is driven purely by the
1927+
// ticker, making the test deterministic under synctest.
1928+
type scriptedKeepaliveSession struct {
1929+
pingErrs []error
1930+
pingCalls atomic.Int64
1931+
closeCalls atomic.Int64
1932+
}
1933+
1934+
func (s *scriptedKeepaliveSession) Ping(context.Context, *PingParams) error {
1935+
n := int(s.pingCalls.Add(1)) - 1
1936+
if n >= len(s.pingErrs) {
1937+
n = len(s.pingErrs) - 1
1938+
}
1939+
return s.pingErrs[n]
1940+
}
1941+
1942+
func (s *scriptedKeepaliveSession) Close() error {
1943+
s.closeCalls.Add(1)
1944+
return nil
1945+
}
1946+
1947+
// TestStartKeepalive_FailureThreshold verifies that the session is kept alive
1948+
// across consecutive ping failures below the threshold and only closed once the
1949+
// threshold is reached.
1950+
func TestStartKeepalive_FailureThreshold(t *testing.T) {
1951+
synctest.Test(t, func(t *testing.T) {
1952+
const interval = 100 * time.Millisecond
1953+
sess := &scriptedKeepaliveSession{pingErrs: []error{errors.New("boom")}}
1954+
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
1955+
var cancel context.CancelFunc
1956+
startKeepalive(sess, interval, 3, &cancel, logger)
1957+
defer cancel()
1958+
1959+
// After two ticks → two failures, still below threshold 3: not closed.
1960+
time.Sleep(2*interval + interval/2)
1961+
synctest.Wait()
1962+
if got := sess.closeCalls.Load(); got != 0 {
1963+
t.Fatalf("session closed below threshold: closeCalls=%d (pingCalls=%d)", got, sess.pingCalls.Load())
1964+
}
1965+
1966+
// Third tick → third failure reaches threshold: session closed.
1967+
time.Sleep(interval)
1968+
synctest.Wait()
1969+
if got := sess.closeCalls.Load(); got != 1 {
1970+
t.Fatalf("expected one Close at threshold, got closeCalls=%d (pingCalls=%d)", got, sess.pingCalls.Load())
1971+
}
1972+
})
1973+
}
1974+
1975+
// TestStartKeepalive_SuccessResetsFailures verifies that a successful ping
1976+
// resets the consecutive-failure counter, so an isolated failure between
1977+
// successes never accumulates toward the threshold.
1978+
func TestStartKeepalive_SuccessResetsFailures(t *testing.T) {
1979+
synctest.Test(t, func(t *testing.T) {
1980+
const interval = 100 * time.Millisecond
1981+
// fail, success, fail, fail, then success (the tail repeats): the run
1982+
// never has 3 consecutive failures, so the session is never closed.
1983+
sess := &scriptedKeepaliveSession{pingErrs: []error{
1984+
errors.New("boom"), nil, errors.New("boom"), errors.New("boom"), nil,
1985+
}}
1986+
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
1987+
var cancel context.CancelFunc
1988+
startKeepalive(sess, interval, 3, &cancel, logger)
1989+
defer cancel()
1990+
1991+
time.Sleep(6 * interval)
1992+
synctest.Wait()
1993+
if got := sess.closeCalls.Load(); got != 0 {
1994+
t.Fatalf("session closed despite a success resetting the counter: closeCalls=%d (pingCalls=%d)", got, sess.pingCalls.Load())
1995+
}
1996+
})
1997+
}
1998+
19231999
func TestAddTool_DuplicateNoPanicAndNoDuplicate(t *testing.T) {
19242000
// Adding the same tool pointer twice should not panic and should not
19252001
// produce duplicates in the server's tool list.

mcp/server.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ type ServerOptions struct {
7979
// If the peer fails to respond to pings originating from the keepalive check,
8080
// the session is automatically closed.
8181
KeepAlive time.Duration
82+
// KeepAliveFailureThreshold is the number of consecutive keepalive ping
83+
// failures tolerated before the session is closed. A value of 0 or 1
84+
// closes the session on the first failure (the default). Higher values
85+
// align with the spec's "multiple failed pings MAY trigger a connection
86+
// reset" guidance, letting a transient miss pass without tearing down an
87+
// otherwise live session. Has no effect unless KeepAlive is non-zero.
88+
KeepAliveFailureThreshold int
8289
// Function called when a client session subscribes to a resource.
8390
SubscribeHandler func(context.Context, *SubscribeRequest) error
8491
// Function called when a client session unsubscribes from a resource.
@@ -1605,7 +1612,7 @@ func (ss *ServerSession) Wait() error {
16051612

16061613
// startKeepalive starts the keepalive mechanism for this server session.
16071614
func (ss *ServerSession) startKeepalive(interval time.Duration) {
1608-
startKeepalive(ss, interval, &ss.keepaliveCancel, ss.server.opts.Logger)
1615+
startKeepalive(ss, interval, ss.server.opts.KeepAliveFailureThreshold, &ss.keepaliveCancel, ss.server.opts.Logger)
16091616
}
16101617

16111618
// pageToken is the internal structure for the opaque pagination cursor.

mcp/shared.go

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -751,9 +751,20 @@ type keepaliveSession interface {
751751
// It assigns the cancel function to the provided cancelPtr and starts a goroutine
752752
// that sends ping messages at the specified interval.
753753
//
754-
// logger must be non-nil; ping failures (which terminate the keepalive loop and
755-
// close the session) are reported via logger so they are not silently dropped.
756-
func startKeepalive(session keepaliveSession, interval time.Duration, cancelPtr *context.CancelFunc, logger *slog.Logger) {
754+
// failureThreshold is the number of consecutive ping failures tolerated before
755+
// the session is closed; a value below 1 is treated as 1 (close on the first
756+
// failure). A successful ping resets the counter. This mirrors the spec's
757+
// "multiple failed pings MAY trigger a connection reset" language, letting a
758+
// transient miss pass without tearing down an otherwise live session.
759+
//
760+
// logger must be non-nil; ping failures (both the tolerated ones and the final
761+
// one that closes the session) are reported via logger so they are not silently
762+
// dropped.
763+
func startKeepalive(session keepaliveSession, interval time.Duration, failureThreshold int, cancelPtr *context.CancelFunc, logger *slog.Logger) {
764+
if failureThreshold < 1 {
765+
failureThreshold = 1
766+
}
767+
757768
ctx, cancel := context.WithCancel(context.Background())
758769
// Assign cancel function before starting goroutine to avoid race condition.
759770
// We cannot return it because the caller may need to cancel during the
@@ -764,6 +775,7 @@ func startKeepalive(session keepaliveSession, interval time.Duration, cancelPtr
764775
ticker := time.NewTicker(interval)
765776
defer ticker.Stop()
766777

778+
consecutiveFailures := 0
767779
for {
768780
select {
769781
case <-ctx.Done():
@@ -772,17 +784,32 @@ func startKeepalive(session keepaliveSession, interval time.Duration, cancelPtr
772784
pingCtx, pingCancel := context.WithTimeout(context.Background(), interval/2)
773785
err := session.Ping(pingCtx, nil)
774786
pingCancel()
775-
if err != nil {
776-
if errors.Is(err, jsonrpc2.ErrMethodNotFound) {
777-
// Peer doesn't support ping, stop the keepalive process.
778-
return
779-
}
780-
// Ping failed; log it before closing the session so the
781-
// failure is observable to operators. See #218.
782-
logger.Error("keepalive ping failed; closing session", "error", err)
783-
_ = session.Close()
787+
if err == nil {
788+
consecutiveFailures = 0
789+
continue
790+
}
791+
if errors.Is(err, jsonrpc2.ErrMethodNotFound) {
792+
// Peer doesn't support ping, stop the keepalive process.
784793
return
785794
}
795+
consecutiveFailures++
796+
if consecutiveFailures < failureThreshold {
797+
// Tolerate transient failures below the threshold; log so
798+
// the misses are still observable to operators. See #218.
799+
logger.Warn("keepalive ping failed; tolerating below threshold",
800+
"error", err,
801+
"consecutiveFailures", consecutiveFailures,
802+
"failureThreshold", failureThreshold)
803+
continue
804+
}
805+
// Threshold reached; log before closing the session so the
806+
// failure is observable to operators. See #218.
807+
logger.Error("keepalive ping failed; closing session",
808+
"error", err,
809+
"consecutiveFailures", consecutiveFailures,
810+
"failureThreshold", failureThreshold)
811+
_ = session.Close()
812+
return
786813
}
787814
}
788815
}()

0 commit comments

Comments
 (0)