相关文章推荐
飘逸的饭卡  ·  dataframe split ...·  2 月前    · 
想出国的拐杖  ·  python dataframe ...·  2 月前    · 
健壮的皮带  ·  python DataFrame循环读取 ...·  2 月前    · 
灰常酷的口罩  ·  recursive ...·  7 月前    · 
暴走的苹果  ·  DeploymentAgentHandler ...·  1 年前    · 
玉树临风的马克杯  ·  JS ...·  2 年前    · 

很久没有更新博客了,因为最近工作确实也很忙,不过忙碌的工作也让我收获了很多新的知识,趁着忙碌的间隙,来记录一下自己的成长。

这次的场景是:需要单独取出DataFrame中的某一列的所有值供Java程序使用。

下面的demo包含两个例子:
1.column value Type -> String, 即 row->String ,需要用到mkString这个方法
2.column value Type -> WrappedArray, 即Seq(String) ->String, 需要用到explode方法行转列增加一个column,即将每一个Seq中的每一个String值都取出放在新增列中生成一个row,例如

姓名 爱好
张三 {“唱歌”, “跳舞”}
李四 {“武术”}

对爱好这一列用explode函数,我们会得到:

姓名 爱好
张三 “唱歌”
张三 “跳舞”
李四 “武术”

之后就跟第一种情况一样row->String就可以了。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json4s.jackson.Serialization
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.immutable
import scala.util.matching.Regex
val spark = SparkSession.builder().appName("SparkSession").getOrCreate()
val wv = "/Pods/" //存放avro文件的路径,我给的是hdfs上的,也可以是本地文件,就用file:///home/XXX就好
val wvAttrs = Seq("Id", "VisitDate", "UserId", "PageUrl")
val attrs = Seq("Id", "INDUSTRY", "RANGE", "Name", "Country", "State", "City")
val PTNS = Seq(("Home Page", ".*baidu.com/.*".r), ("Login Page", ".*baidu.com/products.*".r))
val wvDf = spark.read.format("avro").load(wv)
//根据pattern将match column值的PTNS.Key组成Seq
def matchPtnFn(url: String): Seq[String] = {
    val tmp = url
    PTNS.map(t => t._2.findFirstMatchIn(tmp) match {
        case Some(_) => t._1
        case None => null
        case _ => null
    }).filter(_ != null)
val ptnMatchUdf = udf[Seq[String], String](matchPtnFn)
val cwv = wvDf.select(wvAttrs.map(col):_*).withColumn("UrlCategories", ptnMatchUdf(col("PageUrl")))
val filterParams = Map("<PAGE_FILTER>"->"UrlCategories","<INDUSTRY_FILTER>"->"INDUSTRY")
val filterList = cwv.withColumn("UrlCategory", explode(col("UrlCategories")))
//filterList.show(20, false)
val filterMap = immutable.Map(filterParams.map {
      case (placeHolderName, columnName) =>
        if (placeHolderName == "<PAGE_FILTER>") {
            val filterValueList = filterList.select("UrlCategory").filter(col("UrlCategory").isNotNull).groupBy("UrlCategory")
          .agg(count("*").alias("num")).sort(desc("num")).limit(3).select("UrlCategory").collect().toList
            (placeHolderName, filterValueList.map(row => row.mkString).toSet)
        } else {
            val filterValueList = cwv.select(columnName).filter(col(columnName).isNotNull).groupBy(columnName)
          .agg(count("*").alias("num")).sort(desc("num")).limit(3).select(columnName).collect().toList//我在这里选择TOP3的值留下来
            (placeHolderName, filterValueList.map(row => row.mkString).toSet)
    }.toSeq:_*)
val list =Serialization.write(filterMap)(org.json4s.DefaultFormats)//转成java可以解析的jsonString格式

还是用的不是很熟练,所以做起来的时候磕磕绊绊的,好在是顺利解决问题,mark一下,留待以后翻阅,也欢迎小伙伴们的交流~

很久没有更新博客了,因为最近工作确实也很忙,不过忙碌的工作也让我收获了很多新的知识,趁着忙碌的间隙,来记录一下自己的成长。这次的场景是:需要单独取出DataFrame中的某一列的所有值供Java程序使用。下面的demo包含两个例子:1.column value Type -&gt; String, 即 row-&gt;String ,需要用到mkString这个方法2.column value Type -&gt; WrappedArray, 即Seq(String) -&gt;String, 需要 playerIds =salaries_2016['playerID'].tolist() data[‘列名’].tolist() 以上这篇DataFrame 将某列数据转为数组的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持软件开发网。 您可能感兴趣的文章:python读取文本数据并转化为DataFrame的实例pandas修改DataFrame列名的方法pandas系列之DataFrame 行列数据筛选实例Python将DataFrame的某一列作为index的方法python DataFram import org.apache.spark.sql.types.{StructType,StructField,StringType, IntegerType, LongType} import java.util.ArrayList import org.apache.spark.sql._ val dataList = new util.ArrayList[Row]() dataList.add(Row("ming",20,15552211521L)) df.loc[df['columnName']=='the value'] 以上这篇根据DataFrame一列来选择具体的某一行方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持软件开发网。 您可能感兴趣的文章:pandas系列之DataFrame 行列数据筛选实例pandas.DataFrame删除/选取含有特定数的行或列实例pythonp
在贴代码之前先介绍一下DataFrame与DataSet,以下介绍内容来自以下博客:https://www.cnblogs.com/seaspring/p/5831677.html DataFrame DataFrame是一个分布式集合,其数据逻辑存储结构为有名字的列。它概念上等价于关系数据库的表,一个列名对应很多列值,但底层做了更多的优化。DataFrame可以从很多数据源构建,比如:已...
SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为parquet。 1 加载数据 spark.read.load 是加载数据的通用方法 scala> spark.read. csv format jdbc json