33from typing import Any , Awaitable , Callable , Dict
44
55from kubernetes_asyncio import client , config , watch
6+ from kubernetes_asyncio .client import ApiClient
67
78
89class KubernetesEvents :
@@ -24,7 +25,7 @@ def get_metadata(self) -> Dict[str, Any]:
2425 The Agent uses this to know what parameters to send.
2526 """
2627 return {
27- "name" : "kubernetes_events " ,
28+ "name" : "KubernetesEvents " ,
2829 "description" : "Subscribe to events for Kubernetes resources (Pods, Deployments, etc.)" ,
2930 "parameters" : {
3031 "resource_type" : "string (e.g., pods, deployments, kustomizations)" ,
@@ -43,9 +44,8 @@ async def subscribe(
4344 sub_id = f"k8s_{ uuid .uuid4 ().hex [:8 ]} "
4445
4546 # Spin up the background listener
46- task = asyncio .create_task (self ._watch_loop (sub_id , params , callback ))
47+ task = asyncio .create_task (self .watch_loop (sub_id , params , callback ))
4748 self ._active_watches [sub_id ] = task
48-
4949 return sub_id
5050
5151 async def unsubscribe (self , sub_id : str ) -> bool :
@@ -62,46 +62,97 @@ async def unsubscribe(self, sub_id: str) -> bool:
6262 return True
6363 return False
6464
65- async def _watch_loop (self , sub_id : str , params : Dict [str , Any ], callback : Callable ):
65+ async def watch_loop (self , sub_id : str , params : Dict [str , Any ], callback : Callable ):
6666 """
6767 The internal async loop that communicates with the K8s API.
6868 """
69- resource_type = params .get ("resource_type" , "pods" )
69+ resource_type = params .get ("resource_type" , "pod" ). lower ( )
7070 namespace = params .get ("namespace" , "default" )
71-
72- # Dynamic API selection based on resource type
73- v1 = client . CoreV1Api ( )
71+ is_custom = params . get ( "is_custom_resource" , False )
72+ label_selector = params . get ( "label_selector" , "" )
73+ field_selector = params . get ( "field_selector" , "" )
7474 w = watch .Watch ()
7575
7676 try :
77- # Note: This is an example for Pods.
78- # You would extend this to support Deployments/Flux CRDs.
79- method = getattr (v1 , f"list_namespaced_{ resource_type } " )
80-
81- async with w .stream (
82- method ,
83- namespace = namespace ,
84- label_selector = params .get ("label_selector" , "" ),
85- field_selector = params .get ("field_selector" , "" ),
86- ) as stream :
77+ if is_custom :
78+ api_client_inst = client .CustomObjectsApi ()
79+ group = params .get ("group" )
80+ version = params .get ("version" )
81+ plural = params .get ("plural" , resource_type + "s" )
82+ if not group or not version :
83+ raise ValueError ("Both 'group' and 'version' are required." )
84+
85+ method = api_client_inst .list_namespaced_custom_object
86+ stream_args = {
87+ "group" : group ,
88+ "version" : version ,
89+ "namespace" : namespace ,
90+ "plural" : plural ,
91+ }
92+ else :
93+ if resource_type == "job" :
94+ api_client_inst = client .BatchV1Api ()
95+ else :
96+ api_client_inst = client .CoreV1Api ()
97+
98+ method = getattr (api_client_inst , f"list_namespaced_{ resource_type } " )
99+ stream_args = {
100+ "namespace" : namespace ,
101+ }
102+
103+ stream_args ["label_selector" ] = label_selector
104+ stream_args ["field_selector" ] = field_selector
105+
106+ async with w .stream (method , ** stream_args ) as stream :
107+ sanitizer = ApiClient ()
87108 async for event in stream :
88- # Clean up the object for the LLM to save tokens
89- obj = event ["raw_object" ]
109+ obj = sanitizer .sanitize_for_serialization (event ["object" ])
90110 event_data = {
91111 "type" : event ["type" ],
92- "name" : obj ["metadata" ]["name" ],
93- "status" : obj .get ("status" , {}),
112+ "name" : obj ["metadata" ].get ("name" ),
94113 "resource" : resource_type ,
114+ "creation_timestamp" : obj ["metadata" ].get ("creationTimestamp" ),
95115 }
96116
97- # Push notification to MCP client via the provided callback
98- await callback (sub_id , event_data )
117+ status_block = obj .get ("status" , {})
118+
119+ if resource_type == "pod" :
120+ event_data ["status" ] = status_block .get ("phase" , "Unknown" )
121+
122+ elif resource_type == "job" :
123+ event_data ["status" ] = {
124+ "active" : int (status_block .get ("active" , 0 )),
125+ "succeeded" : int (status_block .get ("succeeded" , 0 )),
126+ "failed" : int (status_block .get ("failed" , 0 )),
127+ "start_time" : status_block .get ("startTime" ),
128+ }
129+ completions = obj .get ("spec" , {}).get ("completions" , 1 )
130+ if event_data ["status" ]["succeeded" ] >= completions :
131+ event_data ["state" ] = "Succeeded"
132+ elif event_data ["status" ]["failed" ] > 0 :
133+ event_data ["state" ] = "Failed"
134+ else :
135+ event_data ["state" ] = "Running"
136+ else :
137+ event_data ["status" ] = status_block
138+
139+ if is_custom and "state" in status_block :
140+ event_data ["state" ] = status_block ["state" ]
141+
142+ try :
143+ safe_event_data = json .loads (json .dumps (event_data , default = str ))
144+ except :
145+ safe_event_data = event_data
146+
147+ print (f"📡 { sub_id } : { safe_event_data ['name' ]} -> { event ['type' ]} " )
148+ await callback (sub_id , safe_event_data )
99149
100150 except asyncio .CancelledError :
101- # Clean exit on unsubscribe
102- pass
151+ print (f"🛑 Stopping watcher for { sub_id } ..." )
103152 except Exception as e :
104- # Notify the agent that the watch failed
105- await callback (sub_id , {"error" : str (e ), "status" : "failed" })
153+ print (f"❌ Watch failed for subscription { sub_id } : { e } " )
154+ import traceback
155+
156+ traceback .print_exc ()
106157 finally :
107158 w .stop ()
0 commit comments