相关文章推荐
留胡子的香菜  ·  windows ...·  1 年前    · 
健身的熊猫  ·  Get-Date ...·  1 年前    · 
欢乐的柳树  ·  MYSQL 8 show ...·  1 年前    · 
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> Parallel(n_jobs= 2 )(delayed(sqrt)(i ** 2 ) for i in range( 10 )) [ 0.0 , 1.0 , 2.0 , 3.0 , 4.0 , 5.0 , 6.0 , 7.0 , 8.0 , 9.0 ]

以上, Parallel 对象会创建一个进程池,以便在多进程中执行每一个列表项。函数 delayed 是一个创建元组 (function, args, kwargs) 的简单技巧。

在Windows上使用joblib.Parallel的时候,要保护主循环以避免递归生成子进程 。换句话说,你的代码应该如下:

import .... def function1( ... ): def function2( ... ): if __name__ == '__main__' : # do stuff with imports and functions defined about if __name__ == '__main__' 块的外面除了imports和definitions不能有代码被执行。

使用多线程

默认情况下,Parallel使用Python的多进程模块(multiprocessing)来fork工作进程,以便任务可以在独立的CPU上同时执行。这对于一般的Python程序来说是合理的,但这会产生一些开销,即,输入输出数据需要被序列化到一个排队,才能在工作进程之间进行通信。

当然,如果你知道,你调用的函数是基于编译扩展的,且它在执行的大部分时间都会释放Python的全局解释器锁(GIL),那么此时使用多线程可能会更高效。

为了使用多线程,只需在构造Parallel的时候设置 backend='threading' 即可:

>>> Parallel(n_jobs= 2 , backend= "threading" )( ... delayed(sqrt)(i ** 2 ) for i in range( 10 )) [ 0.0 , 1.0 , 2.0 , 3.0 , 4.0 , 5.0 , 6.0 , 7.0 , 8.0 , 9.0 ] 在共享内存(memmaping)中操作数值型数据

默认情况下,当 n_jobs != 1 时,joblib使用Python标准库的多进程模块(multiprocessing)来创建真实的Python工作进程 。传递给Parallel调用的参数被序列化,并且会在每一个工作进程中重新创建。

这对于大型参数会成为一个问题,因为它们会被工作进程创建 n_jobs 次。

这在使用numpy进行科学计算中经常发生。joblib.Parallel对大型数组提供了一个特别的处理方法就是自动dump它们到文件系统,并将引用传递给工作进程,然后让工作进程使用 numpy.ndarray 的子类 numpy.memmap 以内存映射的方式打开它们 。这使得所有工作进程可以共享一段数据(更准确的说是共享一段内存)。

如果你的代码能释放GIL,那么使用 backend="threading" 会更高效。

自动将array转换为memmap

通过在数组的大小上配置一个阀值自动触发将array转换为memmap:

>>> import numpy as np >>> from joblib import Parallel, delayed >>> from joblib.pool import has_shareable_memory >>> Parallel(n_jobs= 2 , max_nbytes= 1e6 )( ... delayed(has_shareable_memory)(np.ones(int(i))) ... for i in [ 1e2 , 1e4 , 1e6 ]) [False, False, True] 默认情况下,数据被dump到/dev/shm共享内存分区,如果它存在且可写 (在Linux上就是这样的)。否则将使用操作系统的临时文件夹。可以通过设置Parallel构造函数的参数 temp_folder 来自定义临时数据文件的位置 。

设置 max_nbytes=None 可禁用自动转换。

手动映射输入数据 为了更好地使用内存,你可以手动将数组dump成memmap,然后在fork工作进程之前从父进程中删除原数组。

让我们在父进程中创建一个大型数组:

>>> large_array = np.ones(int(1e6)) 然后,将它dump到本地文件,以便内存映射:

>>> import tempfile >>> import os >>> from joblib import load, dump >>> temp_folder = tempfile.mkdtemp() >>> filename = os.path.join(temp_folder, 'joblib_test.mmap' ) >>> if os.path.exists(filename): os.unlink(filename) >>> _ = dump(large_array, filename) >>> large_memmap = load(filename, mmap_mode= 'r+' )

此时,变量 large_memmap 指向一个 numpy.memmap 实例:

>>> large_memmap.__class__.__name__, large_array.nbytes, large_array.shape ( 'memmap' , 8000000 , ( 1000000 ,)) >>> np.allclose(large_array, large_memmap) 然后,我们就可以释放原来的数组了:

>>> del large_array >>> import gc >>> _ = gc.collect() large_memmap还可以被切片成小的memmap:

>>> small_memmap = large_memmap[ 2 : 5 ] >>> small_memmap.__class__.__name__, small_memmap.nbytes, small_memmap.shape ( 'memmap' , 24 , ( 3 ,))

最后,对 np.ndarray 视图的修改会被写回原来的内存映射文件:

>>> small_array = np.asarray(small_memmap) >>> small_array.__class__.__name__, small_array.nbytes, small_array.shape ( 'ndarray' , 24 , ( 3 ,)) 所有这三个结构都指向相同的内存区域,且这段内存能够被工作进程直接使用:

>>> Parallel(n_jobs= 2 , max_nbytes=None)( ... delayed(has_shareable_memory)(a) ... for a in [large_memmap, small_memmap, small_array]) [True, True, True]

注意 这里我们使用 max_nbytes=None 来禁用 Parallel 自动转换功能。实际上small_array也在工作进程的共享内存中。

将并行计算结果写入共享内存 如果你在主进程中使用 w+ r+ 模式打开你的数据,那么工作进程就可以使用 r+ 模式来访问它们,因此,工作进程就能够直接将结果写入内存映射,从而避免了使用串行通信的方式来将结果返回给父进程的操作。

这里是一个例子,为并行进程预先创建 numpy.memmap 数据结构:

"""Demo:在joblib.Parallel中`numpy.memmap`的使用 这个例子演示了如何为并行工作进程的输入和输出欲创建memmap数组。 程序的输出样例:: [Worker 93486] Sum for row 0 is -1599.756454 [Worker 93487] Sum for row 1 is -243.253165 [Worker 93488] Sum for row 3 is 610.201883 [Worker 93489] Sum for row 2 is 187.982005 [Worker 93489] Sum for row 7 is 326.381617 [Worker 93486] Sum for row 4 is 137.324438 [Worker 93489] Sum for row 8 is -198.225809 [Worker 93487] Sum for row 5 is -1062.852066 [Worker 93488] Sum for row 6 is 1666.334107 [Worker 93486] Sum for row 9 is -463.711714 Expected sums computed in the parent process: [-1599.75645426 -243.25316471 187.98200458 610.20188337 137.32443803 -1062.85206633 1666.33410715 326.38161713 -198.22580876 -463.71171369] Actual sums computed by the worker processes: [-1599.75645426 -243.25316471 187.98200458 610.20188337 137.32443803 -1062.85206633 1666.33410715 326.38161713 -198.22580876 -463.71171369] import tempfile import shutil import os import numpy as np from joblib import Parallel, delayed from joblib import load, dump def sum_row (input, output, i) : """Compute the sum of a row in input and store it in output""" sum_ = input[i, :].sum() print( "[Worker %d] Sum for row %d is %f" % (os.getpid(), i, sum_)) output[i] = sum_ if __name__ == "__main__" : rng = np.random.RandomState( 42 ) folder = tempfile.mkdtemp() samples_name = os.path.join(folder, 'samples' ) sums_name = os.path.join(folder, 'sums' ) try : # Generate some data and an allocate an output buffer samples = rng.normal(size=( 10 , int( 1e6 ))) # Pre-allocate a writeable shared memory map as a container for the # results of the parallel computation sums = np.memmap(sums_name, dtype=samples.dtype, shape=samples.shape[ 0 ], mode= 'w+' ) # Dump the input data to disk to free the memory dump(samples, samples_name) # Release the reference on the original in memory array and replace it # by a reference to the memmap array so that the garbage collector can # release the memory before forking. gc.collect() is internally called # in Parallel just before forking. samples = load(samples_name, mmap_mode= 'r' ) # Fork the worker processes to perform computation concurrently Parallel(n_jobs= 4 )(delayed(sum_row)(samples, sums, i) for i in range(samples.shape[ 0 ])) # Compare the results from the output buffer with the ground truth print( "Expected sums computed in the parent process:" ) expected_result = samples.sum(axis= 1 ) print(expected_result) print( "Actual sums computed by the worker processes:" ) print(sums) assert np.allclose(expected_result, sums) finally : try : shutil.rmtree(folder) except : print( "Failed to delete: " + folder) 因为numpy没有提供原子操作,所以并发中的工作进程写共享内存可能会破坏数据。而之前的例子中,每一个任务都只更新结果数组中对应的一项,因此没有这个风险。

最后,在完成计算后,不要忘记清理临时文件夹:

>>> import shutil >>> try : ... shutil.rmtree(temp_folder) ... except OSError: ... pass # this can sometimes fail under Windows

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑