diff --git a/dcp/api/compute_for.py b/dcp/api/compute_for.py index aacc2c1..ae650b4 100644 --- a/dcp/api/compute_for.py +++ b/dcp/api/compute_for.py @@ -43,7 +43,7 @@ def compute_for(*args, **kwargs): # 3. For each input element, dereference js_ref if from dcp-client, add a guard if pythonmonkey will mutate it, else as it as-is. if job_input_idx != None: if hasattr(args[job_input_idx], 'js_ref') and dry.class_manager.reg.find_from_js_instance(args[job_input_idx].js_ref): - args[job_input_idx] = args[job_input_idx] + args[job_input_idx] = args[job_input_idx].js_ref else: try: tmp = args[job_input_idx][0] @@ -65,7 +65,7 @@ def compute_for(*args, **kwargs): if job_args_idx != None: if hasattr(args[job_args_idx], 'js_ref') and dry.class_manager.reg.find_from_js_instance(args[job_args_idx].js_ref): - args[job_args_idx] = args[job_args_idx] + args[job_args_idx] = args[job_args_idx].js_ref else: try: tmp = args[job_args_idx][0] diff --git a/dcp/api/job.py b/dcp/api/job.py index 43fad66..c942127 100644 --- a/dcp/api/job.py +++ b/dcp/api/job.py @@ -78,8 +78,8 @@ def _before_exec(self, *args, **kwargs): serialized_input_data = [] if len(self.serializers): validate_serializers(self.serializers) - if hasattr(self.js_ref.jobInputData, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.js_ref.jobInputData.js_ref): - serialized_input_data = self.js_ref.jobInputData.js_ref + if hasattr(self.jobInputData, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.jobInputData.js_ref): + serialized_input_data = self.jobInputData.js_ref elif isinstance(self.js_ref.jobInputData, list) or utils.instanceof(self.js_ref.jobInputData, pm.globalThis.Array): for input_slice in self.js_ref.jobInputData: # TODO - find better solution @@ -95,11 +95,8 @@ def _before_exec(self, *args, **kwargs): serialized_input_data.append(serialized_slice) else: serialized_input_data = self.js_ref.jobInputData - if hasattr(self.js_ref.jobArguments, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.js_ref.jobArguments.js_ref): - serialized_arguments = self.js_ref.jobArguments.js_ref - # if utils.instanceof(self.js_ref.jobArguments, pm.eval("globalThis.dcp.compute.RemoteDataSet")): - # convertToURL = pm.eval('(urlString) => new URL(urlString)') - # self.js_ref.jobArguments.forEach(lambda argument: serialized_arguments.append(convertToURL(argument))) + if hasattr(self.jobArguments, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.jobArguments.js_ref): + serialized_arguments = [self.jobArguments.js_ref] else: for argument in self.js_ref.jobArguments: # TODO - find better solution diff --git a/examples/remote-data-job-deploy.py b/examples/remote-data-job-deploy.py new file mode 100644 index 0000000..d3baf9d --- /dev/null +++ b/examples/remote-data-job-deploy.py @@ -0,0 +1,54 @@ +from http.server import BaseHTTPRequestHandler, HTTPServer +import threading +import json + +import dcp; dcp.init() + +class SimpleHandler(BaseHTTPRequestHandler): + def do_GET(self): + path = int(self.path.strip('/')) + self.send_response(200) + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(path).encode()) + +#start http server on seperate thread +server_address = ("localhost", 12345) +httpd = HTTPServer(server_address, SimpleHandler) +print("Server running at http://localhost:12345") +server_thread = threading.Thread(target=httpd.serve_forever) +server_thread.daemon = True +server_thread.start() + +def workfn(x,y): + import dcp + dcp.progress() + return x * y + + +# 'http://localhost:12345' must be added to a Worker's allowed origins for slices to be completed +# run worker.originManager.add('http://localhost:12345', null, null) in the console, or edit the worker config +# On the public group the job will encounter many errors since workers by default can't access that URL +my_rdp = dcp.compute.RemoteDataPattern('http://localhost:12345/{slice}',5) +my_rds = dcp.compute.RemoteDataSet('http://localhost:12345/2') + +my_j = dcp.compute_for(my_rdp, workfn, my_rds) + + +# add event listeners +my_j.on('readystatechange', print) +my_j.on('result', print) +my_j.on('error', print) + +@my_j.on('accepted') +def accepted_handler(ev): + print(f"jobid = {my_j.id}") + +my_j.public.name = 'simple bifrost2 remote data pattern example' + +my_j.exec() +res = my_j.wait() + +print(">>>>>>>>>>>>>>>>>>>>>>>>>> RESULTS ARE IN") +print(res)