一直在说Dataframe是Dataset的特例,DataFrame=Dataset[Row],可Row是什么东西呢?

什么是Row

顾名思义:就是一行数据
Row是org.apache.spark.sql包下的一个特质
简单的理解:
Row是一个类型,跟Car、Person这些的类型一样, 所有的表结构信息都用Row来表示

什么时候会用到这个Row呢?

通过读取文件创建一个DataFrame:

    import spark.implicits._
    val dfA: DataFrame = spark.read.textFile(path = "./data/infoA")
      .map(_.split(","))
      .map(x => (x(0), x(1)))
      .toDF("tel", "name")

如果要把这个DataFrame转换为Dataset,只需要写个样例类,然后调用DataFrame的as方法:

case class User(tel: String, name: String)
import spark.implicits._
val dsA: Dataset[User] = spark.read.textFile(path = "./data/infoA")
      .map(_.split(","))
      .map(x => (x(0), x(1)))
      .toDF("tel", "name")
      .as[User]

这个列子想说明,如果返回的是DataFrame,那么其实可以看成是Dataset[Row]。

那么如何处理这个Row类型的Dataset呢?

先从官网举的几个"无用"的例子说起

  • 在Java中可以使用 RowFactory.create() 来创建Row,在Scala中用 Row.apply() 创建
  • Row可以通过几个字段来构建
    import org.apache.spark.sql._
    // Create a Row from values.
    val row1 = Row(1, true, "a string", null)
    // Create a Row from a Seq of values.
    val row2 = Row.fromSeq(Seq(1, true, "a string", null))
  • 如何访问Row的数据
import org.apache.spark.sql._
val row = Row(1, true, "a string", null)
// row: Row = [1,true,a string,null]
val firstValue = row(0)
// firstValue: Any = 1
val fourthValue = row(3)
// fourthValue: Any = null
// using the row from the previous example.
val firstValue = row.getInt(0)
// firstValue: Int = 1
val isNull = row.isNullAt(3)
// isNull: Boolean = true
  • 在Scala中还可以进行模式匹配:
import org.apache.spark.sql._
val pairs = sql("SELECT key, value FROM src").rdd.map {
  case Row(key: Int, value: String) =>
    key -> value
 

上面的案例在开发中几乎没有用,写上纯属是为了进一步感受Row是个什么东西

Row在开发中可能用到的场景

说实话,基本上对于研发人员来说,真正使用Row的场景不多,因为现在使用spark进行开发一般都是:

spark.sql("SELECT * FROM TABLE A")

这样的句式返回的正是DataFrame或者说是Dataset[Row]
可如果想要处理这个Row,就有点难办了,比如我要把整个手机号存入到一个Array或者List中,一般都要把Row转为String,这个时候就用到了Row的mkString()方法

    val dfA: DataFrame = spark.read.textFile(path = "./data/infoA")
      .map(_.split(","))
      .map(x => (x(0), x(1)))
      .toDF("tel", "name")
    val telArr: Array[String] = dfA.select("tel").map(row => row.mkString).collect()
    val telList: List[String] = telArr.toList
    val telSeq: Seq[String] = telArr.toSeq
    println(telList)
    println(telSeq)
List(13111111111, 13222222222, 13333333333, 13444444444, 13555555555, 13666666666, 13777777777, 13888888888, 13999999999)
WrappedArray(13111111111, 13222222222, 13333333333, 13444444444, 13555555555, 13666666666, 13777777777, 13888888888, 13999999999)
                    前言一直在说Dataframe是Dataset的特列,DataFrame=Dataset[Row],可Row是什么东西呢?什么是Row顾名思义:就是一行数据Row是org.apache.spark.sql包下的一个特质简单的理解:Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。什么时候会用到这个Row呢?煮个例子通过读取文件创建一个Dat...
				
Spark 动态的给Row新增字段 我们知道,在Spark,我们读取csv或者MySQL等关系型数据库时,可以直接得到DataFrame.我们要想新增一个字段,可以通过DataFrame的API或者注册一个临时表,通过SQL语句能很方便的实现给增加一个或多个字段. 但是,当我们将DataFrame转化成RDD的时候,RDD里面的类型就是Row,如果此时,要想再增加一个字段,该怎么办呢? Sho...
由于项目需要,要自己写一个数据转换接口。将Dataset<Row>传入本地算法进行处理,将经过本地算法处理后的行列数据转换为Dataset<Row>返回。 1、获取Dataset<Row>数据到本地 使用list获取数据: List<Row> list = dataset.collectAsList(); in...
1.1、Shark介绍 hark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用。 Shark是完全兼容Hive的语法,表结构以及UDF函数等,已有的HiveSql可以直接进行迁移至Shark上Shark底层依赖于Hive的解析器,查询优化器,但正是由于Shark的整体设计架构对Hive的
关于org.apache.spark.sql.Row前言构造Row解析Row的值按索引进行字段访问 —— apply和get方法指定类型来获取字段 —— getAs方法Row与SchemaRow与匹配模式参考资料 Row表示关系运算符的一行输出。 它是一个通用行对象,具有有序的字段集合,可以通过序数/索引(通过序数进行通用访问,generic access by ordinal),字段名(也...
spark在操作dataset/dataframe时候,经常需要对每一行数据进行处理,像map/mapPartition/foreach/ foreachParition等,那么我们在拿到一行数据时候,如何从拿取出我们想要的列,然后进行相关业务操作,经常摸不着头脑,本文基于spark 2.1.1分析了一行数据的表达,以及详细的讲解了各种操作拿取行相应列数据的方法。 Row实战操作 根据a...
scala实例化方法: It is invalid to use the native primitive interface to retrieve a value that is null, instead a user must checkisNullAtbefore attempting to retrieve a value that might be null. To cre...
Row row = RowFactory.create(record.getLong(1), record.getInt(2), record.getString(3)); 其“记录”是数据库的记录,但是我无法提前知道“记录”的长度,因此我想使用列表或数组来创建“行”。在Scala,我可以使用Row.fromSeq()从列表或数组创建行,但是如何在Java实现呢? // initialize first SQLCo 打印文件内容 可利用collect()函数,它能够以数组的形式,返回RDD数据集的所有元素 lines = spark.read.text(‘file:///home/wordcound.txt’).rdd for i in lines.collect(): print(i) 处理文件: lines存储的是Row object类型 将其String取出,利用map api进一步转换RDD lines_map = lines.ma
spark 读取 linux sftp上的文本文件,原jar只支持josn,csv等,增加bcp,txt文件的支持 下面是例子: public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkDataFrame"); JavaSparkContext javacontext = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(javacontext); Dataset<Row> df = sqlContext.read(). format("com.springml.spark.sftp"). option("host", "192.168.1.3"). option("username", "root"). option("password", "111111"). option("fileType", "bcp"). load("/sparktest/sparkfile0.bcp"); /*List<Row> list = df.collectAsList(); for(Row row:list){ String[] words = new String(row.getString(0).getBytes(),0,row.getString(0).length(),"UTF-8").split(" ",-1); for(int i=0;i<words.length;i++){ System.out.println("words==="+words[i]); JavaRDD<Row> rowRdd = df.javaRDD(); JavaRDD<Row> words_bcp= rowRdd.map(new Function<Row, Row>() { @Override public Row call(Row row) throws Exception { // TODO Auto-generated method stub String line = row.getString(0); String[] words = new String(line.getBytes(),0,line.getBytes().length,"utf-8").split(" ",-1); return RowFactory.create(words); List<Row> list = words_bcp.collect(); for(Row row:list){ System.out.println("row1=="+row.getString(0)); df.write().format("com.springml.spark.sftp"). option("host", "192.168.1.3"). option("username", "root"). option("password", "111111"). option("fileType", "bcp"). save("/sparktest/luozhao.bcp"); df.show(); javacontext.close(); (1)Spark 1.5.x版本以后,在Spark SQL和DataFrame引入了开窗函数,其比较常用的开窗函数就是row_number      该函数的作用是根据表字段进行分组,然后根据表的字段排序;其实就是根据其排序顺序,给组的每条记录添      加一个序号;且每组的序号都是从1开始,可利用它的这个特性进行分组取top-n...
SparkSql 相比较 HiveSql 具有更快的运行速度和更高的灵活性,平常使用经常需要进行数据转换,常见的有 RDD[T] -> DataFrame,DataFrame -> RDD[T] 还有 RDD[row] -> sql.dataFrame,下面简单介绍下常用用法。 初始化 SparkSession : // 1.配置Spark val conf = { if (local) new SparkConf().set //dataframe新增一列方法1,利用createDataFrame方法 val trdd = input.select(targetColumns).rdd.map(x=>{ if (x.get(0).toString().toDouble > critValueR || x.ge...
b'spark 窗口函数row_number练习以及用spark core实现' Spark是一个开源的分布式计算引擎,可以用于数据处理、大数据分析等领域。而Spark Core是其的一个核心组件,用于管理任务调度、内存管理、错误恢复等功能。而窗口函数row_number是一种用于在数据集添加行号的函数,常用于排序、分组等场景下的数据处理。 在Spark使用窗口函数row_number实现行号的方法如下: 1. 首先将数据集按照需要的方式进行排序或分组; 2. 然后使用窗口函数row_number()给每行数据添加行号; 3. 最后根据需要进行数据过滤、聚合等处理使用Spark Core实现窗口函数row_number,则需要先创建SparkContext对象,然后使用该对象创建RDD数据集,最后使用RDD的mapPartitionsWithIndex方法对每个分区数据进行处理,添加每行的行号。具体实现可以参考Spark官方文档和示例代码。
一定要坚持创作更多高质量博客哦, 小小红包, 以资鼓励, 更多创作活动请看: 新星计划2023: https://marketing.csdn.net/p/1738cda78d47b2ebb920916aab7c3584?utm_source=csdn_ai_ada_redpacket 新星计划2023: https://marketing.csdn.net/p/1738cda78d47b2ebb920916aab7c3584?utm_source=csdn_ai_ada_redpacket 上传ChatGPT/计算机论文等资源,瓜分¥5000元现金: https://blog.csdn.net/VIP_Assistant/article/details/130196121?utm_source=csdn_ai_ada_redpacket 新人首创任务挑战赛: https://marketing.csdn.net/p/90a06697f3eae83aabea1e150f5be8a5?utm_source=csdn_ai_ada_redpacket Microsoft Edge功能测评!: https://activity.csdn.net/creatActivity?id=10403?utm_source=csdn_ai_ada_redpacket 生物识别技术能否成为应对安全挑战的绝佳选择?: https://activity.csdn.net/creatActivity?id=10411?utm_source=csdn_ai_ada_redpacket 应届生如何提高职场竞争力: https://activity.csdn.net/creatActivity?id=10409?utm_source=csdn_ai_ada_redpacket 讯飞星火大模型将超越chatgpt?: https://activity.csdn.net/creatActivity?id=10407?utm_source=csdn_ai_ada_redpacket 职场新人备忘录: https://activity.csdn.net/creatActivity?id=10405?utm_source=csdn_ai_ada_redpacket “裸奔”时代下该如何保护网络隐私: https://activity.csdn.net/creatActivity?id=10401?utm_source=csdn_ai_ada_redpacket VR vs AR:哪种技术更有潜力改变未来?: https://activity.csdn.net/creatActivity?id=10399?utm_source=csdn_ai_ada_redpacket 蓝桥杯备赛指南分享: https://activity.csdn.net/creatActivity?id=10317?utm_source=csdn_ai_ada_redpacket 有哪些工具软件是一旦用了就离不开的?: https://activity.csdn.net/creatActivity?id=10397?utm_source=csdn_ai_ada_redpacket 量子计算:下一个大风口,还是一个热炒概念?: https://activity.csdn.net/creatActivity?id=10395?utm_source=csdn_ai_ada_redpacket