Skip to content

Commit 4896f8b

Browse files
author
Jay Gu
committed
Merge pull request #165 from ylow/master
Restored incremental printout in IPython notebook
2 parents 4450bd4 + eda378f commit 4896f8b

7 files changed

Lines changed: 72 additions & 5 deletions

File tree

oss_src/unity/python/sframe/connect/server.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from ..util.config import DEFAULT_CONFIG as default_local_conf
1515
from ..connect import _get_metric_tracker
16+
from ..cython.cy_ipc import get_print_status_function_pointer
1617
from .. import sys_util as _sys_util
1718
import logging
1819
import os
@@ -123,7 +124,12 @@ def get_logger(self):
123124
return self.logger
124125

125126
def set_log_progress(self, enable):
126-
self.dll.set_log_progress(enable)
127+
if enable:
128+
ptr = get_print_status_function_pointer()
129+
ptr = cast(ptr, POINTER(c_void_p))
130+
self.dll.set_log_progress_callback(ptr)
131+
else:
132+
self.dll.set_log_progress(enable)
127133

128134
def _load_dll_ok(self, root_path):
129135
server_env = _sys_util.make_unity_server_env()
@@ -145,6 +151,7 @@ def _load_dll_ok(self, root_path):
145151
self.dll.start_server.argtypes = [c_char_p, c_char_p, c_char_p, c_ulonglong, c_ulonglong]
146152
self.dll.get_client.restype = c_void_p
147153
self.dll.set_log_progress.argtypes = [c_bool]
154+
self.dll.set_log_progress_callback.argtypes = [c_void_p]
148155
self.dll.stop_server
149156
except Exception as e:
150157
return (False, str(e))

oss_src/unity/python/sframe/cython/cy_ipc.pxd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ cdef class PyCommClient:
2525
cdef comm_client *thisptr
2626
cpdef stop(self)
2727
cpdef start(self)
28+
29+
cdef void print_status(string status_string) nogil
30+
31+
cpdef get_print_status_function_pointer()

oss_src/unity/python/sframe/cython/cy_ipc.pyx

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ from libcpp.string cimport string
1010
from libc.stdio cimport printf
1111
from .python_printer_callback import print_callback
1212

13-
1413
cdef class PyCommClient:
1514

1615
def __cinit__(self):
@@ -33,3 +32,13 @@ def make_comm_client_from_existing_ptr(size_t client_ptr):
3332
ret = PyCommClient()
3433
ret.thisptr = <comm_client*>(client_ptr)
3534
return ret
35+
36+
37+
cdef void print_status(string status_string) nogil:
38+
with gil:
39+
status_string = status_string.rstrip()
40+
print_callback(status_string)
41+
42+
ctypedef void* void_p
43+
cpdef get_print_status_function_pointer():
44+
return <size_t>(<void_p>(print_status))

oss_src/unity/python/sframe/cython/python_printer_callback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def print_callback(val):
2727
# I have to intrude rather deep into IPython to make it behave
2828
if have_ipython:
2929
if InteractiveShell.initialized():
30-
IPython.display.publish_display_data('graphlab.callback', {'text/plain':val,'text/html':'<pre>' + val + '</pre>'})
30+
IPython.display.publish_display_data({'text/plain':val,'text/html':'<pre>' + val + '</pre>'})
3131
success = True
3232
except:
3333
pass

oss_src/unity/server/unity_server.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,19 @@ void unity_server::start(const unity_server_initializer& server_initializer) {
7979
server->start();
8080
logstream(LOG_EMPH) << "Unity server listening on: " << options.server_address << std::endl;
8181
logstream(LOG_EMPH) << "Total System Memory Detected: " << total_mem() << std::endl;
82+
log_thread.launch([=]() {
83+
do {
84+
std::pair<std::string, bool> queueelem = this->log_queue.dequeue();
85+
if (queueelem.second == false) {
86+
break;
87+
} else {
88+
// we need to read it before trying to do the callback
89+
// Otherwise we might accidentally call a null pointer
90+
volatile progress_callback_type cback = this->log_progress_callback;
91+
if (cback != nullptr) cback(queueelem.first);
92+
}
93+
} while(1);
94+
});
8295
}
8396

8497
/**
@@ -88,6 +101,7 @@ void unity_server::stop() {
88101
delete server;
89102
server = nullptr;
90103
set_log_progress(false);
104+
log_queue.stop_blocking();
91105
graphlab::global_teardown::get_instance().perform_teardown();
92106
}
93107

@@ -142,5 +156,18 @@ void unity_server::set_log_progress(bool enable) {
142156
}
143157
}
144158

159+
void unity_server::set_log_progress_callback(void (*callback)(std::string)) {
160+
if (callback == nullptr) {
161+
log_progress_callback = nullptr;
162+
global_logger().add_observer(LOG_PROGRESS, NULL);
163+
} else {
164+
log_progress_callback = callback;
165+
global_logger().add_observer(
166+
LOG_PROGRESS,
167+
[=](int lineloglevel, const char* buf, size_t len){
168+
this->log_queue.enqueue(std::string(buf, len));
169+
});
170+
}
171+
}
145172

146173
} // end of graphlab

oss_src/unity/server/unity_server.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#ifndef SFRAME_UNITY_SERVER_HPP
99
#define SFRAME_UNITY_SERVER_HPP
1010

11+
#include <parallel/pthread_tools.hpp>
12+
#include <util/blocking_queue.hpp>
1113
#include "unity_server_options.hpp"
1214
#include "unity_server_init.hpp"
1315

@@ -22,6 +24,8 @@ class toolkit_class_registry;
2224

2325
class unity_server {
2426
public:
27+
typedef void(*progress_callback_type)(std::string);
28+
2529
/**
2630
* Constructor
2731
*/
@@ -40,7 +44,9 @@ class unity_server {
4044
/**
4145
* Enable or disable log progress stream.
4246
*/
43-
static void set_log_progress(bool enable);
47+
void set_log_progress(bool enable);
48+
49+
void set_log_progress_callback(progress_callback_type callback);
4450

4551
inline cppipc::comm_server* get_comm_server() { return server; }
4652

@@ -59,6 +65,12 @@ class unity_server {
5965
cppipc::comm_server* server;
6066
toolkit_function_registry* toolkit_functions;
6167
toolkit_class_registry* toolkit_classes;
68+
69+
private:
70+
volatile progress_callback_type log_progress_callback = nullptr;
71+
graphlab::thread log_thread;
72+
blocking_queue<std::string> log_queue;
73+
6274
}; // end of class usenity_server
6375
} // end of namespace graphlab
6476

oss_src/unity/server/unity_server_capi.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,15 @@ EXPORT void stop_server() {
9999
* Enable or disable log progress stream.
100100
*/
101101
EXPORT void set_log_progress(bool enable) {
102-
graphlab::unity_server::set_log_progress(enable);
102+
if (graphlab::SERVER) {
103+
graphlab::SERVER->set_log_progress(enable);
104+
}
105+
}
106+
107+
EXPORT void set_log_progress_callback(void* callback) {
108+
if (graphlab::SERVER) {
109+
graphlab::SERVER->set_log_progress_callback(reinterpret_cast<void(*)(std::string)>(callback));
110+
}
103111
}
104112
}
105113
// end of extern "C"

0 commit comments

Comments
 (0)