一、小文件管理 之指定分区数
1、配置 spark.sql.shuffle.partitions,适用场景spark.sql()合并分区
spark.conf.set("spark.sql.shuffle.partitions", 5) #后面的数字是你希望的分区数
这样配置后,通过spark.sql()执行后写出的数据分区数就是你要求的个数,如这里5。
2、配置 coalesce(n),适用场景spark写出数据到指定路径下合并分区,不会引起shuffle
df = spark.sql(sql_string).coalesce(1) #合并分区数
df.write.format("csv")
.mode("overwrite")
.option("sep", ",")
.option("header", True)
.save(hdfs_path)
3、配置repartition(n), 重新分区,会引发shuffle
df = spark.sql(sql_string).repartition(1) #重新分区,会引发全局shuffle
df.write.format("csv")
.mode("overwrite")
.option("sep", ",")
.option("header", True)
.save(hdfs_path)
二、文件的并行读取和写出
1、并行写出之 partitionBy() 指定分区列 , 会根据分区列创建子文件夹,并行写出数据
df.write.mode("overwrite")
.partitionBy("day")
.save("/tmp/partitioned-files.parquet")
2、并行写出之 repartition() ,一般spark中有几个分区就会有几个并行的IO写出
df.repartition(5)
.write.format("csv")
.save("/tmp/multiple.csv")
3、分桶写出,好处是后续读入的时候数据就不会做shuffle了,因为相同分桶的数据会被划分到同一个物理分区中
csvFile.write.format("parquet")
.mode("overwrite")
.bucketBy(5, "gmv") #第一个参数:分成几个桶,第二个参数:按哪列进行分桶
.saveAsTable("bucketedFiles")
三、Spark IO
参考:
Spark官方API文档Input and Output
1、DataFrame调用.read/write.format() API文件写入-覆盖和追加
(1)spark.read.format() 对文件进行读取的通用代码格式
spark.read.format('csv') #说明文件格式是CSV还是JSON还是TEXT等
.option("mode", "FAILFAST") #读取遇到格式错误时怎么处理
.option("inferSchema", "true") #option中("Key","value") 形式配置参数
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
#读取遇到格式错误时处理选项参数:
permissive :将损坏的数据记录成null, 并将损坏的列命名为 _corrupt_record 标记出来
dropMalformed :删掉损坏的数据
failFast:报错失败
(2)df.write.format() 将DataFrame中数据写入到文件的通用代码格式
#抽象的DF写出代码格式
DataFrameWrite.format(...)
.option(...)
.partitionBy(...)
.bucketBy(...)
.sortBy(...)
.save()
#一个具体的DF写出的代码格式
df.write.format("csv")
.option("mode", "OVERWRITE") #覆盖还是追加数据
.option("sep",",") #分隔符
.option("header",true) #第一行是否是列名
.option("inferSchema",true) #是否自动推断列类型
.option("dateFormat", "yyyy-MM-dd") #日期数据的格式
.option("timestampFormat","yyyy-MMdd’T’HH:mm:ss.SSSZZ") #时间戳数据的格式
.option("nullValue","null") #数据中的null值用什么表示,默认是“”,也可以设置成NA或NULL
.option("nanValue","unknown") #数据中的NaN或缺失值用什么表示,默认是"NaN",也可以设置成其他
.option("positiveInf","Inf") #正无穷怎么表示
.option("negativeInf","-Inf") #负无穷怎么表示
.option("compression","gzip") #压缩方式,uncompressed、snappy、lz4等
.option("path", "path/to/file(s)") #保存地址
.save()
写出 mode可选参数有:
append 在写出路径下追加数据文件
overwrite 覆盖写出路径下的所有文件
errorIfExists 如果在写出路径下已经有数据了,则报错误并失败掉任务
ignore 如果在写出路径下有数据或文件,不做任何处理,即忽略这个数据的写出,直接跳过
(3)spark.read.format()、df.write.format() 的应用举例
#DataFrame调用write.format API
#spark读取
df = spark.read.format('json').load('python/test_support/sql/people.json')
#df覆盖写入,mode("overwrite")
df.write.format("csv").mode("overwrite").option("sep", ",").option("header", True).save(hdfs_path)
#df追加写入,mode("append")
df.write.format("csv").mode("append").option("sep", ",").option("header", True).save(hdfs_path)
2、DataFrame调用 .write.csv(/parquet/orc) API读取写出文件
(1)spark.read.{文件类型} 读取文件的通用代码格式
#1、spark.read.csv()
spark.read.csv(path='foo.csv', mode='failfast' ,header=True).show()
#2、spark.read.parquet()
df.write.parquet('bar.parquet')
#3、spark.read.orc()
spark.read.orc('zoo.orc').show()
(2)df.write.{文件类型} 将DataFrame中数据写入到文件的通用代码格式
#1、df读写csv
df.write.csv('foo.csv', header=True)
#2、df读写parquet
df.write.parquet('bar.parquet')
#3、df读写ORC
df.write.orc('zoo.orc')
#4、覆盖写入
df.write.csv(path, mode='overwrite',sep=',',header=True)
#5、追加写入
df.write.csv(path, mode='append',sep=',',header=True)
注意: 不管是用format()的方式,把文件类型写在format的里面作为参数,还是用调用文件类型的读写,它们在option配置选项上是通用的。
3、textFile读写
(1)spark.read.textFile() 和 spark.read.wholeTextFile()
#spark.read.textFile() 会忽略读入文件的分区
spark.read.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()
#spark.read.text() 会保留读入文件的分区
如果有一个目录,下面按日期分了多个子目录,每个目录下存放着日期当天的一些数据,
是否用spark.read.text()读取目录就能获取日期分区呢?
(2)df.write.text() 写出只能有一列
当写出的不是一列的时候,否则会报错。
df.select("A_COLUMN_NAME").write.text("/tmp/simple-text-file.txt")
但如果其他列作为分区的话,是可以选择多列的。但作为分区的列是作为分区的子路径文件名存在的,并不是输出的多列,输出还是只有一列。
#指定一个分区列,文件中输出的还是只有一列
df.select("A_COLUMN_NAME", "count")\
.write.partitionBy("count").text("/tmp/five-csv-files2py.csv")
如果先用concat_ws(',',col1,col2) as col 把要输出的列进行了连接,然后再按照要分区的列(比如日期)来目录写出,岂不是很实用?
#链接要输出的列,并按照日期列分区写出
df.select("concat_ws(',',col1,col2) as col", "day")
.write.partitionBy("day")
.text("/tmp/five-csv-files2py.csv")
一、小文件治理 之合并分区数1、配置spark.sql.shuffle.partitions,适用场景spark.sql()合并分区spark.conf.set("spark.sql.shuffle.partitions", 5) #后面的数字是你希望的分区数这样配置后,通过spark.sql()执行后写出的数据分区数就是你要求的个数,如这里5。2、配置coalesce(n),适用场景spark写出数据到指定路径下合并分区df = spark.sql(sql_string).co..
本文来自dongkelun,讲各种情况下的sc.defaultParallelism,defaultMinPartitions,各种情况下创建以及转化。熟悉Spark的分区对于Spark性能调优很重要,本文总结Spark通过各种函数创建RDD、DataFrame时默认的分区数,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block数量有关,还有很坑的某些情况的默认分区数为1。如果分区数少,那么并行执行的task就少,特别情况下,分区数为1,即使你分配的Executor很多,而实际执行的Executor只有1个,如果数据
旁边的实习生又一脸懵逼了:Spark有bug,明明我本地/data目录下有test.txt文件,但运行就报错:
Caused by: java.io.FileNotFoundException: File file:/data/test.txt does not exist
我一看,原来小伙子使用spark集群模式来读取仅仅在他自己的客户端存放的一个文本文件
如何读取本地文件
Spark ...
dataset.write.mode("append").format("jdbc")
.option("driver", "oracle.jdbc.driver.OracleDriver"))
.option("url", "xxx")
.option("dbtable", "tablen
Spark读取单路径及多路径下的文件
1.1 sparkContext方式读取文件
spark.sparkContext.textFile方法返回一个rdd。
1.1.1 单路径读取
val rdd=spark.sparkContext.textFile(path)
1.1.2 多路径读取
1) 方式01
val rdd=spark.sparkContext.textFile("D:\\dat...
由于Hive不在本地,操作略显麻烦。不过细心一点,分析错误,也还好,如果你搭建的hadoop是HA,需要多注意:
这里指出一个错误,如果你报了同类错误,可以参考:https://georgedage.blog.csdn.net/article/details/103086882
读取Hive中的数据加载成DataFrame/DataSet
HiveContext是SQLContext的子类,...
我正在使用
spark df write
写入oracle表-
写入数据时,底层的oracle表结构将通过
spark进行更改
df.write.mode(SaveMode.Overwrite).jdbc(targetJdbcUrl, targetTable, targetProps)
source_desc varchar(200)会变成 source_desc varchar(255)
改为mode(SaveMode.Append)就能解决了,truncate通过自己jdbc去前置把。
spark.
read.
textFile()读取.tar.gz
文件数据问题
从官网的描述中
spark.
textFile方法是可以读取压缩
文件.tar.gz. 当我测试的时候发现
文件从一个
文件读取到另外一个
文件的时候,
spark会在值中加入“
文件名 000 ustar root root”放到下个读取
文件的第一个行。
我想问一下是我写的有问题还是本来就是这样?如果是如何避免。
spark.read.textFile和sc.textFile的区别
val rdd1 = spark.read.textFile("hdfs://han02:9000/words.txt") //读取到的是一个RDD对象
val rdd2 = sc.textFile("hdfs://han02:9000/words.txt") //读取到的是一个Dataset的数据集
分别进行单...
).toDF("col2","col1")
df.write.mode(SaveMode.Append).insertInto("tb_demo_2")
//
spark并不看你df的列名,而是根据位置顺序传入. 所以9999 被插入到了tb_demo_2的col1中
// partit
ionBy 是配合 saveAsTable 创建
分区表用的. 不能和 insertInt...
当配置
文件spark-default.conf中没有显示的配置,则按照如下规则取值:
1、本地模式(不会启动executor,由
SparkSubmit进程生成
指定数量的线程
数来并发):
spark-shell
spark.default.parallelism = 1
from py
spark.sql import
SparkSess
ion
spark =
SparkSess
ion.builder.appName("CSV
Reader").getOrCreate()
df =
spark.
read.format("csv").opt
ion("header", "true").load("data.csv")
df.show()
这将创建一个名为“df”的DataFrame对象,并将其显示在控制台上。
2. 导出CSV
文件
我们可以使用以下代码将DataFrame对象导出为CSV
文件:
```python
df.write.format("csv").opt
ion("header", "true").save("output.csv")
这将在当前目录下创建一个名为“output.csv”的
文件,并将DataFrame对象
写入其中。
以上就是
Spark SQL
读写CSV
文件的简单案例。
row_obj = row.asDict()
row_obj['user_id'] = row.user_id
row_obj['first_cate_cd'] = row.first_cate_cd
row_obj['first_cate_name'] = row.first_cate_name
if row.first_cate_cd not in cate_code_dict.keys():
row_obj['first_cate_cd'] = first_cate_name
cate_key = (row_obj['user_id'], row_obj['first_cate_cd'])
if cate_key not in res_cate_dict.keys():
row_obj['cate_key'] = 1
new_row = Row(**row_obj)
result.append(new_row)
return iter(result)
理解LSTM模型
Fu11size: