0x0 Dataset转POJO
-
将查询出的结果转为RDD
-
将RDD创建为DataFrame,并传入schema参数
-
调用as方法,将Dataset转为相应的POJO Dataset
-
调用collectAsList()方法
代码如下:
1.表结构
+--------+---------+-------+
| id| string| null|
| name| string| null|
2.POJO类
public class Student {
String id;
String name;
String major;
3.转换代码
SparkSession spark = CloudUtils.getSparkSession()
// 查询原始数据
Dataset<Row> student = spark.sql("select * from `event`.`student`")
// 生成schema
List<StructField> fields = new ArrayList<>()
fields.add(DataTypes.createStructField("id", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true))
fields.add(DataTypes.createStructField("major", DataTypes.StringType, true))
StructType schema = DataTypes.createStructType(fields)
// 转换查询结果为POJO List
List<Student> students = spark.createDataFrame(student.toJavaRDD(), schema)
.as(Encoders.bean(Student.class))
.collectAsList()
System.out.println(students)
注意:
Dataset中的日期类型为timestamp和java中的Date类型不兼容,和Timestamp类型相互兼容。
为了解决上述问题,我们可以先将Dataset转为JSON,然后将JSON转为POJO,代码如下:
// 查出数据并转为json集合
List<String> jsonList = spark.sql("select * from `event`.`user`")
.toJSON()
.collectAsList()
// 将json转为pojo,这里使用的是FastJSON
List<User> users = jsonList.stream()
.map(jsonString -> JSON.parseObject(jsonString, User.class))
.collect(Collectors.toList())
System.out.println(users)
0x1 POJO转Dataset
1.表结构
+---------+---------+-------+
| user_id | string| null|
|user_name| string| null|
2.POJO类
public class User{
String userId;
String userName;
Integer userAge;
转换代码:
List<User> users = createUsers();
Dataset<Row> ds = spark.createDataFrame(users, User.class);
String[] columns = ds.columns();
String[] newColumns = Arrays.stream(columns)
.map(column -> camelToUnderline(column))
.toArray(String[]::new);
ds.toDF(newColumns);
ds.show();
同样注意:
对于有些类型无法转换的情况,仍然采用json过渡,代码如下:
List<User> users = createUsers();
List<String> jsonList = users.stream()
.map(JSON::toJSONString)
.collect(Collectors.toList());
Dataset<String> jsonDataset = spark.createDataset(jsonList, Encoders.STRING());
Dataset<Row> ds = spark.read().json(jsonDataset.toJavaRDD());
ds.show();
输出结果:
+------------+---+----+
|689875200000| 1| AAA|
0x0 Dataset转POJO方法:将查询出的结果转为RDD将RDD创建为DataFrame,并传入schema参数调用as方法,将Dataset转为相应的POJO Dataset调用collectAsList()方法代码如下:1.表结构+--------+---------+-------+|col_name|data_type|comment|+...
爱因斯坦曾经说过:"每件事物都应该尽可能简单,而不是更简单"。的确,对科学真理的追求都是为了简化理论的根本假设,这样我们才能处理真正麻烦的问题。企业级软件的开发也是这样的。 简化企业级软件开发的关键是提供一个隐藏了复杂性(例如事务、安全性和永续性)的应用框架。良好设计的框架组件可以提升代码的重复使用(reuse)能力,提高开发效率,从而得到更好的软件质量。但是,目前J2EE1.4中的EJB2.1框架组件被人们普遍认为是设计较差的和过于复杂的。Java开发者对EJB2.1很不满,他们已经试验了多种其它的用于中间件服务传送的方法。最引人注目的,下面两个框架组件已经引起开发者的巨大兴趣和积极的反映
最近做WInfrom项目,对表格和控件的数据绑定非常喜欢用实体类对象来解决,但是绑定以后 又怎么从控件中拿到实体类或者转换为datatable 或者dataset呢
经过在网上的搜索以及自己的改进 完成了一个转换类,分享给大家。
public class ModelHandlerA
public class ModelHandler<T> where ...
val rdd: RDD[Person] = sc.parallelize(Array(
Person("fanghailiang", 29),
Person("sunyu", 28),
Person
SparkSQL是一个用于处理结构化数据的spark组件,主要强调的是“结构化”,让开发者少写代码、降低开发成本、提升数据分析执行效率、shark是SparkSQl的前身。
DataFrame
对比RDD:
DataFrame常见创建方式:
SparkSQL读写数据
1、与RDD交互
2、读写本地文件
3、读写parquet
4、读写json
5、读写mysql
生成Dataset<User>并转化为Dataset<Row>,其中(User为自己写的用户类)
其实就一句…
Dataset<Row> dataset1 = dataset.toDF();
起始原因是用spark做机器学习,但训练集是.csv文件,读取的时候读取的是Dataset<Row>,但在调用保存的模型来预测...
JSON转DataFrame
在日常使用Spark处理数据时, 半结构化的JSON数据(JSONObject, JSONArray)可能是最常见的一种数据结构,那么能熟练并快速地处理JSON格式数据应该是数据开发人员的必备技能。
接下来我们就看看该如何将各种格式的JSON数据转成DataFrame。
1. 读取JSON文件
读取指定路径下的json文件(或者存放json...
VO(Value Object)是值对象,用于封装一些简单的数据,通常不包含业务逻辑。
DO(Data Object)是数据对象,用于封装数据库中的数据,通常与数据库表一一对应。
DTO(Data Transfer Object)是数据传输对象,用于在不同层之间传输数据,通常包含多个实体类的属性。
POJO(Plain Old Java Object)是一个简单的Java对象,通常不包含业务逻辑和特殊的API,用于表示某个具体实体。