@@ -41,40 +41,60 @@ def __init__(
4141 db_cache : K8sDbCache ,
4242 ) -> None :
4343 self .__handler = handler
44- self .__tasks : dict [ClusterId , list [Task ]] | None = None
44+ self .__watch_tasks : dict [ClusterId , list [Task ]] = {}
45+ self .__full_sync_tasks : dict [ClusterId , Task ] = {}
46+ self .__full_sync_times : dict [ClusterId , datetime ] = {}
47+ self .__full_sync_running : set [ClusterId ] = set ()
4548 self .__kinds = kinds
4649 self .__clusters = clusters
47- self .__sync_period_seconds = 1800
50+ self .__sync_period_seconds = 600
4851 self .__cache = db_cache
4952
50- async def __sync (self ) -> None :
53+ async def __sync (self , cluster : Cluster , kind : GVK ) -> None :
5154 """Upsert K8s objects in the cache and remove deleted objects from the cache."""
55+ clnt = K8sClusterClient (cluster )
56+ fltr = K8sObjectFilter (gvk = kind , namespace = cluster .namespace )
57+ # Upsert new / updated objects
58+ objects_in_k8s : dict [str , K8sObject ] = {}
59+ async for obj in clnt .list (fltr ):
60+ objects_in_k8s [obj .name ] = obj
61+ await self .__cache .upsert (obj )
62+ # Remove objects that have been deleted from k8s but are still in cache
63+ async for cache_obj in self .__cache .list (fltr ):
64+ cache_obj_is_in_k8s = objects_in_k8s .get (cache_obj .name ) is not None
65+ if cache_obj_is_in_k8s :
66+ continue
67+ await self .__cache .delete (cache_obj .meta )
68+
69+ async def __full_sync (self , cluster : Cluster ) -> None :
70+ """Run the full sync if it has never run or at the required interval."""
71+ last_sync = self .__full_sync_times .get (cluster .id )
72+ since_last_sync = datetime .now () - last_sync if last_sync is not None else None
73+ if since_last_sync is not None and since_last_sync .total_seconds () < self .__sync_period_seconds :
74+ return
75+ self .__full_sync_running .add (cluster .id )
5276 for kind in self .__kinds :
53- for cluster in self .__clusters .values ():
54- clnt = K8sClusterClient (cluster )
55- fltr = K8sObjectFilter (gvk = kind , namespace = cluster .namespace )
56- # Upsert new / updated objects
57- objects_in_k8s : dict [str , K8sObject ] = {}
58- async for obj in clnt .list (fltr ):
59- objects_in_k8s [obj .name ] = obj
60- await self .__cache .upsert (obj )
61- # Remove objects that have been deleted from k8s but are still in cache
62- async for cache_obj in self .__cache .list (fltr ):
63- cache_obj_is_in_k8s = objects_in_k8s .get (cache_obj .name ) is not None
64- if cache_obj_is_in_k8s :
65- continue
66- await self .__cache .delete (cache_obj .meta )
77+ logger .info (f"Starting full k8s cache sync for cluster { cluster } and kind { kind } " )
78+ await self .__sync (cluster , kind )
79+ self .__full_sync_times [cluster .id ] = datetime .now ()
80+ self .__full_sync_running .remove (cluster .id )
81+
82+ async def __periodic_full_sync (self , cluster : Cluster ) -> None :
83+ """Keeps trying to run the full sync."""
84+ while True :
85+ await self .__full_sync (cluster )
86+ await asyncio .sleep (self .__sync_period_seconds / 10 )
6787
6888 async def __watch_kind (self , kind : GVK , cluster : Cluster ) -> None :
69- last_sync : datetime | None = None
7089 while True :
7190 try :
72- if last_sync is None or (datetime .now () - last_sync ).total_seconds () >= self .__sync_period_seconds :
73- logger .info ("Starting full k8s cache sync" )
74- await self .__sync ()
75- last_sync = datetime .now ()
7691 watch = cluster .api .async_watch (kind = kind .kr8s_kind , namespace = cluster .namespace )
7792 async for event_type , obj in watch :
93+ while cluster .id in self .__full_sync_running :
94+ logger .info (
95+ f"Pausing k8s watch event processing for cluster { cluster } until full sync completes"
96+ )
97+ await asyncio .sleep (5 )
7898 await self .__handler (cluster .with_api_object (obj ), event_type )
7999 # in some cases, the kr8s loop above just never yields, especially if there's exceptions which
80100 # can bypass async scheduling. This sleep here is as a last line of defence so this code does not
@@ -97,36 +117,41 @@ def __run_single(self, cluster: Cluster) -> list[Task]:
97117
98118 async def start (self ) -> None :
99119 """Start the watcher."""
100- if self .__tasks is None :
101- self .__tasks = {}
102- for cluster in self .__clusters . values ():
103- self .__tasks [cluster .id ] = self .__run_single (cluster )
120+ for cluster in sorted ( self .__clusters . values (), key = lambda x : x . id ) :
121+ await self .__full_sync ( cluster )
122+ self .__full_sync_tasks [ cluster . id ] = asyncio . create_task ( self . __periodic_full_sync ( cluster ))
123+ self .__watch_tasks [cluster .id ] = self .__run_single (cluster )
104124
105125 async def wait (self ) -> None :
106126 """Wait for all tasks.
107127
108128 This is mainly used to block the main function.
109129 """
110- if self .__tasks is None :
111- return
112- await asyncio .gather (* [t for tl in self .__tasks .values () for t in tl ])
130+ all_tasks = list (self .__full_sync_tasks .values ())
131+ for tasks in self .__watch_tasks .values ():
132+ all_tasks .extend (tasks )
133+ await asyncio .gather (* all_tasks )
113134
114135 async def stop (self , timeout : timedelta = timedelta (seconds = 10 )) -> None :
115136 """Stop the watcher or timeout."""
116- if self .__tasks is None :
117- return
118- for task_list in self .__tasks .values ():
137+
138+ async def stop_task (task : Task , timeout : timedelta ) -> None :
139+ if task .done ():
140+ return
141+ task .cancel ()
142+ try :
143+ async with asyncio .timeout (timeout .total_seconds ()):
144+ with contextlib .suppress (CancelledError ):
145+ await task
146+ except TimeoutError :
147+ logger .error ("timeout trying to cancel k8s watcher task" )
148+ return
149+
150+ for task_list in self .__watch_tasks .values ():
119151 for task in task_list :
120- if task .done ():
121- continue
122- task .cancel ()
123- try :
124- async with asyncio .timeout (timeout .total_seconds ()):
125- with contextlib .suppress (CancelledError ):
126- await task
127- except TimeoutError :
128- logger .error ("timeout trying to cancel k8s watcher task" )
129- continue
152+ await stop_task (task , timeout )
153+ for task in self .__full_sync_tasks .values ():
154+ await stop_task (task , timeout )
130155
131156
132157async def collect_metrics (
0 commit comments