相关文章推荐
果断的围巾  ·  checkbox wpf ...·  1 年前    · 
活泼的灌汤包  ·  javascript - How do ...·  1 年前    · 
坏坏的黑框眼镜  ·  react.js - react ...·  1 年前    · 

在使用pandas的时候,经常会用到groupby这个函数来对数据进行分组统计,同时可以使用 apply函数很方便的对分组之后的数据进行处理。

def data_process(x):
# process
return ...


result = df.groupby('user_id').apply(data_process)

使用joblib进行加速

但是如果数据非常多的时候(比如几千万条数据),运行的效率是比较低的,因为这个时候只使用了一个CPU线程,所以当数据非常多的时候,处理起来会很慢。这个时候CPU其他的核是空闲的,所以考虑使用joblib来多线程加速。

from joblib import Parallel, delayed


def data_process(x):
# process
return ...


def applyParallel(dfGrouped, func):
res = Parallel(n_jobs=4)(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(res)


result = applyParallel(df.groupby('user_id'), data_process)

使用pandarallel进行加速

除了使用joblib之外,还可以使用pandarallel进行加速,使用pandarallel无需编写函数,只需要提前初始化pandarallel即可。

from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True)
# 可选参数:
# nb_workers:用于并行化的工作程序数。数值类型,如果未设置,则将使用所有可用的CPU。
# progress_bar:如果设置为,则显示进度条True。(False默认为布尔)
# verbose:详细级别(2默认为int )
# 0-不显示任何日志
# 1-仅显示警告日志
# 2-显示所有日志
# use_memory_fs:(None默认为布尔)如果设置为None且内存文件系统可用,Pandarallel将使用它在主进程和工作进程之间传输数据。
# 如果内存文件系统不可用,则Pandarallel将默认进行多处理数据传输(管道)。如果设置为True,则Pandarallel将使用内存文件系统在主进程和工作进程之间传输数据,SystemError如果内存文件系统不可用,则会引发。
# 如果设置为False,则Pandarallel将使用多处理数据传输(管道)在主进程和工作进程之间传输数据。


def data_process(x):
# process
return ...


result = df.groupby('user_id').parallel_apply(data_process)

需要说明的是,pandarallel目前只能在Linux和OS X上运行。使用内存文件系统(参数use_memory_fs控制)可以减少主进程与工作进程之间的数据传输时间,尤其是对于大数据而言。仅当目录/dev/shm存在且用户对其具有读写权限时,才认为内存文件系统可用。基本上,内存文件系统仅在某些Linux发行版(包括Ubuntu)上可用。

并行化具有成本(实例化新进程,通过共享内存发送数据等),因此,只有在要进行并行化的计算量足够高的情况下,并行化才有效。对于很少的数据,使用并行化并不总是值得的。

参考:https://github.com/nalepae/pandaralle

【Python】Pandas groupby加速处理数据_人工智能

qq群554839127