pyspark系列教程-增加、修改列

pyspark系列教程-增加、修改列

spark是基于分布式对数据进行 无序 的存储管理,因此不能简单的对某列数据进行直接操作。

withColumn

withColumn函数可以对某列数据进行修改,或者在某列数据的基础上进行衍生。

data=data.withColumn('value1', data.value + 2)
+----+-----+------+
|name|value|value1|
+----+-----+------+
|   a|    1|     3|
|   b|    2|     4|
|   c|    3|     5|
|   d|    4|     6|
|   e|    5|     7|
+----+-----+------+

如果做一些稍微复杂的怎么办?

udf

udf函数与withColumn结合,让我们写pyspark的时候像写普通的python一样简单。

def fc(a):
    return a+1
fc = udf(fc, StringType())
data = data.withColumn('value2', fc('value'))

第一步:封装普通的函数

第二步:使用udf对函数进行定义,StringType()表示的是最后返回函数的类型。假如你生成的是int类型,使用StringType()后,也变成了字符串类型。

第三步:与withColumn结合,第一个参数,'value2',表示新生成列的名称。第二个参数,fc('value')中的'value'表示要处理的列。

data.show()
                                                                                +----+-----+------+------+
|name|value|value1|value2|
+----+-----+------+------+
|   a|    1|     3|     2|
|   b|    2|     4|     3|
|   c|    3|     5|     4|
|   d|    4|     6|     5|
|   e|    5|     7|     6|
+----+-----+------+------+

第一步中普通函数是对列中的单个元素进行操作

完整代码

# -*- coding: utf-8 -*-
import math
import os
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType,udf
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import *
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
if __name__ == "__main__":
    print("======")
    # 配置环境
    spark = SparkSession.builder \
        .master("local") \
        .appName("Word Count") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    print("ok")
    filepath="./demo.csv"
    # data = spark.read.csv(filepath, sep=',', header=True, inferSchema="true")
    data = spark.read.format('csv').load(filepath, sep=',', header=True, inferSchema=True)
    data=data.withColumn('value1', data.value + 2)
    data.show()