-
Notifications
You must be signed in to change notification settings - Fork 457
Expand file tree
/
Copy pathabstract_task.py
More file actions
228 lines (194 loc) · 9.15 KB
/
abstract_task.py
File metadata and controls
228 lines (194 loc) · 9.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# -------------------------------------------------------------------------
#
# Part of the CodeChecker project, under the Apache License v2.0 with
# LLVM Exceptions. See LICENSE for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# -------------------------------------------------------------------------
"""
Contains the base class to be inherited and implemented by all background task
types.
"""
import logging
import os
import pathlib
import shutil
import time
from typing import Optional
from codechecker_common.logger import get_logger
from ..database.config_db_model import BackgroundTask as DBTask
LOG = get_logger("server")
class TaskCancelHonoured(Exception):
"""
Specialised tag exception raised by `AbstractTask` implementations in a
checkpoint after having checked that their ``cancel_flag`` was set, in
order to terminate task-specific execution and to register the
cancellation's success by the `AbstractTask.execute` method.
This exception should **NOT** be caught by user code.
"""
def __init__(self, task_obj: "AbstractTask"):
super().__init__(f"Task '{task_obj.token}' honoured CANCEL request.")
self.task_obj = task_obj
class AbstractTask:
"""
Base class implementing common execution and bookkeeping methods to
facilitate the dispatch of tasks to background worker processes.
Instances of this class **MUST** be marshallable by ``pickle``, as they
are transported over an IPC `Queue`.
It is important that instances do not grow too large, as the underlying
OS-level primitives of a `Queue` can get full, which can result in a
deadlock situation.
The run-time contents of the instance should only contain the bare minimum
metadata required for the implementation to execute in the background.
Implementors of subclasses **MAY REASONABLY ASSUME** that an
`AbstractTask` scheduled in the API handler process of a server will be
actually executed by a background worker in the same process group, on the
same machine instance.
"""
def __init__(self, token: str, data_path: Optional[pathlib.Path]):
self._token = token
self._data_path = data_path
@property
def token(self) -> str:
"""Returns the task's identifying token, its primary ID."""
return self._token
@property
def data_path(self) -> Optional[pathlib.Path]:
"""
Returns the filesystem path where the task's input data is prepared.
"""
return self._data_path
def destroy_data(self):
"""
Deletes the contents of `data_path`.
"""
if not self._data_path:
return
try:
shutil.rmtree(self._data_path)
LOG.debug("Wiping temporary data of task '%s' at '%s' ...",
self._token, self._data_path)
except Exception as ex:
LOG.warning("Failed to remove background task's data_dir at "
"'%s':\n%s", self.data_path, str(ex))
def _implementation(self, _task_manager: "TaskManager") -> None:
"""
Implemented by subclasses to perform the logic specific to the task.
Subclasses should use the `task_manager` object, injected from the
context of the executed subprocess, to query and mutate service-level
information about the current task.
"""
raise NotImplementedError(f"No implementation for task class {self}!")
def execute(self, task_manager: "TaskManager") -> None:
"""
Executes the `_implementation` of the task, overridden by subclasses,
to perform a task-specific business logic.
This high-level wrapper deals with capturing `Exception`s, setting
appropriate status information in the database (through the
injected `task_manager`) and logging failures accordingly.
"""
if task_manager.should_cancel(self):
def _log_cancel_and_abandon(db_task: DBTask):
db_task.add_comment("CANCEL!\nTask cancelled before "
"execution began!",
"SYSTEM[AbstractTask::execute()]")
db_task.set_abandoned(force_dropped_status=False)
task_manager._mutate_task_record(self, _log_cancel_and_abandon)
task_manager._send_done_message(self.token)
return
try:
task_manager._mutate_task_record(
self, lambda dbt: dbt.set_running())
except KeyError:
# KeyError is thrown if a task without a corresponding database
# record is attempted to be executed.
LOG.error("Failed to execute task '%s' due to database exception",
self.token)
except Exception as ex:
LOG.error("Failed to execute task '%s' due to database exception"
"\n%s",
self.token, str(ex))
# For any other record, try to set the task abandoned due to an
# exception.
try:
task_manager._mutate_task_record(
self, lambda dbt:
dbt.set_abandoned(force_dropped_status=True))
except Exception:
task_manager._send_done_message(self.token)
return
LOG.debug("Task '%s' running on machine '%s' executor #%d",
self.token, task_manager.machine_id, os.getpid())
try:
self._implementation(task_manager)
LOG.debug("Task '%s' finished on machine '%s' executor #%d",
self.token,
task_manager.machine_id,
os.getpid())
try:
task_manager._mutate_task_record(
self, lambda dbt: dbt.set_finished(successfully=True))
except Exception as ex:
LOG.error("Failed to set task '%s' finished due to "
"database exception:\n%s",
self.token, str(ex))
except TaskCancelHonoured:
def _log_cancel_and_abandon(db_task: DBTask):
db_task.add_comment("CANCEL!\nCancel request of admin "
"honoured by task.",
"SYSTEM[AbstractTask::execute()]")
db_task.set_abandoned(force_dropped_status=False)
def _log_drop_and_abandon(db_task: DBTask):
db_task.add_comment("SHUTDOWN!\nTask honoured graceful "
"cancel signal generated by "
"server shutdown.",
"SYSTEM[AbstractTask::execute()]")
db_task.set_abandoned(force_dropped_status=True)
if not task_manager.is_shutting_down:
task_manager._mutate_task_record(self, _log_cancel_and_abandon)
else:
task_manager._mutate_task_record(self, _log_drop_and_abandon)
import traceback
LOG.debug("Task '%s' honoured the administrator's cancel request "
"at:\n%s",
self.token, traceback.format_exc())
except Exception as ex:
LOG.error("Failed to execute task '%s' on machine '%s' "
"executor #%d: %s",
self.token, task_manager.machine_id, os.getpid(),
str(ex))
import traceback
traceback.print_exc()
def _log_exception_and_fail(db_task: DBTask):
db_task.add_comment(
f"FAILED!\nException during execution:\n{str(ex)}",
"SYSTEM[AbstractTask::execute()]")
if LOG.isEnabledFor(logging.DEBUG):
db_task.add_comment("Debug exception information:\n"
f"{traceback.format_exc()}",
"SYSTEM[AbstractTask::execute()]")
db_task.set_finished(successfully=False)
# If a database error occurs, we may not be able to set the
# task state to 'FAILED' immediately since the database server
# could be down. Therefore, we retry multiple times.
# Between retries, we sleep for 2^i seconds.
retries: int = 0
max_retries: int = 10
while retries < max_retries:
if retries > 0:
time.sleep(2 ** retries)
try:
task_manager._mutate_task_record(self,
_log_exception_and_fail)
break # Success
except Exception:
retries += 1
LOG.error("Failed to set task '%s' state to 'FAILED'! "
"(machine: '%s', executor: #%d, retry: %d/%d)",
self.token, task_manager.machine_id, os.getpid(),
retries, max_retries)
import traceback
traceback.print_exc()
finally:
self.destroy_data()
task_manager._send_done_message(self.token)