private static String datasource = ""; //自己需要连接的mongo库的集合 private static String targetTable = ""; SparkSession sparkSession = SparkSession.builder() .appName("hive-mongo") .master("local[*]") .enableHiveSupport() .config("spark.sql.hive.convertMetastoreOrc", false) .config("spark.sql.legacy.hive.tmpTable", true) .config("convertMetastoreParquet", false) .config("spark.sql.debug.maxToStringFields", 1000) .config("spark.mongodb.output.uri", datasource) .config("spark.mongodb.output.collection", targetTable) .getOrCreate(); String sql = "select *from user "; Dataset dataset = sparkSession.sql(sql);

2、将已有的列多列合并为一列,并删除之前的列

    //将已有的列(cloumn)多列合并为一个列;并删除之前的列
    Dataset<Row>getDataset = dataset.withColumn("newFLag",concat_ws(String.valueOf(','),
            dataset.col("name"),dataset.col("sex"))).drop("name","sex");

3、将已有的列多列合并为一列,并用特定字符连接

    //将已有的列(cloumn)多列合并为一个列.并用“_”拼接
    Dataset<Row>upselect = dataset.withColumn("newFLag",concat_ws(String.valueOf('_'),
            dataset.col("name"),dataset.col("sex")));

4、将已有的列多列合并为一列,用数组表示

    //将已有的列(column)多列合并为一个列,用数组表示
    Dataset<Row>getUpselect = dataset.withColumn("newFlags", split(col("newFlag"), ",")).drop("newFlag");

5、将已有的列多列合并为一列,并计算合并的值

    //将已存在的列合并为一个列,并计算合并的值
    Dataset<Row>getSelectNum = dataset.withColumn("newFlagNum", dataset.col("A").plus("B").cast(DataTypes.IntegerType));

6、获取已存在的某一列的值的长度,并将获取的长度新增一列

//获取已存在的某一列的值的长度,并将获取的长度新增一列
     UDF1 lengt = new UDF1<String, Integer>() {
        @Override
        public Integer call(String s) throws Exception {
            return s.getBytes().length;
    Dataset<Row> getselectLength;
        try {
            getselectLength = dataset.withColumn("bytesLength", (Column) lengt.call(dataset.col("newFlags")));
        } catch (Exception e) {
            e.printStackTrace();
  版本说明:Spark-2.3.0
使用Spark SQL在对数据进行处理的过程,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列。这里记录一下目前想到的对DataFrame数据进行合并和拆分的几种方法。
1 DataFrame数据的合并
例如:我们有如下数据,想要将三数据合并为一列,并以“,”分割
+----+---+----...
Action 操作
1、 collect() ,返回值是一个数组,返回dataframe集合所有的行
2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
3、 count() 返回一个number类型的,返回dataframe集合的行数
4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,间用逗号分隔,如
				
之前已经在博客介绍了spark的dataframe利用union 等一系方法进行拼接,详情请见Spark对Dataframe的union 、unionAll和 unionByName方法说明 但是在那篇博客也提到,利用union的这些方法,必须保证两个dataframe必须数一致(unionByName方法还需要所有名必须一致)。 那么如果如果dfA和dfB的长度不一致,应该怎么去上下拼接呢? val data2 = Seq( | ("1", null, "hlj", null), val df = Seq(("a,b,c"), ("d,e,f")).toDF("col1") val splitDF = df.select(split($"col1", ",").as("col2")) splitDF.show() 输出结果: +---------+ | col2| +---------+ |[a, b, c]| |[d, e, f]| +---------+ 2. 一列分隔多行: 可以使用`explode`函数将一列的数组拆分成多行,例如: ```scala import org.apache.spark.sql.functions._ val df = Seq((Seq("a", "b", "c")), (Seq("d", "e", "f"))).toDF("col1") val explodeDF = df.select(explode($"col1").as("col2")) explodeDF.show() 输出结果: +----+ |col2| +----+ | a| | b| | c| | d| | e| | f| +----+