Skip to content

Commit d21aa1b

Browse files
Track and reuse resourceVersion across watch reconnects; re-list only on 410 Gone
Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com>
1 parent 9c708ad commit d21aa1b

2 files changed

Lines changed: 148 additions & 17 deletions

File tree

kubernetes/informer/informer.py

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def __init__(
9090
self._watch = None
9191
self._thread = None
9292
self._stop_event = threading.Event()
93+
self._resource_version = None # most recent RV seen; None forces a full re-list
9394

9495
# ---------------------------------------------------------------- #
9596
# Public API #
@@ -185,34 +186,40 @@ def _fire(self, event_type, obj):
185186
def _initial_list(self):
186187
"""Do the initial list and populate the cache."""
187188
kw = self._build_kwargs()
188-
resource_version = "0"
189189
resp = self._list_func(**kw)
190190
items = getattr(resp, "items", []) or []
191191
self._cache._replace_all(items)
192192
rv = None
193193
meta = getattr(resp, "metadata", None)
194194
if meta is not None:
195195
rv = getattr(meta, "resource_version", None)
196-
if rv:
197-
resource_version = rv
198-
return resource_version
196+
self._resource_version = rv or "0"
199197

200198
def _run_loop(self):
201-
"""Background loop: list then watch, reconnect on errors."""
199+
"""Background loop: list then watch, reconnect on errors.
200+
201+
A full re-list is only performed when ``self._resource_version`` is
202+
``None`` (first start or after a 410 Gone response). On all other
203+
reconnects the most recent ``resourceVersion`` is reused so that no
204+
events are missed and the API server does not need to send a full
205+
object snapshot.
206+
"""
202207
while not self._stop_event.is_set():
203-
try:
204-
resource_version = self._initial_list()
205-
except Exception as exc:
206-
logger.exception("Error during initial list; retrying")
207-
self._fire(ERROR, exc)
208-
self._stop_event.wait(timeout=5)
209-
continue
208+
# Full re-list only when we have no resource version to resume from.
209+
if self._resource_version is None:
210+
try:
211+
self._initial_list()
212+
except Exception as exc:
213+
logger.exception("Error during initial list; retrying")
214+
self._fire(ERROR, exc)
215+
self._stop_event.wait(timeout=5)
216+
continue
210217

211218
# Watch loop
212219
last_resync = time.monotonic()
213220
self._watch = Watch()
214221
kw = self._build_kwargs()
215-
kw["resource_version"] = resource_version
222+
kw["resource_version"] = self._resource_version
216223
try:
217224
for event in self._watch.stream(self._list_func, **kw):
218225
if self._stop_event.is_set():
@@ -245,13 +252,30 @@ def _run_loop(self):
245252
self._fire(MODIFIED, cached_obj)
246253
last_resync = time.monotonic()
247254
except ApiException as exc:
248-
logger.warning(
249-
"Watch stream ended with ApiException (status=%s); reconnecting",
250-
exc.status,
251-
)
255+
if exc.status == 410:
256+
# The stored resource version is too old; force a full re-list.
257+
logger.warning(
258+
"Watch expired (410 Gone); will re-list from scratch"
259+
)
260+
self._resource_version = None
261+
else:
262+
logger.warning(
263+
"Watch stream ended with ApiException (status=%s); reconnecting",
264+
exc.status,
265+
)
252266
self._fire(ERROR, exc)
253267
except Exception as exc:
254268
logger.exception("Unexpected error in watch loop; reconnecting")
255269
self._fire(ERROR, exc)
256270
finally:
271+
# Capture the most recent resource version seen by the Watch
272+
# (updated on every ADDED/MODIFIED/DELETED/BOOKMARK event) so
273+
# that the next watch connection can resume without re-listing.
274+
# Do not overwrite a None that was set by a 410 handler above.
275+
if (
276+
self._resource_version is not None
277+
and self._watch is not None
278+
and self._watch.resource_version
279+
):
280+
self._resource_version = self._watch.resource_version
257281
self._watch = None

kubernetes/test/test_informer.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,113 @@ def fake_stream(func, **kw):
359359
self.assertEqual(len(cached), 1)
360360
self.assertIs(cached[0], pod)
361361

362+
def test_resource_version_stored_from_watch(self):
363+
"""After the watch stream ends the latest RV is preserved for reconnect."""
364+
pod = _make_pod("default", "rv-pod")
365+
events = [{"type": "ADDED", "object": pod}]
366+
367+
list_func = MagicMock()
368+
list_resp = MagicMock()
369+
list_resp.items = []
370+
list_resp.metadata = MagicMock(resource_version="10")
371+
list_func.return_value = list_resp
372+
373+
informer = SharedInformer(list_func=list_func)
374+
375+
call_count = {"n": 0}
376+
377+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
378+
mock_w = MagicMock()
379+
mock_w.resource_version = "99"
380+
381+
def fake_stream(func, **kw):
382+
call_count["n"] += 1
383+
yield from events
384+
informer._stop_event.set()
385+
386+
mock_w.stream.side_effect = fake_stream
387+
MockWatch.return_value = mock_w
388+
389+
informer.start()
390+
informer._thread.join(timeout=3)
391+
392+
# The Watch reported RV "99"; the informer should have stored it.
393+
self.assertEqual(informer._resource_version, "99")
394+
# list_func should have been called once for the initial list only.
395+
self.assertEqual(list_func.call_count, 1)
396+
397+
def test_reconnect_skips_relist_when_rv_known(self):
398+
"""On reconnect without 410 the informer must NOT call the list function again."""
399+
pod = _make_pod("default", "reconnect-pod")
400+
401+
list_func = MagicMock()
402+
list_resp = MagicMock()
403+
list_resp.items = [pod]
404+
list_resp.metadata = MagicMock(resource_version="5")
405+
list_func.return_value = list_resp
406+
407+
informer = SharedInformer(list_func=list_func)
408+
409+
stream_calls = {"n": 0}
410+
411+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
412+
mock_w = MagicMock()
413+
mock_w.resource_version = "7"
414+
415+
def fake_stream(func, **kw):
416+
stream_calls["n"] += 1
417+
if stream_calls["n"] == 1:
418+
# First stream: yield nothing then let it reconnect
419+
return iter([])
420+
# Second stream: stop the informer
421+
informer._stop_event.set()
422+
return iter([])
423+
424+
mock_w.stream.side_effect = fake_stream
425+
MockWatch.return_value = mock_w
426+
427+
informer.start()
428+
informer._thread.join(timeout=3)
429+
430+
# list_func is called only once (initial list); reconnect reuses the RV.
431+
self.assertEqual(list_func.call_count, 1)
432+
self.assertEqual(stream_calls["n"], 2)
433+
434+
def test_410_gone_triggers_relist(self):
435+
"""A 410 Gone ApiException must reset resource_version and trigger re-list."""
436+
from kubernetes.client.exceptions import ApiException
437+
438+
list_func = MagicMock()
439+
list_resp = MagicMock()
440+
list_resp.items = []
441+
list_resp.metadata = MagicMock(resource_version="3")
442+
list_func.return_value = list_resp
443+
444+
informer = SharedInformer(list_func=list_func)
445+
446+
stream_calls = {"n": 0}
447+
448+
with patch("kubernetes.informer.informer.Watch") as MockWatch:
449+
mock_w = MagicMock()
450+
mock_w.resource_version = "3"
451+
452+
def fake_stream(func, **kw):
453+
stream_calls["n"] += 1
454+
if stream_calls["n"] == 1:
455+
raise ApiException(status=410, reason="Gone")
456+
# Second stream (after re-list): stop cleanly
457+
informer._stop_event.set()
458+
return iter([])
459+
460+
mock_w.stream.side_effect = fake_stream
461+
MockWatch.return_value = mock_w
462+
463+
informer.start()
464+
informer._thread.join(timeout=3)
465+
466+
# list_func called twice: initial list + re-list after 410.
467+
self.assertEqual(list_func.call_count, 2)
468+
362469

363470
if __name__ == "__main__":
364471
unittest.main()

0 commit comments

Comments
 (0)