pyspark dataframe基本操作看这篇就够了
1 创建dataframe
1.1 读取文件创建
from pyspark.sql import SparkSession #sparkSession为同统一入口
#创建spakr对象
spark = SparkSession\
.builder\
.appName('readfile')\
.getOrCreate()
# 1.读取csv文件
# 1.读取csv文件
logFilePath = 'births_train.csv'
log_df = spark.read.csv(logFilePath,
encoding='utf-8',
header=True,
inferSchema=True,
sep=',')
logFilePath:这是我自定义的一个参数,为文件路径
encoding:文件编码格式,默认为utf-8
header:是否将文件第一行作为表头,True即将文件第一行作为表头
inferSchema:是否自动推断列类型
sep:列分割符
log_df.show()
展示结果如下图
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FirstApp").getOrCreate()
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
employees=spark.createDataFrame(employees, schema=["emp_id","name","age"])
这里创建了三列
employees为数据内容,schema为表头,这种方式比较简单,类型为spark推断类型
可能有的同学会见到如下表头的创建方式,类型可以自己指定
from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *
#创建spakr对象
spark = SparkSession\
.builder\
.appName('readfile')\
.getOrCreate()
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees,schema=schema)
StructType:即指定一个列类型的对象,里面包含列类型数组
StructField:指定每一个列,第一个参数为列名,第二个参数为列数据类型,从pyspark.sql.types里的数据类型引入
第三个参数为是否可以为空
1.3 从RDD创建
从rdd创建可以有如下两种方式:
from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *
#创建spakr对象
spark = SparkSession\
.builder\
.appName('byRdd')\
.getOrCreate()
# 1 第一种方式利用指定列类型来创建
create_rdd = spark.sparkContext.parallelize([
(1, "John", 25),
(2, "Ray", 35),
(3,"Mike", 24),
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df_rdd = spark.createDataFrame(create_rdd,schema)
df_rdd.show()
# 2 通过rdd转dataframe来创建(这种方式不建议采用,类型推断很容易出错)
create_rdd = spark.sparkContext.parallelize([
(1, "John", 25),
(2, "Ray", 35),
(3,"Mike", 24),
df_rdd = create_rdd.toDF()
df_rdd.show()
2.dataframe操作
2.1 阅读数据操作
# 返回df的列名与数据类型
df.dtypes
# filter过滤,用df['列名']或者df.列名均可
df1 = df.filter(df['name'] == 'John')
df2 = df.filter(df.name == 'John')
df1.show()
df2.show()
df3 = df.filter((df.name == 'John') | (df.name == 'Ray'))
df4 = df.filter((df.name == 'John') & (df.age == 25))
df3.show()
df4.show()
from pyspark.sql import functions as F
#注意导入pyspark.sql的functions
df7 = df.select('name',F.when(df.age > 30,'壮年').otherwise('青年'))
df7.show()
df8 = df.select('name').where("name like 'M%'")
df9 = df.filter("name like 'M%'")
df8.show()
df9.show()
df10 = df.select('name','age').where('age between 25 and 30')
df11 = df.filter('age between 25 and 30')
df10.show()
df11.show()
#这里的df.age-10操作可以支持各种数据类型的原生方法操作
#如:df.age*3,对name列截取字符sub
df14 = df.withColumn('name',df.name.substr(1,2))
d14.show()
注意第二个参数必须为col也就是列对象
一般数据一样就用functions里面的lit方法生成一个,lit里可以是数字或字符串
建议采用别的列来生成新列或者用udf方式生成(下面介绍)
4.删除列
#生成一个新列
df15 = df.withColumn('newColumn',F.lit('new'))
df16 = df.drop('newColumn')
df15.show()
df16.show()
# ascending:False正序排序,True倒序排序
df19 = df.orderBy(df.age,ascending=False)
df20 = df.sort(df.age,ascending=False)
df19.show()
df20.show()
#先按照年龄排序,年龄相同按照emp_id排序
df21 = df.orderBy([df.age,df.emp_id],ascending=False)
df21.show()
union和unionAll(返回两个数据集的并集,列名不同不影响合并)
from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *
from pyspark.sql import functions as F
#创建spakr对象
spark = SparkSession\
.builder\
.appName('byRdd')\
.getOrCreate()
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
employees1 = [(1, "小明", 15), (2, "小红", 14), (3,"小花", 13)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
employees2 = [(1, "小明", 15), (2, "小红", 14), (3,"小花", 13),(4, "Jane", 28), (5, "Kevin", 26)]
schema = StructType([StructField('id',IntegerType(),True),
StructField('short_name',StringType(),True),
StructField('old',IntegerType(),True)])
df = spark.createDataFrame(employees,schema=schema)
col_df = spark.createDataFrame(employees2,schema=schema)
df24 = df.union(col_df)
df25 = df.unionAll(col_df)
df24.show()
df25.show()
#第三个参数为连接规则
#支持的连接规则有以下
#'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left',
#'left_outer', 'rightouter', 'right', 'right_outer',
#'leftsemi', 'left_semi', 'semi', 'leftanti', 'left_anti', 'anti', 'cross'
df29 = df.join(col_df,[df.emp_id == col_df.id],'left')
df29.show()