Skip to content

Commit eda378f

Browse files
author
Yucheng Low
committed
Spin off another thread to handle callbacks. Otherwise we might double lock the GIL.
1 parent cfaf26e commit eda378f

3 files changed

Lines changed: 35 additions & 5 deletions

File tree

oss_src/unity/server/unity_server.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,19 @@ void unity_server::start(const unity_server_initializer& server_initializer) {
7979
server->start();
8080
logstream(LOG_EMPH) << "Unity server listening on: " << options.server_address << std::endl;
8181
logstream(LOG_EMPH) << "Total System Memory Detected: " << total_mem() << std::endl;
82+
log_thread.launch([=]() {
83+
do {
84+
std::pair<std::string, bool> queueelem = this->log_queue.dequeue();
85+
if (queueelem.second == false) {
86+
break;
87+
} else {
88+
// we need to read it before trying to do the callback
89+
// Otherwise we might accidentally call a null pointer
90+
volatile progress_callback_type cback = this->log_progress_callback;
91+
if (cback != nullptr) cback(queueelem.first);
92+
}
93+
} while(1);
94+
});
8295
}
8396

8497
/**
@@ -88,6 +101,7 @@ void unity_server::stop() {
88101
delete server;
89102
server = nullptr;
90103
set_log_progress(false);
104+
log_queue.stop_blocking();
91105
graphlab::global_teardown::get_instance().perform_teardown();
92106
}
93107

@@ -144,12 +158,14 @@ void unity_server::set_log_progress(bool enable) {
144158

145159
void unity_server::set_log_progress_callback(void (*callback)(std::string)) {
146160
if (callback == nullptr) {
161+
log_progress_callback = nullptr;
147162
global_logger().add_observer(LOG_PROGRESS, NULL);
148163
} else {
164+
log_progress_callback = callback;
149165
global_logger().add_observer(
150166
LOG_PROGRESS,
151167
[=](int lineloglevel, const char* buf, size_t len){
152-
callback(std::string(buf, len));
168+
this->log_queue.enqueue(std::string(buf, len));
153169
});
154170
}
155171
}

oss_src/unity/server/unity_server.hpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#ifndef SFRAME_UNITY_SERVER_HPP
99
#define SFRAME_UNITY_SERVER_HPP
1010

11+
#include <parallel/pthread_tools.hpp>
12+
#include <util/blocking_queue.hpp>
1113
#include "unity_server_options.hpp"
1214
#include "unity_server_init.hpp"
1315

@@ -22,6 +24,8 @@ class toolkit_class_registry;
2224

2325
class unity_server {
2426
public:
27+
typedef void(*progress_callback_type)(std::string);
28+
2529
/**
2630
* Constructor
2731
*/
@@ -40,9 +44,9 @@ class unity_server {
4044
/**
4145
* Enable or disable log progress stream.
4246
*/
43-
static void set_log_progress(bool enable);
47+
void set_log_progress(bool enable);
4448

45-
static void set_log_progress_callback(void (*callback)(std::string));
49+
void set_log_progress_callback(progress_callback_type callback);
4650

4751
inline cppipc::comm_server* get_comm_server() { return server; }
4852

@@ -61,6 +65,12 @@ class unity_server {
6165
cppipc::comm_server* server;
6266
toolkit_function_registry* toolkit_functions;
6367
toolkit_class_registry* toolkit_classes;
68+
69+
private:
70+
volatile progress_callback_type log_progress_callback = nullptr;
71+
graphlab::thread log_thread;
72+
blocking_queue<std::string> log_queue;
73+
6474
}; // end of class usenity_server
6575
} // end of namespace graphlab
6676

oss_src/unity/server/unity_server_capi.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,15 @@ EXPORT void stop_server() {
9999
* Enable or disable log progress stream.
100100
*/
101101
EXPORT void set_log_progress(bool enable) {
102-
graphlab::unity_server::set_log_progress(enable);
102+
if (graphlab::SERVER) {
103+
graphlab::SERVER->set_log_progress(enable);
104+
}
103105
}
104106

105107
EXPORT void set_log_progress_callback(void* callback) {
106-
graphlab::unity_server::set_log_progress_callback(reinterpret_cast<void(*)(std::string)>(callback));
108+
if (graphlab::SERVER) {
109+
graphlab::SERVER->set_log_progress_callback(reinterpret_cast<void(*)(std::string)>(callback));
110+
}
107111
}
108112
}
109113
// end of extern "C"

0 commit comments

Comments
 (0)