作者:Maarten、Roman、Jovan
编译:1+1=6
1
前言
使用Python进行大数据分析变得越来越流行。这一切都要从NumPy开始,它也是今天我们在推文介绍工具背后支持的模块之一。
2
Vaex
Vaex是一种更快、更安全、总体上更方便的方法,可以使用几乎任意大小的数据进行数据研究分析
,只要它能够适用于笔记本电脑、台式机或服务器的硬盘驱动器。
https://vaex.readthedocs.io/en/latest/
Vaex是一个开源的DataFrame库,它可以对表格数据集进行可视化、探索、分析,甚至机器学习,这些数据集和你的硬盘驱动器一样大。
它可以在一个n维网格上每秒计算超过10亿(10^9)个对象的平均值、和、计数、标准差等统计信息
。可视化使用直方图、使用直方图、密度图和3D立体渲染进行可视化。为此,
Vaex采用了内存映射、高效的外核算法和延迟计算等概念来获得最佳性能(不浪费内存)
。所有这些都封装在一个类似Pandas的API中。
GitHub:
https://github.com/vaexio/vaex
3
Vaex vs Dask、Pandas、Spark
Vaex与Dask不同,但与Dask DataFrames相似,后者是在Pandas DataFrame之上构建的。这意味着Dask继承了Pandas issues,比如数据必须完全装载到RAM中才能处理的要求,但Vaex并非如此。
Vaex不生成DataFrame副本,所以它可以在内存较少的机器上处理更大的DataFrame。
Vaex和Dask都使用延迟处理。唯一的区别是,
Vaex在需要的时候才计算字段,而Dask需要显式地使用compute函数。
数据需要采用HDF5或Apache Arrow格式才能充分利用Vaex。
1亿行的数据集,对Pandas和Vaex执行相同的操作:
Vaex在我们的四核笔记本电脑上的运行速度可提高约190倍,在AWS h1.x8大型机器上,甚至可以提高1000倍!最慢的操作是正则表达式。正则表达式是CPU密集型的,这意味着大部分时间花在操作上,而不是花在它们周围的所有bookkeeping上。
Apache Spark是JVM/Java生态系统中的一个库,用于处理用于数据科学的大型数据集。如果Pandas不能处理特定的数据集,人们通常求助于PySpark。如果你的工作是生成结果,而不是在本地甚至在集群中设置Spark,那么这是一个额外的障碍。因此我们也对Spark进行了同样的基准操作:
Spark的性能比Pandas更好,这是由于多线程的缘故。但vaex比Spark做得好得多。Spark以每秒1000万串的速度运行(并且会随着内核和机器的数量增加)。Vaex每秒可以处理1亿条字符串,并且会随着内核数量的增加而增加。在32核的机器上,我们每秒钟处理10亿个字符串。
4
Vaex真的很快!
流程都一样:
让我们创建一个DataFrame,它有100万行和1000列:
import vaex
import pandas as pd
import numpy as np
n_rows = 1000000
n_cols = 1000
df = pd.DataFrame(np.random.randint(0, 100, size=(n_rows, n_cols)), columns=['col%d' % i for i in range(n_cols)])
df.head()
这个DataFrame占用了多少内存呢?
df.info(memory_usage='deep')
把它保存到磁盘,这样我们以后可以用Vaex读取它:
file_path = 'big_file.csv'
df.to_csv(file_path, index=False)
直接通过Vaex或直接读取CSV,这速度将类似于Pandas。在我们的电脑上,两者都需要大约85秒。
我们需要将CSV转换为HDF5,才能看到Vaex的优点。
事实上,Vaex只受可用磁盘空间的限制。如果你的数据不是内存映射文件格式(例如CSV、JSON),则可以通过与Vaex结合Pandas I/O轻松地转换它。
我们可以将它转换为HDF5并用Vaex处理它!
dv = vaex.from_csv(file_path, convert=True, chunk_size=5_000_000)
上面的函数将自动创建一个HDF5文件并将其保存到硬盘。
检查一下dv类型:
type(dv)
# output
vaex.hdf5.dataset.Hdf5MemoryMapped
现在,让我们用Vaex处理
7.5GB的数据集
——我们不需要读取它,因为我们在上面的dv变量中已经有了它。这里只是为了测试速度。
dv = vaex.open('big_file.csv.hdf5')
Vaex需要不到1秒的时间来执行上面的命令
。但Vaex实际上并没有读取文件,因为延迟加载。
让我们通过计算col1的和来读取它。
suma = dv.col1.sum()
# array(49486599)
Vaex用不到1秒的时间计算好了结果。这是使用了内存映射。
5
虚拟列
Vaex在添加新列时创建一个
虚拟列
,虚列的行为与普通列一样,但是它们
不占用内存
。这是
因为Vaex只记得定义它们的表达式,而不预先计算值
。这些列仅在必要时才被延迟计算,从而保持较低的内存使用率。
dv['col1_plus_col2'] = dv.col1 + dv.col2
dv['col1_plus_col2']
Vaex在过滤数据时不会创建DataFrame副本,这是因为它属于一个浅拷贝(Shallow Copy)
。在创建过滤后的数据流时,Vaex会创建一个二进制掩码,然后将其应用于原始数据,而不需要进行复制。这类过滤器的内存成本很低:
过滤10亿行数据流需要大约1.2 GB的RAM。
与其他“经典”工具相比,这是可以忽略不计的,只需要100GB就可以读取数据,而对于过滤后的dataframe,则需要另一个100GB。
6
高性能聚合数据
列如value_counts、groupby、unique和各种字符串操作都使用了快速高效的算法,这些算法都是在C++底层实现的。它们都以非核心方式工作,这意味着你可以处理比RAM更大的数据,并使用处理器的所有可用内核。例如,
对超过
10亿行
执行value_counts操作只需
1秒
!
有了Vaex,你可以通过一个操作来完成,并且只需要一次数据传递!下面的group-by示例超过11亿行,只需要30秒。
df.groupby(by='vendor_id', agg={'count': vaex.agg.count(),
'count_fare_n_pass_lt3': vaex.agg.count(selection='passenger_count<3'),
'count_fare_n_pass_ge3': vaex.agg.count(selection='passenger_count>=3'),
'mean_fare_n_pass_lt3': vaex.agg.mean('fare_amount', selection='passenger_count<3'),
'mean_fare_n_pass_ge3': vaex.agg.mean('fare_amount', selection='passenger_count>=3'),
})
7
即时编译
只要虚拟列只使用Numpy或纯Python操作定义,Vaex就可以通过jitting加速它的计算,或者通过Numba或Pythran进行即时编译。如果你的机器有支持CUDA的NVIDIA显卡,
Vaex 也支持通过CUDA加速。这对于加速计算开销很大的虚列的计算非常有用。
考虑下面的例子。我们已经定义了两个地理位置之间的弧距离,这个计算涉及到相当多的代数和三角学知识。平均值计算将强制执行这个计算消耗相当大的虚列。当使用Numpy执行时,只需要30秒(11亿行)。当我们对numba预编译表达式执行同样的操作时,我们的执行时间大约快了2.5倍,至少在我们的测试电脑上是这样。如果有一个英伟达显卡,可以尝试一下!
def arc_distance(theta_1, phi_1, theta_2, phi_2):
temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
+ np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
distance = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
return distance * 3958.8
df['arc_distance_numpy'] = arc_distance(df.pickup_longitude,
df.pickup_latitude,
df.dropoff_longitude,
df.dropoff_latitude)
df['arc_distance_numba'] = arc_distance(df.pickup_longitude,
df.pickup_latitude,
df.dropoff_longitude,
df.dropoff_latitude).jit_numba()
mean_numpy = df.arc_distance_numpy.mean(progress='widget')
mean_numba = df.arc_distance_numba.mean(progress='widget')
print(f'Mean arc distance comuted with numpy: {mean_numpy:.5f}')
print(f'Mean arc distance comuted with numba: {mean_numba:.5f}')
8
Selections
Vaex实现了一个叫做Selections的概念用来选择数据。例如:当你希望通过计算数据不同部分的统计数据而不是每次都创建一个新的引用DataFrame来分析数据时,这是非常有用的。使用选择的真正强大之处在于:
我们只需对数据进行一次传递,就可以计算出多次选择的统计量。
select_n_passengers_lt3 = df.passenger_count < 3
select_n_passengers_ge3 = df.passenger_count >= 3
df.fare_amount.mean(selection=[None, select_n_passengers_lt3, select_n_passengers_ge3], progress='widget')
这对于制作各种可视化图形也很有用。例如,我们可以使用.count方法在不同的选择上创建两个直方图,只需对数据进行一次传递。非常有效!
binned_dist_npass_lt3, binned_dist_npass_ge3 = df.count(binby=['total_amount'],
limits=[5, 50],
shape=64,
selection=[select_n_passengers_lt3,
select_n_passengers_ge3],
progress='widget')
xvalues = np.linspace(5, 50, 64)
plt.figure(figsize=(8, 4))
plt.plot(xvalues, binned_dist_npass_lt3, drawstyle="steps-pre", label='num passengers < 3', lw=3)
plt.plot(xvalues, binned_dist_npass_ge3, drawstyle="steps-pre", label='num passengers >=3', lw=3)
plt.legend(fontsize=14)
plt.xlabel('Total amount [$]', fontsize=14)
plt.ylabel('Number of trips', fontsize=14)
plt.show()
9
绘图
Vaex将数据绘制成图表的速度也很快
。它具有特殊的绘图函数
plot1d、plot2d和plot2d_contour。
dv.plot1d(dv.col2, figsize=(14, 7))