Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dcp/api/compute_for.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def compute_for(*args, **kwargs):
# 3. For each input element, dereference js_ref if from dcp-client, add a guard if pythonmonkey will mutate it, else as it as-is.
if job_input_idx != None:
if hasattr(args[job_input_idx], 'js_ref') and dry.class_manager.reg.find_from_js_instance(args[job_input_idx].js_ref):
args[job_input_idx] = args[job_input_idx]
args[job_input_idx] = args[job_input_idx].js_ref
else:
try:
tmp = args[job_input_idx][0]
Expand All @@ -65,7 +65,7 @@ def compute_for(*args, **kwargs):

if job_args_idx != None:
if hasattr(args[job_args_idx], 'js_ref') and dry.class_manager.reg.find_from_js_instance(args[job_args_idx].js_ref):
args[job_args_idx] = args[job_args_idx]
args[job_args_idx] = args[job_args_idx].js_ref
else:
try:
tmp = args[job_args_idx][0]
Expand Down
11 changes: 4 additions & 7 deletions dcp/api/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def _before_exec(self, *args, **kwargs):
serialized_input_data = []
if len(self.serializers):
validate_serializers(self.serializers)
if hasattr(self.js_ref.jobInputData, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.js_ref.jobInputData.js_ref):
serialized_input_data = self.js_ref.jobInputData.js_ref
if hasattr(self.jobInputData, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.jobInputData.js_ref):
serialized_input_data = self.jobInputData.js_ref
elif isinstance(self.js_ref.jobInputData, list) or utils.instanceof(self.js_ref.jobInputData, pm.globalThis.Array):
for input_slice in self.js_ref.jobInputData:
# TODO - find better solution
Expand All @@ -95,11 +95,8 @@ def _before_exec(self, *args, **kwargs):
serialized_input_data.append(serialized_slice)
else:
serialized_input_data = self.js_ref.jobInputData
if hasattr(self.js_ref.jobArguments, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.js_ref.jobArguments.js_ref):
serialized_arguments = self.js_ref.jobArguments.js_ref
# if utils.instanceof(self.js_ref.jobArguments, pm.eval("globalThis.dcp.compute.RemoteDataSet")):
# convertToURL = pm.eval('(urlString) => new URL(urlString)')
# self.js_ref.jobArguments.forEach(lambda argument: serialized_arguments.append(convertToURL(argument)))
Comment thread
wiwichips marked this conversation as resolved.
if hasattr(self.jobArguments, 'js_ref') and dry.class_manager.reg.find_from_js_instance(self.jobArguments.js_ref):
serialized_arguments = [self.jobArguments.js_ref]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks incorrect to me, doesn't this make serialized_arguments an array of array-likes when it should just be an array-like?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because line 143 on this file expects serialzed_arguments to be concatenated with other PYTHON lists

self.js_ref.jobArguments = [offset_to_argument_vector] + ["gzImage", job_fs] + env_args + serialized_arguments + [meta_arguments]

This solution did seem a little fishy to me in some way, but I was getting the correct behaviour with single objects (such as RemoteDataSet) properly converting to single and/or multiple job args

There might be other cleaner more intuitive solutions than just slapping it in an array as well, my justification for this though was that the other branch of logic for job args always leaves serialized_arguments as an array

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this would work, for ex. [range(1,10)] is a list containing a range, it doesn't flatten the iterable

Also we shouldn't be using + to concatenate iterables since it only works for concatenating lists, but that's maybe out of scope of this PR and we have to deal with the different iterator interfaces between js and python 🥴

Maybe we should instead use itertools.chain, and write a small iterator wrapper which converts a js iterator into a python iterable if it's a js_ref and it has Symbol.iterator attribute

else:
for argument in self.js_ref.jobArguments:
# TODO - find better solution
Expand Down
54 changes: 54 additions & 0 deletions examples/remote-data-job-deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import json

import dcp; dcp.init()

class SimpleHandler(BaseHTTPRequestHandler):
def do_GET(self):
path = int(self.path.strip('/'))
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(path).encode())

#start http server on seperate thread
server_address = ("localhost", 12345)
httpd = HTTPServer(server_address, SimpleHandler)
print("Server running at http://localhost:12345")
server_thread = threading.Thread(target=httpd.serve_forever)
server_thread.daemon = True
server_thread.start()

def workfn(x,y):
import dcp
dcp.progress()
return x * y


# 'http://localhost:12345' must be added to a Worker's allowed origins for slices to be completed
# run worker.originManager.add('http://localhost:12345', null, null) in the console, or edit the worker config
# On the public group the job will encounter many errors since workers by default can't access that URL
my_rdp = dcp.compute.RemoteDataPattern('http://localhost:12345/{slice}',5)
Comment thread
JosephAcernese marked this conversation as resolved.
my_rds = dcp.compute.RemoteDataSet('http://localhost:12345/2')

my_j = dcp.compute_for(my_rdp, workfn, my_rds)


# add event listeners
my_j.on('readystatechange', print)
my_j.on('result', print)
my_j.on('error', print)

@my_j.on('accepted')
def accepted_handler(ev):
print(f"jobid = {my_j.id}")

my_j.public.name = 'simple bifrost2 remote data pattern example'

my_j.exec()
res = my_j.wait()

print(">>>>>>>>>>>>>>>>>>>>>>>>>> RESULTS ARE IN")
print(res)
Loading