Skip to content

Commit 3a6c0c1

Browse files
yroblataskbot
andauthored
Update Redis metadata when backend sessions expire (RC-16) (#4404)
* Update Redis metadata when backend sessions expire (RC-16) Add RemoveBackendFromMetadata to the MultiSession interface and implement it on defaultMultiSession to clear the per-backend session ID key and rebuild MetadataKeyBackendIDs when a backend disconnects or expires. Add NotifyBackendExpired to sessionmanager.Manager to call RemoveBackendFromMetadata and flush the updated metadata to storage (Redis when configured), keeping Redis consistent with the actual set of connected backends. Closes #4213 * Evict local cache on NotifyBackendExpired; fix DataStorage tests NotifyBackendExpired now evicts the session from the node-local multiSessions cache after a successful storage update, and forgets the singleflight key to prevent a concurrent restore from re-populating the cache with stale data. This ensures same-pod requests trigger RestoreSession with the updated metadata (without the expired backend), not just cross-pod ones. On storage error the cache is intentionally not evicted to avoid serving requests against metadata that still describes the expired backend. Also update session_data_storage_test.go to match the API renamed in the "fixes from review" commit: Upsert→Store, Create→Exists. Add contract tests for Exists (false when absent, true when present, does not refresh TTL for either local and Redis implementations). Remove the error-return assertion from NewLocalSessionDataStorage. * fixes from review --------- Co-authored-by: taskbot <taskbot@users.noreply.github.com>
1 parent d59acf5 commit 3a6c0c1

7 files changed

Lines changed: 566 additions & 4 deletions

File tree

pkg/vmcp/server/session_manager_interface.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,11 @@ type SessionManager interface {
4848

4949
// Terminate terminates the session with the given ID, closing all backend connections.
5050
Terminate(sessionID string) (bool, error)
51+
52+
// NotifyBackendExpired updates session metadata in storage to reflect that the
53+
// backend identified by workloadID is no longer connected. It is a best-effort,
54+
// metadata-only operation intended to be called by keepalive or health-monitoring
55+
// components when they detect that a backend session has expired or been lost.
56+
// Storage errors are logged but not returned.
57+
NotifyBackendExpired(sessionID, workloadID string)
5158
}

pkg/vmcp/server/sessionmanager/session_manager.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"log/slog"
21+
"strings"
2122
"time"
2223

2324
"github.com/google/uuid"
@@ -196,6 +197,13 @@ const terminateTimeout = 5 * time.Second
196197
// performs a single Redis SET. 5 s is consistent with terminateTimeout.
197198
const decorateTimeout = 5 * time.Second
198199

200+
// notifyBackendExpiredTimeout bounds each individual storage operation inside
201+
// NotifyBackendExpired() — one Load and one Upsert, each capped independently.
202+
// Each is a single-key Redis operation, so 5 s per call is consistent with
203+
// terminateTimeout and decorateTimeout. Worst-case wall-clock for the function
204+
// is 2 × 5 s = 10 s.
205+
const notifyBackendExpiredTimeout = 5 * time.Second
206+
199207
// Generate implements the SDK's SessionIdManager.Generate().
200208
//
201209
// Phase 1 of the two-phase creation pattern: creates a unique session ID,
@@ -513,6 +521,116 @@ func (sm *Manager) Terminate(sessionID string) (isNotAllowed bool, err error) {
513521
return false, nil
514522
}
515523

524+
// NotifyBackendExpired updates session metadata in storage to reflect that the
525+
// backend identified by workloadID is no longer connected. It removes the
526+
// per-backend session ID key and rebuilds MetadataKeyBackendIDs so that a
527+
// cross-pod RestoreSession call does not attempt to reconnect to the expired
528+
// backend session.
529+
//
530+
// After a successful storage update the session is evicted from the node-local
531+
// cache; the next GetMultiSession call triggers RestoreSession with the updated
532+
// metadata, discarding the stale in-memory copy.
533+
//
534+
// This is a best-effort operation. If the session is absent from storage (not
535+
// found or terminated) the call is a silent no-op. Storage errors are logged
536+
// but not returned; on error the cache is not evicted.
537+
func (sm *Manager) NotifyBackendExpired(sessionID, workloadID string) {
538+
loadCtx, loadCancel := context.WithTimeout(context.Background(), notifyBackendExpiredTimeout)
539+
defer loadCancel()
540+
metadata, err := sm.storage.Load(loadCtx, sessionID)
541+
if err != nil {
542+
if !errors.Is(err, transportsession.ErrSessionNotFound) {
543+
slog.Warn("NotifyBackendExpired: failed to load session from storage",
544+
"session_id", sessionID,
545+
"workload_id", workloadID,
546+
"error", err)
547+
}
548+
return
549+
}
550+
if metadata[MetadataKeyTerminated] == MetadataValTrue {
551+
return
552+
}
553+
554+
// MetadataKeyBackendIDs must be present. An absent key means the metadata
555+
// is corrupted or was never fully initialised; clobbering it with "" would
556+
// silently drop all remaining backends from subsequent restores.
557+
backendIDs, backendIDsPresent := metadata[vmcpsession.MetadataKeyBackendIDs]
558+
if !backendIDsPresent {
559+
slog.Warn("NotifyBackendExpired: MetadataKeyBackendIDs absent from session metadata; skipping update",
560+
"session_id", sessionID,
561+
"workload_id", workloadID)
562+
return
563+
}
564+
565+
// Build updated metadata: remove the expired backend's session-ID key and
566+
// rebuild MetadataKeyBackendIDs. Always write the key (even as "") to match
567+
// populateBackendMetadata, which uses key presence to distinguish an
568+
// explicit zero-backend state from absent/corrupted metadata in
569+
// RestoreSession. Trim spaces and drop empty parts for robustness.
570+
delete(metadata, vmcpsession.MetadataKeyBackendSessionPrefix+workloadID)
571+
var remaining []string
572+
for _, p := range strings.Split(backendIDs, ",") {
573+
if t := strings.TrimSpace(p); t != "" && t != workloadID {
574+
remaining = append(remaining, t)
575+
}
576+
}
577+
metadata[vmcpsession.MetadataKeyBackendIDs] = strings.Join(remaining, ",")
578+
579+
if err := sm.updateMetadata(sessionID, metadata); err != nil {
580+
slog.Warn("NotifyBackendExpired: failed to persist backend expiry to storage",
581+
"session_id", sessionID,
582+
"workload_id", workloadID,
583+
"error", err)
584+
}
585+
}
586+
587+
// updateMetadata writes a complete metadata snapshot to storage and evicts the
588+
// session from the node-local cache so the next GetMultiSession call triggers a
589+
// fresh RestoreSession with the updated state.
590+
//
591+
// Cross-pod TOCTOU: a re-check Load is performed immediately before the Upsert
592+
// to detect cross-pod session termination (where another pod calls
593+
// storage.Delete). If the key is absent at re-check time we bail without
594+
// upserting. A residual race remains between the re-check and the Upsert (a
595+
// concurrent pod could delete the key in that window), but the window is now
596+
// microseconds rather than the full NotifyBackendExpired span. Closing the race
597+
// entirely would require a conditional write primitive (e.g. Redis SET XX /
598+
// UpsertIfPresent) added to the DataStorage interface.
599+
//
600+
// NOTE: concurrent calls for the same session are last-write-wins. We assume
601+
// parallel metadata writers within a session do not occur; NotifyBackendExpired
602+
// is the only post-creation writer and backend expiry events are serialised by
603+
// the backend registry. This can be retrofitted with CAS semantics or a version
604+
// counter if that assumption changes.
605+
func (sm *Manager) updateMetadata(sessionID string, metadata map[string]string) error {
606+
// Same-pod guard: if Terminate() is already tearing down this session on
607+
// this pod the sentinel is in the cache and storage is already deleted.
608+
if raw, ok := sm.sessions.Peek(sessionID); ok {
609+
if _, isSentinel := raw.(terminatedSentinel); isSentinel {
610+
return nil
611+
}
612+
}
613+
614+
ctx, cancel := context.WithTimeout(context.Background(), notifyBackendExpiredTimeout)
615+
defer cancel()
616+
617+
// Cross-pod guard: re-check that the storage record still exists before
618+
// upserting. If another pod terminated the session (deleting the key) after
619+
// NotifyBackendExpired's initial Load, we must not recreate the record.
620+
if _, err := sm.storage.Load(ctx, sessionID); err != nil {
621+
if errors.Is(err, transportsession.ErrSessionNotFound) {
622+
return nil // session was terminated elsewhere; nothing to update
623+
}
624+
return err
625+
}
626+
627+
if err := sm.storage.Upsert(ctx, sessionID, metadata); err != nil {
628+
return err
629+
}
630+
sm.sessions.Delete(sessionID)
631+
return nil
632+
}
633+
516634
// GetMultiSession retrieves the fully-formed MultiSession for a given SDK session ID.
517635
// Returns (nil, false) if the session does not exist or has not yet been
518636
// upgraded from placeholder to MultiSession.
@@ -539,6 +657,13 @@ func (sm *Manager) GetMultiSession(sessionID string) (vmcpsession.MultiSession,
539657
// It returns ErrExpired when the session has been deleted or terminated
540658
// (including termination by another pod), so the cache evicts the entry and
541659
// onEvict closes backend connections.
660+
//
661+
// Cross-pod propagation: if the stored backend list differs from the cached
662+
// session's, ErrExpired is returned to evict the stale entry. The next
663+
// GetMultiSession call triggers RestoreSession with the up-to-date metadata,
664+
// replacing the old session and its backend connections. This ensures that a
665+
// backend-expiry update written by pod A propagates to pod B on the next
666+
// cache access rather than waiting for natural TTL expiry.
542667
func (sm *Manager) checkSession(sessionID string) error {
543668
checkCtx, cancel := context.WithTimeout(context.Background(), restoreStorageTimeout)
544669
defer cancel()
@@ -552,6 +677,22 @@ func (sm *Manager) checkSession(sessionID string) error {
552677
if metadata[MetadataKeyTerminated] == MetadataValTrue {
553678
return ErrExpired
554679
}
680+
681+
// If the cached session has backend metadata and it differs from storage,
682+
// evict to pick up the update. Only compare when the cached session
683+
// explicitly carries MetadataKeyBackendIDs to avoid spurious evictions for
684+
// sessions whose in-memory representation does not track backend IDs (e.g.
685+
// test mocks that return an empty metadata map).
686+
if raw, ok := sm.sessions.Peek(sessionID); ok {
687+
if sess, ok := raw.(vmcpsession.MultiSession); ok {
688+
if cachedIDs, present := sess.GetMetadata()[vmcpsession.MetadataKeyBackendIDs]; present {
689+
if cachedIDs != metadata[vmcpsession.MetadataKeyBackendIDs] {
690+
return ErrExpired
691+
}
692+
}
693+
}
694+
}
695+
555696
return nil
556697
}
557698

0 commit comments

Comments
 (0)