pyspark系列教程-增加、修改列
spark是基于分布式对数据进行 无序 的存储管理,因此不能简单的对某列数据进行直接操作。
withColumn
withColumn函数可以对某列数据进行修改,或者在某列数据的基础上进行衍生。
data=data.withColumn('value1', data.value + 2)
+----+-----+------+
|name|value|value1|
+----+-----+------+
| a| 1| 3|
| b| 2| 4|
| c| 3| 5|
| d| 4| 6|
| e| 5| 7|
+----+-----+------+
如果做一些稍微复杂的怎么办?
udf
udf函数与withColumn结合,让我们写pyspark的时候像写普通的python一样简单。
def fc(a):
return a+1
fc = udf(fc, StringType())
data = data.withColumn('value2', fc('value'))
第一步:封装普通的函数
第二步:使用udf对函数进行定义,StringType()表示的是最后返回函数的类型。假如你生成的是int类型,使用StringType()后,也变成了字符串类型。
第三步:与withColumn结合,第一个参数,'value2',表示新生成列的名称。第二个参数,fc('value')中的'value'表示要处理的列。
data.show()
+----+-----+------+------+
|name|value|value1|value2|
+----+-----+------+------+
| a| 1| 3| 2|
| b| 2| 4| 3|
| c| 3| 5| 4|
| d| 4| 6| 5|
| e| 5| 7| 6|
+----+-----+------+------+
第一步中普通函数是对列中的单个元素进行操作
完整代码
# -*- coding: utf-8 -*-
import math
import os
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType,udf
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import *
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
if __name__ == "__main__":
print("======")
# 配置环境
spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
print("ok")
filepath="./demo.csv"
# data = spark.read.csv(filepath, sep=',', header=True, inferSchema="true")
data = spark.read.format('csv').load(filepath, sep=',', header=True, inferSchema=True)
data=data.withColumn('value1', data.value + 2)
data.show()