case class User(user_id:String,item_id:String,cat_id:String,merchant_id:String,brand_id:String,month:String,day:String,
action:String,age_range:String,gender:String,province:String)
object UserAnalysis {
def main(args:Array[String]): Unit = {
val userDataPath = "file:///home/hadoop/data_format/small_user_log.csv"
val sparkSession = SparkSession
.builder
.master("local")
.appName("UserAnalysis")
.enableHiveSupport()
.getOrCreate()
val lineRdd = sparkSession.sparkContext.textFile(userDataPath)
val sqlContext = sparkSession.sqlContext
import sqlContext.implicits._
val userDF = lineRdd.map{{x=>
val split = x.split(",")
User(split(0),split(1),split(2),split(3),split(4),split(5),split(6),
split(7),split(8),split(9),split(10))
}.toDF()
userDF.show(10)
userDF.createOrReplaceTempView("userDF")
val provinceDF = sparkSession.sql("select province from userDF")
provinceDF.show(10)
注意:如果csv文件内容第一行是列头信息,按方式四创建DataFrame会看到如下情况,甚至会因
字段类型转换失败而报错,例如:user_id这个字符串没法转换成int

注意:如果调用toDF方法时没有指定列头名称,则会以case class的参数名称为列头名;
如果调用toDF方法时指定了列头名称,则会显示toDF方法指定的列头名称
不上图了。。。
另外,在scala 2.10中最大支持22个字段的case class,这点需要注意
-
关于处理csv文件中首行列头信息
第一种创建方式,可以通过设置option中的header属性来控制是否读取csv文件第一行为列头。
即如果有列头,设置为true,反之则设置为false。无论是加载单个文件还是批量加载都没问题。
其它三种创建方式,使用textFile加载csv文件,可能需要使用filter来过滤掉所有csv文件的首行列头信息
-
关于字段类型和字段名称
第一种创建方式,在创建好dataframe之前,似乎没法指定字段名称和字段类型,字段名称要么是csv文件行首自带的要么是spark生成的,字段类型似乎全部默认为String。但是我们能通过dataframe的withColumn方法和withColumnRenamed方法修改它们,如下:
import org.apache.spark.sql.functions._
userDF = userDF.withColumn("user_id", col("user_id").cast(IntegerType))
.withColumn("item_id", col("item_id").cast(IntegerType))
.withColumn("cat_id", col("cat_id").cast(IntegerType))
.withColumn("merchant_id", col("merchant_id").cast(IntegerType))
.withColumn("brand_id", col("brand_id").cast(IntegerType))
.withColumn("month", col("month").cast(IntegerType))
.withColumn("day", col("day").cast(IntegerType))
.withColumn("action", col("action").cast(IntegerType))
.withColumn("age_range", col("age_range").cast(IntegerType))
.withColumn("gender", col("gender").cast(IntegerType))
.withColumnRenamed("province","省份")
第二种创建方式,我们可以在定义StructType时指定字段名称和类型
第三种创建方式,我们可以在将RDD[String]转换成元组时指定字段类型,在调用toDF时指定字段名称,如下:
val userDF = lineRdd.map{{x=>
val split = x.split(",")
(Integer.parseInt(split(0)),
Integer.parseInt(split(1)),
Integer.parseInt(split(2)))
}.toDF("A","B","C")
第四种创建方式,我们可以在定义case class时指定字段名称和类型,在调用toDF时也能指定字段名称,
toDF里指定的的名称优先级更高。
这样看来,以上几种创建dataframe的方式各有优劣,所以还是要根据实际应用场景来择取最方便的途径吧。
1、使用spark来处理CSV文件,写入mysql表当中
spark介绍
Spark是一个快速(基于内存),通用、可扩展的计算引擎,采用Scala语言编写。2009年诞生于UC Berkeley(加州大学伯克利分校,CAL的AMP实验室),2010年开源,2013年6月进入Apach孵化器,同年由美国伯克利大学 AMP 实验室的 Spark 大数据处理系统多位创始人联合创立Databricks(属于 Spark 的商业化公司-业界称之为数砖-数据展现-砌墙-侧面应正其不是基石,只是数据计算),2014年成为
之前写的程序中,有些API在Spark SQLContext没有,我计算的结果先保存在rdd中,最后在使用RDD转换成dataframe进行保存,话不多说下面是代码.//一个StruceFields你可以把它当成一个特征列。分别用列的名称和数据类型初始化
val structFields = List(StructField("age",DoubleType),StructField("hei
数据的加载和保存通用的加载和保存方式SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的
API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式
为 parquet1) 加载数据spark.read.load 是加载数据的通用方法 如果读取不同格式的数据,可以对不同的数据格式进行设定➢ format("…"):指定加载的数据类型,括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
➢ load("
val sparkContext=ss.sqlContext
import sparkContext.implicits._
val sqlContext = //create sqlContext
import sqlContext.implicits._val df = RDD.toDF()
val ...
转自:https://vimsky.com/article/2708.html跟关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象。DateFrame广泛应用于使用SQL处理大数据的各种场景。创建DataFrame有很多种方法,比如从本地List创建、从RDD创建或者从源数据创建,下面简要介绍创建DataFrame的三种方法。方法一,Spark...
object scvSQL {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder()
.master("local[*]")
.appName("dataframe")
.getOrCreate()
//不需要创建RDD
format(“…”):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和
“textFile”。
load(“…”):在"csv"、“jdbc”、“json”、“orc”、"parquet"和"textFile"格式下需要传入加载
数据的路径。
option(“…”):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
我们前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,其实
val rdd: RDD[Person] = sc.parallelize(Array(
Person("fanghailiang", 29),
Person("sunyu", 28),
Person
Spark SQL
Spark SQL是构建在Spark RDD之上的一款ETL(Extract Transformation Load)工具,这类似于构建在MapReduce之上的1.x版本的Hive。同Spark RDD的不同之处在于Spark SQL的API可以给Spark计算引擎提供更多的信息(计算数据结构、转换算子),Spark计算引擎可以根据Spark SQL提供的信息优化底层计算任务。目前为止,Spark SQL提供了两种风格的交互式API:Dataset API/SQL脚本。
Dataset
with open('singer_name.csv','w')as csvf:
#新建csv表头列表
fieldnames=['first_name','second_name','last_name']
writer=csv.DictWriter(csvf,fieldnames=fieldnames)
#写入表头
writer.writeheader()
介绍一下Spark将RDD转换成DataFrame的两种方式。
1.通过是使用case class的方式,不过在scala 2.10中最大支持22个字段的case class,这点需要注意
2.是通过spark内部的StructType方式,将普通的RDD转换成DataFrame
装换成DataFrame后,就可以使用SparkSQL来进行数据筛选过滤等操作
下面直接代码说话
packag