private static String datasource = "";
//自己需要连接的mongo库的集合
private static String targetTable = "";
SparkSession sparkSession = SparkSession.builder()
.appName("hive-mongo")
.master("local[*]")
.enableHiveSupport()
.config("spark.sql.hive.convertMetastoreOrc", false)
.config("spark.sql.legacy.hive.tmpTable", true)
.config("convertMetastoreParquet", false)
.config("spark.sql.debug.maxToStringFields", 1000)
.config("spark.mongodb.output.uri", datasource)
.config("spark.mongodb.output.collection", targetTable)
.getOrCreate();
String sql = "select *from user ";
Dataset
dataset = sparkSession.sql(sql);
2、将已有的列多列合并为一列,并删除之前的列
//将已有的列(cloumn)多列合并为一个列;并删除之前的列
Dataset<Row>getDataset = dataset.withColumn("newFLag",concat_ws(String.valueOf(','),
dataset.col("name"),dataset.col("sex"))).drop("name","sex");
3、将已有的列多列合并为一列,并用特定字符连接
//将已有的列(cloumn)多列合并为一个列.并用“_”拼接
Dataset<Row>upselect = dataset.withColumn("newFLag",concat_ws(String.valueOf('_'),
dataset.col("name"),dataset.col("sex")));
4、将已有的列多列合并为一列,用数组表示
//将已有的列(column)多列合并为一个列,用数组表示
Dataset<Row>getUpselect = dataset.withColumn("newFlags", split(col("newFlag"), ",")).drop("newFlag");
5、将已有的列多列合并为一列,并计算合并的值
//将已存在的列合并为一个列,并计算合并的值
Dataset<Row>getSelectNum = dataset.withColumn("newFlagNum", dataset.col("A").plus("B").cast(DataTypes.IntegerType));
6、获取已存在的某一列的值的长度,并将获取的长度新增一列
//获取已存在的某一列的值的长度,并将获取的长度新增一列
UDF1 lengt = new UDF1<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.getBytes().length;
Dataset<Row> getselectLength;
try {
getselectLength = dataset.withColumn("bytesLength", (Column) lengt.call(dataset.col("newFlags")));
} catch (Exception e) {
e.printStackTrace();
版本说明:Spark-2.3.0
使用Spark SQL在对数据进行处理的过程中,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列。这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法。
1 DataFrame列数据的合并
例如:我们有如下数据,想要将三列数据合并为一列,并以“,”分割
+----+---+----...
Action 操作
1、 collect() ,返回值是一个数组,返回dataframe集合所有的行
2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
3、 count() 返回一个number类型的,返回dataframe集合的行数
4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如
之前已经在博客中介绍了spark的dataframe利用union 等一系列方法进行拼接,详情请见Spark中对Dataframe的union 、unionAll和 unionByName方法说明
但是在那篇博客也提到,利用union的这些方法,必须保证两个dataframe必须列数一致(unionByName方法还需要所有列名必须一致)。
那么如果如果dfA和dfB的列长度不一致,应该怎么去上下拼接呢?
val data2 = Seq(
| ("1", null, "hlj", null),
val df = Seq(("a,b,c"), ("d,e,f")).toDF("col1")
val splitDF = df.select(split($"col1", ",").as("col2"))
splitDF.show()
输出结果:
+---------+
| col2|
+---------+
|[a, b, c]|
|[d, e, f]|
+---------+
2. 一列分隔多行:
可以使用`explode`函数将一列中的数组拆分成多行,例如:
```scala
import org.apache.spark.sql.functions._
val df = Seq((Seq("a", "b", "c")), (Seq("d", "e", "f"))).toDF("col1")
val explodeDF = df.select(explode($"col1").as("col2"))
explodeDF.show()
输出结果:
+----+
|col2|
+----+
| a|
| b|
| c|
| d|
| e|
| f|
+----+