我刚刚在学习Dask,了解它在尴尬的并行任务中的应用。我有一个函数,从一个文件中读取数据,并对该数据进行长时间的计算。我通过joblib并行化来加速该计算。
我现在想用Dask将其扩展到多个分布式机器。我想请求一些节点,让每个机器/节点从文件池中处理一个文件并返回结果。我希望每个文件的处理都能利用某种本地节点的并行性。
如果这是MPI+OpenMP,我将在每台机器上有一个等级,每个等级的物理核心数量为OpenMP线程。使用Dask,我只看到如何创建一个巨大的工作者池,分享每个文件或所有文件的处理。我想要复合并行(每个节点一个文件,每个节点的ncore进程帮助处理每个文件)。
我试着用dask调用joblib函数,但它并没有利用每台机器上的所有内核。我也不知道如何通过client.submit将提交的任务钉在某个机器上。
import joblib
import itertools
import numpy as np
#world's stupidest function as a simple illustrative example, matrix is read in from a file and this function is called on many different pairs in conjunction with the matrix to create a very expensive computation. parallelization over the list of pairs is trivial
def example(matrix, pair):
for i in range(100000): #takes almost no time
#for i in range(10000000): #takes a long time
x=np.exp(100)
return pair[0]+pair[1]+pair[2]
def my_parallel_example(matrix, pairs, num_jobs):
results= joblib.Parallel(n_jobs=num_jobs, verbose=10)(joblib.delayed(example)(matrix, pair) for pair in pairs)
return results
from dask_jobqueue import SGECluster
cores_per_node=24
cluster = SGECluster(
cores=1,
dashboard_address=':0',
job_extra=['-pe {} {}'.format(parallel_environment, cores_per_node), '-j y', '-o /dev/null'],
local_directory='$TMPDIR',
memory=100 GiB,
processes=cores_per_node,
project=project_name,
walltime='00:30:00'
#just requesting one 24-core machine
requested_cores=24
cluster.scale(requested_nodes)
client.wait_for_workers(requested_nodes)
matrix=None
possibilities=[1, 2, 3]
pairs = list(itertools.product(possibilities, possibilities, possibilities))
num_jobs=10
c=client.submit(my_parallel_bfs, matrix, pairs, num_jobs)
最终我想要这样的东西,但我搞不清楚语法。
matrices=[mat1, mat2, ...] #each matrix read from a seperate file and added to a pool of 'big' jobs to be tackled by a node
results=[]
for matrix in matrices:
c=client.submit(my_parallel_bfs, matrix, pairs, num_jobs) #each job is submitted to a node that then uses several cores/processes to compute the result corresponding to each file.
results.append(c.result() )
我在网上看到的一切似乎都是使用分布式dask来完成令人尴尬的并行工作,我没有看到我的应用实例,即每个节点嵌套一个大的计算,在每个大的计算上使用ncore子进程。
我希望上述代码能像我不使用dask直接运行时那样有效地利用10个核心,但通过client.submit()提交时似乎只使用了一个核心。而且我也不知道如何扩展到多台机器上,使每台机器在没有通信的情况下从矩阵池中的一个矩阵上工作。