val sparkCfg = new SparkConf().set("spark.driver.maxResultSize", "0")
sparkCfg.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder()
.appName(appCfg.getString("appName"))
.master(appCfg.getString("master"))
.config(sparkCfg).getOrCreate();
val csvDataset = sparkSession.sqlContext.read
.option("header", true).option("multiLine", "true").option("quote", "\"").option("escape", "\"")
.csv(path)
val resultRDD = csvDataset.rdd.map(row=>{
//业务处理
resultRDD .saveAsTextFile(srcPath)
val src = new Path(srcPath)
val dst = new Path(srcPath+".out")
val config = new Configuration
FileUtil.copyMerge(src.getFileSystem(config), src, dst.getFileSystem(config), dst, true, config, null)
scala
-
csv
生
成
.sbt
libraryDependencies + = " com.github.tototoshi " %% "
scala
-
csv
" % " 1.3.8 "
scala
> import com . github . tototoshi .
csv
. _
样本.
csv
a,b,c
d,e,f
您可以使用
CSV
Reader#open 创建
CSV
Reader 实例。
scala
> val reader =
CSV
Reader .open( new File ( " sample.
csv
" ))
读取
所有行
scala
> val reader =
CSV
Reader .open( new File ( " sample.
csv
" ))
reader : com.github.tototoshi.
csv
.
CSV
Reade
众所周知,
csv
文件
默认以逗号“,”分割数据,那么在
scala
命令行里查询的数据:
可以看见,字段里就包含了逗号“,”,那接下来切割的时候,这本应该作为一个整体的字段会以逗号“,”为界限进行切割为多个字段。
现在来看看这里的_c0字段一共有多少行记录。
记住这个数字:60351行
写
scala
代码
读取
csv
文件
并以逗号为分隔符来分割字段
val lineRDD = sc.textFile("xxxx/xxx.
csv
").map(_.split(","))
这里只
读取
了_c0一个字段,否则会报数组下标越
而不是Array(Array(),Array(),Array())这样的格式,让问题瞬间有了一点点难度.
--------------------------------------------------------------------------------------------------------------------------------------------------
import java.util.Properties
import org.apache.
spark
.sql.types.{DoubleType, IntegerType, LongType, StructType}
import org.apache.
spark
.sql.{DataFrame,
Spark
Session}
object _03
Spark
SQLSourceTest {
def main(args: Array[Str
前言:我的依赖
文件
和hive-site.xml
文件
在这篇文章末尾,仅供参考,这里就不贴了。
Spark
SQL抽取Mysql全量数据到Hive动态分区表中
配置好相关依赖,然后将集群中的hive-site.xml
文件
复制一份放在项目中的resources目录下。
import org.apache.
spark
.rdd.RDD
import org.apache.
spark
.sql.
Spark
Session
import
scala
.util.matching.Regex
object A_my_rus
import org.apache.
spark
.
Spark
Conf
import org.apache.
spark
.sql.
Spark
Session
import org.apache.
spark
.sql.functions._
object Dataframe_demo_01 {
case class student11(name: String, course: String, score: String)
def main(args: Array[String]): Unit = {
/** * 通过
CSV
文件
,及其预
处理
的字段类型字符串,
输出
insert语句 */
class
csv
2InsertSQLServer {
/** * @param CLASS_PATH 输入/
输出
路径
* @param inFilename 输入
文件
名
* @param tableN...
scala
输出
方法
print(),println(),printf()都在对象Predef中定义,该对象默认情况下被所有
Scala
程序使用,因此可以直接使用Predef对象提供的方法,无需使用
scala
.predef.的形式
Printf():方法
object
scala
O {
def main(array: Array[String]): Unit={
var i = 20
var f = 50.05
printf("My name is %s.I a
spark
2.x
读取
csv
文件
,写入到
文件
系统中(例如hdfs s3 或者本地)
println("day:::"+day)
ss.read.format("
csv
").option("header", "true").option("delimiter", "\t").option("mode", "DROPMALFORMED").
csv
(s"D://mcd-user-$day.txt").cr...
spark
读取
csv
文件
——
scala
下面介绍如何通过
scala
读取
csv
文件
读取
的过程是首先按照普通额文本
文件
进行
读取
,然后通过open
csv
的jar包进行转换,通过对每行进行
读取
,生
成
string数组。
好,下面上货。
import java.io.StringReader
import au.com.bytecode.open
csv
.
CSV
Reader
object CC{
.setMaster("local[*]")
val sc = new
Spark
Context(conf)
val sqlContext = new SQLContext(sc)
val arr = ArrayBuff...