Python/Pandas如何处理百亿行,数十列的数据?

目前要处理的数据大致在25亿到50亿行,50-60列,每行数据包含整数,浮点,字符串和字符,日期等数据。 目前文件是csv分散在几十个文件中。文件总数…
关注者
1,573
被浏览
548,756

51 个回答

这种场景,非要强行用内存计算,已经不太合适了。

几个方案:

1. 依然pandas。 数据存储依赖于HDF5是没错的,但也只是极度节省了IO时间,计算的话,pandas官方给出了一些可行方案。 "Large data" work flows using pandas

可用的策略前面基本也已提到,分块的合并、计算,如果需要机器学习,优先选择那些支持增量学习的算法和类库。

参考 使用sklearn进行增量学习 - CSDN博客


2. 使用 Dask - dask 0.15.0+9.g0531266 documentation

很好的copy了pandas的api,可以用很小的成本过度,你将获得强大的out-core计算能力,内存不再是你的局限,硬盘才是。这货基本上可以当个穷逼版的spark使用(官方文档也这么说的)。


3. 回归数据库 ,PostgreSql是高性能计算的好选择。


4. 怂恿公司购买云服务,阿里云强行安利么么哒。

阿里云机器学习-阿里云


海量数据处理自有其方法论和工具链,不要埋怨pandas,这个锅不背。

pandas在中等数据量下的综合能力,我是指性能+易用api+可扩展性+方案优雅程度,比R领先一个时代。

当然,我丝毫没有黑R的意思,R在统计学术方面的能力,大概也比python领先了10个stata。

工具自有场景。

从 Pandas 说起

Pandas 在 Python 的数据工程领域可谓是半壁江山,Pandas 的作者 Wes Mckinney 于 2008 年开始构建 Pandas,至今已经走过了十几个年头。然而,Wes 在 2017 年的一篇博客中写道:

我开始构建 Pandas 的时候并不是很了解软件工程,甚至不太会使用 Python 的科学计算生态。我当时的代码丑陋且缓慢,也是一边学一边写。2011年,我引入了 BlockManager Numpy 作为 Pandas 的内部内存管理和数据结构。
。。。
然而这个决定也是 Pandas 如今无法自如处理超大数据的罪魁祸首。毕竟在 2011 年我们几乎不去思考处理超过100 GB 甚至 1TB 的数据。
如今,我的经验是:如果你想用 Pandas 分析一个 1GB 的数据,那么你至少需要 5 - 10 GB 的内存才可以。 ,然而,今天我们碰到更多地情况恰恰相反,分析的数据比内存大 5-10 倍。
wesmckinney.com/blog/ap

Pandas 的“十一大”问题

  1. 内部数据结构距离硬件太过遥远
  2. 没有内存映射
  3. 数据库和文件读写性能差
  4. 丢失值处理支持差
  5. 内存管理不透明,过于复杂
  6. Category 数据类型支持差
  7. 复杂的分组运算性能差
  8. 数据Append性能差
  9. 数据类型依赖于numpy,不完整
  10. 只有Eager evaluation,没有询问计划(query plan)
  11. 慢,大数据集多核性能很差

今天我们来列举目前针对这些问题一些可能的解决方案:Dask、Ray、Modin、Vaex、Polars。当然,我们还会提到一个项目就是:Apache Arrow。

Dask


Dask


Dask 本质上是两个部分:动态计算调度 + 一些数据结构。调度器主要负责在多核心或者多个计算机之间组织并行计算,而数据结构则提供了一些熟悉的API,比如类Pandas 的 Dask DataFrame、类 Numpy 的 Dask Array 等等。Dask 把人们已经熟的 Pandas、numpy 的 API 拓展到多核以及计算集群上进行计算。

当然,Dask 本身完全是由 Python 写成的,在单个计算任务方面并没有比 Pandas 有本质的提升,甚至 Dask 还是用到了一些 Pandas 的功能。我以为,Dask 真正的核心其实是他的调度器,理论上他的调度器可以执行任意Python函数、采用任意Python数据结构,只不过 Dask 为了使用数据科学的场景,自带了一些常见的 API,比如 DataFrame 或者 ndarray,这些数据结构可以更好的拓展到计算集群。

目前,Dask 已经将调度器部分独立成了新的项目: A distributed task scheduler for Dask

# Arrays
import dask.array as da
x = da.random.uniform(low=0, high=10, size=(10000, 10000),  # normal numpy code
                      chunks=(1000, 1000))  # break into chunks of size 1000x1000
y = x + x.T - x.mean(axis=0)  # Use normal syntax for high level algorithms
# DataFrames
import dask.dataframe as dd
df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp',  # normal Pandas code
                 blocksize=64000000)  # break text into 64MB chunks
s = df.groupby('name').balance.mean()  # Use normal syntax for high level algorithms