package com.jesse.task.service;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.jesse.task.constants.Constants;
import com.jesse.task.domain.*;
import com.jesse.task.mapper.SysResourceHiveQueueMapper;
import com.jesse.task.mapper.SysResourceRdbmsQueueMapper;
import com.jesse.task.utils.ElasticsearchTemplate;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
private static final int PAGE_LIMIT = 1000;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private ElasticsearchTemplate esTemplate;
//下面这两个mapper是mybatisplus相关的mapper。idea检查会飘红,不用管,只是因为没有实现类
@Autowired
private SysResourceRdbmsQueueMapper rdbmsQueueMapper;
@Autowired
private SysResourceHiveQueueMapper hiveQueueMapper;
*同步至es
public void MetadataColumnDataSyncEs() throws IOException {
//从mongodb中查询出相应数据;数据类型有hive和rdbms两种
Criteria criteria = Criteria.where("typeName").in(Constants.HIVE_COLUMN,Constants.RDBMS_COLUMN)
.and("status").is("ACTIVE")
.and("attributes.versionType").is("formal");
Query query = new Query(criteria);
//查询总记录数。
long amount = mongoTemplate.count(query, Entity.class,"entities");
//获取页数
int page = (int) Math.ceil((double)amount / PAGE_LIMIT);
List<MetadataColumnEntity> metadataColumnEntityList = new ArrayList<>();
//时间戳,供索引使用。
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("_yyyyMMddHHmmss");
String dateTimeString = formatter.format(LocalDateTime.now());
//操作需要指定中文分词器的字段;其他普通字段不要添加;
String[] fields = {"subjectName", "dbName", "name", "displayName","tableName", "dataType", "teamName", "comment", "createUserName", "refTerm"};
String mappings = esTemplate.getMapping(fields);
esTemplate.createIndex(Constants.INDEX_METADATA_TABLE+dateTimeString,fields);
//定义两个map,用于缓存数据库字段。
Map<String ,String> rdbMap = new HashMap<>();
Map<String ,String> hiveMap = new HashMap<>();
//mongo支持多级关联,我的关联关系是column---> table ---> db
//字段关联物理表。这个entities代表table
LookupOperation lookupOperation = LookupOperation.newLookup().
from("entities").
localField("attributes.table.guid").//这个字段永远当成主表中的;
foreignField("guid").//这个字段当成当前表的字段;
as("table"); //lookupOperation2 中直接使用了此table
//物理表关联数据库。这个entities代表db。用查出来的table中的字段(table.attributes.db.guid)关联db中的guid。
LookupOperation lookupOperation2 = LookupOperation.newLookup().
from("entities").
localField("table.attributes.db.guid").//这个table字段主表中没有,是关联lookupOperation出来的结果,可以直接使用
foreignField("guid").
as("db");
for(int i =0; i < page;i++){
AggregationOperation match = Aggregation.match(criteria);
//project根据需要选择,如果是所有字段,可以不加,如果只想要一部分,可以添加。我这里是所有字段,所以我注释掉了。
// ProjectionOperation project = Aggregation.project("guid","table","db", "typeName", "attributes", "status", "createBy", "createTime", "updateBy", "updateTime", "version", "typeVersion" );
SkipOperation skip = Aggregation.skip((long) i * PAGE_LIMIT);
LimitOperation limit = Aggregation.limit(PAGE_LIMIT);
//将以上的条件封装,注意lookupOperation的顺序。
List<AggregationOperation> operations = new ArrayList<>();
operations.add(lookupOperation);
operations.add(lookupOperation2);
// operations.add(project);
operations.add(match);
operations.add(skip);
operations.add(limit);
Aggregation aggregation = Aggregation.newAggregation(operations);
AggregationResults<Entity> results = mongoTemplate.aggregate(aggregation, "entities", Entity.class);
List<Entity> entitieList = results.getMappedResults();
//下面是处理结果。
entitieList.forEach(entity -> {
MetadataColumnEntity metadataColumnEntity = JSON.parseObject(JSON.toJSONString(entity.getAttributes()), MetadataColumnEntity.class);
metadataColumnEntity.setDataType((String) entity.getAttributes().get("data_type"));
metadataColumnEntity.setDefaultValue((String)entity.getAttributes().get("default_value"));
metadataColumnEntity.setCreateTime(entity.getCreateTime());
metadataColumnEntity.setUpdatedTime(entity.getUpdatedTime());
metadataColumnEntity.setGuid(entity.getGuid());
//这里entity.getTable()就是关联主表(column)查出来的。
metadataColumnEntity.setTableName(entity.getTable().get(0).getAttributes().get("name").toString());
//这里entity.getDb()就是关联table表查出来的。
HashMap instance = (HashMap) (entity.getDb().get(0).getAttributes().get("instance"));
String name = entity.getDb().get(0).getAttributes().get("name").toString();
//根据不同的类型,从不同的数据库查出相关db信息
if (Constants.RDBMS_COLUMN.equals(entity.getTypeName())){
//如果map中有相关数据,直接取出,否则去查数据库,同时将数据缓存到map中
if (rdbMap.size()>0&&rdbMap.containsKey(instance.get("guid"))){
metadataColumnEntity.setDbName(rdbMap.get(instance.get("guid"))+name);
}else{
Wrapper<SysResourceRdbms> queryWrapper = new QueryWrapper<SysResourceRdbms>().lambda().eq(SysResourceRdbms::getAtlasGuid,instance.get("guid"));
SysResourceRdbms sysResourceRdbms = rdbmsQueueMapper.selectOne(queryWrapper);
metadataColumnEntity.setDbName(sysResourceRdbms.getDbType()+sysResourceRdbms.getInstanceName()+name);
rdbMap.put(instance.get("guid").toString(),sysResourceRdbms.getDbType()+sysResourceRdbms.getInstanceName()+name);
}else if (Constants.HIVE_COLUMN.equals(entity.getTypeName())){
//如果map中有相关数据,直接取出,否则去查数据库。
if (hiveMap.size()>0&&hiveMap.containsKey(instance.get("guid"))){
metadataColumnEntity.setDbName(hiveMap.get(instance.get("guid"))+name);
}else{
Wrapper<SysResourceHive> queryWrapper = new QueryWrapper<SysResourceHive>().lambda().eq(SysResourceHive::getAtlasGuid,instance.get("guid"));
SysResourceHive sysResourceHive = hiveQueueMapper.selectOne(queryWrapper);
metadataColumnEntity.setDbName("hive"+sysResourceHive.getClusterName()+name);
hiveMap.put(instance.get("guid").toString(),"hive"+sysResourceHive.getClusterName()+name);
metadataColumnEntityList.add(metadataColumnEntity);
//数据处理完毕,以下是同步到es的过程。
//将mongo实体转为es实体;
List<EsDataSync<MetadataColumnEntity>> esList = metadataColumnEntityList.stream().map(metadataColumnEntity -> new EsDataSync<>(metadataColumnEntity.getGuid(), metadataColumnEntity)).collect(Collectors.toList());
//批量插入;
BulkResponse bulkItemResponses = esTemplate.addBatchDocument(Constants.INDEX_METADATA_COLUMN + dateTimeString, esList);
//同步后需要进行清空,方便下次操作。
metadataColumnEntityList.clear();
//根据索引别名获取原索引
List<String> indexListByAlias = esTemplate.getIndexListByAlias(Constants.INDEX_METADATA_COLUMN);
//如果已有相关索引,删除旧索引,给新索引起别名。
if (CollectionUtils.isNotEmpty(indexListByAlias)){
String oldIndex = indexListByAlias.get(0);
esTemplate.renameAliases(oldIndex,Constants.INDEX_METADATA_COLUMN+dateTimeString,Constants.INDEX_METADATA_COLUMN);
//删除原索引;
esTemplate.deleteIndex(indexListByAlias.get(0));
}else{
esTemplate.addAliases(Constants.INDEX_METADATA_COLUMN+dateTimeString,Constants.INDEX_METADATA_COLUMN);
}