private static String datasource = ""; //自己需要连接的mongo库的集合 private static String targetTable = ""; SparkSession sparkSession = SparkSession.builder() .appName("hive-mongo") .master("local[*]") .enableHiveSupport() .config("spark.sql.hive.convertMetastoreOrc", false) .config("spark.sql.legacy.hive.tmpTable", true) .config("convertMetastoreParquet", false) .config("spark.sql.debug.maxToStringFields", 1000) .config("spark.mongodb.output.uri", datasource) .config("spark.mongodb.output.collection", targetTable) .getOrCreate(); String sql = "select *from user "; Dataset dataset = sparkSession.sql(sql);

2、将已有的列多列合并为一列,并删除之前的列

    //将已有的列(cloumn)多列合并为一个列;并删除之前的列
    Dataset<Row>getDataset = dataset.withColumn("newFLag",concat_ws(String.valueOf(','),
            dataset.col("name"),dataset.col("sex"))).drop("name","sex");

3、将已有的列多列合并为一列,并用特定字符连接

    //将已有的列(cloumn)多列合并为一个列.并用“_”拼接
    Dataset<Row>upselect = dataset.withColumn("newFLag",concat_ws(String.valueOf('_'),
            dataset.col("name"),dataset.col("sex")));

4、将已有的列多列合并为一列,用数组表示

    //将已有的列(column)多列合并为一个列,用数组表示
    Dataset<Row>getUpselect = dataset.withColumn("newFlags", split(col("newFlag"), ",")).drop("newFlag");

5、将已有的列多列合并为一列,并计算合并的值

    //将已存在的列合并为一个列,并计算合并的值
    Dataset<Row>getSelectNum = dataset.withColumn("newFlagNum", dataset.col("A").plus("B").cast(DataTypes.IntegerType));

6、获取已存在的某一列的值的长度,并将获取的长度新增一列

//获取已存在的某一列的值的长度,并将获取的长度新增一列
     UDF1 lengt = new UDF1<String, Integer>() {
        @Override
        public Integer call(String s) throws Exception {
            return s.getBytes().length;
    Dataset<Row> getselectLength;
        try {
            getselectLength = dataset.withColumn("bytesLength", (Column) lengt.call(dataset.col("newFlags")));
        } catch (Exception e) {
            e.printStackTrace();
2022-02-09 03:14:01 INFO Error in query:
2022-02-09 03:14:01 INFOCREATE TEMPORARY TABLE is not supported yet. Please use CREATE TEMPORARY VIEW as an alternative.(line 3, pos 4)
2022-02-09 03:14:01 INFO
2022-02-09 03:14:01 INFO == SQL ==
2022-02-09..
  版本说明:Spark-2.3.0
使用Spark SQL在对数据进行处理的过程,可能会遇到对一列数据拆分为多列,或者把多列数据合并一列。这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法。
1 DataFrame列数据的合并
例如:我们有如下数据,想要将三列数据合并一列,并以“,”分割
+----+---+----...
from pyspark.sql.functions import collect_list
# 假设你有一个名为df的DataFrame,并且想要将其列"my_column"的值提取为一个列表
list_col = df.groupBy().agg(collect_list("my_column")).collect()[0][0]
在这个例子,我们首先使用`groupBy()`函数将DataFrame的所有行分组为一组。接着,我们使用`agg()`函数将`collect_list("my_column")`应用于每个组。这将返回一个包含所有分组的列表的DataFrame。最后,我们使用`collect()`方法将结果收集到本地驱动程序,并使用`[0][0]`获取第一个元素的第一个值,也就是包含所有值的列表。
请注意,`collect()`方法将所有结果收集到驱动程序,因此只有在结果集较小的情况下才应使用此方法。如果结果集很大,你可能需要使用其他方法来处理它们,例如将它们存储到HDFS或Amazon S3