pyspark系列--dataframe基础
dataframe基础
- 1. 连接本地spark
- 2. 创建dataframe
- 3. 查看字段类型
- 4. 查看列名
- 5. 查看行数
- 6. 重命名列名
- 7. 选择和切片筛选
- 8. 删除一列
- 增加一列
- 9. 转json
- 10. 排序
- 11. 缺失值
- 12. sparkDataFrame和python变量互转
1. 连接本地spark
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('my_first_app_name') \
.getOrCreate()
2. 创建dataframe
# 从pandas dataframe创建spark dataframe
colors = ['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)
color_df=spark.createDataFrame(color_df)
color_df.show()
3. 查看字段类型
# 查看列的类型 ,同pandas
color_df.dtypes
# [('color', 'string'), ('length', 'bigint')]
4. 查看列名
# 查看有哪些列 ,同pandas
color_df.columns
# ['color', 'length']
5. 查看行数
# 行数
color_df.count()
# 如果是pandas
len(color_df)
6. 重命名列名
# dataframe列名重命名
# pandas
df=df.rename(columns={'a':'aa'})
# spark-1
# 在创建dataframe的时候重命名
data = spark.createDataFrame(data=[("Alberto", 2), ("Dakota", 2)],
schema=['name','length'])
data.show()
data.printSchema()
# spark-2
# 使用selectExpr方法
color_df2 = color_df.selectExpr('color as color2','length as length2')
color_df2.show()
# spark-3
# withColumnRenamed方法
color_df2 = color_df.withColumnRenamed('color','color2')\
.withColumnRenamed('length','length2')
color_df2.show()
# spark-4
# alias 方法
color_df.select(color_df.color.alias('color2')).show()
7. 选择和切片筛选
这个应该是dataframe最常用最重要的操作了。
# 1.列的选择
# 选择一列的几种方式,比较麻烦,不像pandas直接用df['cols']就可以了
# 需要在filter,select等操作符中才能使用
color_df.select('length').show()
color_df.select(color_df.length).show()
color_df.select(color_df[0]).show()
color_df.select(color_df['length']).show()
color_df.filter(color_df['length']>=4).show() # filter方法
# 2.选择几列的方法
color_df.select('length','color').show()
# 如果是pandas,似乎要简单些
color_df[['length','color']]
# 3.多列选择和切片
color_df.select('length','color') \
.select(color_df['length']>4).show()
# 4.between 范围选择
color_df.filter(color_df.length.between(4,5) )\
.select(color_df.color.alias('mid_length')).show()
# 5.联合筛选
# 这里使用一种是 color_df.length, 另一种是color_df[0]
color_df.filter(color_df.length>4)\
.filter(color_df[0]!='white').show()
# 6.filter运行类SQL
color_df.filter("color='green'").show()
color_df.filter("color like 'b%'").show()
# 7.where方法的SQL
color_df.where("color like '%yellow%'").show()
# 8.直接使用SQL语法
# 首先dataframe注册为临时表,然后执行SQL查询
color_df.createOrReplaceTempView("color_df")
spark.sql("select count(1) from color_df").show()
8. 删除一列
# 删除一列
color_df.drop('length').show()
# pandas写法
df.drop(labels=['a'],axis=1)
增加一列
from pyspark.sql.functions import lit
df1.withColumn('newCol', lit(0)).show()
9. 转json
# dataframe转json,和pandas很像啊
color_df.toJSON().first()
10. 排序
# pandas的排序
df.sort_values(by='b')
# spark排序
color_df.sort('color',ascending=False).show()
# 多字段排序
color_df.filter(color_df['length']>=4)\
.sort('length', 'color', ascending=False).show()
# 混合排序
color_df.sort(color_df.length.desc(), color_df.color.asc()).show()
# orderBy也是排序,返回的Row对象列表
color_df.orderBy('length','color').take(4)
11. 缺失值
# 1.生成测试数据
import numpy as np
import pandas as pd
df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e'])\
.applymap(lambda x: int(x*10))
df.iloc[2,2]=np.nan
spark_df = spark.createDataFrame(df)
spark_df.show()
# 2.删除有缺失值的行
df2 = spark_df.dropna()
df2.show()
# 3.或者
spark_df=spark_df.na.drop()
12. sparkDataFrame和python变量互转
在sparkSQL编程的时候,经常需要获取DataFrame的信息,然后python做其他的判断或计算,比如获取dataframe的行数以判断是否需要等待,获取dataframe的某一列或第一行信息以决定下一步的处理,等等。
(1)获取第一行的值,返回普通python变量
value = df.select('columns_name').first()[0]
# 由于 first() 返回的是 Row 类型,可以看做是dict类型,
# 在只有一列的情况下可以用 [0] 来获取值。
(2)获取第一行的多个值,返回普通python变量
row = df.select('col_1', 'col_2').first()
col_1_value = row.col_1
col_2_value = row.col_2