from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql import Row
from datetime import datetime, date
#RDD转化为DataFrame
spark=SparkSession.builder.appName("jsonRDD").getOrCreate()
sc=spark.sparkContext
stringJSONRDD=sc.parallelize([
["123","Katie",19,"brown"],
["234","Michael",22,"green"],
["345","Simone",23,"blue"]])
schema=StructType([StructField("id", StringType(),False),
StructField("name", StringType(),False),
StructField("age", IntegerType(),False),
StructField("eyeColor", StringType(),False)])
df=spark.createDataFrame(stringJSONRDD,schema=schema)
df.show()
#将静态数据转化为DataFrame
data=[['124','Joe',23,'black'],
['125','Mark',24,'green']]
df1=spark.createDataFrame(data,schema=schema)
df1.show()
data=pd.DataFrame(data,columns=['id','name','age','eyeColor'])
df2=spark.createDataFrame(data)
df2.show()
#利用Row对象构DataFrame
df3=spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))])
df3.show()
其结果如下:
注意:虽然python是动态类型语言,但使用pyspark时依然要注意数据类型。比如,若在data中的age的定义若既使用了整型22,又使用了float型19.0的话,在创建DataFrame时会报TypeError错。
- 从外部数据源中读取数据到DataFrame——DataFrameReader推荐方法
Spark提供了DataFrameReader这个接口,允许从JSON、CSV、Parquet、Text、Avro、ORC等各种数据源读取数据到DataFrame。但要注意,只能通过SparkSession实例访问DataFrameReader。也就是说不能自行创建DataFrameReader实例。Spark中获取该实例句柄的方式如下:
SparkSession.read
SparkSession.readStream
其中read方法返回的DataFrameReader句柄可以用来从静态数据源读取DataFrame,而readStream方法返回的实例则用于读取流失数据源。DataFrameReader句柄推荐的使用模式如下:
DataFrameReader.format(args).option("key","value").schema(args).load()
其中的方法、参数和选项如下:
- format: 可选参数有:"parquet"(默认值)、"csv"、"txt"、“json”、"jdbc"、"orc"、"avro"等;
- option: 一系列键值对;
- schema: DDL字符串或StructType对象;
- load: 读取的数据源路径;
其用法如下:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark=SparkSession.builder.appName("csvRDD").getOrCreate()
schema=StructType([StructField('State',StringType()),
StructField('Color',StringType()),
StructField('Count',IntegerType()) ])
df=spark.read.format('csv').option("header",True).schema(schema).\
load(r'/data/mnm_dataset.csv')
df.show(10)
其结果如下:
- 从外部数据源中读取数据到DataFrame——各种类型数据的专用方法
除了上文使用的DataFrameReader推荐方式之外,SparkSession.read还为各种数据类型提供了专门的数据读取方法。其方法名称如下:csv()、json()、text()、parquet()、jdbc()、orc()等。这里要说明一点,使用上述DataFramerReader推荐的使用方式读取外部数据数据的时候,不同的数据类型其option()方法中可选的key即为对应方法中的参数,value即为该参数的取值。上述读取CSV文件的代码等价于下述代码:
df=spark.read.csv(path=r'/data/mnm_dataset.csv',schema=schema,
header=True)
3. 列操作
Spark DataFrame中的列是具有公有方法的对象,以Column类表示。Column实例是可单独存在的,并且可以持有一个表达式,Column实例会在使用时,和调用的DataFrame相关联,这个表达式将作用于每一条数据, 对每条数据都生成一个值。
在Spark中既可以列出所有列的名字,也可以使用关系型或计算型的表达式对相应列的值进行操作。为了将Colum对象的操作结果显示出来,这里将会用到DataFrame的select()和show()方法。
3.1 DataFrame取列的方法
data=[(123,"Katie",19,'brown'),
(234,"Michael",22,"green"),
(345,"Simone",57,"blue")]
schema=StructType([
StructField("id",LongType(),True),StructField("name",StringType(),True),
StructField("age",LongType(),True),StructField("eyeColor",StringType(),True)])
df=spark.createDataFrame(data,schema)
#取列的名称
print(df.columns)
print(df['id'],df.name)
其结果如下:
3.2 Column表达式
Spark DataFrame不仅支持对列使用关系型或计算型的表达式,也支持逻辑表达式。举例如下:
data=[(123,"Katie",19,'brown'),
(234,"Michael",22,"green"),
(345,"Simone",57,"blue")]
schema=StructType([
StructField("id",LongType(),True),StructField("name",StringType(),True),
StructField("age",LongType(),True),StructField("eyeColor",StringType(),True)])
df=spark.createDataFrame(data,schema)
df.select(df.age+1,df.age==19,
df.id!=df.age,
(df.age==19)|(df.id==123),
(df.age==19)&(df.id==123)).show()
其结果如下:
3.3 Column对象自带方法
这里主要介绍几种Column对象自带的方法,具体如下:
- alias()方法修改列名(select的用法在第4部分)
df.select(df['age']+1).show()
df.select((df['age']+1).alias('new_age')).show()
其结果如下:
df.sort(df['age'].asc()).show()
df.sort(df['age'].desc()).show()
其结果如下:
除了这两个排序方法之外,asc_nulls_first()、asc_nulls_last()、desc_nulls_first()、desc_nulls_last()方法规定了空值的位置。
- cast()、astype()方法修改数据类型,这两个方法作用相同
df_1=df.withColumn('str_age',df['age'].cast("string"))
print(df_1.dtypes)
其结果如下:
- contains()、startswith()、endswith()、like()、rlike()、substr()字符串操作方法
#contains:判断字符串是否包含特定字符串
#starswith:判断字符串是否以特定字符串开头
#endswith:判断字符是否以特定字符串结尾
#rlike:判断字符串是否符合特定正则表达式、
#like:SQL中的like
#substr:提取子串
from pyspark.sql import functions as func
data=[(123,"Katie",19,'brown',2),
(234,"Michael",22,"green",4),
(345,"Simone",57,"blue",3)]
schema=StructType([
StructField("id",LongType(),True),StructField("name",StringType(),True),
StructField("age",LongType(),True),StructField("eyeColor",StringType(),True),
StructField('len', IntegerType(),True)])
df=spark.createDataFrame(data,schema)
df.select(df.name.contains('M').alias('A'),df.name.startswith('K').alias('B'),
df.name.endswith('e').alias('C'),df.name.rlike('[\w]+').alias('D'),
df.name.like('Ka%').alias('E'),df.name.substr(2,3).alias('F'),
df.name.substr(func.lit(1),df.len).alias('G')).show()
其结果如下:
注意:使用substr()时字符串的索引位置是从1开始的;另外substr()的两个参数可以是int型变量也可以是Column型变量,只要这两个参数保持一致即可。
df.select(df.age.between(22,57)).show()
其结果如下:
- 两个Column实例进行二进制按位运算:bitwiseAND()、bitwiseOR()、bitwiseXOR()
data=[(123,19,),(234,22),(345,57)]
df=spark.createDataFrame(data,['a','b'])
df.select(df.a.bitwiseAND(df.b),
df.a.bitwiseOR(df.b),
df.a.bitwiseXOR(df.b)).show()
其结果如下:
- dropFields()、withField()、getField()方法向DataFrame中的嵌套Row中删除元素、添加元素、获取对应元素
df.select(df.value.getField('age')).show()
df.select(df.value.withField('School',func.lit('TsingHua'))).show()
df.select(df.value.dropFields('age')).show()
其结果如下:
- getItem()方法按索引获取ArrayType类型数据的元素
from pyspark.sql import Row
data=[Row(name='Alice',score=[78,90,85]),
Row(name='Bob',score=[69,85]),
Row(name='Jack',score=None)]
df=spark.createDataFrame(data)
df.select(df.score.getItem(0)).show()
其结果如下:
- isNotNull()、isNull()判断是否为空
from pyspark.sql import Row
data=[Row(name='Alice',score=78),
Row(name='Bob',score=69),
Row(name='Jack',score=None)]
df=spark.createDataFrame(data)
df.select(df.score.isNull()).show()
df.select(df.score.isNotNull()).show()
其结果如下:
from pyspark.sql import Row
data=[Row(name='Alice',score=78),
Row(name='Bob',score=69),
Row(name='Jack',score=100)]
df=spark.createDataFrame(data)
df.select(df.score.isin([69,78])).show()
其结果如下:
from pyspark.sql import Row
data=[Row(name='Alice',score=float('NaN')),
Row(name='Bob',score=69.0),
Row(name='Jack',score=None)]
df=spark.createDataFrame(data)
df.select(df.score==None,df.score.eqNullSafe(None),
df.score.eqNullSafe(float('Nan'))).show()
其结果如下:
3.4 pyspark.sql.funcitons包中提供的方法
pyspark.sql.functions包中也提供了很多可以对DataFrame的列进行操作的方法。这里有一些与Column自带的方法同名的方法,不再赘述。
数值类计算操作主要包括:abs、acos、acosh、asin、asinh、atan、atan2、atanh、cos、cosh、exp、expm1(其结果为exp()-1)、pow、sqrt、tan、tanh、sin、sinh、log、log10、log1p、log2、ceil(向上取整)、floor(向下取整)、round(HALF-UP型四舍五入)、bround(HALF-EVEN型四舍五入)、rint(返回最靠近该值的双精度整数)、cbrt(立方根)、factorial(阶乘)、corr()、signum(符号函数)、hypot(其计算值为sqrt(col1^2+col2^2))、degrees(将以弧度为单位测量的角度转换为以度为单位测量的近似等效角度)、radians(degrees的逆操作)。仅以几个例子进行说明:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
spark=SparkSession.builder.appName("jsonRDD").getOrCreate()
data=[[item,item*0.5] for item in range(-5,5)]
df=spark.createDataFrame(data,['A','B'])
df.show()
df.select('A','B',func.abs('A').alias('abs'),func.ceil('B').alias('ceil'),
func.pow('A','B').alias('pow')).show()
其他结果如下:
常用的聚合操作有:avg、mean、min、max、count、count_distinct(不同值计数)、sum、sum_distinct(不同的值的总和)、stddev、stddev_pop、stddev_samp、var_pop、var_samp、variance、first(返回群组的第1个值)、last(返回群组的最后一个值)、skewness(偏度)、kurtosis(峰度)、aggregate、approx_count_distinct(近似不同值计数)、grouping(指定分组列表中的列是否聚合)、grouping_id(指定分组的层级)、collect_list(把某一列的值聚合成一个列表)、collect_set(把某一列的值聚合成一个集合,去重)。用法举例如下:
data=[[item,item*0.5] for item in range(-5,5)]
df=spark.createDataFrame(data,['A','B'])
df.select(func.sum('A').alias('sum'),
func.sum_distinct(func.abs('A')).alias('sum_dis'),
func.count('A').alias('count'),
func.collect_set('A').alias('collect')).show()
其结果如下:
常用的ArrayType类型列操作: array(将两个表合并成array)、array_contains、array_distinct、array_except(两个array的差集)、array_intersect(两个array的交集不去重)、array_join、array_max、array_min、array_position(返回指定元素在array中的索引,索引值从1开始,若不存在则返回0)、array_remove、array_repeat、array_sort、array_union(求两个array的并集,不去重)、arrays_overlap(如果两个array中包含非空的相同元素,则返回True;如果两个array中都包含空元素,返回空;否则返回False)、arrays_zip、size、sort_array(可以指定是否逆序)、slice、element_at(返回指定索引的值)、flatten、forall(判断array中的所有元素是否都满足设定的条件)、shuffle、transform(对array中的每个元素进行转化)、sequence(类似range)、zip_with。用法举例如下:
data=[(1,2,[3,4]),(3,5,[5,6,5]),(10,0,[4,5])]
df=spark.createDataFrame(data,['A','B','C'])
df.select(func.array('A','B').alias('new_arr'),
func.array_contains('C', 5).alias('contain_5'),
func.array_distinct('C').alias('dist'),
func.array_position('C', 3).alias('index'),
func.array_min('C').alias('min'),
func.array_join('C',',').alias('join'),
func.array_repeat('C',2).alias('repeat'),
func.array_sort('C').alias('sort'),
func.sequence(func.lit(-1),func.lit(1)).alias('seq')).show()
df.select(func.element_at('C', 1).alias('element'),
func.transform('C',lambda x:x+1).alias('trans'),
func.slice('C',1,2).alias('slice')).show()
df=df.withColumn('D',func.array('A','B'))
df.select('C','D',
func.array_union('C','D').alias('union'),
func.arrays_zip('C','D').alias('zip'),
func.arrays_overlap('C', 'D').alias('overlap')).show()
其结果如下:
常用的日期类操作有:current_date、current_timestamp、date_add、date_format(将日期转化为指定格式)、date_sub、date_trunc(在指定位置对数据进行阶截断)、datediff、dayofmonth、dayofweek、dayofyear、hour、minute、month、months_between(两个日期相差的月份数)、next_day(返回日期之后第一个周几)、quarter、second、timestamp_seconds(将时间戳转化为日期)、weekofyear、year、to_date、to_timestamp、to_utc_timestamp、unix_timestamp(将日期转化为时间戳)、trunc(将日期在指定位置截断)、add_months、session_window、from_unixtime(将时间戳转化为日期)、from_utc_timestamp(将时间戳转化为日期)、last_day(返回日前所在月份的最后一天)。
data=[('2012-10-23','2013-01-15'),
('2013-03-05','2013-05-07'),
('2014-04-03','2015-09-13')]
df=spark.createDataFrame(data,['startdate','enddate'])
df.select(func.dayofweek('startdate').alias('A'),
func.date_sub('startdate',2).alias('B'),
func.datediff('enddate','startdate').alias('C'),
func.month('startdate').alias('D'),
func.quarter('startdate').alias('E'),
func.year('startdate').alias('F'),
func.next_day('startdate','Sun').alias('G'),
func.current_date().alias('H'),
func.date_trunc('mon','startdate').alias('trunc')).show()
其结果如下:
补充:date_trunc中的format的取值为: 'year', 'yyyy', 'yy' , 'month', 'mon', 'mm''day', 'dd', 'microsecond', 'millisecond', 'second', 'minute', 'hour', 'week', 'quarter'
常用的字符类操作有:ascii(返回字符串首字母的ASCII值)、concat、concat_ws、length、lower、lpad、ltrim、regexp_extract(按正则表达式进行抽取)、regexp_replace、repeat、reverse、rpad、rtrim、split、substring(抽取子串)、substring_index(返回第n个分隔符之前的所有字符)、translate、trim、locate(返回指定位置之后某个字符第一次出现的位置)、initcap(字符串首字母大写)、input_file_name(从当前spark任务中的文件中创建字符串)、instr(返回子串第一次出现时的位置索引)、levenshtein(两个字符串的编辑距离)、sentences(将字符串分割成句子的集合)、to_json、to_csv。用法举例如下:
data=[('a','abc','def'),
('c','defg','adc'),
('d','edge','ghi')]
df=spark.createDataFrame(data,['A','B','C'])
df.select('A','B','C',
func.ascii('B').alias('B_ascii'),
func.length('C').alias('C_len'),
func.lpad('A',3,'#').alias('A_lpad'),
func.concat('B','C').alias('B_C_concat'),
func.concat_ws('_','A','B','C').alias('ABC_concat_ws'),
func.reverse('B').alias('B_reverse'),
func.substring('B',1,3).alias('B_substring'),
func.translate('B','abcd','123').alias('B_translate')).show()
#translate中:a->1,b->2,c->3,d-''
其结果如下:
常用的map型操作有:create_map、map_concat(将两个列组合成map)、map_entries、map_filter、map_from_arrays、map_from_entries、map_keys、map_values、map_zip_with、explode(将map的key和value分成两列)、explode_outer(将map的key和value分成两行)、transform_keys(对key进行操作)、transform_values(对value进行操作)。用法举例如下:
data=[([1,2],['a','b'],{'m':40,'k':300},{'m':3,'k':1}),
([3,4],['a','k'],{'d':14,'c':24},{'d':3,'c':5}),
([5,6],['g','h'],{'e':34,'f':39},{'e':2,'f':10})]
df=spark.createDataFrame(data,['A','B','C','D'])
df=df.withColumn('E',func.map_from_arrays('B', 'A'))
df.select('B','A','E').show()
df.select(func.map_concat('C','E').alias('concat'),
func.map_keys('C').alias('keys'),
func.map_values('D').alias('vals'),
func.map_filter('C',lambda k,v:v>30).alias('filter'),
func.map_zip_with('C','D',lambda k,v1,v2:v1*v2).alias('zip'),
func.transform_keys('C',lambda x,_:func.upper(x)).alias('trans')).show()
其结果如下:
这一类方法可以同时对多个列进行操作。常用的方法主要包括:greatest、least、nanvl(如果第一列的值为空,则返回第二列的值)、coalesce(返回第一个不为空的列值)。用法举例如下:
data=[(1,float('nan'),2,3),
(None,None,4,5),
(None,None,None,10)]
df=spark.createDataFrame(data,['A','B','C','D'])
df.select(func.greatest('A','B','C','D').alias('greast'),
func.nanvl('A','B').alias('nanvl'),
func.coalesce('A','B','C','D').alias('coalesce')).show()
其结果如下:
Spark SQL中的窗口函数用法与MySQL 8中的窗口函数相同,关于窗口函数的理论可以参考:MySQL8.0中的窗口函数_Sun_Sherry的博客-CSDN博客_mysql8窗口函数
Spark DataFrame中常用的窗口函数有:rank、dense_rank、row_number、ntile、nth_value、lead、lag、percent_rank、cume_dist。另外聚合函数也可以作为窗口函数。用法举例如下:
from pyspark.sql import Window as win
data=[('A',1,2),('B',3,4),
("A",4,5),('C',7,9),
('C',4,0),('B',8,2)]
df=spark.createDataFrame(data,['C1','C2','C3'])
df.select('C1','C2','c3',
func.rank().over(win.partitionBy('C1').orderBy('C3')).alias('C4'),
func.sum('C3').over(win.partitionBy('C1').orderBy('C2').\
rowsBetween(win.unboundedPreceding, win.currentRow)).alias('C5')).show()
其结果如下:
常用的二进制列方法有:decode、encode、base64、unbase64、sha1、sha2、xxhash64、md5、hash。用法举例如下:
data=[([bytearray('HELLO','utf-8')]),
([bytearray('hello','utf-8')]),
([bytearray('1','utf-8')])]
df=spark.createDataFrame(data,['C1'])
df.select('C1',
func.md5('C1').alias('md5'),
func.decode('C1','utf-8').alias('decode'),
func.base64('C1').alias('base64')).show()
结果如下:
常用的分区转化函数有:days、hours、months、years。
常用的位转移方法有:shiftleft、shiftright、shiftrightunsigned。用法举例如下:
data=[[item] for item in range(0,5)]
df=spark.createDataFrame(data,['A'])
df.select('A',
func.shiftleft('A',3).alias('shift_left'),
func.shiftrightunsigned('A',1).alias('shift-right')).show()
其结果如下;
常用的进制转换方法有:hex(返回16进制对应的数据)、unhex()、conv(进制转换)、bin(将数据的整数部分转化成二进制)
data=[('AB3',2.3),
('2DA',4.5),
('48F',4.2)]
df=spark.createDataFrame(data,['A','B'])
df.select('A','B',
func.hex('A').alias('hex'),
func.conv('A',16,8).alias('conv'),
func.bin('B').alias('bin')).show()
其结果如下:
常用的创建特定的列的方法有:monotonically_increasing_id(自增)、lit(常量列)、rand(随机数)、randn(随机数)。用法举例如下:
data=[('AB3',2.3),
('2DA',4.5),
('48F',4.2)]
df=spark.createDataFrame(data,['A','B'])
df.select(func.monotonically_increasing_id().alias('id'),
func.rand().alias('rand'),
func.randn().alias('randn')).show()
其用法如下:
(1) when()……otherwise()条件判断,类似于SQL中的case……when
data=[('AB3',2.3),
('2DA',4.5),
('48F',4.2)]
df=spark.createDataFrame(data,['A','B'])
df.select('B',
func.when(func.col('B')>3,True).otherwise(False).alias('when')).show()
其结果如下:
(2) udf(f,returnType)自定义函数
data=[('AB3',2.3),
('2DA',4.5),
('48F',4.2)]
new_func=func.udf(lambda x:True if x>3 else False,BooleanType())
df=spark.createDataFrame(data,['A','B'])
df.select('B',
new_func('B').alias('new_col')).show()
其结果如下:
(3) pandas_udf()使用Pandas中的函数
from pyspark.sql.functions import pandas_udf
data=[('AB3',2.3),('2DA',4.5),('48F',4.2)]
df=spark.createDataFrame(data,['A','B'])
new_func1=func.pandas_udf(lambda x:x.str.len(),IntegerType())
new_func2=func.pandas_udf(lambda x:x>3,BooleanType())
df.select(new_func1('A').alias('A_len'),
new_func2('B').alias('B_TF')).show()
@pandas_udf("int")
def new_func3(x:pd.Series) -> pd.Series:
return x.str.len()
@pandas_udf('boolean')
def new_func4(x:pd.Series) -> pd.Series:
return x>3
df.select(new_func3('A').alias('A_len1'),
new_func4('B').alias('B_TF1')).show()
其结果如下:
关于pandas_udf有以下几点需要说明:
- 使用pandas_udf()需要安装pandas和PyArrow包。
- pandas_udf()中f的定义中的参数x相当于pandas.Series,可以直接使用pandas.Series自带的所有函数和方法,而udf()中f的定义中的x则对应DataFrame中对应的Column列中的每一个元素。
- pandas_udf()中的可选参数functionType在未来的版本中会弃用,spark推荐使用类型提示(type hints)来代替functionType。上述四个使用pandas_udf定义的函数:new_func1、new_func2、new_func3和new_func4。其中new_func1和new_func3、new_func2和new_func4的作用相同,而new_func3和new_func4才是推荐的写法。
- 《Spark快速大数据分析》
DataFrame也是一种不可变的分布式数据集,类似于Python Pandas中的DataFrame和关系数据库中的表。在分布式数据集上施加表结构之后,就可以使用Spark SQL查询结构化的数据或者使用Spark表达式方法。1. Spark SQL性能未引入DataFrame之前,使用Python操作RDD时的查询速度比使用Scala和Java的查询慢很多,因为Pyspark需要将所有........................................................
在Pandas中,DataFrame的一列就是一个Series, 可以通过map来对一列进行操作:
df['col2'] = df['col1'].map(lambda x: x**2)
其中lambda函数中的x代表当前元素。可以使用另外的函数来代替lambda函数,例如:
define square(x):
return (x ** 2)
df['col2'] = df['col1'].map(square)
2.多列运算
apply()会将待处理的对象拆分成多个片段,然后对各片段调用传入的函数,最后尝试将各片段组合到一起。
要对DataFrame的多个列同时进
中提供了该项目中所有PySpark RDD,DataFrame和SQL示例的说明,所有这些示例均以Python语言编码并在我们的开发环境中进行了测试。
目录(Python中的火花示例)
PySpark基本示例
PySpark –功能
PySpark –这是什么? &谁使用它?
PySpark DataFrame示例
PySpark –创建一个DataFrame
PySpark –创建一个空的DataFrame
PySpark –将RDD转换为DataFrame
PySpark –将DataFrame转换为Pandas
PySpark – StructType和StructField
在DataFrame和RDD上使用PySpark行
从PySpark DataFrame中选择列
PySpark Collect()–从DataFrame检索数据
PySpark withColumn
Pyspark学习笔记(六)
文章目录Pyspark学习笔记(六)前言DataFrame简介一、什么是 DataFrame ?二、RDD 和 DataFrame 和 Dataset三、选择使用DataFrame / RDD 的时机
本篇博客讲的是DataFrame的基本概念
DataFrame简介
主要参考文献:
A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets
RDDs vs. Dataframes
DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。通过在分布式数据集上施加结构,让Spark用户利用Spark SQL来车讯结构化的数据或使用Spark表达式方法(而不是lambda)。
1.python到RDD之间的通信
每当使用RDD执行PySpark程序时,潜在地需要巨大地开销来执行作业。如图
在PySpark驱动器中...
spark.createDataFrame(value, ['name', 'age']).collect()
d = [{'name': 'Alice', 'age': 1}]
spark.createDataFrame(d).collect()
通过pandas创建
spark.
二、基本操作
2.1 建立SparkSession对象
一切操作之前需要先建立一个SparkSession对象(运行Spark code的Entrance point,可以理解为交互部件):
详见: pyspark.sql module
from pyspark.sql import SparkSession
spark = SparkSession.builder.ma
在spark下,有很多种创建dataframe的方法,下面会一一例句
from pyspark.sql import SparkSession
from datetime import datetime,date
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql import Row
spark = SparkSession.builder.appName('test').master('local').getOr
其中,withColumn方法接受两个参数,第一个参数是新列的名称,第二个参数是新列的值。在上面的例子中,我们使用了lit函数来创建一个常量值作为新列的值。如果需要根据已有的列计算新列的值,可以使用pyspark.sql.functions中的其他函数来实现。
### 回答2:
在Pyspark中,我们可以使用withColumn()方法为DataFrame对象添加新的一列。
withColumn()方法需要两个参数:第一个参数为新列的名称,第二个参数为新列的值或新列的计算方法。
以下是一个示例代码:
from pyspark.sql.functions import col
# 创建一个DataFrame对象
df = spark.createDataFrame([(1, "John", 25), (2, "Mary", 30)], ["id", "name", "age"])
# 添加一个新列"gender"
df = df.withColumn("gender", col("age") % 2)
# 展示DataFrame对象
df.show()
在上面的代码中,我们使用createDataFrame()方法创建了一个DataFrame对象,其中包含三列:id、name和age。
接着,我们使用withColumn()方法为该对象添加了一个新列gender。这个新列的值是根据age列的值计算得到的,使用了col()函数和%运算符。
最后,我们使用show()方法展示了更新后的DataFrame对象。
可以看到,新的DataFrame对象中有了一个名为gender的列,它的值分别是0和1,代表了age列的奇偶性。
### 回答3:
在pyspark中,我们可以使用withcolumn函数将新的一列添加到dataframe中。withcolumn需要两个参数,一个是新的列名,一个是新的列所要包含的值。以下是具体的步骤:
1. 导入pyspark.sql.functions包
```python
from pyspark.sql.functions import *
2. 创建一个dataframe
例如,我们创建一个包含姓名和年龄的dataframe:
```python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
data = [("John", 25), ("Lisa", 30), ("Tom", 20)]
df = spark.createDataFrame(data=schema)
3. 使用withcolumn函数添加新的一列
例如,我们添加一个新的列"gender",需要根据年龄判断性别:
```python
df1 = df.withColumn("gender", when(df.age >= 18, "Male").otherwise("Female"))
以上代码中,wehn函数判断age是否大于等于18,如果是则设置gender为"Male",否则设置为"Female"。
4. 查看新的dataframe
```python
df1.show()
输出结果为:
+----+---+------+
|name|age|gender|
+----+---+------+
|John| 25| Male|
|Lisa| 30| Male|
|Tom | 20| Male|
+----+---+------+
以上是在pyspark中添加新的一列的基本步骤,具体使用可以根据需求进行修改。