一、小文件管理 之指定分区数

1、配置 spark.sql.shuffle.partitions,适用场景spark.sql()合并分区

spark.conf.set("spark.sql.shuffle.partitions", 5) #后面的数字是你希望的分区数

这样配置后,通过spark.sql()执行后写出的数据分区数就是你要求的个数,如这里5。

2、配置 coalesce(n),适用场景spark写出数据到指定路径下合并分区,不会引起shuffle

df = spark.sql(sql_string).coalesce(1) #合并分区数
df.write.format("csv")
.mode("overwrite")
.option("sep", ",")
.option("header", True)
.save(hdfs_path)

3、配置repartition(n), 重新分区,会引发shuffle

df = spark.sql(sql_string).repartition(1) #重新分区,会引发全局shuffle
df.write.format("csv")
.mode("overwrite")
.option("sep", ",")
.option("header", True)
.save(hdfs_path)

二、文件的并行读取和写出

1、并行写出之 partitionBy() 指定分区列  , 会根据分区列创建子文件夹,并行写出数据

df.write.mode("overwrite")
.partitionBy("day")
.save("/tmp/partitioned-files.parquet")

2、并行写出之 repartition() ,一般spark中有几个分区就会有几个并行的IO写出

df.repartition(5)
.write.format("csv")
.save("/tmp/multiple.csv")

3、分桶写出,好处是后续读入的时候数据就不会做shuffle了,因为相同分桶的数据会被划分到同一个物理分区中

csvFile.write.format("parquet")
.mode("overwrite")
.bucketBy(5, "gmv") #第一个参数:分成几个桶,第二个参数:按哪列进行分桶
.saveAsTable("bucketedFiles")

三、Spark IO

参考: Spark官方API文档Input and Output

1、DataFrame调用.read/write.format() API文件写入-覆盖和追加

(1)spark.read.format() 对文件进行读取的通用代码格式

spark.read.format('csv') #说明文件格式是CSV还是JSON还是TEXT等
.option("mode", "FAILFAST") #读取遇到格式错误时怎么处理
.option("inferSchema", "true") #option中("Key","value") 形式配置参数
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
#读取遇到格式错误时处理选项参数:
permissive :将损坏的数据记录成null, 并将损坏的列命名为 _corrupt_record 标记出来
dropMalformed :删掉损坏的数据
failFast:报错失败

(2)df.write.format() 将DataFrame中数据写入到文件的通用代码格式

#抽象的DF写出代码格式
DataFrameWrite.format(...)
.option(...)
.partitionBy(...)
.bucketBy(...)
.sortBy(...)
.save()
#一个具体的DF写出的代码格式
df.write.format("csv")
.option("mode", "OVERWRITE") #覆盖还是追加数据
.option("sep",",") #分隔符
.option("header",true) #第一行是否是列名
.option("inferSchema",true) #是否自动推断列类型
.option("dateFormat", "yyyy-MM-dd") #日期数据的格式
.option("timestampFormat","yyyy-MMdd’T’HH:mm​:ss.SSSZZ") #时间戳数据的格式
.option("nullValue","null") #数据中的null值用什么表示,默认是“”,也可以设置成NA或NULL
.option("nanValue","unknown") #数据中的NaN或缺失值用什么表示,默认是"NaN",也可以设置成其他
.option("positiveInf","Inf") #正无穷怎么表示
.option("negativeInf","-Inf") #负无穷怎么表示
.option("compression","gzip") #压缩方式,uncompressed、snappy、lz4等
.option("path", "path/to/file(s)") #保存地址
.save()
写出 mode可选参数有:
append 在写出路径下追加数据文件
overwrite 覆盖写出路径下的所有文件
errorIfExists 如果在写出路径下已经有数据了,则报错误并失败掉任务
ignore 如果在写出路径下有数据或文件,不做任何处理,即忽略这个数据的写出,直接跳过

(3)spark.read.format()、df.write.format() 的应用举例

#DataFrame调用write.format API
#spark读取
df = spark.read.format('json').load('python/test_support/sql/people.json')
#df覆盖写入,mode("overwrite")
df.write.format("csv").mode("overwrite").option("sep", ",").option("header", True).save(hdfs_path)
#df追加写入,mode("append")
df.write.format("csv").mode("append").option("sep", ",").option("header", True).save(hdfs_path)

2、DataFrame调用 .write.csv(/parquet/orc) API读取写出文件

(1)spark.read.{文件类型} 读取文件的通用代码格式

#1、spark.read.csv()
spark.read.csv(path='foo.csv', mode='failfast' ,header=True).show()
#2、spark.read.parquet()
df.write.parquet('bar.parquet')
#3、spark.read.orc()
spark.read.orc('zoo.orc').show()

(2)df.write.{文件类型}  将DataFrame中数据写入到文件的通用代码格式

#1、df读写csv
df.write.csv('foo.csv', header=True)
#2、df读写parquet
df.write.parquet('bar.parquet')
#3、df读写ORC
df.write.orc('zoo.orc')
#4、覆盖写入
df.write.csv(path, mode='overwrite',sep=',',header=True)
#5、追加写入
df.write.csv(path, mode='append',sep=',',header=True)

注意: 不管是用format()的方式,把文件类型写在format的里面作为参数,还是用调用文件类型的读写,它们在option配置选项上是通用的。

3、textFile读写

(1)spark.read.textFile() 和 spark.read.wholeTextFile()

#spark.read.textFile() 会忽略读入文件的分区
spark.read.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()
#spark.read.text() 会保留读入文件的分区
如果有一个目录,下面按日期分了多个子目录,每个目录下存放着日期当天的一些数据,
是否用spark.read.text()读取目录就能获取日期分区呢?

(2)df.write.text()   写出只能有一列

  当写出的不是一列的时候,否则会报错。

df.select("A_COLUMN_NAME").write.text("/tmp/simple-text-file.txt")

  但如果其他列作为分区的话,是可以选择多列的。但作为分区的列是作为分区的子路径文件名存在的,并不是输出的多列,输出还是只有一列。

#指定一个分区列,文件中输出的还是只有一列
df.select("A_COLUMN_NAME", "count")\
.write.partitionBy("count").text("/tmp/five-csv-files2py.csv")

如果先用concat_ws(',',col1,col2)  as col 把要输出的列进行了连接,然后再按照要分区的列(比如日期)来目录写出,岂不是很实用?

#链接要输出的列,并按照日期列分区写出
df.select("concat_ws(',',col1,col2) as col", "day")
.write.partitionBy("day")
.text("/tmp/five-csv-files2py.csv")
一、小文件治理 之合并分区数1、配置spark.sql.shuffle.partitions,适用场景spark.sql()合并分区spark.conf.set("spark.sql.shuffle.partitions", 5) #后面的数字是你希望的分区数这样配置后,通过spark.sql()执行后写出的数据分区数就是你要求的个数,如这里5。2、配置coalesce(n),适用场景spark写出数据到指定路径下合并分区df = spark.sql(sql_string).co..
本文来自dongkelun,讲各种情况下的sc.defaultParallelism,defaultMinPartitions,各种情况下创建以及转化。熟悉Spark分区对于Spark性能调优很重要,本文总结Spark通过各种函创建RDD、DataFrame时默认的分区,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block量有关,还有很坑的某些情况的默认分区为1。如果分区少,那么并行执行的task就少,特别情况下,分区为1,即使你分配的Executor很多,而实际执行的Executor只有1个,如果
旁边的实习生又一脸懵逼了:Spark有bug,明明我本地/data目录下有test.txt文件,但运行就报错: Caused by: java.io.FileNotFoundException: File file:/data/test.txt does not exist 我一看,原来小伙子使用spark集群模式来读取仅仅在他自己的客户端存放的一个文本文件 如何读取本地文件 Spark ... dataset.write.mode("append").format("jdbc") .option("driver", "oracle.jdbc.driver.OracleDriver")) .option("url", "xxx") .option("dbtable", "tablen
Spark读取单路径及多路径下的文件 1.1 sparkContext方式读取文件 spark.sparkContext.textFile方法返回一个rdd。 1.1.1 单路径读取 val rdd=spark.sparkContext.textFile(path) 1.1.2 多路径读取 1) 方式01 val rdd=spark.sparkContext.textFile("D:\\dat...
由于Hive不在本地,操作略显麻烦。不过细心一点,分析错误,也还好,如果你搭建的hadoop是HA,需要多注意: 这里指出一个错误,如果你报了同类错误,可以参考:https://georgedage.blog.csdn.net/article/details/103086882 读取Hive中的据加载成DataFrame/DataSet HiveContext是SQLContext的子类,...
我正在使用spark df write写入oracle表- 写入据时,底层的oracle表结构将通过spark进行更改 df.write.mode(SaveMode.Overwrite).jdbc(targetJdbcUrl, targetTable, targetProps) source_desc varchar(200)会变成 source_desc varchar(255) 改为mode(SaveMode.Append)就能解决了,truncate通过自己jdbc去前置把。
spark.read.textFile()读取.tar.gz文件据问题 从官网的描述中spark.textFile方法是可以读取压缩文件.tar.gz. 当我测试的时候发现文件从一个文件读取到另外一个文件的时候,spark会在值中加入“文件名 000 ustar root root”放到下个读取文件的第一个行。 我想问一下是我写的有问题还是本来就是这样?如果是如何避免。
spark.read.textFile和sc.textFile的区别 val rdd1 = spark.read.textFile("hdfs://han02:9000/words.txt")   //读取到的是一个RDD对象 val rdd2 = sc.textFile("hdfs://han02:9000/words.txt")  //读取到的是一个Dataset的据集 分别进行单...
).toDF("col2","col1") df.write.mode(SaveMode.Append).insertInto("tb_demo_2") // spark并不看你df的列名,而是根据位置顺序传入. 所以9999 被插入到了tb_demo_2的col1中 // partitionBy 是配合 saveAsTable 创建分区表用的. 不能和 insertInt...
当配置文件spark-default.conf中没有显示的配置,则按照如下规则取值: 1、本地模式(不会启动executor,由SparkSubmit进程生成指定量的线程来并发):     spark-shell       spark.default.parallelism = 1 from pyspark.sql import SparkSession spark = SparkSession.builder.appName("CSV Reader").getOrCreate() df = spark.read.format("csv").option("header", "true").load("data.csv") df.show() 这将创建一个名为“df”的DataFrame对象,并将其显示在控制台上。 2. 导出CSV文件 我们可以使用以下代码将DataFrame对象导出为CSV文件: ```python df.write.format("csv").option("header", "true").save("output.csv") 这将在当前目录下创建一个名为“output.csv”的文件,并将DataFrame对象写入其中。 以上就是Spark SQL读写CSV文件的简单案例。
row_obj = row.asDict() row_obj['user_id'] = row.user_id row_obj['first_cate_cd'] = row.first_cate_cd row_obj['first_cate_name'] = row.first_cate_name if row.first_cate_cd not in cate_code_dict.keys(): row_obj['first_cate_cd'] = first_cate_name cate_key = (row_obj['user_id'], row_obj['first_cate_cd']) if cate_key not in res_cate_dict.keys(): row_obj['cate_key'] = 1 new_row = Row(**row_obj) result.append(new_row) return iter(result) 理解LSTM模型 Fu11size: 大佬牛的,明白多了