private static String datasource = "";
//自己需要连接的mongo库的集合
private static String targetTable = "";
SparkSession sparkSession = SparkSession.builder()
.appName("hive-mongo")
.master("local[*]")
.enableHiveSupport()
.config("spark.sql.hive.convertMetastoreOrc", false)
.config("spark.sql.legacy.hive.tmpTable", true)
.config("convertMetastoreParquet", false)
.config("spark.sql.debug.maxToStringFields", 1000)
.config("spark.mongodb.output.uri", datasource)
.config("spark.mongodb.output.collection", targetTable)
.getOrCreate();
String sql = "select *from user ";
Dataset
dataset = sparkSession.sql(sql);
2、将已有的列多列合并为一列,并删除之前的列
//将已有的列(cloumn)多列合并为一个列;并删除之前的列
Dataset<Row>getDataset = dataset.withColumn("newFLag",concat_ws(String.valueOf(','),
dataset.col("name"),dataset.col("sex"))).drop("name","sex");
3、将已有的列多列合并为一列,并用特定字符连接
//将已有的列(cloumn)多列合并为一个列.并用“_”拼接
Dataset<Row>upselect = dataset.withColumn("newFLag",concat_ws(String.valueOf('_'),
dataset.col("name"),dataset.col("sex")));
4、将已有的列多列合并为一列,用数组表示
//将已有的列(column)多列合并为一个列,用数组表示
Dataset<Row>getUpselect = dataset.withColumn("newFlags", split(col("newFlag"), ",")).drop("newFlag");
5、将已有的列多列合并为一列,并计算合并的值
//将已存在的列合并为一个列,并计算合并的值
Dataset<Row>getSelectNum = dataset.withColumn("newFlagNum", dataset.col("A").plus("B").cast(DataTypes.IntegerType));
6、获取已存在的某一列的值的长度,并将获取的长度新增一列
//获取已存在的某一列的值的长度,并将获取的长度新增一列
UDF1 lengt = new UDF1<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.getBytes().length;
Dataset<Row> getselectLength;
try {
getselectLength = dataset.withColumn("bytesLength", (Column) lengt.call(dataset.col("newFlags")));
} catch (Exception e) {
e.printStackTrace();
2022-02-09 03:14:01 INFO Error in query:
2022-02-09 03:14:01 INFOCREATE TEMPORARY TABLE is not supported yet. Please use CREATE TEMPORARY VIEW as an alternative.(line 3, pos 4)
2022-02-09 03:14:01 INFO
2022-02-09 03:14:01 INFO == SQL ==
2022-02-09..
版本说明:Spark-2.3.0
使用Spark SQL在对数据进行处理的过程中,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列。这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法。
1 DataFrame列数据的合并
例如:我们有如下数据,想要将三列数据合并为一列,并以“,”分割
+----+---+----...
from pyspark.sql.functions import collect_list
# 假设你有一个名为df的DataFrame,并且想要将其列"my_column"的值提取为一个列表
list_col = df.groupBy().agg(collect_list("my_column")).collect()[0][0]
在这个例子中,我们首先使用`groupBy()`函数将DataFrame的所有行分组为一组。接着,我们使用`agg()`函数将`collect_list("my_column")`应用于每个组。这将返回一个包含所有分组的列表的DataFrame。最后,我们使用`collect()`方法将结果收集到本地驱动程序中,并使用`[0][0]`获取第一个元素的第一个值,也就是包含所有值的列表。
请注意,`collect()`方法将所有结果收集到驱动程序中,因此只有在结果集较小的情况下才应使用此方法。如果结果集很大,你可能需要使用其他方法来处理它们,例如将它们存储到HDFS或Amazon S3中。