Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams
energies = [10, 20]
system = delayed(make_non_serializable_oject)(x=1)
trans = [delayed(some_function_that_uses_system)(system, energy) for energy in energies]
result = delayed(list)(trans)
result.visualize()

When I call result.compute() the calculation never finishes.

Calling result.compute(get=dask.async.get_sync), and result.compute(dask.threaded.get) both do work. However result.compute(dask.multiprocessing.get) does not and generates the following error:

---------------------------------------------------------------------------
RemoteError                               Traceback (most recent call last)
<ipython-input-70-b5c8f2a1c6f6> in <module>()
----> 1 result.compute(get=dask.multiprocessing.get)
/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
     76             Extra keywords to forward to the scheduler ``get`` function.
     77         """
---> 78         return compute(self, **kwargs)[0]
     80     @classmethod
/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    169         dsk = merge(var.dask for var in variables)
    170     keys = [var._keys() for var in variables]
--> 171     results = get(dsk, keys, **kwargs)
    173     results_iter = iter(results)
/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
     81         # Run
     82         result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 83                            queue=queue, get_id=_process_get_id, **kwargs)
     84     finally:
     85         if cleanup:
/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    479                     _execute_task(task, data)  # Re-execute locally
    480                 else:
--> 481                     raise(remote_exception(res, tb))
    482             state['cache'][key] = res
    483             finish_task(dsk, key, state, results, keyorder.get)
RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/managers.py", line 228, in serve_client
    request = recv()
  File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
  File "kwant/graph/core.pyx", line 664, in kwant.graph.core.CGraph_malloc.__cinit__ (kwant/graph/core.c:8330)
TypeError: __cinit__() takes exactly 6 positional arguments (0 given)
---------------------------------------------------------------------------
Traceback
---------
  File "/home/bnijholt/anaconda3/lib/python3.5/site-packages/dask/async.py", line 273, in execute_task
    queue.put(result)
  File "<string>", line 2, in put
  File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/managers.py", line 732, in _callmethod
    raise convert_to_error(kind, result)

With ipyparallel I would execute the make_non_serializable_oject on each engine, which solves the problem for that case.

I would like to use dask for my parallel computations, how can I solve this?

Ensure that your data can be serialized

This code in your traceback shows that the objects from your kwant library are not serializing themselves well:

Traceback (most recent call last):
  File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/managers.py", line 228, in serve_client
    request = recv()
  File "/home/bnijholt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
  File "kwant/graph/core.pyx", line 664, in kwant.graph.core.CGraph_malloc.__cinit__ (kwant/graph/core.c:8330)
TypeError: __cinit__() takes exactly 6 positional arguments (0 given)

Which is why the multiprocessing and distributed schedulers are failing. Dask requires the ability to serialize data in order to move it around between different processes.

The simplest and cleanest way to solve this problem is to improve serialization of your data. Ideally you can do this by improving kwant. You could also manage this through dask's custom serialization, but that' possibly more work at the moment.

Keep data in one location

OK, so lets assume that you can't improve serialization and need to keep data where it is. This will restrict you to embarrassingly parallel workflows (map). There are two solutions:

  • Use the fuse optimization`
  • Track explicitly where tasks run
  • You're going to create some unserializable data, then run stuff on it, then run a computation on it that turns it into something serializable before trying to move it back. This is fine as long as the scheduler decides to never move the data around on its own. You can enforce this by fusing all of those tasks into a single atomic task. See the optimization docs for details

    from dask.optimize import fuse
    bad_data = [f(...) for ...]
    good_data = [convert_to_serializable_data(bd) for bd in bad_data]
    dask.compute(good_data, optimizations=[fuse])
    

    Specify exactly where each computation should live yourself

    See Data Locality docs

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.