相关文章推荐
卖萌的眼镜  ·  飞猪 - 开放平台·  5 月前    · 
乐观的便当  ·  GROUP BY ...·  1 年前    · 

如何用Dask做复合并行(类似于MPI+OpenMP)?

0 人关注

我刚刚在学习Dask,了解它在尴尬的并行任务中的应用。我有一个函数,从一个文件中读取数据,并对该数据进行长时间的计算。我通过joblib并行化来加速该计算。

我现在想用Dask将其扩展到多个分布式机器。我想请求一些节点,让每个机器/节点从文件池中处理一个文件并返回结果。我希望每个文件的处理都能利用某种本地节点的并行性。

如果这是MPI+OpenMP,我将在每台机器上有一个等级,每个等级的物理核心数量为OpenMP线程。使用Dask,我只看到如何创建一个巨大的工作者池,分享每个文件或所有文件的处理。我想要复合并行(每个节点一个文件,每个节点的ncore进程帮助处理每个文件)。

我试着用dask调用joblib函数,但它并没有利用每台机器上的所有内核。我也不知道如何通过client.submit将提交的任务钉在某个机器上。

import joblib
import itertools
import numpy as np
#world's stupidest function as a simple illustrative example, matrix is read in from a file and this function is called on many different pairs in conjunction with the matrix to create a very expensive computation. parallelization over the list of pairs is trivial 
def example(matrix, pair):
    for i in range(100000): #takes almost no time
    #for i in range(10000000): #takes a long time
        x=np.exp(100)
    return pair[0]+pair[1]+pair[2]
def my_parallel_example(matrix, pairs, num_jobs):
    results= joblib.Parallel(n_jobs=num_jobs, verbose=10)(joblib.delayed(example)(matrix, pair) for pair in pairs) 
    return results
from dask_jobqueue import SGECluster
cores_per_node=24
cluster = SGECluster(
    cores=1,
    dashboard_address=':0',
    job_extra=['-pe {} {}'.format(parallel_environment, cores_per_node), '-j y', '-o /dev/null'],
    local_directory='$TMPDIR',
    memory=100 GiB,
    processes=cores_per_node,
    project=project_name,
    walltime='00:30:00'
#just requesting one 24-core machine
requested_cores=24
cluster.scale(requested_nodes)
client.wait_for_workers(requested_nodes)
matrix=None
possibilities=[1, 2, 3]
pairs = list(itertools.product(possibilities, possibilities, possibilities))
num_jobs=10
c=client.submit(my_parallel_bfs, matrix, pairs, num_jobs)

最终我想要这样的东西,但我搞不清楚语法。

matrices=[mat1, mat2, ...] #each matrix read from a seperate file and added to a pool of 'big' jobs to be tackled by a node
results=[]
for matrix in matrices: 
    c=client.submit(my_parallel_bfs, matrix, pairs, num_jobs) #each job is submitted to a node that then uses several cores/processes to compute the result corresponding to each file. 
    results.append(c.result() )

我在网上看到的一切似乎都是使用分布式dask来完成令人尴尬的并行工作,我没有看到我的应用实例,即每个节点嵌套一个大的计算,在每个大的计算上使用ncore子进程。

我希望上述代码能像我不使用dask直接运行时那样有效地利用10个核心,但通过client.submit()提交时似乎只使用了一个核心。而且我也不知道如何扩展到多台机器上,使每台机器在没有通信的情况下从矩阵池中的一个矩阵上工作。

2 个评论
你能不能更具体地说明你想对你的矩阵进行什么操作?最好的解决方案可能取决于此..,
这实际上是对矩阵中的指数对进行深度优先搜索。最短的路径是不与数组中的 "0 "元素交叉。我使用joblib在本地进行了并行化,创建了一个对的池,在它们之间进行DFS。
python
parallel-processing
multiprocessing
dask
dask-distributed
Jrt54
Jrt54
发布于 2019-09-06
1 个回答
Jrt54
Jrt54
发布于 2019-09-25
已采纳
0 人赞同

所以在纠结了很久之后,在网上发现了很多关于混合joblib/Dask的问题(比如说 https://github.com/joblib/joblib/issues/875 ),我想出了一个足够好的解决方法,可以张贴部分答案。

理想情况下,我想用dask提交独立的 "父 "任务到不同的客户端(机器),并让每个任务调用一个用joblib并行化的计算密集型函数(每个核心使用一个joblib工作者)。最终,即使在与各种选项(joblib中的线程与进程,每台机器的进程数量等)进行斗争后,我也没有看到良好的性能。让dask调用joblib扼杀了我的并行性,也许在引擎盖下有一些锁定机制。

相反,我重写了程序,完全使用Dask(每个内核一个Dask工作者,而不是每台机器一个工作者),并摆脱了joblib。

cores_per_node=24
cluster = SGECluster(
    cores=cores_per_node,
    dashboard_address=':0',
    job_extra=['-pe {} {}'.format(parallel_environment, cores_per_node), '-j y', '-o /dev/null'],
    local_directory='$TMPDIR',
    memory=100 GiB,
    processes=cores_per_node,
    project=project_name,
    walltime='00:30:00'
requested_nodes=2
total_workers=cores_per_node*requested_nodes
cluster.scale(total_workers)
client.wait_for_workers(total_workers)

在这一点上,我已经申请了2台24核机器,Dask有48个工作者,所以每个核心有一个工作者。

#I was not able to distribute one "file" of work to a machine, but I was able to have multiple machines process one file. 
info=dict #initialize empty dictionary
for file in list_of_files:
    matrix=np.load(file)
    remote_matrix=client.scatter(matrix) #necessary to prevent any dask warnings and seemed to improve performance by reducing communication. each worker 
    futures = []
    for pair in pairs:
        #now each pair is a dask job, previously this was sent as jobs to joblib
        futures.append(client.submit(example, remote_matrix, pair, workers=first_host))
    tmp_result=client.gather(futures) #blocks until all jobs completed
    info[file]=tmp_result #store the results for one file