>>>
from
math
import
sqrt
>>>
from
joblib
import
Parallel, delayed
>>>
Parallel(n_jobs=
2
)(delayed(sqrt)(i **
2
)
for
i
in
range(
10
))
[
0.0
,
1.0
,
2.0
,
3.0
,
4.0
,
5.0
,
6.0
,
7.0
,
8.0
,
9.0
]
以上,
Parallel
对象会创建一个进程池,以便在多进程中执行每一个列表项。函数
delayed
是一个创建元组
(function, args, kwargs)
的简单技巧。
在Windows上使用joblib.Parallel的时候,要保护主循环以避免递归生成子进程 。换句话说,你的代码应该如下:
import ....
def function1(
...
):
def function2(
...
):
if
__name__ ==
'__main__'
:
# do stuff with imports and functions defined about
在
if __name__ ==
'__main__'
块的外面除了imports和definitions不能有代码被执行。
使用多线程
默认情况下,Parallel使用Python的多进程模块(multiprocessing)来fork工作进程,以便任务可以在独立的CPU上同时执行。这对于一般的Python程序来说是合理的,但这会产生一些开销,即,输入输出数据需要被序列化到一个排队,才能在工作进程之间进行通信。
当然,如果你知道,你调用的函数是基于编译扩展的,且它在执行的大部分时间都会释放Python的全局解释器锁(GIL),那么此时使用多线程可能会更高效。
为了使用多线程,只需在构造Parallel的时候设置
backend='threading'
即可:
>>> Parallel(n_jobs=
2
, backend=
"threading"
)(
...
delayed(sqrt)(i **
2
)
for
i
in
range(
10
))
[
0.0
,
1.0
,
2.0
,
3.0
,
4.0
,
5.0
,
6.0
,
7.0
,
8.0
,
9.0
]
在共享内存(memmaping)中操作数值型数据
默认情况下,当
n_jobs !=
1
时,joblib使用Python标准库的多进程模块(multiprocessing)来创建真实的Python工作进程
。传递给Parallel调用的参数被序列化,并且会在每一个工作进程中重新创建。
这对于大型参数会成为一个问题,因为它们会被工作进程创建
n_jobs
次。
这在使用numpy进行科学计算中经常发生。joblib.Parallel对大型数组提供了一个特别的处理方法就是自动dump它们到文件系统,并将引用传递给工作进程,然后让工作进程使用
numpy.ndarray
的子类
numpy.memmap
以内存映射的方式打开它们
。这使得所有工作进程可以共享一段数据(更准确的说是共享一段内存)。
如果你的代码能释放GIL,那么使用
backend="threading"
会更高效。
自动将array转换为memmap
通过在数组的大小上配置一个阀值自动触发将array转换为memmap:
>>> import numpy as np
>>> from joblib import Parallel, delayed
>>> from joblib.pool import has_shareable_memory
>>> Parallel(n_jobs=
2
, max_nbytes=
1e6
)(
...
delayed(has_shareable_memory)(np.ones(int(i)))
...
for
i
in
[
1e2
,
1e4
,
1e6
])
[False, False, True]
默认情况下,数据被dump到/dev/shm共享内存分区,如果它存在且可写
(在Linux上就是这样的)。否则将使用操作系统的临时文件夹。可以通过设置Parallel构造函数的参数
temp_folder
来自定义临时数据文件的位置 。
设置
max_nbytes=None
可禁用自动转换。
手动映射输入数据
为了更好地使用内存,你可以手动将数组dump成memmap,然后在fork工作进程之前从父进程中删除原数组。
让我们在父进程中创建一个大型数组:
>>> large_array
=
np.ones(int(1e6))
然后,将它dump到本地文件,以便内存映射:
>>>
import
tempfile
>>>
import
os
>>>
from
joblib
import
load, dump
>>>
temp_folder = tempfile.mkdtemp()
>>>
filename = os.path.join(temp_folder,
'joblib_test.mmap'
)
>>>
if
os.path.exists(filename): os.unlink(filename)
>>>
_ = dump(large_array, filename)
>>>
large_memmap = load(filename, mmap_mode=
'r+'
)
此时,变量
large_memmap
指向一个
numpy.memmap
实例:
>>>
large_memmap.__class__.__name__, large_array.nbytes, large_array.shape
(
'memmap'
,
8000000
, (
1000000
,))
>>>
np.allclose(large_array, large_memmap)
然后,我们就可以释放原来的数组了:
>>>
del
large_array
>>>
import
gc
>>>
_ = gc.collect()
large_memmap还可以被切片成小的memmap:
>>>
small_memmap = large_memmap[
2
:
5
]
>>>
small_memmap.__class__.__name__, small_memmap.nbytes, small_memmap.shape
(
'memmap'
,
24
, (
3
,))
最后,对
np.ndarray
视图的修改会被写回原来的内存映射文件:
>>>
small_array = np.asarray(small_memmap)
>>>
small_array.__class__.__name__, small_array.nbytes, small_array.shape
(
'ndarray'
,
24
, (
3
,))
所有这三个结构都指向相同的内存区域,且这段内存能够被工作进程直接使用:
>>> Parallel(n_jobs=
2
, max_nbytes=None)(
...
delayed(has_shareable_memory)(a)
...
for
a
in
[large_memmap, small_memmap, small_array])
[True, True, True]
注意
这里我们使用
max_nbytes=None
来禁用
Parallel
自动转换功能。实际上small_array也在工作进程的共享内存中。
将并行计算结果写入共享内存
如果你在主进程中使用
w+
或
r+
模式打开你的数据,那么工作进程就可以使用
r+
模式来访问它们,因此,工作进程就能够直接将结果写入内存映射,从而避免了使用串行通信的方式来将结果返回给父进程的操作。
这里是一个例子,为并行进程预先创建
numpy.memmap
数据结构:
"""Demo:在joblib.Parallel中`numpy.memmap`的使用
这个例子演示了如何为并行工作进程的输入和输出欲创建memmap数组。
程序的输出样例::
[Worker 93486] Sum for row 0 is -1599.756454
[Worker 93487] Sum for row 1 is -243.253165
[Worker 93488] Sum for row 3 is 610.201883
[Worker 93489] Sum for row 2 is 187.982005
[Worker 93489] Sum for row 7 is 326.381617
[Worker 93486] Sum for row 4 is 137.324438
[Worker 93489] Sum for row 8 is -198.225809
[Worker 93487] Sum for row 5 is -1062.852066
[Worker 93488] Sum for row 6 is 1666.334107
[Worker 93486] Sum for row 9 is -463.711714
Expected sums computed in the parent process:
[-1599.75645426 -243.25316471 187.98200458 610.20188337 137.32443803
-1062.85206633 1666.33410715 326.38161713 -198.22580876 -463.71171369]
Actual sums computed by the worker processes:
[-1599.75645426 -243.25316471 187.98200458 610.20188337 137.32443803
-1062.85206633 1666.33410715 326.38161713 -198.22580876 -463.71171369]
import
tempfile
import
shutil
import
os
import
numpy
as
np
from
joblib
import
Parallel, delayed
from
joblib
import
load, dump
def
sum_row
(input, output, i)
:
"""Compute the sum of a row in input and store it in output"""
sum_ = input[i, :].sum()
print(
"[Worker %d] Sum for row %d is %f"
% (os.getpid(), i, sum_))
output[i] = sum_
if
__name__ ==
"__main__"
:
rng = np.random.RandomState(
42
)
folder = tempfile.mkdtemp()
samples_name = os.path.join(folder,
'samples'
)
sums_name = os.path.join(folder,
'sums'
)
try
:
# Generate some data and an allocate an output buffer
samples = rng.normal(size=(
10
, int(
1e6
)))
# Pre-allocate a writeable shared memory map as a container for the
# results of the parallel computation
sums = np.memmap(sums_name, dtype=samples.dtype,
shape=samples.shape[
0
], mode=
'w+'
)
# Dump the input data to disk to free the memory
dump(samples, samples_name)
# Release the reference on the original in memory array and replace it
# by a reference to the memmap array so that the garbage collector can
# release the memory before forking. gc.collect() is internally called
# in Parallel just before forking.
samples = load(samples_name, mmap_mode=
'r'
)
# Fork the worker processes to perform computation concurrently
Parallel(n_jobs=
4
)(delayed(sum_row)(samples, sums, i)
for
i
in
range(samples.shape[
0
]))
# Compare the results from the output buffer with the ground truth
print(
"Expected sums computed in the parent process:"
)
expected_result = samples.sum(axis=
1
)
print(expected_result)
print(
"Actual sums computed by the worker processes:"
)
print(sums)
assert
np.allclose(expected_result, sums)
finally
:
try
:
shutil.rmtree(folder)
except
:
print(
"Failed to delete: "
+ folder)
因为numpy没有提供原子操作,所以并发中的工作进程写共享内存可能会破坏数据。而之前的例子中,每一个任务都只更新结果数组中对应的一项,因此没有这个风险。
最后,在完成计算后,不要忘记清理临时文件夹:
>>> import shutil
>>>
try
:
...
shutil.rmtree(temp_folder)
...
except OSError:
...
pass
# this can sometimes fail under Windows
新浪简介
|
About Sina
|
广告服务
|
联系我们
|
招聘信息
|
网站律师
|
SINA English
|
产品答疑