最近遇到需求要从hive表中读取数据,并且要遍历每一行的数据,网上找了很多资料都没有解释的很清晰的,这边记录一下。
一、主要思路
1. 首先spark读取hive表,得到DataFrame。
如果直接对spark的dataframe进行遍历的话,需要进行collect操作,这对性能消耗是非常大的,一般不建议直接对dataframe进行collect操作。
2. 将DataFrame转为RDD。
转为RDD是方便进行map等算子操作,这个时候可以进行遍历操作。
3. 如果有需要将遍历的结果再次写入hive表,则可以将RDD转为DataFrame。
二、代码实现
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object GetInfo {
case class tableInfo(aaa:String, bbb:String, ccc:String, ddd:String, eee:String)
case class Address(s1:String, s2:String, s3:String, s4:String, s5:String, s6:String)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("GetInfo")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
//1. 读取hive表字段, 将DataFrame转为RDD
val dataRDD = spark.sql(s"""
select aaa, bbb, ccc, ddd, eee
from table_xxxx
""").as[tableInfo].rdd
//2. 遍历RDD中的每一行
val resultDF = dataRdd.map(item=>{
val s1= item.aaa
val s2= item.ddd
val s3= item.eee
val s4= item.aaa.substring(0, 3)
val s5= item.aaa.substring(3, 5)
val s6= item.ccc
Address(s1,s2,s3,s4,s5,s6)
}).toDF()
//3. 将新的结果写入hive表
resultDF.createTempView("tmp_table")
spark.sql(
"insert overwrite table table_xxxxx select * from tmp_table")
简单对上面三种方法进行说明:
iterrows(): 按行遍历,将DataFrame的每一行迭代为(index, Series)对,可以通过row[name]对元素进行访问。
itertuples(): 按行遍历,将DataFrame的每一行迭代为元祖,可以通过row[name]对元素进行访问,比iterrows()效率高。
iteritems():按列遍历,将DataFrame的每一列迭代为(列名, Series)对,可以通过row[index]对元素进行访问。
示例数据
import pandas as pd
inp = [{'c1':10, '
DataFrame既有行索引也有列索引,它可以被看做由Series组成的字典(共同用一个索引)pd.set_option('display.colheader_justify', 'left') # 设置列标题靠左。它由一组数据(各种Numpy数据类型)以及一组与之相关的数据标签(即索引)组成。pd.set_option('display.max_columns', None) # 设置列数为无限制。pd.set_option('display.max_rows', None) # 设置行数为无限制。
#coding:utf-8
from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext
import datetime
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# 初始化
conf=SparkConf().setAppName("
def customFunction(row):
return (row.name, row.age, row.city)
sample2 = sample.rdd.map(customFunction)
自定义一个处理函数,并通过rdd的map函数来处理dataframe的每一行数据。
使用collect()函数
若数据量比较小,可以把数据收集到driver本地,然后再用常规的方法来遍历数据。
for row
SparkSession中的DataFrame类似于一张关系型数据表。在关系型数据库中对单表或进行的查询操作,在DataFrame中都可以通过调用其API接口来实现。
可以参考,Scala提供的DataFrame API。本文中的代码基于Spark-2.2的文档实现。
一、DataFrame对象的生成
Spark-SQL可以以其他RDD对象、parquet文件、json文件、Hive表,以及通过JDBC连接到其他关系型数据库作为数据源来生成DataFrame对象。本文将以MySQL数据库为数据源.
数据采集链接检查数据 首先检查“ex1data1”文件中的数据。“txt”在“我的存储库”的“数据”目录中。首先导入一些库。import osimport numpy as npimport pandas as pdimport matplotlib.pyplot as plt%matplotlib inline现在开始运行,使用Pandas把数据加载到数据帧里,并且使用“head”函数显示前几行...
def main(args: Array[String]) = {
val spark = SparkSession.builder().appName("p2") //.master("local")
.enableHiveSupport().getOrCreate()
import spark.implicits._
val nowdate = Loca...
最近遇到一个小问题,我要用pyspark实现数据表的行列遍历,在python里很容易实现,但是用pyspark没实现过,遇到一点小问题,但摸索了一会也实现了,记录如下:
下表(data1)为某数据大宽表(data2)各列缺失数据的处理方法配置表,其中COLUMN_NAME为数据大宽表的特征名称,NULL_PROCESS_METHON为各特征列缺失数据的处置办法,假设处理方式共有4种:drop、zero、mean、other。
遍历配置表(data1)的COLUMN_NAME,获取相应的缺失值处
jdk-logging
log4j :其中commons-logging是规范,log4j是commons-logging的实现
要实现log4j首先需要导入两个依赖:log4j和commons-logging
<dependency>
<groupId>log4j</g
文章目录一、递归遍历HDFS并筛选文件1-1、对于本地文件系统1-2、对于HDFS文件系统二、Spark集群模式记录自己的调试日志
一、递归遍历HDFS并筛选文件
1-1、对于本地文件系统
public static boolean logFilter(Path path){
return path.toString().toLowerCase().endsWith(".l...
pandas中遍历dataframe的每一个元素
假如有一个需求场景需要遍历一个csv或excel中的每一个元素,判断这个元素是否含有某个关键字
那么可以用python的pandas库来实现。
pandas的dataframe有一个很好用的函数applymap,它可以把某个函数应用到dataframe的每一个元素上,而且比常规的for循环去遍历每个元素要快很多。如下是相关代码:
import pandas as pd
data = [[str,ewt,earw],[agter,awetg,aeorgh]]
dataframe1 = pd.DataFrame(