3333from clusterfuzz ._internal .tests .test_libs import test_utils
3434
3535
36- @mock .patch (
37- 'clusterfuzz._internal.metrics.logs.get_logging_config_dict' ,
38- return_value = {
39- 'version' : 1 ,
40- 'disable_existing_loggers' : False ,
41- 'formatters' : {
42- 'simpleFormatter' : {
43- 'format' : '%(levelname)s:%(module)s:%(lineno)d:%(message)s'
44- }
45- },
46- 'handlers' : {
47- 'consoleHandler' : {
48- 'class' : 'logging.StreamHandler' ,
49- 'formatter' : 'simpleFormatter'
50- }
51- },
52- 'loggers' : {
53- 'root' : {
54- 'handlers' : ['consoleHandler' ],
55- 'level' : 'INFO'
56- }
57- }
58- })
5936@test_utils .with_cloud_emulators ('datastore' )
6037class KubernetesServiceE2ETest (unittest .TestCase ):
6138 """End-to-end tests for the Kubernetes service."""
@@ -66,166 +43,94 @@ def setUpClass(cls):
6643 if not os .getenv ('K8S_E2E' ):
6744 raise unittest .SkipTest ('K8S_E2E environment variable not set.' )
6845
69- cls .mock_batch_config = mock . Mock ()
70- cls .mock_batch_config . get . return_value = 'test-project '
46+ cls .cluster_name = 'test-cluster-for-e2e-test'
47+ cls .image = 'gcr.io/clusterfuzz-images/base:000dc1f-202511191429 '
7148
72- def get_batch_config (key ):
73- return {
74- 'project' : 'test-project' ,
75- 'mapping' : {
76- 'LINUX-PREEMPTIBLE-UNPRIVILEGED' : {
77- 'clusterfuzz_release' : 'prod' ,
78- 'docker_image' : cls .image ,
79- 'user_data' : 'file://linux-init.yaml' ,
80- 'disk_size_gb' : 10 ,
81- 'disk_type' : 'pd-standard' ,
82- 'service_account_email' : 'test-email' ,
83- 'preemptible' : True ,
84- 'machine_type' : 'machine-type' ,
85- 'subconfigs' : [{
86- 'name' : 'subconfig1' ,
87- 'weight' : 1
88- }]
89- },
90- 'LINUX-NONPREEMPTIBLE-UNPRIVILEGED' : {
91- 'clusterfuzz_release' : 'prod' ,
92- 'docker_image' : cls .image ,
93- 'user_data' : 'file://linux-init.yaml' ,
94- 'disk_size_gb' : 20 ,
95- 'disk_type' : 'pd-standard' ,
96- 'service_account_email' : 'test-email' ,
97- 'preemptible' : False ,
98- 'machine_type' : 'machine-type' ,
99- 'subconfigs' : [{
100- 'name' : 'subconfig1' ,
101- 'weight' : 1
102- }]
103- }
104- },
105- 'subconfigs' : {
106- 'subconfig1' : {
107- 'region' : 'region' ,
108- 'network' : 'network' ,
109- 'subnetwork' : 'subnetwork'
110- }
111- }
112- }.get (key )
49+ # Find `kind` executable.
50+ cls .kind_path = (
51+ shutil .which ('kind' ) or os .path .expanduser ('~/.local/bin/kind' ))
52+ if not cls .kind_path or not os .path .exists (cls .kind_path ):
53+ raise unittest .SkipTest ('kind executable not found.' )
11354
114- cls .mock_batch_config .get .side_effect = get_batch_config
115-
116- cls .mock_local_config = mock .Mock ()
117- cls .mock_local_config .BatchConfig .return_value = cls .mock_batch_config
118-
119- with mock .patch (
120- 'clusterfuzz._internal.config.local_config' , new = cls .mock_local_config ):
121- cls .cluster_name = 'test-cluster-for-e2e-test'
122- cls .image = 'gcr.io/clusterfuzz-images/base:000dc1f-202511191429'
123-
124- # First, try to find `kind` in the user's local bin directory.
125- home_dir = os .path .expanduser ('~' )
126- local_kind_path = os .path .join (home_dir , '.local' , 'bin' , 'kind' )
127-
128- if os .path .exists (local_kind_path ):
129- cls .kind_path = local_kind_path
130- else :
131- # Fallback to searching the PATH.
132- cls .kind_path = shutil .which ('kind' )
133-
134- # Ensure no old cluster exists.
135- subprocess .run (
136- [cls .kind_path , 'delete' , 'cluster' , '--name' , cls .cluster_name ],
137- check = False )
138-
139- subprocess .run (
140- [cls .kind_path , 'create' , 'cluster' , '--name' , cls .cluster_name ],
141- check = True )
55+ # Ensure no old cluster exists and create a new one.
56+ subprocess .run (
57+ [cls .kind_path , 'delete' , 'cluster' , '--name' , cls .cluster_name ],
58+ check = False )
59+ subprocess .run (
60+ [cls .kind_path , 'create' , 'cluster' , '--name' , cls .cluster_name ],
61+ check = True )
14262
143- # Explicitly get the kubeconfig from the kind cluster .
144- kubeconfig = subprocess .check_output (
145- [cls .kind_path , 'get' , 'kubeconfig' , '--name' ,
146- cls .cluster_name ]).decode ('utf-8' )
63+ # Get kubeconfig and load it .
64+ kubeconfig = subprocess .check_output (
65+ [cls .kind_path , 'get' , 'kubeconfig' , '--name' ,
66+ cls .cluster_name ]).decode ('utf-8' )
14767
148- # Write the kubeconfig to a temporary file and load it.
149- with tempfile .NamedTemporaryFile (mode = 'w' , delete = False ) as f :
150- f .write (kubeconfig )
151- cls .kubeconfig_path = f .name
152- k8s_config .load_kube_config (config_file = cls .kubeconfig_path )
153- cls .api_client = k8s_client .BatchV1Api ()
68+ with tempfile .NamedTemporaryFile (mode = 'w' , delete = False ) as f :
69+ f .write (kubeconfig )
70+ cls .kubeconfig_path = f .name
71+ k8s_config .load_kube_config (config_file = cls .kubeconfig_path )
15472
73+ cls .api_client = k8s_client .BatchV1Api ()
15574 cls .kubernetes_client = kubernetes_service .KubernetesService (
15675 k8s_config_loaded = True )
76+
77+ # Setup dummy jobs in datastore.
15778 data_types .Job (name = 'test-job' , platform = 'LINUX' ).put ()
15879 data_types .Job (name = 'test-job1' , platform = 'LINUX' ).put ()
15980 data_types .Job (name = 'test-job2' , platform = 'LINUX' ).put ()
16081
161- cls .mock_get_config_names = mock .Mock (
162- return_value = {
163- ('fuzz' , 'test-job' ): ('LINUX-PREEMPTIBLE-UNPRIVILEGED' , 10 , None ),
164- ('fuzz' , 'test-job1' ): ('LINUX-PREEMPTIBLE-UNPRIVILEGED' , 10 , None ),
165- ('fuzz' , 'test-job2' ): ('LINUX-NONPREEMPTIBLE-UNPRIVILEGED' , 20 ,
166- None ),
167- })
168- cls .mock_get_config_names_patcher = mock .patch (
169- 'clusterfuzz._internal.batch.service._get_config_names' ,
170- new = cls .mock_get_config_names )
171- cls .mock_get_config_names_patcher .start ()
172-
17382 @classmethod
17483 def tearDownClass (cls ):
17584 """Tear down the test environment."""
176- os .remove (cls .kubeconfig_path )
177- subprocess .run (
178- [cls .kind_path , 'delete' , 'cluster' , '--name' , cls .cluster_name ],
179- check = True )
180- cls .mock_get_config_names_patcher .stop ()
181-
182- def test_create_job (self , mock_get_logging_config_dict ):
183- """Tests creating a job."""
184- input_url = 'url'
185- task = remote_task_types .RemoteTask (None , 'test-job' , None )
186- task .docker_image = self .image
187-
188- config = KubernetesJobConfig (
189- job_type = 'test-job' ,
190- docker_image = self .image ,
191- command = task .command ,
192- disk_size_gb = 10 ,
193- service_account_email = 'test-email' ,
194- clusterfuzz_release = 'prod' ,
195- is_kata = False )
196- actual_job_name = self .kubernetes_client .create_job (config , input_url )
197-
198- # Wait for the job to be created.
199- time .sleep (5 )
85+ if hasattr (cls , 'kubeconfig_path' ) and os .path .exists (cls .kubeconfig_path ):
86+ os .remove (cls .kubeconfig_path )
87+ if hasattr (cls , 'kind_path' ) and cls .kind_path :
88+ subprocess .run (
89+ [cls .kind_path , 'delete' , 'cluster' , '--name' , cls .cluster_name ],
90+ check = True )
20091
201- job = self .api_client .read_namespaced_job (actual_job_name , 'default' )
202- self .assertIsNotNone (job )
203- self .assertEqual (job .metadata .name , actual_job_name )
92+ def _wait_for_job_and_delete (self , job_name ):
93+ """Waits for a job to start running and then deletes it."""
94+ # Wait for the job to be created in the API.
95+ time .sleep (2 )
20496
205- # Wait for the job to start running.
97+ # Wait for the job to start running (at least one active pod) .
20698 job_running = False
207- for _ in range (180 ):
208- job = self .api_client .read_namespaced_job (actual_job_name , 'default' )
99+ for _ in range (60 ):
100+ job = self .api_client .read_namespaced_job (job_name , 'default' )
209101 if job .status .active or job .status .succeeded :
210102 job_running = True
211103 break
212104 time .sleep (1 )
213105
214106 self .assertTrue (
215107 job_running ,
216- f'Job { actual_job_name } did not start running. Status: { job .status } ' )
108+ f'Job { job_name } did not start running. Status: { job .status } ' )
217109
110+ # Cleanup.
218111 self .api_client .delete_namespaced_job (
219- name = actual_job_name ,
112+ name = job_name ,
220113 namespace = 'default' ,
221114 body = k8s_client .V1DeleteOptions (propagation_policy = 'Foreground' ))
222115
116+ def test_create_job (self ):
117+ """Tests creating a job."""
118+ config = KubernetesJobConfig (
119+ job_type = 'test-job' ,
120+ docker_image = self .image ,
121+ command = 'fuzz' ,
122+ disk_size_gb = 10 ,
123+ service_account_email = 'test-email' ,
124+ clusterfuzz_release = 'prod' ,
125+ is_kata = False )
126+ actual_job_name = self .kubernetes_client .create_job (config , 'url' )
127+ self ._wait_for_job_and_delete (actual_job_name )
128+
223129 @mock .patch ('clusterfuzz._internal.k8s.service._get_k8s_job_configs' )
224130 @mock .patch (
225131 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module' )
226132 def test_create_uworker_main_batch_job (self , mock_get_command_from_module ,
227- mock_get_k8s_job_configs ,
228- mock_get_logging_config_dict ):
133+ mock_get_k8s_job_configs ):
229134 """Tests creating a single uworker main batch job."""
230135 mock_get_command_from_module .return_value = 'fuzz'
231136 config = KubernetesJobConfig (
@@ -238,41 +143,15 @@ def test_create_uworker_main_batch_job(self, mock_get_command_from_module,
238143 is_kata = False )
239144 mock_get_k8s_job_configs .return_value = {('fuzz' , 'test-job' ): config }
240145
241- actual_job_name = \
242- self .kubernetes_client .create_utask_main_job (
243- 'module' , 'test-job' , 'url1' )
244-
245- # Wait for the job to be created.
246- time .sleep (5 )
247-
248- job = self .api_client .read_namespaced_job (actual_job_name , 'default' )
249- self .assertIsNotNone (job )
250- self .assertEqual (job .metadata .name , actual_job_name )
251-
252- # Wait for the job to start running.
253- job_running = False
254- for _ in range (180 ):
255- job = self .api_client .read_namespaced_job (actual_job_name , 'default' )
256- if job .status .active or job .status .succeeded :
257- job_running = True
258- break
259- time .sleep (1 )
260-
261- self .assertTrue (
262- job_running ,
263- f'Job { actual_job_name } did not start running. Status: { job .status } ' )
264-
265- self .api_client .delete_namespaced_job (
266- name = actual_job_name ,
267- namespace = 'default' ,
268- body = k8s_client .V1DeleteOptions (propagation_policy = 'Foreground' ))
146+ actual_job_name = self .kubernetes_client .create_utask_main_job (
147+ 'module' , 'test-job' , 'url1' )
148+ self ._wait_for_job_and_delete (actual_job_name )
269149
270150 @mock .patch ('clusterfuzz._internal.k8s.service._get_k8s_job_configs' )
271151 @mock .patch (
272152 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module' )
273153 def test_create_uworker_main_batch_jobs (self , mock_get_command_from_module ,
274- mock_get_k8s_job_configs ,
275- mock_get_logging_config_dict ):
154+ mock_get_k8s_job_configs ):
276155 """Tests creating multiple uworker main batch jobs."""
277156 mock_get_command_from_module .return_value = 'fuzz'
278157 config1 = KubernetesJobConfig (
@@ -285,7 +164,7 @@ def test_create_uworker_main_batch_jobs(self, mock_get_command_from_module,
285164 is_kata = False )
286165 config2 = KubernetesJobConfig (
287166 job_type = 'test-job2' ,
288- docker_image = 'different- image' ,
167+ docker_image = self . image ,
289168 command = 'fuzz' ,
290169 disk_size_gb = 20 ,
291170 service_account_email = 'test-email' ,
@@ -302,35 +181,11 @@ def test_create_uworker_main_batch_jobs(self, mock_get_command_from_module,
302181 remote_task_types .RemoteTask ('fuzz' , 'test-job2' , 'url2' ),
303182 ]
304183
305- actual_job_names = \
306- self .kubernetes_client .create_utask_main_jobs (tasks )
184+ actual_job_names = self .kubernetes_client .create_utask_main_jobs (tasks )
307185 self .assertEqual (len (actual_job_names ), 2 )
308186
309187 for job_name in actual_job_names :
310- # Wait for the job to be created.
311- time .sleep (5 )
312-
313- job = self .api_client .read_namespaced_job (job_name , 'default' )
314- self .assertIsNotNone (job )
315- self .assertEqual (job .metadata .name , job_name )
316-
317- # Wait for the job to start running.
318- job_running = False
319- for _ in range (180 ):
320- job = self .api_client .read_namespaced_job (job_name , 'default' )
321- if job .status .active or job .status .succeeded :
322- job_running = True
323- break
324- time .sleep (1 )
325-
326- self .assertTrue (
327- job_running ,
328- f'Job { job_name } did not start running. Status: { job .status } ' )
329-
330- self .api_client .delete_namespaced_job (
331- name = job_name ,
332- namespace = 'default' ,
333- body = k8s_client .V1DeleteOptions (propagation_policy = 'Foreground' ))
188+ self ._wait_for_job_and_delete (job_name )
334189
335190
336191if __name__ == '__main__' :
0 commit comments