pandas与pyspark中dataframe的一些用法对比
- SparkSession创建
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('log') \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
pandas是个单机版处理的,就没有上面 这一步
- 创建dataframe
- pyspark
# 1. 创建dataframe
# list创建
l = [('Alice', 1)]
spark.createDataFrame(l)
spark.createDataFrame(l, ['name', 'age'])
# dict创建
d = [{'name': 'Alice', 'age': 1}]
spark.createDataFrame(d).collect()
# 从RDD创建
rdd = sc.parallelize(l)
spark.createDataFrame(rdd, ['name', 'age'])
# RDD + ROW(列名)创建
from pyspark.sql import Row
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
spark.createDataFrame(person)
# RDD + schema创建(列名)
from pyspark.sql import Row
schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
spark.createDataFrame(rdd, schema)
# 从pandas创建
spark.createDataFrame(df.toPandas()).collect()
- pandas
# 从numpy创建
dates = pd.date_range('20130101', periods=6)
df = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
# 从csv创建
df = pd.read_csv(file, sep='\t')
# 还有其他很多
- 查看基本属性
- pyspark
# 2. 查看dataframe的基本属性
# 查看列以及列属性
df.dtypes
# 查看行数
df.count()
# 查看第一行,返回一个Row
df.first()
# 查看前10行
df.show(10)
# 查看所有行
df.show(truncate = False)
- pandas
df.columns
df.shape
df.head()
- (条件)选择行、列
- pyspark
# (条件)选择行列
# 选择列
df.select('fuser_name', 'group_num_c2').show()
# 条件选择列
df.select('fuser_name', 'group_num_c2').filter(df.group_num_c2>10).show()
# 自定义filter函数
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
def my_filter(group_num_c2):
if group_num_c2 > 10:
return True
else:
return False
df.select('fuser_name', 'group_num_c2').filter(udf(my_filter, BooleanType())(df.group_num_c2)).show()
- pandas
# 选择列,不管行
df[['A', 'B']]
df.loc[:, ['A', 'B']]
df.iloc[:, [0, 1]]
df.ix[:, ['A', 'B']] # 官方不建议用
df.ix[:, [0, 1]] # 官方不建议用
# 只选择行,不管列(未写全,可参照上面的)
df[1:2]
df.iloc[1:2, :]
df.ix[1:2, :] # 官方不建议用
# 既选择行,又选择列(未写全,可参照上面的)
df.iloc[1:2, [0, 1]]
df.ix[1:2, [0, 1]] # 官方不建议用
- 增加、删除、修改列
- pyspark
# 增加列
df.withColumn('add_column', df.group_num_c2 + 2)
# 增加列 - 自定义函数
from pyspark.sql import functions as F
df.withColumn('add_column', F.UserDefinedFunction(lambda obj: int(obj)+2)(df.group_num_c2))
# 删除列
df.drop('add_column')
# 修改列名
df.withColumnRenamed('group_num_c2', 'num_c2')
- pandas
# 增加列,D是一个列名
df['E'] = df.apply(lambda x: x.D+1, axis=1)
- 按行去重
- pyspark
# 整行去重
df = df.dropDuplicates()
# 按照某几列去重(按照url这一列去重)
df = df.dropDuplicates(subset=['url'])
# pyspark在去重时会在重复行中随机的保存一行,若要保存特定的一行
# 如要在去重url的同时保存tagwordCount最大的行
from pyspark.sql.window import Window