在使用pandas的时候,经常会用到groupby这个函数来对数据进行分组统计,同时可以使用 apply函数很方便的对分组之后的数据进行处理。
def data_process(x):
# process
return ...
result = df.groupby('user_id').apply(data_process)
使用joblib进行加速
但是如果数据非常多的时候(比如几千万条数据),运行的效率是比较低的,因为这个时候只使用了一个CPU线程,所以当数据非常多的时候,处理起来会很慢。这个时候CPU其他的核是空闲的,所以考虑使用joblib来多线程加速。
from joblib import Parallel, delayed
def data_process(x):
# process
return ...
def applyParallel(dfGrouped, func):
res = Parallel(n_jobs=4)(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(res)
result = applyParallel(df.groupby('user_id'), data_process)
使用pandarallel进行加速
除了使用joblib之外,还可以使用pandarallel进行加速,使用pandarallel无需编写函数,只需要提前初始化pandarallel即可。
from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True)
# 可选参数:
# nb_workers:用于并行化的工作程序数。数值类型,如果未设置,则将使用所有可用的CPU。
# progress_bar:如果设置为,则显示进度条True。(False默认为布尔)
# verbose:详细级别(2默认为int )
# 0-不显示任何日志
# 1-仅显示警告日志
# 2-显示所有日志
# use_memory_fs:(None默认为布尔)如果设置为None且内存文件系统可用,Pandarallel将使用它在主进程和工作进程之间传输数据。
# 如果内存文件系统不可用,则Pandarallel将默认进行多处理数据传输(管道)。如果设置为True,则Pandarallel将使用内存文件系统在主进程和工作进程之间传输数据,SystemError如果内存文件系统不可用,则会引发。
# 如果设置为False,则Pandarallel将使用多处理数据传输(管道)在主进程和工作进程之间传输数据。
def data_process(x):
# process
return ...
result = df.groupby('user_id').parallel_apply(data_process)
需要说明的是,pandarallel目前只能在Linux和OS X上运行。使用内存文件系统(参数use_memory_fs控制)可以减少主进程与工作进程之间的数据传输时间,尤其是对于大数据而言。仅当目录/dev/shm存在且用户对其具有读写权限时,才认为内存文件系统可用。基本上,内存文件系统仅在某些Linux发行版(包括Ubuntu)上可用。
并行化具有成本(实例化新进程,通过共享内存发送数据等),因此,只有在要进行并行化的计算量足够高的情况下,并行化才有效。对于很少的数据,使用并行化并不总是值得的。
参考:https://github.com/nalepae/pandaralle
适合初学者入门人工智能的路线及资料下载机器学习及深度学习笔记等资料打印机器学习在线手册深度学习笔记专辑《统计学习方法》的代码复现专辑
AI基础下载黄海广老师《机器学习课程》视频课黄海广老师《机器学习课程》711页完整版课件
本站qq群554839127,加入微信群请扫码:
在使用pandas的时候,经常会用到groupby这个函数来对数据进行分组统计,同时可以使用 apply函数很方便的对分组之后的数据进行处理。def data_process(x): ...
分组聚合的原理一般分为拆分-应用-合并。( )
只要使用
group
by()方法分组就会产生一个DataFrame
Group
by对象。( )
使用agg()方法进行聚合运算会对产生的标量值进行广播。( )
使用transform()方法进行聚合运算,其结果可以保持与原
数据
形状相同。( )
apply()方法可以使用广播功能。( )
下列选项中,关于
group
by()方法说法不正确的是。( )
分组键可以是列表或数组,但长度不需要与待分组轴的长度相同
可以通过DataFrame中的列名的值进行分组
可以使用函数进行分组
可使用series或字典分组
下列选项中,关于agg()方法使用不正确是。( )
agg()方法中func参数只能传入一个函数
agg()方法中func参数可以传入多个函数
agg()方法中func参数可以传入自定义函数
agg()方法不能对产生的标量值进行广播
【*】8-Intro to Data Structures
【*】9-Essential Basic Functionality
【*】10-Working with Text Data
【*】11-Options and Settings
【*】12-Indexing and Selecting Data
【*】13-MultiIndex / Advanced Indexing
【*】14-Computational tools
【*】15-Working with missing data
【*】16-
Group
By: split-apply-combine
【*】17-Merge, join, and concatenate
【*】18-Reshaping and Pivot Tables
【*】19-Time Series / Date functionality
【*】20-Time Deltas
21-Categorical Data
【*】22-Visualization
23-Styling
【+】24-IO Tools (Text, CSV, HDF5, ...) 【***** 文件】
25-Remote Data Access 【-】
26-Enhancing Performance 【*C语言提升性能】
27-Sparse data structures 【*稀疏】
28-Frequently Asked Questions (FAQ) 【-】
29-rpy2 / R interface 【-】
【*】30-
pandas
Ecosystem
31-Comparison with R / R libraries 【-】
【*】32-Comparison with SQL 【***** sql用法】
33-Comparison with SAS 【-】
【*】34-API Reference 【***函数总结】
35-Developer 【-】
36-Internals 【-】
37-Release Notes 【- 版本致谢】
38-Bibliography 【-】
39-
Python
Module Index 【-】
在平时的金融
数据
处理
中,模型构建中,经常会用到
pandas
的
group
by。之前的一篇文章中也讲述过
group
by的作用:
https://blog.csdn.net/qtlyx/article/details/80515077
但是,大家都知道,
python
有一个东西叫做GIL,说白了就是
python
并没有多线程这种东西。那么,现在如果我们要进行grou...
2、按分组取出最新值
3、按分组对value1、value2求和
df = df.sort_values('time', ascending=True) # 升序排列,tail即最新值
sum_df = df.
group
by(['id', 'type']).tail(3...
由于每个key的
数据
分布不均匀,其中有
group
by的key的
数据
过量,导致速度慢和OOM问题,
采用二次
group
by的方式优化:
val randomInt = new scala.util.Random
def splitKey = udf((inputKey: String) => {
inputKey + "---" + randomInt.nextInt(100).toString
// 将原来的key拆分成100份
dataframe = dataframe.with
在
Pandas
中使用
group
by 函数可以对
数据
进行分组。可以指定一个或多个列作为分组键,然后进行聚合操作。
如果要删除分组后重复的行,可以在
group
by 函数后使用 drop_duplicates() 方法。
import
pandas
as pd
df = pd.DataFrame({'A': ['foo', 'bar', 'baz'] * 3,
'B': [1, 2, 3] * 3,
'C': [4, 5, 6] * 3})
# 首先按 'A' 和 'B' 分组
group
ed = df.
group
by(['A', 'B']).sum()
# 删除分组后重复的行
group
ed =
group
ed.drop_duplicates()
这将会返回一个新的 dataframe,里面没有重复行。