-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Expand file tree
/
Copy pathtest_informer.py
More file actions
225 lines (181 loc) · 8.25 KB
/
Copy pathtest_informer.py
File metadata and controls
225 lines (181 loc) · 8.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Copyright 2026 The Kubernetes Authors.
# Licensed under the Apache License, Version 2.0 (the "License").
# End-to-end tests for kubernetes.informer.SharedInformer.
import threading
import time
import unittest
import uuid
from kubernetes.client import api_client
from kubernetes.client.api import core_v1_api
from kubernetes.e2e_test import base
from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer
_TIMEOUT = 30
def _uid():
return str(uuid.uuid4())[-12:]
def _cm(name, payload=None):
return {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": name, "labels": {"inf-e2e": "1"}},
"data": payload or {"k": "v"},
}
def _name_of(obj):
if hasattr(obj, "metadata"):
return obj.metadata.name
return (obj.get("metadata") or {}).get("name")
class TestSharedInformerE2E(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.cfg = base.get_e2e_configuration()
cls.apiclient = api_client.ApiClient(configuration=cls.cfg)
cls.api = core_v1_api.CoreV1Api(cls.apiclient)
def _drop(self, cm_name):
try:
self.api.delete_namespaced_config_map(name=cm_name, namespace="default")
except Exception:
pass
def _expect(self, ev, label):
if not ev.wait(timeout=_TIMEOUT):
self.fail("Timeout waiting for: " + label)
def _wait_in_cache(self, inf, key):
stop = time.monotonic() + _TIMEOUT
while time.monotonic() < stop:
if inf.cache.get_by_key(key) is not None:
return
time.sleep(0.25)
self.fail("key " + key + " never appeared in cache")
def _wait_listed(self, inf):
stop = time.monotonic() + _TIMEOUT
while inf._resource_version is None and time.monotonic() < stop:
time.sleep(0.1)
self.assertIsNotNone(inf._resource_version, "initial list never completed")
# -------------------------------------------------------
def test_cache_populated_after_start(self):
"""Pre-existing ConfigMaps appear in the cache once the informer starts."""
name = "inf-pre-" + _uid()
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self.addCleanup(self._drop, name)
inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.start()
self.addCleanup(inf.stop)
self._wait_in_cache(inf, "default/" + name)
cached = inf.cache.get_by_key("default/" + name)
self.assertEqual(_name_of(cached), name)
# Verify the cached object actually contains the expected data payload.
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
self.assertEqual(data.get("k"), "v")
def test_added_event_and_cache_entry(self):
"""Creating a ConfigMap fires ADDED and the object appears in the cache."""
name = "inf-add-" + _uid()
seen = threading.Event()
inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)
self._wait_listed(inf)
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._expect(seen, "ADDED/" + name)
self.assertIsNotNone(inf.cache.get_by_key("default/" + name))
def test_modified_event_and_cache_refresh(self):
"""Patching a ConfigMap fires MODIFIED and the cache holds the updated object."""
name = "inf-mod-" + _uid()
seen = threading.Event()
inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(MODIFIED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._wait_in_cache(inf, "default/" + name)
self.api.patch_namespaced_config_map(
name=name, namespace="default", body={"data": {"k": "updated"}}
)
self._expect(seen, "MODIFIED/" + name)
# Verify that the cache now holds the updated data.
cached = inf.cache.get_by_key("default/" + name)
self.assertIsNotNone(cached)
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
self.assertEqual(data.get("k"), "updated")
def test_deleted_event_removes_from_cache(self):
"""Deleting a ConfigMap fires DELETED and removes it from the cache."""
name = "inf-del-" + _uid()
seen = threading.Event()
inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(DELETED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._wait_in_cache(inf, "default/" + name)
self.api.delete_namespaced_config_map(name=name, namespace="default")
self._expect(seen, "DELETED/" + name)
self.assertIsNone(inf.cache.get_by_key("default/" + name))
def test_resource_version_advances(self):
"""The stored resourceVersion advances after watch events are received."""
name = "inf-rv-" + _uid()
seen = threading.Event()
inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)
self._wait_listed(inf)
rv_before = int(inf._resource_version)
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._expect(seen, "ADDED/" + name)
self.assertGreater(int(inf._resource_version), rv_before)
def test_resync_fires_modified_for_existing_objects(self):
"""Periodic resync re-lists from the API server and fires MODIFIED for cached objects.
A short resync_period (5 s) is used so the test completes quickly.
After the informer has cached the ConfigMap via the initial list, we
wait for a MODIFIED event that is fired by the resync, verifying that
the resync actually contacts the API server and triggers callbacks.
"""
name = "inf-rsync-" + _uid()
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self.addCleanup(self._drop, name)
added = threading.Event()
resynced = threading.Event()
inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
resync_period=5,
)
inf.add_event_handler(ADDED, lambda o: added.set() if _name_of(o) == name else None)
# The resync fires MODIFIED for existing cached objects; wait for it.
inf.add_event_handler(MODIFIED, lambda o: resynced.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
# First, wait for the object to be added to the cache.
self._expect(added, "ADDED/" + name)
# Then wait for the resync to fire MODIFIED (allow up to 3× resync_period).
if not resynced.wait(timeout=15):
self.fail("Timeout waiting for resync MODIFIED/" + name)
# Verify the cached object still holds the expected data.
cached = inf.cache.get_by_key("default/" + name)
self.assertIsNotNone(cached)
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
self.assertEqual(data.get("k"), "v")
if __name__ == "__main__":
unittest.main()