Skip to content

Commit 8f594f7

Browse files
committed
feat: support for kubernetes events stream for agent
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent e18a98b commit 8f594f7

5 files changed

Lines changed: 118 additions & 4 deletions

File tree

hpc_mcp/deploy/kubernetes/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from .events import KubernetesEvents
12
from .tool import (
23
kubectl_api_resources,
34
kubectl_api_versions,
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import asyncio
2+
import uuid
3+
from typing import Any, Awaitable, Callable, Dict
4+
5+
from kubernetes_asyncio import client, config, watch
6+
7+
8+
class KubernetesEvents:
9+
"""
10+
Handles Kubernetes resource watching for the MCP server.
11+
This class is dynamically loaded by mcpserver if defined in config.
12+
"""
13+
14+
def __init__(self):
15+
self._active_watches: Dict[str, asyncio.Task] = {}
16+
try:
17+
config.load_incluster_config()
18+
except config.ConfigException:
19+
asyncio.run(config.load_kube_config())
20+
21+
def get_metadata(self) -> Dict[str, Any]:
22+
"""
23+
Returns static discovery info for the Agent.
24+
The Agent uses this to know what parameters to send.
25+
"""
26+
return {
27+
"name": "kubernetes_events",
28+
"description": "Subscribe to events for Kubernetes resources (Pods, Deployments, etc.)",
29+
"parameters": {
30+
"resource_type": "string (e.g., pods, deployments, kustomizations)",
31+
"namespace": "string",
32+
"label_selector": "string (optional)",
33+
"field_selector": "string (optional)",
34+
},
35+
}
36+
37+
async def subscribe(
38+
self, params: Dict[str, Any], callback: Callable[[str, Dict[str, Any]], Awaitable[None]]
39+
) -> str:
40+
"""
41+
Starts a Kubernetes watch task in the background.
42+
"""
43+
sub_id = f"k8s_{uuid.uuid4().hex[:8]}"
44+
45+
# Spin up the background listener
46+
task = asyncio.create_task(self._watch_loop(sub_id, params, callback))
47+
self._active_watches[sub_id] = task
48+
49+
return sub_id
50+
51+
async def unsubscribe(self, sub_id: str) -> bool:
52+
"""
53+
Stops a specific watch task.
54+
"""
55+
task = self._active_watches.pop(sub_id, None)
56+
if task:
57+
task.cancel()
58+
try:
59+
await task
60+
except asyncio.CancelledError:
61+
pass
62+
return True
63+
return False
64+
65+
async def _watch_loop(self, sub_id: str, params: Dict[str, Any], callback: Callable):
66+
"""
67+
The internal async loop that communicates with the K8s API.
68+
"""
69+
resource_type = params.get("resource_type", "pods")
70+
namespace = params.get("namespace", "default")
71+
72+
# Dynamic API selection based on resource type
73+
v1 = client.CoreV1Api()
74+
w = watch.Watch()
75+
76+
try:
77+
# Note: This is an example for Pods.
78+
# You would extend this to support Deployments/Flux CRDs.
79+
method = getattr(v1, f"list_namespaced_{resource_type}")
80+
81+
async with w.stream(
82+
method,
83+
namespace=namespace,
84+
label_selector=params.get("label_selector", ""),
85+
field_selector=params.get("field_selector", ""),
86+
) as stream:
87+
async for event in stream:
88+
# Clean up the object for the LLM to save tokens
89+
obj = event["raw_object"]
90+
event_data = {
91+
"type": event["type"],
92+
"name": obj["metadata"]["name"],
93+
"status": obj.get("status", {}),
94+
"resource": resource_type,
95+
}
96+
97+
# Push notification to MCP client via the provided callback
98+
await callback(sub_id, event_data)
99+
100+
except asyncio.CancelledError:
101+
# Clean exit on unsubscribe
102+
pass
103+
except Exception as e:
104+
# Notify the agent that the watch failed
105+
await callback(sub_id, {"error": str(e), "status": "failed"})
106+
finally:
107+
w.stop()

hpc_mcp/deploy/kubernetes/tool.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def kubectl_get(
102102
verbose: Annotated[
103103
bool, "If True, returns the full raw JSON. If False, returns a summarized version."
104104
] = False,
105-
) -> KubeResult:
105+
):
106106
"""
107107
Retrieves information about Kubernetes resources.
108108
@@ -153,7 +153,7 @@ def kubectl_describe(
153153
verbose: Annotated[
154154
bool, "If False, returns only the first 50 lines (usually enough for errors)."
155155
] = False,
156-
) -> KubeResult:
156+
):
157157
"""
158158
Provides a human-readable, detailed description of a resource.
159159
@@ -196,7 +196,7 @@ def kubectl_logs(
196196
verbose: Annotated[
197197
bool, "If True, ignores the 'tail' limit and returns more context (max 1000 lines)."
198198
] = False,
199-
) -> KubeResult:
199+
):
200200
"""
201201
Fetches the logs for a specific pod or container.
202202
@@ -232,7 +232,7 @@ def kubectl_get_events(
232232
verbose: Annotated[
233233
bool, "If False, returns only the 10 most recent events summarized."
234234
] = False,
235-
) -> KubeResult:
235+
):
236236
"""
237237
Retrieves cluster events to diagnose scheduling or lifecycle issues.
238238

hpc_mcp/version.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
)
2323

2424
DATABASE_REQUIRES = (("sqlalchemy", {"min_version": None}),)
25+
KUBERNETES_REQUIRES = (
26+
("kubernetes-asyncio", {"min_version": None}),
27+
("kubernetes", {"min_version": None}),
28+
)
2529

2630
TESTS_REQUIRES = (
2731
("pytest", {"min_version": "4.6.2"}),

setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"):
6363
TESTS_REQUIRES = get_reqs(lookup, "TESTS_REQUIRES")
6464
INSTALL_REQUIRES_ALL = get_reqs(lookup, "INSTALL_REQUIRES_ALL")
6565
INSTALL_REQUIRES_DATABASE = get_reqs(lookup, "DATABASE_REQUIRES")
66+
KUBERNETES_REQUIRES = get_reqs(lookup, "KUBERNETES_REQUIRES")
6667

6768
setup(
6869
name=NAME,
@@ -85,6 +86,7 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"):
8586
extras_require={
8687
"all": [INSTALL_REQUIRES_ALL],
8788
"database": [INSTALL_REQUIRES_DATABASE],
89+
"kubernetes": [KUBERNETES_REQUIRES],
8890
},
8991
classifiers=[
9092
"Intended Audience :: Science/Research",

0 commit comments

Comments
 (0)