Skip to content

Commit f776f1f

Browse files
authored
Merge pull request #43 from Distributive-Network/bugfix/dont-serialize-dcp-objs
Don't serialize dcp objects
2 parents 257d35f + 9585474 commit f776f1f

3 files changed

Lines changed: 60 additions & 9 deletions

File tree

dcp/api/compute_for.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def compute_for(*args, **kwargs):
4343
# 3. For each input element, dereference js_ref if from dcp-client, add a guard if pythonmonkey will mutate it, else as it as-is.
4444
if job_input_idx != None:
4545
if hasattr(args[job_input_idx], 'js_ref') and dry.class_manager.reg.find_from_js_instance(args[job_input_idx].js_ref):
46-
args[job_input_idx] = args[job_input_idx]
46+
args[job_input_idx] = args[job_input_idx].js_ref
4747
else:
4848
try:
4949
tmp = args[job_input_idx][0]
@@ -65,7 +65,7 @@ def compute_for(*args, **kwargs):
6565

6666
if job_args_idx != None:
6767
if hasattr(args[job_args_idx], 'js_ref') and dry.class_manager.reg.find_from_js_instance(args[job_args_idx].js_ref):
68-
args[job_args_idx] = args[job_args_idx]
68+
args[job_args_idx] = args[job_args_idx].js_ref
6969
else:
7070
try:
7171
tmp = args[job_args_idx][0]

dcp/api/job.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ def _before_exec(self, *args, **kwargs):
7878
serialized_input_data = []
7979
if len(self.serializers):
8080
validate_serializers(self.serializers)
81-
if hasattr(self.js_ref.jobInputData, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.js_ref.jobInputData.js_ref):
82-
serialized_input_data = self.js_ref.jobInputData.js_ref
81+
if hasattr(self.jobInputData, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.jobInputData.js_ref):
82+
serialized_input_data = self.jobInputData.js_ref
8383
elif isinstance(self.js_ref.jobInputData, list) or utils.instanceof(self.js_ref.jobInputData, pm.globalThis.Array):
8484
for input_slice in self.js_ref.jobInputData:
8585
# TODO - find better solution
@@ -95,11 +95,8 @@ def _before_exec(self, *args, **kwargs):
9595
serialized_input_data.append(serialized_slice)
9696
else:
9797
serialized_input_data = self.js_ref.jobInputData
98-
if hasattr(self.js_ref.jobArguments, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.js_ref.jobArguments.js_ref):
99-
serialized_arguments = self.js_ref.jobArguments.js_ref
100-
# if utils.instanceof(self.js_ref.jobArguments, pm.eval("globalThis.dcp.compute.RemoteDataSet")):
101-
# convertToURL = pm.eval('(urlString) => new URL(urlString)')
102-
# self.js_ref.jobArguments.forEach(lambda argument: serialized_arguments.append(convertToURL(argument)))
98+
if hasattr(self.jobArguments, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.jobArguments.js_ref):
99+
serialized_arguments = [self.jobArguments.js_ref]
103100
else:
104101
for argument in self.js_ref.jobArguments:
105102
# TODO - find better solution

examples/remote-data-job-deploy.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from http.server import BaseHTTPRequestHandler, HTTPServer
2+
import threading
3+
import json
4+
5+
import dcp; dcp.init()
6+
7+
class SimpleHandler(BaseHTTPRequestHandler):
8+
def do_GET(self):
9+
path = int(self.path.strip('/'))
10+
self.send_response(200)
11+
self.send_header('Access-Control-Allow-Origin', '*')
12+
self.send_header('Content-Type', 'application/json')
13+
self.end_headers()
14+
self.wfile.write(json.dumps(path).encode())
15+
16+
#start http server on seperate thread
17+
server_address = ("localhost", 12345)
18+
httpd = HTTPServer(server_address, SimpleHandler)
19+
print("Server running at http://localhost:12345")
20+
server_thread = threading.Thread(target=httpd.serve_forever)
21+
server_thread.daemon = True
22+
server_thread.start()
23+
24+
def workfn(x,y):
25+
import dcp
26+
dcp.progress()
27+
return x * y
28+
29+
30+
# 'http://localhost:12345' must be added to a Worker's allowed origins for slices to be completed
31+
# run worker.originManager.add('http://localhost:12345', null, null) in the console, or edit the worker config
32+
# On the public group the job will encounter many errors since workers by default can't access that URL
33+
my_rdp = dcp.compute.RemoteDataPattern('http://localhost:12345/{slice}',5)
34+
my_rds = dcp.compute.RemoteDataSet('http://localhost:12345/2')
35+
36+
my_j = dcp.compute_for(my_rdp, workfn, my_rds)
37+
38+
39+
# add event listeners
40+
my_j.on('readystatechange', print)
41+
my_j.on('result', print)
42+
my_j.on('error', print)
43+
44+
@my_j.on('accepted')
45+
def accepted_handler(ev):
46+
print(f"jobid = {my_j.id}")
47+
48+
my_j.public.name = 'simple bifrost2 remote data pattern example'
49+
50+
my_j.exec()
51+
res = my_j.wait()
52+
53+
print(">>>>>>>>>>>>>>>>>>>>>>>>>> RESULTS ARE IN")
54+
print(res)

0 commit comments

Comments
 (0)