Skip to content

Commit 6654d36

Browse files
committed
[feat] Make sync store more sync
After the introduction of async store, massStoreRun was implemented in a way that it is polling the task ID in every 5 seconds. This makes the text execution too long. In this commit a pipe has been added to the abstract task so it can directly notify the massStoreRun function when the task is ready. No other API function should use this pipe.
1 parent 0ec360d commit 6654d36

10 files changed

Lines changed: 317 additions & 200 deletions

File tree

codechecker_common/compatibility/multiprocessing.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,17 @@
1414
# pylint: disable=unused-import
1515
if sys.platform in ["darwin", "win32"]:
1616
from multiprocess import \
17-
Pool, Process, \
17+
Pipe, Pool, Process, \
1818
Queue, \
1919
Value, \
2020
cpu_count
21+
from multiprocess.managers import SyncManager
2122
else:
2223
from concurrent.futures import ProcessPoolExecutor as Pool
2324
from multiprocessing import \
25+
Pipe, \
2426
Process, \
2527
Queue, \
2628
Value, \
2729
cpu_count
30+
from multiprocessing.managers import SyncManager

web/client/codechecker_client/cli/store.py

Lines changed: 105 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def assemble_blame_info(_, __) -> int:
5858
from codechecker_common.compatibility.multiprocessing import Pool
5959
from codechecker_common.source_code_comment_handler import \
6060
SourceCodeCommentHandler
61-
from codechecker_common.util import format_size, load_json
61+
from codechecker_common.util import format_size, load_json, strtobool
6262

6363
from codechecker_web.shared import webserver_context, host_check
6464
from codechecker_web.shared.env import get_default_workspace
@@ -964,51 +964,111 @@ def main(args):
964964
description = args.description if 'description' in args else None
965965

966966
LOG.info("Storing results to the server ...")
967-
task_token: str = client.massStoreRunAsynchronous(
968-
b64zip,
969-
SubmittedRunOptions(
970-
runName=args.name,
971-
tag=args.tag if "tag" in args else None,
972-
version=str(context.version),
973-
force="force" in args,
974-
trimPathPrefixes=trim_path_prefixes,
975-
description=description)
976-
)
977-
LOG.info("Reports submitted to the server for processing.")
978-
979-
if client.allowsStoringAnalysisStatistics():
980-
store_analysis_statistics(client, args.input, args.name)
981-
982-
if "detach" in args:
983-
LOG.warning("Exiting the 'store' subcommand as '--detach' was "
984-
"specified: not waiting for the result of the store "
985-
"operation.\n"
986-
"The server might not have finished processing "
987-
"everything at this point, so do NOT rely on querying "
988-
"the results just yet!\n"
989-
"To await the completion of the processing later, "
990-
"you can execute:\n\n"
991-
"\tCodeChecker cmd serverside-tasks --token %s "
992-
"--await",
993-
task_token)
994-
# Print the token to stdout as well, so scripts can use "--detach"
995-
# meaningfully.
996-
print(task_token)
997-
return
998-
999-
task_client = libclient.setup_task_client(protocol, host, port)
1000-
task_status: str = await_task_termination(LOG, task_token,
1001-
task_api_client=task_client)
1002-
1003-
if task_status == "COMPLETED":
1004-
LOG.info("Storing the reports finished successfully.")
967+
968+
if strtobool(os.environ.get('CC_FORCE_SYNC_STORE', 'no')):
969+
try:
970+
with _timeout_watchdog(timedelta(hours=1),
971+
signal.SIGUSR1):
972+
client.massStoreRun(args.name,
973+
args.tag if 'tag' in args else None,
974+
str(context.version),
975+
b64zip,
976+
'force' in args,
977+
trim_path_prefixes,
978+
description)
979+
except WatchdogError as we:
980+
LOG.warning("%s", str(we))
981+
982+
# Showing parts of the exception stack is important here.
983+
# We **WANT** to see that the timeout happened during a wait on
984+
# Thrift reading from the TCP connection (something deep in the
985+
# Python library code at "sock.recv_into").
986+
import traceback
987+
_, _, tb = sys.exc_info()
988+
frames = traceback.extract_tb(tb)
989+
first, last = frames[0], frames[-2]
990+
formatted_frames = traceback.format_list([first, last])
991+
fmt_first, fmt_last = formatted_frames[0], formatted_frames[1]
992+
LOG.info("Timeout was triggered during:\n%s", fmt_first)
993+
LOG.info("Timeout interrupted this low-level operation:\n%s",
994+
fmt_last)
995+
996+
LOG.error(
997+
"Timeout!"
998+
"\n\tThe server's reply did not arrive after "
999+
"%d seconds (%s) elapsed since the server-side "
1000+
"processing began."
1001+
"\n\n\tThis does *NOT* mean that there was an issue "
1002+
"with the run you were storing!"
1003+
"\n\tThe server might still be processing the results..."
1004+
"\n\tHowever, it is more likely that the "
1005+
"server had already finished, but the client did not "
1006+
"receive a response."
1007+
"\n\tUsually, this is caused by the underlying TCP "
1008+
"connection failing to signal a low-level disconnect."
1009+
"\n\tClients potentially hanging indefinitely in these "
1010+
"scenarios is an unfortunate and known issue."
1011+
"\n\t\tSee http://github.com/Ericsson/codechecker/"
1012+
"issues/3672 for details!"
1013+
"\n\n\tThis error here is a temporary measure to ensure "
1014+
"an infinite hang is replaced with a well-explained "
1015+
"timeout."
1016+
"\n\tA more proper solution will be implemented in a "
1017+
"subsequent version of CodeChecker.",
1018+
we.timeout.total_seconds(), str(we.timeout))
1019+
sys.exit(1)
1020+
1021+
if client.allowsStoringAnalysisStatistics():
1022+
store_analysis_statistics(client, args.input, args.name)
10051023
else:
1006-
LOG.error("Storing the reports failed! "
1007-
"The job terminated in status '%s'. "
1008-
"The comments associated with the failure are:\n\n%s",
1009-
task_status,
1010-
task_client.getTaskInfo(task_token).comments)
1011-
sys.exit(1)
1024+
task_token: str = client.massStoreRunAsynchronous(
1025+
b64zip,
1026+
SubmittedRunOptions(
1027+
runName=args.name,
1028+
tag=args.tag if "tag" in args else None,
1029+
version=str(context.version),
1030+
force="force" in args,
1031+
trimPathPrefixes=trim_path_prefixes,
1032+
description=description)
1033+
)
1034+
LOG.info("Reports submitted to the server for processing.")
1035+
1036+
if client.allowsStoringAnalysisStatistics():
1037+
store_analysis_statistics(client, args.input, args.name)
1038+
1039+
if "detach" in args:
1040+
LOG.warning(
1041+
"Exiting the 'store' subcommand as '--detach' was "
1042+
"specified: not waiting for the result of the store "
1043+
"operation.\n"
1044+
"The server might not have finished processing "
1045+
"everything at this point, so do NOT rely on querying "
1046+
"the results just yet!\n"
1047+
"To await the completion of the processing later, "
1048+
"you can execute:\n\n"
1049+
"\tCodeChecker cmd serverside-tasks --token %s "
1050+
"--await",
1051+
task_token)
1052+
# Print the token to stdout as well, so scripts can use
1053+
# "--detach" meaningfully.
1054+
print(task_token)
1055+
return
1056+
1057+
task_client = libclient.setup_task_client(protocol, host, port)
1058+
task_status: str = await_task_termination(
1059+
LOG, task_token, task_api_client=task_client)
1060+
1061+
if task_status == "COMPLETED":
1062+
LOG.info("Storing the reports finished successfully.")
1063+
else:
1064+
LOG.error(
1065+
"Storing the reports failed! "
1066+
"The job terminated in status '%s'. "
1067+
"The comments associated with the failure are:\n\n%s",
1068+
task_status,
1069+
task_client.getTaskInfo(task_token).comments)
1070+
sys.exit(1)
1071+
10121072
except Exception as ex:
10131073
import traceback
10141074
traceback.print_exc()

web/server/codechecker_server/api/mass_store_run.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,8 +1209,8 @@ def __realise_fake_checkers(self, session):
12091209
checker: Tuple[str, str] = checker_name_for_report(report)
12101210
grouped_by_checker[checker].append(cast(int, db_id))
12111211

1212-
for chk, report_ids in grouped_by_checker.items():
1213-
analyzer_name, checker_name = chk
1212+
for checker, report_ids in grouped_by_checker.items():
1213+
analyzer_name, checker_name = checker
12141214
chk_obj = cast(Checker, self.__get_checker(session,
12151215
analyzer_name,
12161216
checker_name))
@@ -1222,18 +1222,18 @@ def __realise_fake_checkers(self, session):
12221222
def __add_report_context(self, session, file_path_to_id):
12231223
for db_report, report in self.__added_reports:
12241224
LOG.debug("Storing bug path positions.")
1225-
for i, p in enumerate(report.bug_path_positions):
1225+
for idx, path_pos in enumerate(report.bug_path_positions):
12261226
session.add(BugReportPoint(
1227-
p.range.start_line, p.range.start_col,
1228-
p.range.end_line, p.range.end_col,
1229-
i, file_path_to_id[p.file.path], db_report.id))
1227+
path_pos.range.start_line, path_pos.range.start_col,
1228+
path_pos.range.end_line, path_pos.range.end_col,
1229+
idx, file_path_to_id[path_pos.file.path], db_report.id))
12301230

12311231
LOG.debug("Storing bug path events.")
1232-
for i, event in enumerate(report.bug_path_events):
1232+
for idx, event in enumerate(report.bug_path_events):
12331233
session.add(BugPathEvent(
12341234
event.range.start_line, event.range.start_col,
12351235
event.range.end_line, event.range.end_col,
1236-
i, event.message, file_path_to_id[event.file.path],
1236+
idx, event.message, file_path_to_id[event.file.path],
12371237
db_report.id))
12381238

12391239
LOG.debug("Storing notes.")
@@ -1405,12 +1405,6 @@ def __validate_and_add_report_annotations(
14051405
f"'{value}' has wrong format. '{key}' annotations must be "
14061406
f"'{report_annotation_types[key]['display']}'.")
14071407

1408-
def __get_report_limit_for_product(self):
1409-
with DBSession(self.__config_database) as session:
1410-
product = session.query(Product).get(self.__product.id)
1411-
if product.report_limit:
1412-
self.__report_limit = product.report_limit
1413-
14141408
def __check_report_count(self):
14151409
"""
14161410
This method comparest the already added report count to the report

web/server/codechecker_server/api/report_server.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3947,6 +3947,12 @@ def __massStoreRun_common(self, is_async: bool, zipfile_blob: str,
39473947
m: MassStoreRunTask = ih.create_mass_store_task(is_async)
39483948
self._task_manager.push_task(m)
39493949

3950+
if not is_async:
3951+
LOG.info("massStoreRun(): Blocking until task '%s' terminates ...",
3952+
m.token)
3953+
pipe = self._task_manager.get_task_receiver_pipe(m.token)
3954+
pipe.recv()
3955+
39503956
return m.token
39513957

39523958
@exc_to_thrift_reqfail
@@ -3968,27 +3974,21 @@ def massStoreRun(self,
39683974
)
39693975
token = self.__massStoreRun_common(False, b64zip, store_opts)
39703976

3971-
LOG.info("massStoreRun(): Blocking until task '%s' terminates ...",
3972-
token)
3973-
39743977
# To be compatible with older (<= 6.24, API <= 6.58) clients which
39753978
# may keep using the old API endpoint, simulate awaiting the
39763979
# background task in the API handler.
3977-
while True:
3978-
time.sleep(5)
3979-
t = self._task_manager.get_task_record(token)
3980-
if t.is_in_terminated_state:
3981-
if t.status == "failed":
3982-
raise codechecker_api_shared.ttypes.RequestFailed(
3983-
codechecker_api_shared.ttypes.ErrorCode.GENERAL,
3984-
"massStoreRun()'s processing failed. Here follow "
3985-
f"the details:\n\n{t.comments}")
3986-
if t.status == "cancelled":
3987-
raise codechecker_api_shared.ttypes.RequestFailed(
3988-
codechecker_api_shared.ttypes.ErrorCode.GENERAL,
3989-
"Server administrators cancelled the processing of "
3990-
"the massStoreRun() request!")
3991-
break
3980+
task = self._task_manager.get_task_record(token)
3981+
if task.is_in_terminated_state:
3982+
if task.status == "failed":
3983+
raise codechecker_api_shared.ttypes.RequestFailed(
3984+
codechecker_api_shared.ttypes.ErrorCode.GENERAL,
3985+
"massStoreRun()'s processing failed. Here follow "
3986+
f"the details:\n\n{task.comments}")
3987+
if task.status == "cancelled":
3988+
raise codechecker_api_shared.ttypes.RequestFailed(
3989+
codechecker_api_shared.ttypes.ErrorCode.GENERAL,
3990+
"Server administrators cancelled the processing of "
3991+
"the massStoreRun() request!")
39923992

39933993
# Prior to CodeChecker 6.25.0 (API v6.59), massStoreRun() was
39943994
# completely synchronous and blocking, and the implementation of the

web/server/codechecker_server/server.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949

5050
from codechecker_common import util
5151
from codechecker_common.compatibility.multiprocessing import \
52-
Pool, Process, Queue, Value, cpu_count
52+
Pool, Process, Queue, Value, cpu_count, SyncManager
5353
from codechecker_common.logger import get_logger, signal_log
5454

5555
from codechecker_web.shared import database_status
@@ -635,6 +635,7 @@ def __init__(self,
635635
manager: session_manager.SessionManager,
636636
machine_id: str,
637637
task_queue: Queue,
638+
task_pipes,
638639
server_shutdown_flag: Value):
639640

640641
LOG.debug("Initializing HTTP server...")
@@ -658,7 +659,7 @@ def __init__(self,
658659

659660
self.__task_queue = task_queue
660661
self.task_manager = BackgroundTaskManager(
661-
task_queue, self.config_session, self.check_env,
662+
task_queue, task_pipes, self.config_session, self.check_env,
662663
server_shutdown_flag, machine_id,
663664
pathlib.Path(self.context.codechecker_workspace))
664665

@@ -1061,23 +1062,27 @@ def start_server(config_directory: str, workspace_directory: str,
10611062
bg_task_queue: Queue = Queue()
10621063
is_server_shutting_down = Value('B', False)
10631064

1065+
sync_manager = SyncManager()
1066+
sync_manager.start()
1067+
task_pipes = sync_manager.dict()
1068+
10641069
def _cleanup_incomplete_tasks(action: str) -> int:
10651070
config_db = config_sql_server.create_engine()
10661071
config_session_factory = sessionmaker(bind=config_db)
1067-
tm = BackgroundTaskManager(
1068-
bg_task_queue, config_session_factory, check_env,
1072+
task_manager = BackgroundTaskManager(
1073+
bg_task_queue, task_pipes, config_session_factory, check_env,
10691074
is_server_shutting_down, machine_id,
10701075
pathlib.Path(context.codechecker_workspace))
10711076

10721077
try:
1073-
tm.destroy_all_temporary_data()
1078+
task_manager.destroy_all_temporary_data()
10741079
except OSError:
10751080
LOG.warning("Clearing task-temporary storage space failed!")
10761081
import traceback
10771082
traceback.print_exc()
10781083

10791084
try:
1080-
return tm.drop_all_incomplete_tasks(action)
1085+
return task_manager.drop_all_incomplete_tasks(action)
10811086
finally:
10821087
config_db.dispose()
10831088

@@ -1107,6 +1112,7 @@ def _cleanup_incomplete_tasks(action: str) -> int:
11071112
manager,
11081113
machine_id,
11091114
bg_task_queue,
1115+
task_pipes,
11101116
is_server_shutting_down)
11111117

11121118
try:
@@ -1190,6 +1196,7 @@ def spawn_bg_process():
11901196
p = _start_process_with_no_signal_handling(
11911197
target=background_task_executor,
11921198
args=(bg_task_queue,
1199+
task_pipes,
11931200
config_sql_server,
11941201
check_env,
11951202
is_server_shutting_down,

web/server/codechecker_server/task_executors/abstract_task.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ def _log_cancel_and_abandon(db_task: DBTask):
116116
db_task.set_abandoned(force_dropped_status=False)
117117

118118
task_manager._mutate_task_record(self, _log_cancel_and_abandon)
119+
task_manager._send_done_message(self.token)
119120
return
120121

121122
try:
@@ -137,6 +138,7 @@ def _log_cancel_and_abandon(db_task: DBTask):
137138
self, lambda dbt:
138139
dbt.set_abandoned(force_dropped_status=True))
139140
except Exception:
141+
task_manager._send_done_message(self.token)
140142
return
141143

142144
LOG.debug("Task '%s' running on machine '%s' executor #%d",
@@ -200,3 +202,4 @@ def _log_exception_and_fail(db_task: DBTask):
200202
task_manager._mutate_task_record(self, _log_exception_and_fail)
201203
finally:
202204
self.destroy_data()
205+
task_manager._send_done_message(self.token)

web/server/codechecker_server/task_executors/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131

3232
def executor(queue: Queue,
33+
task_pipes,
3334
config_db_sql_server,
3435
server_environment,
3536
server_shutdown_flag: "Value",
@@ -67,7 +68,7 @@ def executor_hangup_handler(signum: int, _frame):
6768
signal.signal(signal.SIGHUP, executor_hangup_handler)
6869

6970
config_db_engine = config_db_sql_server.create_engine()
70-
tm = TaskManager(queue, sessionmaker(bind=config_db_engine),
71+
tm = TaskManager(queue, task_pipes, sessionmaker(bind=config_db_engine),
7172
server_environment, kill_flag, machine_id)
7273

7374
while not kill_flag.value:

0 commit comments

Comments
 (0)