from pyspark import SparkContext
from pyspark.sql import SparkSession
import json
import pandas as pd
当需要把Spark DataFrame转换成Pandas DataFrame时,可以调用toPandas();
当需要从Pandas DataFrame创建Spark DataFrame时,可以采用createDataFrame(pandas_df)。
但是,需要注意的是,在调用这些操作之前,
需要首先把Spark的参数spark.sql.execution.arrow.enabled设置为true,
因为这个参数在默认情况下是false
columns_json_str = '{"name":"影片名称","box_office":"票房"}'
columns_dict = json.loads(columns_json_str)
sc = SparkContext('local', 'spark_file_conversion')
sc.setLogLevel('WARN')
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('hdfs://192.168.3.9:8020/input/movies.csv')
print(df.dtypes)
df = pd.DataFrame(df.toPandas(),columns=columns_dict.keys())
print(df)
data_values=df.values.tolist()
data_coulumns=list(df.columns)
df = spark.createDataFrame(data_values,data_coulumns)
for key in columns_dict.keys() :
df = df.withColumnRenamed(key , columns_dict[key]);
print(df.collect())
print(df.printSchema())
filepath = 'new_movies.csv'
df.write.format("csv").options(header='true', inferschema='true').save('hdfs://192.168.3.9:8020/input/' + filepath)
https://github.com/gm19900510/data_analysis_python 欢迎star