很久没有更新博客了,因为最近工作确实也很忙,不过忙碌的工作也让我收获了很多新的知识,趁着忙碌的间隙,来记录一下自己的成长。
这次的场景是:需要单独取出DataFrame中的某一列的所有值供Java程序使用。
下面的demo包含两个例子:
1.column value Type -> String, 即 row->String ,需要用到mkString这个方法
2.column value Type -> WrappedArray, 即Seq(String) ->String, 需要用到explode方法行转列增加一个column,即将每一个Seq中的每一个String值都取出放在新增列中生成一个row,例如
姓名
|
爱好
|
张三
|
{“唱歌”, “跳舞”}
|
李四
|
{“武术”}
|
对爱好这一列用explode函数,我们会得到:
姓名
|
爱好
|
张三
|
“唱歌”
|
张三
|
“跳舞”
|
李四
|
“武术”
|
之后就跟第一种情况一样row->String就可以了。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json4s.jackson.Serialization
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.immutable
import scala.util.matching.Regex
val spark = SparkSession.builder().appName("SparkSession").getOrCreate()
val wv = "/Pods/"
val wvAttrs = Seq("Id", "VisitDate", "UserId", "PageUrl")
val attrs = Seq("Id", "INDUSTRY", "RANGE", "Name", "Country", "State", "City")
val PTNS = Seq(("Home Page", ".*baidu.com/.*".r), ("Login Page", ".*baidu.com/products.*".r))
val wvDf = spark.read.format("avro").load(wv)
def matchPtnFn(url: String): Seq[String] = {
val tmp = url
PTNS.map(t => t._2.findFirstMatchIn(tmp) match {
case Some(_) => t._1
case None => null
case _ => null
}).filter(_ != null)
val ptnMatchUdf = udf[Seq[String], String](matchPtnFn)
val cwv = wvDf.select(wvAttrs.map(col):_*).withColumn("UrlCategories", ptnMatchUdf(col("PageUrl")))
val filterParams = Map("<PAGE_FILTER>"->"UrlCategories","<INDUSTRY_FILTER>"->"INDUSTRY")
val filterList = cwv.withColumn("UrlCategory", explode(col("UrlCategories")))
val filterMap = immutable.Map(filterParams.map {
case (placeHolderName, columnName) =>
if (placeHolderName == "<PAGE_FILTER>") {
val filterValueList = filterList.select("UrlCategory").filter(col("UrlCategory").isNotNull).groupBy("UrlCategory")
.agg(count("*").alias("num")).sort(desc("num")).limit(3).select("UrlCategory").collect().toList
(placeHolderName, filterValueList.map(row => row.mkString).toSet)
} else {
val filterValueList = cwv.select(columnName).filter(col(columnName).isNotNull).groupBy(columnName)
.agg(count("*").alias("num")).sort(desc("num")).limit(3).select(columnName).collect().toList
(placeHolderName, filterValueList.map(row => row.mkString).toSet)
}.toSeq:_*)
val list =Serialization.write(filterMap)(org.json4s.DefaultFormats)
还是用的不是很熟练,所以做起来的时候磕磕绊绊的,好在是顺利解决问题,mark一下,留待以后翻阅,也欢迎小伙伴们的交流~
很久没有更新博客了,因为最近工作确实也很忙,不过忙碌的工作也让我收获了很多新的知识,趁着忙碌的间隙,来记录一下自己的成长。这次的场景是:需要单独取出DataFrame中的某一列的所有值供Java程序使用。下面的demo包含两个例子:1.column value Type -> String, 即 row->String ,需要用到mkString这个方法2.column value Type -> WrappedArray, 即Seq(String) ->String, 需要
playerIds =salaries_2016['playerID'].tolist()
data[‘列名’].tolist()
以上这篇DataFrame 将某列数据转为数组的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持软件开发网。
您可能感兴趣的文章:python读取文本中数据并转化为DataFrame的实例pandas修改DataFrame列名的方法pandas系列之DataFrame 行列数据筛选实例Python将DataFrame的某一列作为index的方法python DataFram
import org.apache.spark.sql.types.{StructType,StructField,StringType, IntegerType, LongType}
import java.util.ArrayList
import org.apache.spark.sql._
val dataList = new util.ArrayList[Row]()
dataList.add(Row("ming",20,15552211521L))
df.loc[df['columnName']=='the value']
以上这篇根据DataFrame某一列的值来选择具体的某一行方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持软件开发网。
您可能感兴趣的文章:pandas系列之DataFrame 行列数据筛选实例pandas.DataFrame删除/选取含有特定数值的行或列实例python中p
在贴代码之前先介绍一下DataFrame与DataSet,以下介绍内容来自以下博客:https://www.cnblogs.com/seaspring/p/5831677.html
DataFrame
DataFrame是一个分布式集合,其中数据逻辑存储结构为有名字的列。它概念上等价于关系数据库中的表,一个列名对应很多列值,但底层做了更多的优化。DataFrame可以从很多数据源构建,比如:已...
SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为parquet。
1 加载数据
spark.read.load 是加载数据的通用方法
scala> spark.read.
csv format jdbc json