目前我们需要将各渠道的数据进行入库然后进行分析,当前 api 返回的数据都是通过定制化代码开发来解析、建表、数据入库的。这种方式可复用性不高,因为每种渠道提供的接口数据格式是不一样的,每次都需要去修改代码,不能做到零代码通用方案入库。所以我们需要一个可以自动来根据 api 返回的 json 数据格式,来完成建表、自动入库的一种通用方案。
因为接口返回的数据基本都不是就简单的一个层级的,可能是某个 key 对应的 value 还是 JSON 数据, 一直嵌套好多层,这个时候就需要给定嵌套的这种情况如何处理,生成多少张表,数据如何批量插入,如果生成多张表,表中的数据通过什么来关联?以上问题都是需要考虑的问题,具体该如何解决,下面详细讲解一下。
落地多少张表
关于落地多少张表,当前是支持二种方案:
单张宽表的思路就比较简单,一般是最外面一层数据是我们比较想要的数据,然后里面嵌套的可能是一些维度信息,因此我们会以最外层的 key 来作为表的字段,value 如果是基本类型(数据、字符串、布尔类型)就不做处理,如果是 json 数据(JSONObject 或者 JSONArray),则将其转化成 json 字符串,按照字符串类型来进行处理。
注:因为这边业务场景是使用 flink 来进行接下来的数据处理,所以,会写一个 udf 函数,来从 json 字符串中提取任意级别的 json 数据配合着使用。
多张表处理是将同一个 jsonObject 范围内的同一级别并且 value 是基本类型的所有 key 组成一个表,然后如果 value 是 json 数据(JSONObject 或者 JSONArray),则会再生成一张表,然后会添加一个上层 json 生成表的关联字段来进行表表之间的关联,这比较符合关系型数据的第三范式,也比较适合于那种没有 json 字符串和行列转换强功能支持的数据库类型,比如 mysql。
数据如何批量插入 & 解决幂等性
批量插入就是生成批量插入的 sql 语句,但是一个接口可能会重复导入,那如何解决幂等性的问题?
这就要从我们生成表那里的逻辑,就是在生成一个表的时候,会默认添加一列,列名就叫 md5_code,用来标志这条数据的唯一性,md5_code 取值就包括该行里面的数据(排除关联键的值)后经过 md5 加密后的一个值,来标志这行数据的唯一性,如果多次导入同一批数据,则只会更新导入时间,不会新插入一条数据。
落地多张表的情况,数据如何关联
这里的关联,包括建表的时候的关联键的添加和实际插入数据时,多张表之间行与行之间关联的值的关联二种情况。
关联键的添加
在建表的时候会生成关联键,关联键的名称生成规则就是外层表名拼上一个后缀。
多表之间的行数据关联
再往里解析 json 的时候,需要把关联键和关联键的值传递进去,关联键的值传递的就是外层表当前行数据的 md5 值,这样就生成了多表之间行数据关联的依据。
如何适配多种不同的数据源
这里就是整体设计方案来解决的问题了,我们这里采取的是工厂模式,根据不同的数据库类型,来选择不同的 builder 来处理。
再选择 SqlBuilder 的时候,根据要落库的数据类型来选择即可,在 buildSql 的时候在传入你想构建大宽表还是多表,即可根据你的需求进行定制化实现。
核心的json数据解析和生成统一的数据模型是在父类的代码里做的,不同的sql构建器则是根据解析和抽象好的数据模型来自定义生成不同类型的标准sql。
上图是核心架构图,因为要支持同一个接口可以灵活入不同类型的库,所以我们这里设计思路有二大亮点:
使用工厂模式的设计模式,使得用户可以随意选择入库的数据库类型,核心逻辑根据 json 生成下面统一的数据模型是在父类 SqlBuilder 中,子类在进行 buildSql 的时候只需要先调用父类的统一逻辑,再调用自己重写的 buildCreateSql 和 buildInsertSql 方法即可。
抽象出来一个数据模型,将生成 sql 的必要的通用信息进行统一实现,不同数据库类型的 SqlBuilder 根据通用信息来根据自己的语法特征来生成对应的 sql 即可。通用信息包括:
列信息(列名称、类型(长度))
要插入的数据集合
如何来使用呢,使用方法也比较简单,如下代码所示:
SqlBuilder sqlBuilder = SqlBuilderFactory.getSqlBuilder(SinkDbTypeEnum.MYSQL);
sqlBuilder.initRawJsonData(jsonData,"shop_order_list");
sqlBuilder.buildSql(DataSinkTypeEnum.SINGLE_TABLE);
sqlBuilder.getCreateSqlList().forEach(sql -> System.out.println(sql));
sqlBuilder.getInsertSqlList().forEach(sql -> System.out.println(sql));
JSON 数据解析核心逻辑
解析成单表的逻辑
解析成单表的逻辑比较简单,主要考虑是一般接口直接返回的数据,最外层的数据一般是比较核心的数据,其中可能会有 json 嵌套,json 嵌套值会被认为是数据某个属性的扩展,类似是指标的维度的概念,所以我们在解析成单标的逻辑就是每个 key 会对应到表中的一个字段,value 类型和生成字段的类型的对应关系如下(Mysql 为例):
value 类型 字段类型 字段类型长度
字符串 varchar 取当前该字段要插入数据的最大长度和 255 中的最大值
整型 bigint bigint(11)
jsonObject varchar 序列化后字符串的长度和 255 取最大值
jsonArray varchar 序列化后字符串的长度和 255 取最大值
核心代码如下:
* 生成单表的逻辑 ,只解析最外面一层做表结构
private void buildSqlBlock(Object object) {
SqlBlock sqlBlock = this.sqlMap.get(this.rootTableName);
if (sqlBlock == null) {
this.sqlMap.put(this.rootTableName, new SqlBlock(this.rootTableName));
if (object instanceof JSONObject) {
buildSqlBlockForJSONObject((JSONObject) object);
return;
if (object instanceof JSONArray) {
JSONArray array = (JSONArray) object;
for (int i = 0; i < array.size(); i++) {
buildSqlBlockForJSONObject(array.getJSONObject(i));
return;
private void buildSqlBlockForJSONObject(JSONObject row) {
SqlBlock sqlBlock = this.sqlMap.get(this.rootTableName);
for (Map.Entry<String, Object> entry : row.entrySet()) {
if (entry.getValue() instanceof JSONObject || entry.getValue() instanceof JSONArray) {
String value = JSON.toJSONString(entry.getValue());
sqlBlock.addColumn(entry.getKey(), value);
row.put(entry.getKey(), value);
} else {
sqlBlock.addColumn(entry.getKey(), entry.getValue());
sqlBlock.addColumn(DEFAULT_COLUMN_MD5,"md5Column");
String md5Code = md5Encode(row,null);
row.put(DEFAULT_COLUMN_MD5,md5Code);
sqlBlock.addRowData(row);
解析成多表的逻辑
解析成多表的逻辑,就会比较复杂,从外向里遍历 json 结构的时候,如果发现某个 key 对应的 value 是 json 数据(JSONObject 或者 JSONArray)就会为其生成一个新表,并且在新表里面建立一个和外层表的关联键。
表名默认是 key 值,最外层没有 key 值,需要在进行初始化的时候进行指定。
核心代码如下:
* jsonArray 生成json语句
* @param tableName
* @param array
* @param relationTable
* @param relationTableValue
private void buildSqlBlock(String tableName,
JSONArray array,
String relationTable, String relationTableValue) {
if (array.size() <= 0) {
return;
if (StringUtils.hasLength(relationTable)) {
for (int i = 0; i < array.size(); i++) {
array.getJSONObject(i).put(buildRelationKey(relationTable), relationTableValue);
buildSqlBlock(tableName, array.getJSONObject(0), relationTable);
* jsonObject生成创建表语句
* @param tableName
* @param row
private void buildSqlBlock( String tableName,
JSONObject row,
String relationTable) {
SqlBlock sqlBlock = sqlMap.get(tableName);
if (sqlBlock == null) {
sqlBlock = new SqlBlock(tableName);
String rowDataMd5 = md5Encode(row, relationTable);
Set<String> keySets = row.keySet();
for (String key : keySets) {
Object value = row.get(key);
if (value instanceof JSONObject) {
JSONObject object = row.getJSONObject(key);
object.put(buildRelationKey(tableName), rowDataMd5);
buildSqlBlock(key, object, tableName);
} else if (value instanceof JSONArray) {
JSONArray array = row.getJSONArray(key);
buildSqlBlock(key, array, tableName, rowDataMd5);
} else {
sqlBlock.addColumn(key, value);
sqlBlock.addColumn(DEFAULT_COLUMN_MD5, "string");
row.put(DEFAULT_COLUMN_MD5, rowDataMd5);
sqlBlock.addRowData(row);
sqlBlock.addRelationTableKey(buildRelationKey(relationTable));
sqlMap.put(tableName, sqlBlock);
private String buildRelationKey(String relationTable) {
if (relationTable == null) {
return null;
return relationTable.concat("_relation_key");
private String md5Encode(JSONObject row, String relationTable) {
try {
Map<String, Object> rowMap = new HashMap<>();
row.entrySet().stream().filter(entry -> baseType(entry.getValue()))
.filter(entry -> StringUtils.isEmpty(relationTable) || !buildRelationKey(relationTable).equals(entry.getKey()))
.forEach(entry -> rowMap.put(entry.getKey(), entry.getValue()));
return DigestUtils.md5DigestAsHex(JSON.toJSONString(rowMap).getBytes("utf-8"));
} catch (UnsupportedEncodingException e) {
log.error("DigestUtils.md5DigestAsHex execute Error : {}", e.getMessage(), e);
return row.toJSONString();
private Boolean baseType(Object object) {
if (object instanceof JSONObject || object instanceof JSONArray) {
return false;
} else {
return true;
复制代码