-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathworkloads_operations.py
More file actions
707 lines (659 loc) · 34.5 KB
/
workloads_operations.py
File metadata and controls
707 lines (659 loc) · 34.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
import ast
import os
import time
import tarfile
import shutil
from csv import DictReader
from datetime import datetime, timezone, timedelta
from benchmark_runner.common.logger.logger_time_stamp import logger_time_stamp
from benchmark_runner.workloads.workloads_exceptions import ODFNotInstalled, CNVNotInstalled, KataNotInstalled, EmptyLSOPath, MissingScaleNodes, MissingRedis
from benchmark_runner.common.oc.oc import OC
from benchmark_runner.common.virtctl.virtctl import Virtctl
from benchmark_runner.common.elasticsearch.elasticsearch_operations import ElasticSearchOperations
from benchmark_runner.main.environment_variables import environment_variables
from benchmark_runner.common.clouds.shared.s3.s3_operations import S3Operations
from benchmark_runner.common.prometheus.prometheus_snapshot import PrometheusSnapshot
from benchmark_runner.common.prometheus.prometheus_snapshot_exceptions import PrometheusSnapshotError
from benchmark_runner.common.template_operations.template_operations import TemplateOperations
from benchmark_runner.common.clouds.BareMetal.bare_metal_operations import BareMetalOperations
from benchmark_runner.common.prometheus.prometheus_metrics_operations import PrometheusMetricsOperation
from benchmark_runner.common.google_drive.google_drive_operations import GoogleDriveOperations
class WorkloadsOperations:
oc = None
MILLISECONDS = 1000
REPEAT_TIMES = 3
SLEEP_TIME = 3
"""
This class contains workloads operations
"""
def __init__(self):
# environment variables
self._environment_variables_dict = environment_variables.environment_variables_dict
self._workload = self._environment_variables_dict.get('workload', '')
self._build_version = self._environment_variables_dict.get('build_version', '')
self._workloads_odf_pvc = list(self._environment_variables_dict.get('workloads_odf_pvc', ''))
self._odf_pvc = self._environment_variables_dict.get('odf_pvc', True)
self._kubeadmin_password = self._environment_variables_dict.get('kubeadmin_password', '')
self._run_type = self._environment_variables_dict.get('run_type', '')
self._trunc_uuid = self._environment_variables_dict.get('trunc_uuid', '')
self._uuid = self._environment_variables_dict.get('uuid', '')
self._date_key = self._environment_variables_dict.get('date_key', '')
self._key = self._environment_variables_dict.get('key', '')
self._time_stamp_format = self._environment_variables_dict.get('time_stamp_format', '')
self._endpoint_url = self._environment_variables_dict.get('endpoint_url', '')
self._run_artifacts_path = self._environment_variables_dict.get('run_artifacts_path', '')
self._save_artifacts_local = self._environment_variables_dict.get('save_artifacts_local', '')
self._enable_prometheus_snapshot = self._environment_variables_dict.get('enable_prometheus_snapshot', False)
self._run_artifacts_url = self._environment_variables_dict.get('run_artifacts_url', '')
self._pin_node1 = self._environment_variables_dict.get('pin_node1', '')
self._pin_node2 = self._environment_variables_dict.get('pin_node2', '')
self._pin_node0 = self._environment_variables_dict.get('pin_node0', '')
self._es_host = self._environment_variables_dict.get('elasticsearch', '')
self._es_port = self._environment_variables_dict.get('elasticsearch_port', '')
self._es_user = self._environment_variables_dict.get('elasticsearch_user', '')
self._es_password = self._environment_variables_dict.get('elasticsearch_password', '')
self._es_url_protocol = self._environment_variables_dict['elasticsearch_url_protocol']
self._scale = self._environment_variables_dict.get('scale', '')
self._redis = self._environment_variables_dict.get('redis', '')
self._threads_limit = self._environment_variables_dict.get('threads_limit', '')
self._kata_thread_pool_size = self._environment_variables_dict.get('kata_thread_pool_size', '')
self._cnv_version = self._environment_variables_dict.get('cnv_version', '')
self._odf_version = self._environment_variables_dict.get('odf_version', '')
if self._scale:
self._scale = int(self._scale)
self._scale_nodes = self._environment_variables_dict.get('scale_nodes', '')
self._redis = self._environment_variables_dict.get('redis', '')
if not self._scale_nodes:
raise MissingScaleNodes()
if not self._redis and 'vdbench' in self._workload:
raise MissingRedis()
self._scale_node_list = ast.literal_eval(self._scale_nodes)
if self._threads_limit:
self._threads_limit = int(self._threads_limit)
else:
self._threads_limit = self._scale * len(self._scale_node_list)
self._bulk_sleep_time = int(self._environment_variables_dict.get('bulk_sleep_time', ''))
else:
self._scale_node_list = []
self._timeout = int(self._environment_variables_dict.get('timeout', ''))
# Elasticsearch connection
if self._es_host and self._es_port:
self._es_operations = ElasticSearchOperations(es_host=self._es_host,
es_port=self._es_port,
es_user=self._es_user,
es_password=self._es_password,
es_url_protocol=self._es_url_protocol,
timeout=self._timeout)
# Generate templates class - need Virtctl first for SSH key generation
# get oc instance
if WorkloadsOperations.oc is None:
self._oc = self.get_oc(kubeadmin_password=self._kubeadmin_password)
self._virtctl = Virtctl()
# Generate SSH key for VM workloads (needed before template generation)
if '_vm' in self._workload:
ssh_key_dir = os.path.join(self._run_artifacts_path, 'ssh')
os.makedirs(ssh_key_dir, exist_ok=True)
self._ssh_key_path = self._virtctl.generate_ssh_key(key_path=os.path.join(ssh_key_dir, 'vm_key'))
self._environment_variables_dict['ssh_public_key'] = self._virtctl.get_ssh_public_key(self._ssh_key_path)
self._environment_variables_dict['ssh_key_path'] = self._ssh_key_path
self._template = TemplateOperations(workload=self._workload)
# Prometheus Snapshot
self._prometheus_result = {}
if self._enable_prometheus_snapshot:
self._snapshot = PrometheusSnapshot(oc=self._oc, artifacts_path=self._run_artifacts_path, verbose=True)
self._prometheus_snap_interval = self._environment_variables_dict.get('prometheus_snap_interval', '')
self._prometheus_metrics_operation = PrometheusMetricsOperation()
# Extract lso id for LSO workload
# Extract lso id for LSO workload
if '_lso' in self._environment_variables_dict.get('workload'):
self._oc.delete_namespace()
self._oc.delete_available_released_pv()
# Update lso_disk_id only if both worker_disk_ids and a free disk exist
if self._environment_variables_dict.get('worker_disk_ids', '') and self._oc.get_free_disk_id(
node=self._environment_variables_dict['lso_node']):
self._lso_disk_id = self._oc.get_free_disk_id(node=self._environment_variables_dict['lso_node'])
self._environment_variables_dict['lso_disk_id'] = self._lso_disk_id
else:
self._lso_disk_id = self._environment_variables_dict.get('lso_disk_id', '')
else:
self._lso_disk_id = None
self._windows_url = self._environment_variables_dict.get('windows_url', '')
self._create_vms_only = self._environment_variables_dict.get('create_vms_only', '')
self._delete_all = self._environment_variables_dict.get('delete_all', '')
self._verification_only = self._environment_variables_dict.get('verification_only', '')
self._must_gather_log = self._environment_variables_dict.get('must_gather_log', '')
self._test_name = self._environment_variables_dict.get('test_name', '')
self._wait_for_upgrade_version = self._environment_variables_dict.get('wait_for_upgrade_version', '')
if self._windows_url:
file_name = os.path.basename(self._windows_url)
self._windows_os = os.path.splitext(file_name)[0]
# google drive
self._google_drive_path = self._environment_variables_dict.get('google_drive_path', '')
self._google_drive_credentials_file = self._environment_variables_dict.get('google_drive_credentials_file', '')
self._google_drive_token_file = self._environment_variables_dict.get('google_drive_token_file', '')
self._google_drive_shared_drive_id = self._environment_variables_dict.get('google_drive_shared_drive_id', '')
if self._google_drive_path:
self._google_drive_operation = GoogleDriveOperations(google_drive_path=self._google_drive_path,
google_drive_credentials_file=self._google_drive_credentials_file,
google_drive_token_file=self._google_drive_token_file,
google_drive_shared_drive_id=self._google_drive_shared_drive_id)
self._upgrade_ocp_version = self._environment_variables_dict.get('upgrade_ocp_version', '')
self._upgrade_masters_duration_seconds = self._environment_variables_dict.get('upgrade_masters_duration_seconds', '')
self._upgrade_workers_duration_seconds = self._environment_variables_dict.get('upgrade_workers_duration_seconds', '')
self._run_strategy = self._environment_variables_dict.get('run_strategy', '')
self._product_versions = self._environment_variables_dict.get('product_versions', '')
self._storage_type = self._environment_variables_dict.get('storage_type', '')
def _get_workload_file_name(self, workload):
"""
This method returns workload name
:return:
"""
if self._scale:
return f'{workload}-scale-{self._time_stamp_format}'
else:
return f'{workload}-{self._time_stamp_format}'
def get_oc(self, kubeadmin_password: str = ''):
"""
This method returns oc instance
:param kubeadmin_password:
:return: oc instance
"""
self._oc = OC(kubeadmin_password=kubeadmin_password)
return self._oc
@logger_time_stamp
def delete_all(self):
"""
This method deletes all resources in namespace
:return:
"""
self._oc.delete_namespace()
if self._storage_type == 'lso':
self._oc.delete_available_released_pv()
self._oc.remove_lso_path()
@logger_time_stamp
def start_prometheus(self):
"""
This method starts collection of Prometheus snapshot
:return:
"""
if self._enable_prometheus_snapshot:
try:
self._snapshot.prepare_for_snapshot()
except PrometheusSnapshotError as err:
raise PrometheusSnapshotError(err)
except Exception as err:
raise err
@logger_time_stamp
def end_prometheus(self):
"""
This method retrieves the Prometheus snapshot
:return:
"""
if self._enable_prometheus_snapshot:
try:
self._snapshot.retrieve_snapshot()
except PrometheusSnapshotError as err:
raise PrometheusSnapshotError(err)
except Exception as err:
raise err
@logger_time_stamp
def odf_workload_verification(self):
"""
This method verifies whether the ODF operator is installed for ODF workloads and raises an error if it is missing, skip for LSO workload
:return:
"""
workload_name = self._workload.split('_')
if workload_name[0] in self._workloads_odf_pvc and '_lso' not in self._workload:
if not self._oc.is_odf_installed():
raise ODFNotInstalled(workload=self._workload)
@logger_time_stamp
def verify_lso(self):
"""
This method verifies that lso disk id is set
:return:
"""
if not self._lso_disk_id:
raise EmptyLSOPath()
def _create_vm_log(self, labels: list) -> str:
"""
This method sets vm log per workload
:param labels: list of labels
:return: vm_name
"""
vm_name = ''
for label in labels:
vm_name = self._oc.get_vm(label=label)
self._virtctl.save_vm_log(vm_name=vm_name)
return vm_name
def _create_pod_log(self, pod: str = '', log_type: str = ''):
"""
This method creates pod log per workload
:param pod: pod name
:return: save_pod_log file
"""
pod_name = self._oc.get_pod(label=pod)
return self._oc.save_pod_log(pod_name=pod_name, log_type=log_type)
def _get_run_artifacts_hierarchy(self, workload_name: str = '', is_file: bool = False):
"""
This method returns log hierarchy
:param workload_name: workload name
:param is_file: is file name
:return:
"""
key = self._key
run_type = self._run_type.replace('_', '-')
date_key = self._date_key
if workload_name:
workload_key = workload_name.split('-')[0]
if is_file:
return os.path.join(key, run_type, date_key, workload_key, workload_name)
else:
return os.path.join(key, run_type, date_key, workload_key)
return os.path.join(key, run_type, date_key)
@staticmethod
def __is_float(value) -> bool:
"""
This method checks if value is float
:param value: The value to check
:return: True if the value is a float, False when cannot be converted to a float
"""
try:
float(value)
return True
except (ValueError, TypeError):
return False
def _create_scale_logs(self):
"""
The method creates scale logs
:return:
"""
self._create_pod_log(pod='state-signals-exporter', log_type='.log')
self._create_pod_log(pod='redis-master', log_type='.log')
def _create_pod_run_artifacts(self, pod_name: str, log_type: str):
"""
This method creates pod run artifacts
:param pod_name: pod name
:param log_type: log type extension
:return: run results list of dicts
"""
result_list = []
pod_log_file = self._create_pod_log(pod=pod_name, log_type=log_type)
workload_name = self._environment_variables_dict.get('workload', '').replace('_', '-')
# csv to dictionary
the_reader = DictReader(open(pod_log_file, 'r'))
for line_dict in the_reader:
for key, value in line_dict.items():
if self.__is_float(value):
num = float(value)
line_dict[key] = round(num, 3)
elif value == 'n/a':
line_dict[key] = 0.0
line_dict['pod_name'] = pod_name
workload = self._get_workload_file_name(workload=self._get_run_artifacts_hierarchy(workload_name=workload_name, is_file=True))
line_dict['run_artifacts_url'] = os.path.join(self._run_artifacts_url, f'{workload}.tar.gz')
result_list.append(dict(line_dict))
return result_list
def _create_run_artifacts(self, workload: str = '', labels: list = None):
"""
This method creates pod logs for direct pod workloads (no operator)
:param workload: workload name
:param labels: list of pod labels - use when pod labels differ from workload name
:return: run artifacts url
"""
# Create pod logs for workload pods
if labels:
for pod_label in labels:
self._create_pod_log(pod=pod_label)
elif workload:
self._create_pod_log(pod=workload)
workload_name = self._workload.replace('_', '-')
return os.path.join(self._environment_variables_dict.get('run_artifacts_url', ''), f'{self._get_run_artifacts_hierarchy(workload_name=workload_name, is_file=True)}-{self._time_stamp_format}.tar.gz')
def __make_run_artifacts_tarfile(self, workload: str):
"""
This method compresses the run artifacts directory and returns the compressed file path.
:param workload: The name of the workload used to name the archive contents.
:return: Path to the created tar.gz file.
"""
if self._test_name:
tar_run_artifacts_path = f"{self._run_artifacts_path}_{self._test_name}.tar.gz"
else:
tar_run_artifacts_path = f"{self._run_artifacts_path}.tar.gz"
with tarfile.open(tar_run_artifacts_path, mode='w:gz') as archive:
workload_file_name = self._get_workload_file_name(workload)
if self._test_name:
workload_file_name= f'{workload_file_name}_{self._test_name}'
archive.add(self._run_artifacts_path, arcname=workload_file_name, recursive=True)
return tar_run_artifacts_path
@logger_time_stamp
def delete_local_artifacts(self):
"""
This method deletes local artifacts
:return:
"""
workload = self._workload.replace('_', '-')
tar_run_artifacts_path = self.__make_run_artifacts_tarfile(workload)
# remove local run artifacts workload folder
# verify that its not empty path
if len(self._run_artifacts_path) > 3 and self._run_artifacts_path != '/' and self._run_artifacts_path and tar_run_artifacts_path and os.path.isfile(tar_run_artifacts_path):
# remove run_artifacts_path
shutil.rmtree(path=self._run_artifacts_path)
# remove tar.gz file
os.remove(path=tar_run_artifacts_path)
@logger_time_stamp
def upload_run_artifacts_to_s3(self):
"""
This method uploads log to s3
:return:
"""
workload = self._workload.replace('_', '-')
tar_run_artifacts_path = self.__make_run_artifacts_tarfile(workload)
run_artifacts_hierarchy = self._get_run_artifacts_hierarchy(workload_name=workload)
# Upload when endpoint_url is not None
s3operations = S3Operations()
# change workload to key convention
workload_file_name = self._get_workload_file_name(workload)
upload_file = f"{workload_file_name}.tar.gz"
s3operations.upload_file(file_name_path=tar_run_artifacts_path,
bucket=self._environment_variables_dict.get('bucket', ''),
key=str(run_artifacts_hierarchy),
upload_file=upload_file)
def get_run_artifacts_google_drive(self):
"""
This method returns google drive run artifacts folder path
:return:
"""
workload = self._workload.replace('_', '-')
run_artifacts_hierarchy = self._get_run_artifacts_hierarchy(workload_name=workload)
return self._google_drive_operation.get_drive_folder_url(folder_path=run_artifacts_hierarchy, parent_folder_id=self._google_drive_shared_drive_id)
def create_run_artifacts_google_drive(self):
"""
This method creates google drive run artifacts folder path
:return:
"""
workload = self._workload.replace('_', '-')
run_artifacts_hierarchy = self._get_run_artifacts_hierarchy(workload_name=workload)
self._google_drive_operation.create_drive_folder_url(folder_path=run_artifacts_hierarchy, parent_folder_id=self._google_drive_shared_drive_id)
@logger_time_stamp
def upload_run_artifacts_to_google_drive(self):
"""
This method uploads log to google drive
:return:
"""
workload = self._workload.replace('_', '-')
tar_run_artifacts_path = self.__make_run_artifacts_tarfile(workload)
run_artifacts_hierarchy = self._get_run_artifacts_hierarchy(workload_name=workload)
# change workload to key convention
workload_file_name = self._get_workload_file_name(workload)
upload_file = f"{workload_file_name}.tar.gz"
self._google_drive_operation.upload_file_to_folder(file_path=tar_run_artifacts_path,
folder_path=str(run_artifacts_hierarchy),
parent_folder_id=self._google_drive_shared_drive_id)
def __get_metadata(self, kind: str = None, status: str = None, database: str = None, run_artifacts_url: str = None, prometheus_result: dict = None, result: dict = None) -> dict:
"""
This method returns metadata for a run, optionally updates by runtime kind
@param kind: optionally: pod, vm, or kata
@param status:
@param result:
:return:
"""
date_format = '%Y_%m_%d'
if self._storage_type == 'ephemeral':
odf_disk_count = -1
else:
odf_disk_count = self._oc.get_odf_disk_count()
odf_disk_count = -1 if odf_disk_count in {0, 1} else odf_disk_count
metadata = {'ocp_version': self._oc.get_ocp_server_version(),
'previous_ocp_version': '' if len(self._oc.get_previous_ocp_version()) > 10 else self._oc.get_previous_ocp_version(),
'cnv_version': self._oc.get_cnv_version(),
'nhc_version': self._oc.get_nhc_version(),
'far_version': self._oc.get_far_version(),
'kata_version': self._oc.get_kata_operator_version(),
'kata_rpm_version': self._oc.get_kata_rpm_version(node=self._pin_node1),
'odf_version': self._oc.get_odf_version(),
'runner_version': self._build_version,
'version': int(self._build_version.split('.')[-1]),
'vm_os_version': self._product_versions.get('db_vm_os_version', 'centos-stream9'),
'ci_date': datetime.now().strftime(date_format),
'uuid': self._uuid,
'run_id': 'NA',
'pin_node1': self._pin_node1,
'pin_node2': self._pin_node2,
'pin_node0': self._pin_node0,
'storage_type': self._storage_type,
'odf_disk_count': odf_disk_count
}
if kind:
metadata.update({'kind': kind})
if status:
metadata.update({'run_status': status})
if run_artifacts_url:
metadata.update({'run_artifacts_url': run_artifacts_url})
if prometheus_result:
metadata.update(prometheus_result)
if self._scale:
metadata.update({'scale': int(self._scale)*len(self._scale_node_list)})
if 'bootstorm' in self._workload:
metadata.update({'vm_os_version': 'fedora37'})
if 'win' in self._workload:
metadata.update({'vm_os_version': self._windows_os})
# for hammerdb
product_versions = self._product_versions if isinstance(self._product_versions, dict) else {}
if database == 'mssql':
metadata.update({'db_type': 'mssql', 'db_version': product_versions.get('mssql', 2025), 'storage_type': self._storage_type})
elif database == 'postgres':
metadata.update({'db_type': 'pg', 'db_version': product_versions.get('postgres', 13), 'storage_type': self._storage_type})
elif database == 'mariadb':
metadata.update({'db_type': 'mariadb', 'db_version': product_versions.get('mariadb', 10.5), 'storage_type': self._storage_type})
if database:
metadata.update({'hammerdb_version': self._product_versions.get('hammerdb', 4.12)})
if 'stressng' in self._workload:
metadata.update({'stressng_version': self._product_versions.get('stressng', '0.20.01')})
if 'uperf' in self._workload:
metadata.update({'uperf_version': self._product_versions.get('uperf', '1.0.8')})
if self._test_name:
metadata.update({'test_name': self._test_name})
if result:
metadata.update(result)
return metadata
def _get_index_ids_between_dates(self, index: str, key:str):
end_datetime = datetime.now(timezone.utc)
start_datetime = end_datetime - timedelta(hours=4)
return self._es_operations.get_index_ids_between_dates(index=index, key=key, start_datetime=start_datetime, end_datetime=end_datetime)
def _get_latest_resource_with_key(self, index: str, key: str) -> dict:
"""
This method get index key value
:param index:
:param key:
:return:
"""
# Current time in UTC
current_datetime = datetime.now(timezone.utc)
# Start time: 6 hours before UTC
start_datetime = current_datetime - timedelta(hours=1)
end_datetime = current_datetime + timedelta(hours=1)
return self._es_operations.get_latest_resource_with_key(index=index, key=key, start_datetime=start_datetime, end_datetime=end_datetime)
def _upload_to_elasticsearch(self, index: str, kind: str, status: str, result: dict = None, database: str = ''):
"""
This method uploads results to elasticsearch
:param index:
:param kind:
:param status:
:param result:
:param database:
:return:
"""
self._es_operations.upload_to_elasticsearch(index=index, data=self.__get_metadata(kind=kind, status=status, database=database, result=result))
@logger_time_stamp
def _update_elasticsearch_index(self, index: str, id: str, kind: str, status: str, run_artifacts_url: str, database: str = '', vm_name: str = '', data_updated: bool = False, prometheus_result: dict = None):
"""
This method updates elasticsearch id
:param index:
:param id:
:param kind:
:param database:
:param status:
:param run_artifacts_url:
:param data_updated: check if data was updated
:param prometheus_result:
:return:
"""
metadata = self.__get_metadata(kind=kind, database=database, status=status, run_artifacts_url=run_artifacts_url, prometheus_result=prometheus_result)
if vm_name:
metadata.update({'vm_name': vm_name})
metadata.update({'data_updated': data_updated})
self._es_operations.update_elasticsearch_index(index=index, id=id, metadata=metadata)
def _verify_elasticsearch_data_uploaded(self, index: str, uuid: str, timeout: int = None):
"""
This method verifies that elasticsearch data was uploaded
:param index:
:param uuid:
:param timeout:
:return:
"""
return self._es_operations.verify_elasticsearch_data_uploaded(index=index, uuid=uuid, timeout=timeout)
def __parse_duration(self, value):
try:
return int(float(value))
except (TypeError, ValueError):
return None
@logger_time_stamp
def update_ci_status(self, status: str, ci_minutes_time: int, benchmark_runner_id: str, ocp_install_minutes_time: int = 0, ocp_resource_install_minutes_time: int = 0):
"""
This method updates ci status Pass/Failed
:param status: Pass/Failed
:param ci_minutes_time: ci time in minutes
:param benchmark_runner_id: benchmark_runner last repository commit id
:param ocp_install_minutes_time: ocp install minutes time, default 0 because ocp install run once a week
:param ocp_resource_install_minutes_time: ocp install minutes time, default 0 because ocp install run once a week
:return:
"""
if self._run_type == 'test_ci':
es_index = 'ci-status-test'
elif self._run_type == 'chaos_ci':
es_index = 'ci-status-chaos'
else:
es_index = 'ci-status'
status_dict = {'failed': 0, 'pass': 1}
metadata = self.__get_metadata()
if ocp_resource_install_minutes_time != 0:
bm_operations = BareMetalOperations(user=self._environment_variables_dict.get('provision_user', ''))
ocp_install_minutes_time = bm_operations.get_ocp_install_time()
metadata.update({'status': status, 'status#': status_dict[status], 'ci_minutes_time': ci_minutes_time, 'benchmark_runner_id': benchmark_runner_id, 'ocp_install_minutes_time': ocp_install_minutes_time, 'ocp_resource_install_minutes_time': ocp_resource_install_minutes_time, 'upgrade_masters_duration_seconds': self.__parse_duration(self._upgrade_masters_duration_seconds), 'upgrade_workers_duration_seconds': self.__parse_duration(self._upgrade_workers_duration_seconds)})
self._es_operations.upload_to_elasticsearch(index=es_index, data=metadata)
@logger_time_stamp
def split_run_bulks(self, iterable: range, limit: int = 1):
"""
This method splits run into bulk depends on threads limit
@return: run bulks
"""
length = len(iterable)
for ndx in range(0, length, limit):
yield iterable[ndx:min(ndx + limit, length)]
@logger_time_stamp
def clear_nodes_cache(self):
"""
This method clears nodes cache
"""
for i in range(self.REPEAT_TIMES-1):
self._oc.clear_node_caches()
time.sleep(self.SLEEP_TIME)
self._oc.clear_node_caches()
def initialize_workload(self):
"""
This method includes all the initialization of workload
:return:
"""
# Verify that CNV operator in installed for CNV workloads
if '_vm' in self._workload and not self._oc.is_cnv_installed():
raise CNVNotInstalled(workload=self._workload)
# Verify that Kata operator in installed for kata workloads
if '_kata' in self._workload and not self._oc.is_kata_installed():
raise KataNotInstalled(workload=self._workload)
if self._delete_all:
self.delete_all()
self.clear_nodes_cache()
if self._odf_pvc:
self.odf_workload_verification()
if 'lso' in self._workload:
self.verify_lso()
self._template.generate_yamls(scale=str(self._scale), scale_nodes=self._scale_node_list, redis=self._redis, thread_limit=self._threads_limit)
if self._enable_prometheus_snapshot:
self.start_prometheus()
if self._google_drive_path:
self.create_run_artifacts_google_drive()
def finalize_workload(self):
"""
This method includes all the finalization of workload
:return:
"""
self._oc.collect_events()
if self._enable_prometheus_snapshot:
self.end_prometheus()
if self._endpoint_url:
self.upload_run_artifacts_to_s3()
elif self._google_drive_path:
self.upload_run_artifacts_to_google_drive()
if not self._save_artifacts_local:
self.delete_local_artifacts()
if self._delete_all:
self.delete_all()
# ── HammerDB helpers ──────────────────────────────────────────────────────
def _hammerdb_elasticsearch_template_fields(self) -> dict:
"""Return common HammerDB Elasticsearch fields derived from the workload name."""
workload_parts = self._workload.split('_')
database = workload_parts[2] if len(workload_parts) >= 3 else ''
db_type_map = {'mariadb': 'mariadb', 'mssql': 'mssql', 'postgres': 'pg'}
product_versions = self._product_versions if isinstance(self._product_versions, dict) else {}
db_version_defaults = {'mariadb': 10.5, 'mssql': 2025, 'postgres': 13}
return {
'db_type': db_type_map.get(database, database),
'db_version': product_versions.get(database, db_version_defaults.get(database, '')),
'storage_type': self._storage_type,
}
def _hammerdb_thread_results(self, results_list: list) -> list:
"""Return the per-thread result list (enriched with common template fields)."""
if not results_list:
return []
common_fields = self._hammerdb_elasticsearch_template_fields()
return [{**common_fields, **entry} for entry in results_list]
def _upload_hammerdb_thread_result(self, index: str, kind: str, status: str,
run_artifacts_url: str, database: str,
thread_result: dict):
"""Upload a single per-thread HammerDB result to Elasticsearch."""
result = {'run_artifacts_url': run_artifacts_url, **thread_result}
if self._enable_prometheus_snapshot:
result.update(self._prometheus_result)
self._upload_to_elasticsearch(index=index, kind=kind, status=status,
result=result, database=database)
def _parse_hammerdb_results_pod(self, log_output: str,
source_label: str = '') -> list:
"""Parse HammerDB JSON results from pod log output.
The results section is expected to be a JSON array somewhere in the log.
"""
import json as _json
from benchmark_runner.common.logger.logger_time_stamp import logger
start = log_output.find('[')
end = log_output.rfind(']')
if start == -1 or end == -1 or end <= start:
logger.warning('_parse_hammerdb_results_pod: no JSON array found in %s', source_label)
return []
try:
data = _json.loads(log_output[start:end + 1])
if isinstance(data, list):
return data
except ValueError as err:
logger.warning('_parse_hammerdb_results_pod: JSON parse error in %s: %s', source_label, err)
return []
def _parse_hammerdb_results_vm(self, json_path: str) -> list:
"""Load and return the list of per-thread result dicts from a JSON file."""
import json as _json
try:
with open(json_path, encoding='utf-8-sig') as f:
data = _json.load(f)
if isinstance(data, list):
return data
return []
except (OSError, ValueError) as err:
from benchmark_runner.common.logger.logger_time_stamp import logger
logger.warning('_parse_hammerdb_results_vm: could not read %s: %s', json_path, err)
return []