-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathtest_persist_critical_error.py
More file actions
134 lines (106 loc) · 4.4 KB
/
Copy pathtest_persist_critical_error.py
File metadata and controls
134 lines (106 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
import json
import shutil
import threading
import unittest
from pynumaflow.errors.errors import persist_critical_error, _persist_error_once
from pynumaflow.errors.errors import _persist_critical_error_to_file
from pynumaflow._constants import CONTAINER_TYPE, INTERNAL_ERROR_CODE
class TestErrorPersistence(unittest.TestCase):
def setUp(self):
"""
Set up temporary directories for tests.
"""
self.test_dirs = ["/tmp/test_error_dir", "/tmp/test_dir"]
def tearDown(self):
"""
Clean up temporary directories after tests.
"""
for dir_path in self.test_dirs:
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
# Writes error details to a JSON file
def test_writes_error_details_to_json_file(self):
"""
Test that _persist_critical_error_to_file writes error details to a JSON file.
"""
dir_path = self.test_dirs[0]
error_code = "500"
error_message = "Server Error"
error_details = "An unexpected error occurred."
_persist_critical_error_to_file(error_code, error_message, error_details, dir_path)
container_dir = os.path.join(dir_path, CONTAINER_TYPE)
self.assertTrue(os.path.exists(container_dir))
# Debug: Check directory after the function call
print(f"After: {os.listdir(container_dir)}")
files = os.listdir(container_dir)
self.assertEqual(len(files), 1)
final_file_name = files[0]
final_file_path = os.path.join(container_dir, final_file_name)
with open(final_file_path) as f:
data = json.load(f)
self.assertEqual(data["code"], error_code)
self.assertEqual(data["message"], error_message)
self.assertEqual(data["details"], error_details)
self.assertEqual(data["container"], CONTAINER_TYPE)
self.assertTrue(isinstance(data["timestamp"], int))
# Uses default error code if none provided
def test_uses_default_error_code_if_none_provided(self):
"""
Test that _persist_critical_error_to_file uses the default error code if none is provided.
"""
dir_path = self.test_dirs[1]
_persist_critical_error_to_file("", "Error Message", "Error Details", dir_path)
container_dir = os.path.join(dir_path, "unknown-container")
self.assertTrue(os.path.exists(container_dir))
files = os.listdir(container_dir)
self.assertEqual(len(files), 1)
with open(os.path.join(container_dir, files[0])) as f:
error_data = json.load(f)
self.assertEqual(error_data["code"], INTERNAL_ERROR_CODE)
def test_persist_critical_error_all_threads_fail(self):
"""
Test that all threads fail when persist_critical_error is executed after the first call.
"""
error_code = "testCode"
error_message = "testMessage"
error_details = "testDetails"
# Set `done` to True to simulate that the critical error has already been persisted
_persist_error_once.done = True
try:
# Set up threading
num_threads = 10
errors = []
lock = threading.Lock()
def thread_func():
nonlocal errors
result = persist_critical_error(error_code, error_message, error_details)
with lock:
errors.append(result)
# Create and start threads
threads = []
for _ in range(num_threads):
thread = threading.Thread(target=thread_func)
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Count the number of failures
fail_count = sum(
1
for error in errors
if error is not None
and "Persist critical error function has already been executed" in str(error)
)
# Assert that all threads failed
self.assertEqual(
fail_count,
num_threads,
f"Expected all {num_threads} threads to fail, but only {fail_count} failed",
)
finally:
# Revert `done` back to False after the test
_persist_error_once.done = False
if __name__ == "__main__":
unittest.main()