Skip to content

Commit ff2c838

Browse files
author
Bogdan-Marius-Catanus
committed
feat: add leader instance identification with operational metadata
- Add instance metadata (port, PID, hostname) to Redis leader election - Expose is_leader boolean field in /health endpoint for cluster monitoring - Store JSON metadata in Redis instead of plain UUID for operational visibility - Add startup logging of instance metadata - Make /health endpoint async to support Redis leader status checks - Update tests to async for health check endpoints - Maintain backward compatibility with legacy UUID-only format Closes #3838 Signed-off-by: Bogdan-Marius-Catanus <bogdan-marius.catanus@ibm.com>
1 parent 23cecbf commit ff2c838

4 files changed

Lines changed: 130 additions & 24 deletions

File tree

mcpgateway/main.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10317,28 +10317,40 @@ async def reset_metrics(entity: Optional[str] = None, entity_id: Optional[int] =
1031710317
# Healthcheck #
1031810318
####################
1031910319
@app.get("/health")
10320-
def healthcheck(response: Response = None):
10320+
async def healthcheck(response: Response = None):
1032110321
"""
1032210322
Perform a basic health check to verify database connectivity.
10323+
Now includes leader status for cluster monitoring.
1032310324
10324-
Sync function so FastAPI runs it in a threadpool, avoiding event loop blocking.
10325+
Async function to allow checking Redis leader status.
1032510326
Uses a dedicated session to avoid cross-thread issues and double-commit
1032610327
from get_db dependency. All DB operations happen in the same thread.
1032710328
1032810329
Args:
1032910330
response: Optional response object used to attach runtime-mode headers.
1033010331
1033110332
Returns:
10332-
A dictionary with the health status and optional error message.
10333+
A dictionary with the health status, leader status, and optional error message.
1033310334
"""
1033410335
db = SessionLocal()
1033510336
try:
1033610337
db.execute(text("SELECT 1"))
1033710338
# Explicitly commit to release PgBouncer backend connection in transaction mode.
1033810339
db.commit()
10340+
10341+
# Check leader status using async method
10342+
is_leader = False
10343+
if hasattr(gateway_service, "is_leader"):
10344+
try:
10345+
is_leader = await gateway_service.is_leader()
10346+
except Exception:
10347+
# Fallback to False if leader check fails (e.g., in test environments)
10348+
is_leader = False
10349+
1033910350
if response is not None:
1034010351
_apply_runtime_mode_headers(response)
10341-
return {"status": "healthy", "mcp_runtime": _mcp_runtime_status_payload()}
10352+
10353+
return {"status": "healthy", "is_leader": is_leader, "mcp_runtime": _mcp_runtime_status_payload()}
1034210354
except Exception as e:
1034310355
# Rollback, then invalidate if rollback fails (mirrors get_db cleanup).
1034410356
try:
@@ -10352,7 +10364,8 @@ def healthcheck(response: Response = None):
1035210364
logger.error(error_message)
1035310365
if response is not None:
1035410366
_apply_runtime_mode_headers(response)
10355-
return {"status": "unhealthy", "error": error_message, "mcp_runtime": _mcp_runtime_status_payload()}
10367+
10368+
return {"status": "unhealthy", "is_leader": False, "error": error_message, "mcp_runtime": _mcp_runtime_status_payload()}
1035610369
finally:
1035710370
db.close()
1035810371

mcpgateway/services/gateway_service.py

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@
4545
import asyncio
4646
import binascii
4747
from datetime import datetime, timezone
48+
import json
4849
import logging
4950
import mimetypes
5051
import os
52+
import socket
5153
import ssl
5254
import tempfile
5355
import time
@@ -471,15 +473,17 @@ def __init__(self) -> None:
471473

472474
# Leader election settings from config
473475
if self.redis_url and REDIS_AVAILABLE:
474-
self._instance_id = str(uuid.uuid4()) # Unique ID for this process
476+
# Metadata for this instance
477+
self._instance_metadata = {"instance_id": str(uuid.uuid4()), "port": settings.port, "pid": os.getpid(), "hostname": socket.gethostname()}
478+
self._instance_id = self._instance_metadata["instance_id"]
475479
self._leader_key = settings.redis_leader_key
476480
self._leader_ttl = settings.redis_leader_ttl
477481
self._leader_heartbeat_interval = settings.redis_leader_heartbeat_interval
478482
self._leader_heartbeat_task: Optional[asyncio.Task] = None
479483
self._follower_election_task: Optional[asyncio.Task] = None
480484

481-
# Log instance mapping for debugging
482-
logger.info(f"Instance started: instance_id={self._instance_id}, port={settings.port}, pid={os.getpid()}")
485+
# Log instance mapping
486+
logger.info(f"Instance started: {json.dumps(self._instance_metadata)}")
483487

484488
# Always initialize file lock as fallback (used if Redis connection fails at runtime)
485489
if settings.cache_type != "none":
@@ -587,7 +591,8 @@ async def initialize(self) -> None:
587591
except Exception as e:
588592
raise ConnectionError(f"Redis ping failed: {e}") from e
589593

590-
is_leader = await self._redis_client.set(self._leader_key, self._instance_id, ex=self._leader_ttl, nx=True)
594+
# Store all instance metadate in redis
595+
is_leader = await self._redis_client.set(self._leader_key, json.dumps(self._instance_metadata), ex=self._leader_ttl, nx=True)
591596
if is_leader:
592597
logger.info("Acquired Redis leadership. Starting health check and heartbeat tasks.")
593598
self._health_check_task = asyncio.create_task(self._run_health_checks(user_email))
@@ -653,7 +658,7 @@ async def shutdown(self) -> None:
653658
return 0
654659
end
655660
"""
656-
result = await self._redis_client.eval(release_script, 1, self._leader_key, self._instance_id)
661+
result = await self._redis_client.eval(release_script, 1, self._leader_key, json.dumps(self._instance_metadata))
657662
if result:
658663
logger.info("Released Redis leadership on shutdown")
659664
except Exception as e:
@@ -3941,8 +3946,18 @@ async def _run_leader_heartbeat(self) -> None:
39413946
continue
39423947

39433948
# Check if we're still the leader
3944-
current_leader = await self._redis_client.get(self._leader_key)
3945-
if current_leader != self._instance_id:
3949+
current_leader_raw = await self._redis_client.get(self._leader_key)
3950+
if current_leader_raw:
3951+
try:
3952+
current_leader_data = json.loads(current_leader_raw)
3953+
current_leader_id = current_leader_data.get("instance_id")
3954+
except (json.JSONDecodeError, AttributeError):
3955+
# Fallback for old UUID-only format
3956+
current_leader_id = current_leader_raw
3957+
else:
3958+
current_leader_id = None
3959+
3960+
if current_leader_id != self._instance_id:
39463961
logger.info("Lost Redis leadership, stopping heartbeat")
39473962
self._start_follower_election()
39483963
return
@@ -3985,7 +4000,7 @@ async def _run_follower_election(self, user_email: str) -> None:
39854000
continue
39864001

39874002
# Attempt to acquire leadership
3988-
is_leader = await self._redis_client.set(self._leader_key, self._instance_id, ex=self._leader_ttl, nx=True)
4003+
is_leader = await self._redis_client.set(self._leader_key, json.dumps(self._instance_metadata), ex=self._leader_ttl, nx=True)
39894004

39904005
if is_leader:
39914006
logger.info("Acquired Redis leadership via follower election. Starting health check and heartbeat.")
@@ -4034,8 +4049,18 @@ async def _run_health_checks(self, user_email: str) -> None:
40344049
if self._redis_client and settings.cache_type == "redis":
40354050
# Redis-based leader check (async, decode_responses=True returns strings)
40364051
# Note: Leader key TTL refresh is handled by _run_leader_heartbeat task
4037-
current_leader = await self._redis_client.get(self._leader_key)
4038-
if current_leader != self._instance_id:
4052+
current_leader_raw = await self._redis_client.get(self._leader_key)
4053+
if current_leader_raw:
4054+
try:
4055+
current_leader_data = json.loads(current_leader_raw)
4056+
current_leader_id = current_leader_data.get("instance_id")
4057+
except (json.JSONDecodeError, AttributeError):
4058+
# Fallback for old UUID-only format
4059+
current_leader_id = current_leader_raw
4060+
else:
4061+
current_leader_id = None
4062+
4063+
if current_leader_id != self._instance_id:
40394064
return
40404065

40414066
# Run health checks
@@ -4087,6 +4112,67 @@ async def _run_health_checks(self, user_email: str) -> None:
40874112
logger.error(f"Unexpected error in health check loop: {str(e)}")
40884113
await asyncio.sleep(self._health_check_interval)
40894114

4115+
def is_leader_sync(self) -> bool:
4116+
"""Check if this instance is the current leader (synchronous version for health checks).
4117+
4118+
Returns:
4119+
bool: True if this instance holds the leader lock, False otherwise.
4120+
"""
4121+
if not self._redis_client or not hasattr(self, "_leader_key"):
4122+
# Fallback to file lock for non-Redis setups
4123+
return True
4124+
4125+
try:
4126+
# Use sync Redis client method if available
4127+
import redis
4128+
if isinstance(self._redis_client, redis.asyncio.Redis):
4129+
# For async Redis client, we can't do sync call - return True as fallback
4130+
return True
4131+
4132+
# For sync Redis client (shouldn't happen in current setup, but safe fallback)
4133+
current_leader_raw = self._redis_client.get(self._leader_key)
4134+
if not current_leader_raw:
4135+
return False
4136+
4137+
try:
4138+
current_leader_data = json.loads(current_leader_raw)
4139+
current_leader_id = current_leader_data.get("instance_id")
4140+
except (json.JSONDecodeError, AttributeError):
4141+
# Fallback for old UUID-only format
4142+
current_leader_id = current_leader_raw
4143+
4144+
return current_leader_id == self._instance_id
4145+
except Exception as e:
4146+
logger.warning(f"Error checking leader status: {e}")
4147+
return True # Fail open for health checks
4148+
4149+
async def is_leader(self) -> bool:
4150+
"""Check if this instance is the current leader (async version).
4151+
4152+
Returns:
4153+
bool: True if this instance holds the leader lock, False otherwise.
4154+
"""
4155+
if not self._redis_client or not hasattr(self, "_leader_key"):
4156+
# Fallback to file lock for non-Redis setups
4157+
return True
4158+
4159+
try:
4160+
current_leader_raw = await self._redis_client.get(self._leader_key)
4161+
if not current_leader_raw:
4162+
return False
4163+
4164+
try:
4165+
current_leader_data = json.loads(current_leader_raw)
4166+
current_leader_id = current_leader_data.get("instance_id")
4167+
except (json.JSONDecodeError, AttributeError):
4168+
# Fallback for old UUID-only format
4169+
current_leader_id = current_leader_raw
4170+
4171+
return current_leader_id == self._instance_id
4172+
except Exception as e:
4173+
logger.warning(f"Error checking leader status: {e}")
4174+
return False
4175+
40904176
def _get_auth_headers(self) -> Dict[str, str]:
40914177
"""Get default headers for gateway requests (no authentication).
40924178

tests/unit/mcpgateway/test_main.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,15 +457,19 @@ def test_health_check(self, test_client):
457457
"""Test the basic health check endpoint."""
458458
response = test_client.get("/health")
459459
assert response.status_code == 200
460-
assert response.json()["status"] == "healthy"
460+
data = response.json()
461+
assert data["status"] == "healthy"
462+
assert "is_leader" in data
463+
assert isinstance(data["is_leader"], bool)
461464

462465
def test_ready_check(self, test_client):
463466
"""Test the readiness check endpoint."""
464467
response = test_client.get("/ready")
465468
assert response.status_code == 200
466469
assert response.json()["status"] == "ready"
467470

468-
def test_health_check_db_error(self):
471+
@pytest.mark.asyncio
472+
async def test_health_check_db_error(self):
469473
"""Test health check error path with rollback failure."""
470474
# First-Party
471475
from mcpgateway import main as mcpgateway_main
@@ -491,7 +495,7 @@ def close(self):
491495

492496
session = DummySession()
493497
with patch("mcpgateway.main.SessionLocal", return_value=session):
494-
response = mcpgateway_main.healthcheck()
498+
response = await mcpgateway_main.healthcheck()
495499
assert response["status"] == "unhealthy"
496500
assert session.invalidate_called is True
497501

tests/unit/mcpgateway/test_main_extended.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9657,7 +9657,8 @@ def close(self): # noqa: D401 - test helper
96579657
assert sess.invalidated is True
96589658
assert sess.closed is True
96599659

9660-
def test_healthcheck_invalidate_failure_is_best_effort(self, monkeypatch):
9660+
@pytest.mark.asyncio
9661+
async def test_healthcheck_invalidate_failure_is_best_effort(self, monkeypatch):
96619662
# First-Party
96629663
import mcpgateway.main as main_mod
96639664

@@ -9683,12 +9684,13 @@ def close(self):
96839684
sess = FakeSession()
96849685
monkeypatch.setattr(main_mod, "SessionLocal", lambda: sess)
96859686

9686-
result = main_mod.healthcheck()
9687+
result = await main_mod.healthcheck()
96879688
assert result["status"] == "unhealthy"
96889689
assert "db down" in result["error"]
96899690
assert sess.closed is True
96909691

9691-
def test_healthcheck_reports_runtime_mode_and_headers(self, monkeypatch):
9692+
@pytest.mark.asyncio
9693+
async def test_healthcheck_reports_runtime_mode_and_headers(self, monkeypatch):
96929694
# First-Party
96939695
import mcpgateway.main as main_mod
96949696

@@ -9708,7 +9710,7 @@ def close(self):
97089710
monkeypatch.setattr(main_mod.settings, "experimental_rust_mcp_runtime_enabled", False)
97099711

97109712
response = FastAPIResponse()
9711-
result = main_mod.healthcheck(response)
9713+
result = await main_mod.healthcheck(response)
97129714

97139715
assert result["status"] == "healthy"
97149716
assert result["mcp_runtime"]["mode"] == "python-rust-built-disabled"
@@ -9863,7 +9865,8 @@ def test_runtime_status_payload_reports_python_mount_without_session_auth_reuse(
98639865
assert payload["rust_affinity_core_enabled"] is False
98649866
assert payload["session_auth_reuse_mode"] == "python"
98659867

9866-
def test_healthcheck_unhealthy_applies_runtime_headers(self, monkeypatch):
9868+
@pytest.mark.asyncio
9869+
async def test_healthcheck_unhealthy_applies_runtime_headers(self, monkeypatch):
98679870
# First-Party
98689871
import mcpgateway.main as main_mod
98699872

@@ -9881,7 +9884,7 @@ def close(self):
98819884
monkeypatch.setattr(main_mod.settings, "experimental_rust_mcp_runtime_enabled", True)
98829885

98839886
response = FastAPIResponse()
9884-
result = main_mod.healthcheck(response)
9887+
result = await main_mod.healthcheck(response)
98859888

98869889
assert result["status"] == "unhealthy"
98879890
assert response.headers["x-contextforge-mcp-runtime-mode"] == "rust-managed"

0 commit comments

Comments
 (0)