55 * This software may be modified and distributed under the terms
66 * of the BSD license. See the LICENSE file for details.
77 */
8- #include < lambda/graph_pylambda.hpp>
9- #include < lambda/pyflexible_type.hpp>
10- #include < lambda/python_api.hpp>
11- #include < lambda/python_thread_guard.hpp>
128#include < lambda/lambda_utils.hpp>
13- #include < Python.h>
14- #include < boost/python.hpp>
9+ #include < lambda/graph_pylambda.hpp>
10+ #include < lambda/pylambda.hpp>
11+ #include < python_callbacks/python_callbacks.hpp>
1512
1613namespace graphlab {
1714namespace lambda {
1815
19- namespace python = boost::python;
20-
21- typedef python::dict py_vertex_object;
22- typedef python::dict py_edge_object;
23-
2416/* *************************************************************************/
2517/* */
2618/* pysgraph_synchronize */
@@ -85,13 +77,13 @@ vertex_partition_exchange pysgraph_synchronize::get_vertex_partition_exchange(si
8577/* */
8678/* *************************************************************************/
8779graph_pylambda_evaluator::graph_pylambda_evaluator () {
88- python_thread_guard guard;
89- m_current_lambda = new python::object;
9080}
9181
9282graph_pylambda_evaluator::~graph_pylambda_evaluator () {
93- python_thread_guard guard;
94- delete m_current_lambda;
83+ if (m_lambda_id != size_t (-1 )) {
84+ std::cerr << " destructor: Clearing lambda " << m_lambda_id << std::endl;
85+ release_lambda (m_lambda_id);
86+ }
9587}
9688
9789void graph_pylambda_evaluator::init (const std::string& lambda,
@@ -103,7 +95,15 @@ void graph_pylambda_evaluator::init(const std::string& lambda,
10395 clear ();
10496
10597 // initialize members
106- make_lambda (lambda);
98+ size_t new_lambda_id = make_lambda (lambda);
99+
100+ // If it has changed, release the old one.
101+ if (m_lambda_id != size_t (-1 ) && new_lambda_id != m_lambda_id) {
102+ release_lambda (m_lambda_id);
103+ }
104+
105+ m_lambda_id = new_lambda_id;
106+
107107 m_vertex_keys = vertex_fields;
108108 m_edge_keys = edge_fields;
109109 m_srcid_column = src_column_id;
@@ -117,24 +117,21 @@ void graph_pylambda_evaluator::clear() {
117117 m_graph_sync.clear ();
118118 m_srcid_column = -1 ;
119119 m_dstid_column = -1 ;
120- *m_current_lambda = python::object ();
121120}
122121
123122std::vector<sgraph_edge_data> graph_pylambda_evaluator::eval_triple_apply (
124123 const std::vector<sgraph_edge_data>& all_edge_data,
125124 size_t src_partition, size_t dst_partition,
126125 const std::vector<size_t >& mutated_edge_field_ids) {
127126
127+ std::lock_guard<mutex> lg (m_mutex);
128+
128129 logstream (LOG_INFO ) << " graph_lambda_worker eval triple apply " << src_partition
129130 << " , " << dst_partition << std::endl;
130- python_thread_guard guard;
131+
131132 DASSERT_TRUE (is_loaded (src_partition));
132133 DASSERT_TRUE (is_loaded (dst_partition));
133134
134- py_edge_object edge_object;
135- py_vertex_object source_object;
136- py_vertex_object target_object;
137-
138135 auto & source_partition = m_graph_sync.get_partition (src_partition);
139136 auto & target_partition = m_graph_sync.get_partition (dst_partition);
140137
@@ -144,65 +141,23 @@ std::vector<sgraph_edge_data> graph_pylambda_evaluator::eval_triple_apply(
144141 }
145142
146143 std::vector<sgraph_edge_data> ret (all_edge_data.size ());
147- try {
148- size_t cnt = 0 ;
149- for (const auto & edata: all_edge_data) {
150- PyDict_UpdateFromFlex (edge_object, m_edge_keys, edata);
151- size_t srcid = edata[m_srcid_column];
152- size_t dstid = edata[m_dstid_column];
153-
154- auto & source_vertex = source_partition[srcid];
155- auto & target_vertex = target_partition[dstid];
156-
157- PyDict_UpdateFromFlex (source_object, m_vertex_keys, source_vertex);
158- PyDict_UpdateFromFlex (target_object, m_vertex_keys, target_vertex);
159-
160- python::object lambda_ret = (*m_current_lambda)(source_object, edge_object, target_object);
161- if (lambda_ret.is_none () || !PyTuple_Check (lambda_ret.ptr ()) || python::len (lambda_ret) != 3 ) {
162- throw (std::string (" Lambda must return a tuple of the form (source_data, edge_data, target_data)." ));
163- }
164-
165- for (size_t i = 0 ; i < m_vertex_keys.size (); ++i) {
166- source_vertex[i] = PyObject_AsFlex (lambda_ret[0 ][m_vertex_keys[i]]);
167- target_vertex[i] = PyObject_AsFlex (lambda_ret[2 ][m_vertex_keys[i]]);
168- }
169-
170- if (!mutated_edge_field_ids.empty ()) {
171- edge_object.update (lambda_ret[1 ]);
172- for (auto & key: mutated_edge_keys)
173- ret[cnt].push_back (PyObject_AsFlex (edge_object[key]));
174- }
175- ++cnt;
176- }
177- } catch (python::error_already_set const & e) {
178- std::string error_string = parse_python_error ();
179- throw (error_string);
180- } catch (std::exception& e) {
181- throw (std::string (e.what ()));
182- } catch (const char * e) {
183- throw (e);
184- } catch (std::string& e) {
185- throw (e);
186- } catch (...) {
187- throw (" Unknown exception from python lambda evaluation." );
188- }
189- return ret;
190- }
191144
192- void graph_pylambda_evaluator::make_lambda (const std::string& pylambda_str) {
193- python_thread_guard guard;
194- try {
195- python::object pickle = python::import (" pickle" );
196- PyObject* lambda_bytes = PyByteArray_FromStringAndSize (pylambda_str.c_str (), pylambda_str.size ());
197- *m_current_lambda = python::object ((pickle.attr (" loads" )(python::object (python::handle<>(lambda_bytes)))));
198- } catch (python::error_already_set const & e) {
199- std::string error_string = parse_python_error ();
200- throw (error_string);
201- } catch (std::exception& e) {
202- throw (std::string (e.what ()));
203- } catch (...) {
204- throw (" Unknown exception from python lambda evaluation." );
205- }
145+ lambda_graph_triple_apply_data lgt;
146+
147+ lgt.all_edge_data = &all_edge_data;
148+ lgt.out_edge_data = &ret;
149+ lgt.source_partition = &source_partition;
150+ lgt.target_partition = &target_partition;
151+ lgt.vertex_keys = &m_vertex_keys;
152+ lgt.edge_keys = &m_edge_keys;
153+ lgt.mutated_edge_keys = &mutated_edge_keys;
154+ lgt.srcid_column = m_srcid_column;
155+ lgt.dstid_column = m_dstid_column;
156+
157+ evaluation_functions.eval_graph_triple_apply (m_lambda_id, &lgt);
158+ python::check_for_python_exception ();
159+
160+ return ret;
206161}
207162
208163}
0 commit comments