相关文章推荐
热心的移动电源  ·  JS ...·  1 年前    · 
逆袭的大海  ·  python 获取json的key ...·  1 年前    · 
完美的单车  ·  MissingField.Type - ...·  1 年前    · 
精彩文章免费看

pyspark dataframe基本操作看这篇就够了

1 创建dataframe

1.1 读取文件创建

from pyspark.sql import SparkSession #sparkSession为同统一入口
#创建spakr对象
spark = SparkSession\
    .builder\
    .appName('readfile')\
    .getOrCreate()
# 1.读取csv文件
# 1.读取csv文件
logFilePath = 'births_train.csv'
log_df = spark.read.csv(logFilePath, 
                        encoding='utf-8', 
                        header=True, 
                        inferSchema=True,
                        sep=',')

logFilePath:这是我自定义的一个参数,为文件路径
encoding:文件编码格式,默认为utf-8
header:是否将文件第一行作为表头,True即将文件第一行作为表头
inferSchema:是否自动推断列类型
sep:列分割符

log_df.show()

展示结果如下图

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FirstApp").getOrCreate()
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26), 
             (6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
             (11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
employees=spark.createDataFrame(employees, schema=["emp_id","name","age"])

这里创建了三列
employees为数据内容,schema为表头,这种方式比较简单,类型为spark推断类型

可能有的同学会见到如下表头的创建方式,类型可以自己指定

from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *
#创建spakr对象
spark = SparkSession\
    .builder\
    .appName('readfile')\
    .getOrCreate()
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26), 
             (6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
             (11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
                    StructField('name',StringType(),True),
                    StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees,schema=schema)

StructType:即指定一个列类型的对象,里面包含列类型数组
StructField:指定每一个列,第一个参数为列名,第二个参数为列数据类型,从pyspark.sql.types里的数据类型引入
第三个参数为是否可以为空

1.3 从RDD创建

从rdd创建可以有如下两种方式:

from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *
#创建spakr对象
spark = SparkSession\
    .builder\
    .appName('byRdd')\
    .getOrCreate()
# 1 第一种方式利用指定列类型来创建
create_rdd = spark.sparkContext.parallelize([
    (1, "John", 25),
    (2, "Ray", 35),
    (3,"Mike", 24),
schema = StructType([StructField('emp_id',IntegerType(),True),
                    StructField('name',StringType(),True),
                    StructField('age',IntegerType(),True)])
df_rdd = spark.createDataFrame(create_rdd,schema)
df_rdd.show()
# 2 通过rdd转dataframe来创建(这种方式不建议采用,类型推断很容易出错)
create_rdd = spark.sparkContext.parallelize([
    (1, "John", 25),
    (2, "Ray", 35),
    (3,"Mike", 24),
df_rdd = create_rdd.toDF()
df_rdd.show()

2.dataframe操作

2.1 阅读数据操作

# 返回df的列名与数据类型
df.dtypes 
# filter过滤,用df['列名']或者df.列名均可
df1 = df.filter(df['name'] == 'John')
df2 = df.filter(df.name == 'John')
df1.show()
df2.show()
df3 = df.filter((df.name == 'John') | (df.name == 'Ray'))
df4 = df.filter((df.name == 'John') & (df.age == 25))
df3.show()
df4.show()
from pyspark.sql import functions as F
#注意导入pyspark.sql的functions
df7 = df.select('name',F.when(df.age > 30,'壮年').otherwise('青年'))
df7.show()
df8 = df.select('name').where("name like 'M%'")
df9 = df.filter("name like 'M%'")
df8.show()
df9.show()
df10 = df.select('name','age').where('age between 25 and 30')
df11 = df.filter('age between 25 and 30')
df10.show()
df11.show()
#这里的df.age-10操作可以支持各种数据类型的原生方法操作
#如:df.age*3,对name列截取字符sub
df14 = df.withColumn('name',df.name.substr(1,2))
d14.show()

注意第二个参数必须为col也就是列对象
一般数据一样就用functions里面的lit方法生成一个,lit里可以是数字或字符串
建议采用别的列来生成新列或者用udf方式生成(下面介绍)

4.删除列

#生成一个新列
df15 = df.withColumn('newColumn',F.lit('new'))
df16 = df.drop('newColumn')
df15.show()
df16.show()
# ascending:False正序排序,True倒序排序
df19 = df.orderBy(df.age,ascending=False)
df20 = df.sort(df.age,ascending=False)
df19.show()
df20.show()
#先按照年龄排序,年龄相同按照emp_id排序
df21 = df.orderBy([df.age,df.emp_id],ascending=False)
df21.show()
  • union和unionAll(返回两个数据集的并集,列名不同不影响合并)
  • from pyspark.sql import SparkSession #sparkSession为同统一入口
    from pyspark.sql.types import *
    from pyspark.sql import functions as F
    #创建spakr对象
    spark = SparkSession\
        .builder\
        .appName('byRdd')\
        .getOrCreate()
    employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26), 
                 (6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
                 (11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
    employees1 = [(1, "小明", 15), (2, "小红", 14), (3,"小花", 13)]
    schema = StructType([StructField('emp_id',IntegerType(),True),
                        StructField('name',StringType(),True),
                        StructField('age',IntegerType(),True)])
    employees2 = [(1, "小明", 15), (2, "小红", 14), (3,"小花", 13),(4, "Jane", 28), (5, "Kevin", 26)]
    schema = StructType([StructField('id',IntegerType(),True),
                        StructField('short_name',StringType(),True),
                        StructField('old',IntegerType(),True)])
    df = spark.createDataFrame(employees,schema=schema)
    col_df = spark.createDataFrame(employees2,schema=schema)
    df24 = df.union(col_df)
    df25 = df.unionAll(col_df)
    df24.show()
    df25.show()
    #第三个参数为连接规则
    #支持的连接规则有以下
    #'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 
    #'left_outer', 'rightouter', 'right', 'right_outer',
    #'leftsemi', 'left_semi', 'semi', 'leftanti', 'left_anti', 'anti', 'cross'
    df29 = df.join(col_df,[df.emp_id == col_df.id],'left')
    df29.show()