相关文章推荐
瘦瘦的小摩托  ·  java中将Object类型转换成Strin ...·  1 年前    · 
爽快的镜子  ·  对外经济贸易大学·  1 年前    · 
慷慨的伤疤  ·  北京人工智能核心产值破2500亿元 ...·  1 年前    · 
博学的地瓜  ·  贪心算法求解 TSP ...·  1 年前    · 
豁达的马克杯  ·  空客首架A321neo飞机交付维珍美国航空·  1 年前    · 
Code  ›  Spark(RDD,CSV)创建DataFrame方式开发者社区
csv rdd spark dataframe
https://cloud.tencent.com/developer/article/1673819
憨厚的薯片
2 年前
作者头像
Tim在路上
0 篇文章

Spark(RDD,CSV)创建DataFrame方式

前往专栏
腾讯云
开发者社区
文档 意见反馈 控制台
首页
学习
活动
专区
工具
TVP
文章/答案/技术大牛
发布
首页
学习
活动
专区
工具
TVP
返回腾讯云官网
社区首页 > 专栏 > 后台技术底层理解 > Spark(RDD,CSV)创建DataFrame方式

Spark(RDD,CSV)创建DataFrame方式

作者头像
Tim在路上
发布 于 2020-08-04 21:46:04
1K 0
发布 于 2020-08-04 21:46:04
举报

spark将RDD转换为DataFrame

  1. 方法一(不推荐)

spark将csv转换为DataFrame,可以先文件读取为RDD,然后再进行map操作,对每一行进行分割。 再将schema和rdd分割后的Rows回填,sparkSession创建的dataFrame

 val spark = SparkSession
      .builder()
      .appName("sparkdf")
      .master("local[1]")
      .getOrCreate()
       //设置spark的上下文sparkContext
      val sc = spark.sparkContext
      val fileRDD = sc.textFile("/home/hadoop/Downloads/filesmall2.csv")
      //val rdd = fileRDD.filter(line => line.split("\t").length != 30)
      val df = spark.createDataFrame(fileRDD.map(line=>HttpSchema.parseLog(line)),HttpSchema.struct)
      df.show(3)

这里的RDD是通过读取文件创建的所以也可以看做是将RDD转换为DataFrame

object HttpSchema {
  def parseLog(x:String): Row = {
    var fields = x.split("\t")
    val _id = fields(0)
    val srcIp = fields(1)
    val srcPort = fields(2)
    //这种方法比较麻烦的地方是row里面的字段名要和struct中的字段对应上
    RowFactory.create(_id,srcIp,srcPort)
  //设置schema描述
  val struct = StructType(
    Array( StructField("_id",StringType),
      StructField("srcIp",StringType),
      StructField("srcPort",StringType),
}

这也是这种方法不推荐使用的地方,因为返回的Row中的字段名要与schema中的字段名要一致,当字段多于22个这个需要集成一个

2.方法二 //使用隐式转换的方式来进行转换

val spark = SparkSession
      .builder()
      .appName("sparkdf")
      .master("local[1]")
      .getOrCreate()
      //使用隐式转换必须导入这个才可以使用只有import spark.implicits._之后,RDD才有toDF、toDS功能
      import spark.implicits._
       //设置spark的上下文sparkContext
      val sc = spark.sparkContext
      val fileRDD = sc.textFile("/home/hadoop/Downloads/filesmall2.csv")
      case class HttpClass(id:String,srcIp:String,srcPort:Int)
      val df = fileRDD.map(_.split("\t")).map(line=>HttpClass(line(0),line(1),line(2).toInt)).toDF()

当然也可以不创建类对象

rdd.map{x=>val par=x.split(",");(par(0),par(1).toInt)}.toDF("name","age")

dataFrame转换为RDD只需要将collect就好,df.collect RDD[row]类型,就可以按row取出

spark读取csv转化为DataFrame

  1. 方法一
 val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
    val sc = new SparkContext(conf)
    println("spark version: " + sc.version)
    val spark = new SQLContext(sc)
    import spark.implicits._
    val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "false")
      .option("inferSchema", "false") //是否自动推到内容的类型
      .option("delimiter",",")  //分隔符,默认为 ,
      .load("/home/hadoop/Downloads/Salary_Data.csv")
    df.show()
    //进行写数据
    data.repartition(1).write.format("com.databricks.spark.csv")
      .option("header", "false")//在csv第一行有属性"true",没有就是"false"
      .option("delimiter",",")//默认以","分割
      .save(outpath)
     sparkContext.stop()

sparkContext.sql()操作完成后直接返回的是DataFrame

 
推荐文章
瘦瘦的小摩托  ·  java中将Object类型转换成String类型_set<object>转 set<string>-CSDN博客
1 年前
爽快的镜子  ·  对外经济贸易大学
1 年前
慷慨的伤疤  ·  北京人工智能核心产值破2500亿元 力促医药健康产业万亿级跃升
1 年前
博学的地瓜  ·  贪心算法求解 TSP 旅行商问题及其实现_旅行者问题 贪心算法-CSDN博客
1 年前
豁达的马克杯  ·  空客首架A321neo飞机交付维珍美国航空
1 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号