相关文章推荐
绅士的创口贴  ·  震惊! ...·  2 月前    · 
被表白的橙子  ·  使用Python ...·  2 月前    · 
面冷心慈的感冒药  ·  TPLINK 3G+MINI ...·  1 年前    · 
不羁的南瓜  ·  httpClient ...·  1 年前    · 
import pandas as pd import pyspark.sql.functions as F

PySpark 所有功能的入口点是 SparkSession 类。通过 SparkSession 实例,您可以创建spark dataframe、应用各种转换、读取和写入文件等,下面是定义 SparkSession的代码模板:

from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('SparkByExamples.com')\
.getOrCreate()

💡 创建 dataframe

在 Pandas 和 PySpark 中,我们最方便的数据承载数据结构都是 dataframe,它们的定义有一些不同,我们来对比一下看看:

💦 Pandas

columns = ["employee","department","state","salary","age"]
data = [("Alain","Sales","Paris",60000,34),
        ("Ahmed","Sales","Lyon",80000,45),
        ("Ines","Sales","Nice",55000,30),
        ("Fatima","Finance","Paris",90000,28),
        ("Marie","Finance","Nantes",100000,40)]

创建 DataFrame 的 Pandas 语法如下:

df = pd.DataFrame(data=data, columns=columns)
# 查看头2行
df.head(2)

💦 PySpark

创建 DataFrame 的 PySpark 语法如下:

df = spark.createDataFrame(data).toDF(*columns)
# 查看头2行
df.limit(2).show()

💡 指定列类型

💦 Pandas

Pandas 指定字段数据类型的方法如下:

types_dict = {
    "employee": pd.Series([r[0] for r in data], dtype='str'),
    "department": pd.Series([r[1] for r in data], dtype='str'),
    "state": pd.Series([r[2] for r in data], dtype='str'),
    "salary": pd.Series([r[3] for r in data], dtype='int'),
    "age": pd.Series([r[4] for r in data], dtype='int')
df = pd.DataFrame(types_dict)

Pandas 可以通过如下代码来检查数据类型:

df.dtypes

💦 PySpark

PySpark 指定字段数据类型的方法如下:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([ \
    StructField("employee",StringType(),True), \
    StructField("department",StringType(),True), \
    StructField("state",StringType(),True), \
    StructField("salary", IntegerType(), True), \
    StructField("age", IntegerType(), True) \
df = spark.createDataFrame(data=data,schema=schema)

PySpark 可以通过如下代码来检查数据类型:

df.dtypes
# 查看数据类型 
df.printSchema() 

💡 读写文件

Pandas 和 PySpark 中的读写文件方式非常相似。 具体语法对比如下:

💦 Pandas

df = pd.read_csv(path, sep=';', header=True)
df.to_csv(path, ';', index=False)

💦 PySpark

df = spark.read.csv(path, sep=';')
df.coalesce(n).write.mode('overwrite').csv(path, sep=';')

注意 ①

PySpark 中可以指定要分区的列:

df.partitionBy("department","state").write.mode('overwrite').csv(path, sep=';')

注意 ②

可以通过上面所有代码行中的 parquet 更改 CSV 来读取和写入不同的格式,例如 parquet 格式

💡 数据选择 - 列

💦 Pandas

在 Pandas 中选择某些列是这样完成的:

columns_subset = ['employee', 'salary']
df[columns_subset].head()
df.loc[:, columns_subset].head()

💦 PySpark

在 PySpark 中,我们需要使用带有列名列表的 select 方法来进行字段选择:

columns_subset = ['employee', 'salary']
df.select(columns_subset).show(5)

💡 数据选择 - 行

💦 Pandas

Pandas可以使用 iloc 对行进行筛选:

# 头2行
df.iloc[:2].head()

💦 PySpark

在 Spark 中,可以像这样选择前 n 行:

df.take(2).head()
df.limit(2).head()

注意 :使用 spark 时,数据可能分布在不同的计算节点上,因此“第一行”可能会随着运行而变化。

💡 条件选择

💦 Pandas

Pandas 中根据特定条件过滤数据/选择数据的语法如下:

# First method
flt = (df['salary'] >= 90_000) & (df['state'] == 'Paris')
filtered_df = df[flt]
# Second Method: Using query which is generally faster
filtered_df = df.query('(salary >= 90_000) and (state == "Paris")')
target_state = "Paris"
filtered_df = df.query('(salary >= 90_000) and (state == @target_state)')

💦 PySpark

在 Spark 中,使用 filter 方法或执行 SQL 进行数据选择。 语法如下:

# 方法1:基于filter进行数据选择
filtered_df = df.filter((F.col('salary') >= 90_000) & (F.col('state') == 'Paris'))
filtered_df = df.filter(F.expr('(salary >= 90000) and (state == "Paris")'))
# 方法2:基于SQL进行数据选择
df.createOrReplaceTempView("people")
filtered_df = spark.sql("""
SELECT * FROM people
WHERE (salary >= 90000) and (state == "Paris")
""") 

💡 添加字段

💦 Pandas

在 Pandas 中,有几种添加列的方法:

seniority = [3, 5, 2, 4, 10]
# 方法1
df['seniority'] = seniority
# 方法2
df.insert(2, "seniority", seniority, True)

💦 PySpark

在 PySpark 中有一个特定的方法 withColumn 可用于添加列:

seniority = [3, 5, 2, 4, 10]
df = df.withColumn('seniority', seniority)

💡 dataframe拼接

💦 2个dataframe - pandas

# pandas拼接2个dataframe
df_to_add = pd.DataFrame(data=[("Robert","Advertisement","Paris",55000,27)], columns=columns)
df = pd.concat([df, df_to_add], ignore_index = True)

💦 2个dataframe - PySpark

# PySpark拼接2个dataframe
df_to_add = spark.createDataFrame([("Robert","Advertisement","Paris",55000,27)]).toDF(*columns)
df = df.union(df_to_add)

💦 多个dataframe - pandas

# pandas拼接多个dataframe
dfs = [df, df1, df2,...,dfn]
df = pd.concat(dfs, ignore_index = True)

💦 多个dataframe - PySpark

PySpark 中 unionAll 方法只能用来连接两个 dataframe。我们使用 reduce 方法配合 unionAll 来完成多个 dataframe 拼接:

# pyspark拼接多个dataframe
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)