Skip to content

Commit 4e02b8b

Browse files
Copiloteddyashtonachamayou
authored
Add exception handling in task_worker_loop to prevent worker thread termination (#7658)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: eddyashton <6000239+eddyashton@users.noreply.github.com> Co-authored-by: Amaury Chamayou <amchamay@microsoft.com> Co-authored-by: Eddy Ashton <edashton@microsoft.com> Co-authored-by: achamayou <4016369+achamayou@users.noreply.github.com>
1 parent c17a103 commit 4e02b8b

5 files changed

Lines changed: 375 additions & 3 deletions

File tree

CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ include(GNUInstallDirs)
4444
# Use fixed name instead of absolute path for reproducible builds
4545
add_compile_options("-ffile-prefix-map=${CCF_DIR}=CCF")
4646

47+
# In Debug builds, export symbols to the dynamic symbol table so that
48+
# backtrace_symbols can resolve function names in stacktraces, and preserve
49+
# frame pointers so that backtrace() can walk the full call stack.
50+
add_link_options($<$<CONFIG:Debug>:-rdynamic>)
51+
add_compile_options($<$<CONFIG:Debug>:-fno-omit-frame-pointer>)
52+
4753
set(CMAKE_MODULE_PATH "${CCF_DIR}/cmake;${CMAKE_MODULE_PATH}")
4854

4955
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
@@ -275,7 +281,9 @@ add_ccf_static_library(
275281
${CCF_DIR}/src/tasks/ordered_tasks.cpp
276282
${CCF_DIR}/src/tasks/fan_in_tasks.cpp
277283
${CCF_DIR}/src/tasks/thread_manager.cpp
284+
${CCF_DIR}/src/tasks/worker.cpp
278285
)
286+
target_link_libraries(ccf_tasks PRIVATE ${CMAKE_DL_LIBS})
279287

280288
# Common test args for Python scripts starting up CCF networks
281289
set(WORKER_THREADS

src/tasks/test/basic_tasks.cpp

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include "tasks/basic_task.h"
55
#include "tasks/task_system.h"
6+
#include "tasks/worker.h"
67

78
#include <doctest/doctest.h>
89
#include <iostream>
@@ -243,3 +244,173 @@ TEST_CASE("Scheduling" * doctest::test_suite("basic_tasks"))
243244
std::iota(target.begin(), target.end(), 0);
244245
REQUIRE(count_with_me == target);
245246
}
247+
248+
// Helper functions at namespace scope to ensure external linkage, so that
249+
// backtrace_symbols can resolve their names with -rdynamic.
250+
namespace exception_handling_test
251+
{
252+
void level_3_throws_runtime_error()
253+
{
254+
throw std::runtime_error("Test exception");
255+
}
256+
257+
void level_2_calls_level_3()
258+
{
259+
level_3_throws_runtime_error();
260+
}
261+
262+
void level_1_calls_level_2()
263+
{
264+
level_2_calls_level_3();
265+
}
266+
267+
void level_3_throws_int()
268+
{
269+
throw 42;
270+
}
271+
272+
void level_2_calls_level_3_int()
273+
{
274+
level_3_throws_int();
275+
}
276+
277+
void level_1_calls_level_2_int()
278+
{
279+
level_2_calls_level_3_int();
280+
}
281+
282+
struct ThrowsException : public ccf::tasks::BaseTask
283+
{
284+
void do_task_implementation() override
285+
{
286+
level_1_calls_level_2();
287+
}
288+
289+
const std::string& get_name() const override
290+
{
291+
static const std::string name = "ThrowsException";
292+
return name;
293+
}
294+
};
295+
296+
struct ThrowsUnknown : public ccf::tasks::BaseTask
297+
{
298+
void do_task_implementation() override
299+
{
300+
level_1_calls_level_2_int();
301+
}
302+
303+
const std::string& get_name() const override
304+
{
305+
static const std::string name = "ThrowsUnknown";
306+
return name;
307+
}
308+
};
309+
}
310+
311+
TEST_CASE("Exception handling" * doctest::test_suite("basic_tasks"))
312+
{
313+
// Custom logger that captures log messages for assertion
314+
struct CapturingLogger : public ccf::logger::AbstractLogger
315+
{
316+
std::mutex mutex;
317+
std::vector<std::string> messages;
318+
319+
void write(const ccf::logger::LogLine& ll) override
320+
{
321+
std::lock_guard<std::mutex> lock(mutex);
322+
messages.push_back(ll.msg);
323+
}
324+
325+
bool contains(const std::string& substring)
326+
{
327+
std::lock_guard<std::mutex> lock(mutex);
328+
for (const auto& m : messages)
329+
{
330+
if (m.find(substring) != std::string::npos)
331+
{
332+
return true;
333+
}
334+
}
335+
return false;
336+
}
337+
338+
void clear()
339+
{
340+
std::lock_guard<std::mutex> lock(mutex);
341+
messages.clear();
342+
}
343+
};
344+
345+
auto capturing_logger = std::make_unique<CapturingLogger>();
346+
auto* logger_ptr = capturing_logger.get();
347+
ccf::logger::config::loggers().push_back(std::move(capturing_logger));
348+
349+
// Task that runs successfully after exceptions
350+
std::atomic<bool> success_task_ran = false;
351+
ccf::tasks::Task success_task = ccf::tasks::make_basic_task(
352+
[&success_task_ran]() { success_task_ran.store(true); }, "SuccessTask");
353+
354+
ccf::tasks::JobBoard job_board;
355+
std::atomic<bool> stop_signal = false;
356+
357+
// Queue tasks: two that throw, then one that should still run
358+
job_board.add_task(
359+
std::make_shared<exception_handling_test::ThrowsException>());
360+
job_board.add_task(
361+
std::make_shared<exception_handling_test::ThrowsUnknown>());
362+
job_board.add_task(success_task);
363+
364+
std::thread worker([&]() {
365+
ccf::tasks::task_worker_loop(
366+
job_board, stop_signal, /*abort_on_throw=*/false);
367+
});
368+
369+
// Wait for the success task to run
370+
const auto wait_step = std::chrono::milliseconds(10);
371+
const auto max_wait = std::chrono::seconds(5);
372+
auto waited = std::chrono::milliseconds(0);
373+
while (!success_task_ran.load() && waited < max_wait)
374+
{
375+
std::this_thread::sleep_for(wait_step);
376+
waited += wait_step;
377+
}
378+
379+
stop_signal.store(true);
380+
worker.join();
381+
382+
// With CCF_TASK_EXCEPTION_NO_ABORT, the worker loop continues after
383+
// exceptions, so the success task should have run
384+
REQUIRE(success_task_ran.load());
385+
386+
// Verify that fatal messages were logged for both exception types
387+
REQUIRE(logger_ptr->contains(
388+
"ThrowsException task failed with exception: Test exception"));
389+
REQUIRE(
390+
logger_ptr->contains("ThrowsUnknown task failed with unknown exception"));
391+
392+
// Verify that stack traces contain demangled function names from the
393+
// known call chains. These functions have external linkage and are
394+
// exported to the dynamic symbol table via -rdynamic in Debug builds.
395+
// Note: very small leaf functions (e.g. level_3_throws_int, which is
396+
// just `throw 42;`) may be inlined by the compiler, so we only assert
397+
// on the caller frames that reliably appear.
398+
399+
// ThrowsException call chain
400+
REQUIRE(logger_ptr->contains("level_3_throws_runtime_error"));
401+
REQUIRE(logger_ptr->contains("level_2_calls_level_3()"));
402+
REQUIRE(logger_ptr->contains("level_1_calls_level_2()"));
403+
404+
// ThrowsUnknown call chain
405+
REQUIRE(logger_ptr->contains("level_2_calls_level_3_int"));
406+
REQUIRE(logger_ptr->contains("level_1_calls_level_2_int"));
407+
408+
// Clean up: remove the capturing logger
409+
auto& loggers = ccf::logger::config::loggers();
410+
loggers.erase(
411+
std::remove_if(
412+
loggers.begin(),
413+
loggers.end(),
414+
[logger_ptr](const auto& l) { return l.get() == logger_ptr; }),
415+
loggers.end());
416+
}

src/tasks/thread_manager.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ namespace ccf::tasks
9090
auto& stop_signal = stop_signals[i].value;
9191
stop_signal.store(false);
9292
workers[i] = std::thread(
93-
task_worker_loop, std::ref(job_board), std::ref(stop_signal));
93+
task_worker_loop,
94+
std::ref(job_board),
95+
std::ref(stop_signal),
96+
/*abort_on_throw=*/true);
9497
}
9598
}
9699

src/tasks/worker.cpp

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the Apache 2.0 License.
3+
4+
#include "tasks/worker.h"
5+
6+
#include <cstdlib>
7+
#include <cxxabi.h>
8+
#include <dlfcn.h>
9+
#include <execinfo.h>
10+
#include <memory>
11+
#include <sstream>
12+
13+
namespace ccf::tasks
14+
{
15+
// Maximum number of frames to capture at the throw-point
16+
static constexpr int throw_trace_max_frames = 128;
17+
18+
struct ThrowTrace
19+
{
20+
void* frames[throw_trace_max_frames] = {};
21+
size_t num_frames = 0;
22+
};
23+
24+
namespace
25+
{
26+
thread_local ThrowTrace current_throw_trace = {};
27+
28+
struct FreeDeleter
29+
{
30+
void operator()(char* p) const
31+
{
32+
// NOLINTNEXTLINE(cppcoreguidelines-no-malloc,cppcoreguidelines-owning-memory)
33+
free(p);
34+
}
35+
};
36+
37+
struct FreePtrArrayDeleter
38+
{
39+
void operator()(char** p) const
40+
{
41+
// NOLINTNEXTLINE(cppcoreguidelines-no-malloc,cppcoreguidelines-owning-memory,bugprone-multi-level-implicit-pointer-conversion)
42+
free(p);
43+
}
44+
};
45+
46+
std::string demangle_symbol(const char* raw)
47+
{
48+
// backtrace_symbols format: "binary(mangled+0xoffset) [0xaddr]"
49+
// Try to extract and demangle the symbol name between '(' and '+'/')'
50+
std::string entry(raw);
51+
auto open = entry.find('(');
52+
auto plus = entry.find('+', open != std::string::npos ? open : 0);
53+
auto close = entry.find(')', open != std::string::npos ? open : 0);
54+
55+
if (
56+
open != std::string::npos && close != std::string::npos &&
57+
close > open + 1)
58+
{
59+
auto end = (plus != std::string::npos && plus < close) ? plus : close;
60+
std::string mangled = entry.substr(open + 1, end - open - 1);
61+
62+
if (!mangled.empty())
63+
{
64+
int status = 0;
65+
std::unique_ptr<char, FreeDeleter> demangled(
66+
abi::__cxa_demangle(mangled.c_str(), nullptr, nullptr, &status));
67+
if (status == 0 && demangled != nullptr)
68+
{
69+
std::string rest = entry.substr(end);
70+
entry = entry.substr(0, open + 1) + demangled.get() + rest;
71+
}
72+
}
73+
}
74+
75+
return entry;
76+
}
77+
78+
// Format a demangled stack trace as a string. Note: backtrace_symbols only
79+
// resolves symbols exported to the dynamic symbol table (e.g. via
80+
// -rdynamic). Static/internal functions will appear as raw addresses. For
81+
// broader coverage, consider integrating libbacktrace (reads DWARF
82+
// directly) or invoking addr2line at runtime.
83+
std::string format_stacktrace(void** frames, int num_frames)
84+
{
85+
std::ostringstream oss;
86+
// NOLINTNEXTLINE(cppcoreguidelines-no-malloc,cppcoreguidelines-owning-memory,bugprone-multi-level-implicit-pointer-conversion)
87+
std::unique_ptr<char*, FreePtrArrayDeleter> symbols(
88+
backtrace_symbols(frames, num_frames));
89+
if (symbols == nullptr)
90+
{
91+
// If memory allocation fails, return a message indicating the issue
92+
return " (failed to allocate memory for backtrace symbols)\n";
93+
}
94+
for (int i = 0; i < num_frames; ++i)
95+
{
96+
oss << " #" << i << ": " << demangle_symbol(symbols.get()[i]) << "\n";
97+
}
98+
return oss.str();
99+
}
100+
}
101+
102+
void dump_stacktrace(const std::string& msg)
103+
{
104+
LOG_FATAL_FMT("{}", msg);
105+
106+
auto& throw_trace = current_throw_trace;
107+
if (throw_trace.num_frames > 0)
108+
{
109+
LOG_FATAL_FMT(
110+
"Stack trace:\n{}",
111+
format_stacktrace(throw_trace.frames, throw_trace.num_frames));
112+
113+
// Reset so that a subsequent dump does not re-use a stale trace
114+
// (e.g. if an earlier throw was caught internally and a later
115+
// throw; / re-throw escapes without calling __cxa_throw).
116+
throw_trace.num_frames = 0;
117+
}
118+
else
119+
{
120+
LOG_FATAL_FMT("No throw-point stack trace available");
121+
}
122+
}
123+
}
124+
125+
// Interpose __cxa_throw to capture a backtrace at each throw-point.
126+
// This is called by the C++ runtime whenever `throw` is executed.
127+
extern "C"
128+
{
129+
using CxaThrowFn = void (*)(void*, std::type_info*, void (*)(void*));
130+
131+
void __cxa_throw(
132+
void* thrown_exception, std::type_info* tinfo, void (*dest)(void*))
133+
{
134+
// Capture the backtrace at the throw site
135+
auto& trace = ccf::tasks::current_throw_trace;
136+
trace.num_frames =
137+
backtrace(trace.frames, ccf::tasks::throw_trace_max_frames);
138+
139+
// Forward to the real __cxa_throw
140+
static auto real_cxa_throw =
141+
reinterpret_cast<CxaThrowFn>(dlsym(RTLD_NEXT, "__cxa_throw"));
142+
if (real_cxa_throw != nullptr)
143+
{
144+
real_cxa_throw(thrown_exception, tinfo, dest);
145+
// real_cxa_throw is [[noreturn]], so we never reach here
146+
}
147+
else
148+
{
149+
// If dlsym failed, we cannot safely proceed. Abort to prevent undefined
150+
// behavior.
151+
std::abort();
152+
}
153+
// Both real_cxa_throw and std::abort() are [[noreturn]], but the compiler
154+
// may not recognize that for function pointers. This satisfies the compiler
155+
// that we never return from this function.
156+
__builtin_unreachable();
157+
}
158+
}

0 commit comments

Comments
 (0)