查了一下,网上关于mongodb的多级关联一般都是一个主表关联两个子表,没有主–子--孙 表的这种关联查询形式,故此,记录一下。

需求描述:

将mongo中的column相关的信息同步到es。

相关结构:

  1. column中有tableId,table表中有dbId,
  2. db中有dbCode,而dbType和InstanceName在mysql中,完整的db:dBCode+dbType+InstanceName

注意:

上面的column、table、db,它们其实都是一个表(在一个表中存储),我是为了大家好理解,所以给了不同的名称,按需选择表即可。

相关分析:

  1. 数据量可能很大,所以需要分批次同步。
  2. 数据在多个表中存在,column中是主信息,同时需要tableName(在table表中),dbName(在mongo的db表中和mysql的两个表(sys_resource_rdbms或sys_resource_hive)中。)
  3. 从2中可以看出,dbName获取最为麻烦,数据很分散。可以采取一下办法:
    1. mongo部分的db可以采取多级关联查询,我的关联关系是column—> table —> db
    2. mysql部分,需要根据mongo中查出的db信息,进一步去mysql中进行查询。
  4. 同步过程还是比较简单的,考虑到表结果变化,删除数据等,主要过程就是新建索引,批量插入,起别名,删除旧索引

代码如下:

package com.jesse.task.service;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.jesse.task.constants.Constants;
import com.jesse.task.domain.*;
import com.jesse.task.mapper.SysResourceHiveQueueMapper;
import com.jesse.task.mapper.SysResourceRdbmsQueueMapper;
import com.jesse.task.utils.ElasticsearchTemplate;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
    private static final int PAGE_LIMIT = 1000;
    @Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private ElasticsearchTemplate esTemplate;
    //下面这两个mapper是mybatisplus相关的mapper。idea检查会飘红,不用管,只是因为没有实现类
    @Autowired
    private SysResourceRdbmsQueueMapper rdbmsQueueMapper;
    @Autowired
    private SysResourceHiveQueueMapper hiveQueueMapper;
	*同步至es
   public void MetadataColumnDataSyncEs() throws IOException {
        //从mongodb中查询出相应数据;数据类型有hive和rdbms两种
        Criteria criteria = Criteria.where("typeName").in(Constants.HIVE_COLUMN,Constants.RDBMS_COLUMN)
                .and("status").is("ACTIVE")
                .and("attributes.versionType").is("formal");
        Query query = new Query(criteria);
        //查询总记录数。
        long amount = mongoTemplate.count(query, Entity.class,"entities");
        //获取页数
        int page = (int) Math.ceil((double)amount / PAGE_LIMIT);
        List<MetadataColumnEntity> metadataColumnEntityList = new ArrayList<>();
        //时间戳,供索引使用。
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("_yyyyMMddHHmmss");
        String dateTimeString = formatter.format(LocalDateTime.now());
        //操作需要指定中文分词器的字段;其他普通字段不要添加;
        String[] fields = {"subjectName", "dbName", "name", "displayName","tableName", "dataType", "teamName", "comment", "createUserName", "refTerm"};
        String mappings = esTemplate.getMapping(fields);
        esTemplate.createIndex(Constants.INDEX_METADATA_TABLE+dateTimeString,fields);
        //定义两个map,用于缓存数据库字段。
       Map<String ,String> rdbMap = new HashMap<>();
       Map<String ,String> hiveMap = new HashMap<>();
       //mongo支持多级关联,我的关联关系是column---> table ---> db
       //字段关联物理表。这个entities代表table
       LookupOperation lookupOperation = LookupOperation.newLookup().
               from("entities").
               localField("attributes.table.guid").//这个字段永远当成主表中的;
               foreignField("guid").//这个字段当成当前表的字段;
               as("table");		//lookupOperation2 中直接使用了此table
       //物理表关联数据库。这个entities代表db。用查出来的table中的字段(table.attributes.db.guid)关联db中的guid。
       LookupOperation lookupOperation2 = LookupOperation.newLookup().
               from("entities").
               localField("table.attributes.db.guid").//这个table字段主表中没有,是关联lookupOperation出来的结果,可以直接使用
               foreignField("guid").
               as("db");
        for(int i =0; i < page;i++){
            AggregationOperation match = Aggregation.match(criteria);
           //project根据需要选择,如果是所有字段,可以不加,如果只想要一部分,可以添加。我这里是所有字段,所以我注释掉了。
//         ProjectionOperation project = Aggregation.project("guid","table","db", "typeName", "attributes", "status", "createBy", "createTime", "updateBy", "updateTime", "version", "typeVersion" );
            SkipOperation skip = Aggregation.skip((long) i * PAGE_LIMIT);
            LimitOperation limit = Aggregation.limit(PAGE_LIMIT);
            //将以上的条件封装,注意lookupOperation的顺序。
            List<AggregationOperation> operations = new ArrayList<>();
            operations.add(lookupOperation);
            operations.add(lookupOperation2);
//          operations.add(project);
            operations.add(match);
            operations.add(skip);
            operations.add(limit);
            Aggregation aggregation = Aggregation.newAggregation(operations);
            AggregationResults<Entity> results = mongoTemplate.aggregate(aggregation, "entities", Entity.class);
            List<Entity> entitieList = results.getMappedResults();
            //下面是处理结果。
            entitieList.forEach(entity -> {
                MetadataColumnEntity metadataColumnEntity = JSON.parseObject(JSON.toJSONString(entity.getAttributes()), MetadataColumnEntity.class);
                metadataColumnEntity.setDataType((String) entity.getAttributes().get("data_type"));
                metadataColumnEntity.setDefaultValue((String)entity.getAttributes().get("default_value"));
                metadataColumnEntity.setCreateTime(entity.getCreateTime());
                metadataColumnEntity.setUpdatedTime(entity.getUpdatedTime());
                metadataColumnEntity.setGuid(entity.getGuid());
				//这里entity.getTable()就是关联主表(column)查出来的。 
              	metadataColumnEntity.setTableName(entity.getTable().get(0).getAttributes().get("name").toString());
              	//这里entity.getDb()就是关联table表查出来的。
                HashMap instance = (HashMap) (entity.getDb().get(0).getAttributes().get("instance"));
                String name = entity.getDb().get(0).getAttributes().get("name").toString();
                //根据不同的类型,从不同的数据库查出相关db信息
                if (Constants.RDBMS_COLUMN.equals(entity.getTypeName())){
                    //如果map中有相关数据,直接取出,否则去查数据库,同时将数据缓存到map中
                    if (rdbMap.size()>0&&rdbMap.containsKey(instance.get("guid"))){
                        metadataColumnEntity.setDbName(rdbMap.get(instance.get("guid"))+name);
                    }else{
                        Wrapper<SysResourceRdbms> queryWrapper = new QueryWrapper<SysResourceRdbms>().lambda().eq(SysResourceRdbms::getAtlasGuid,instance.get("guid"));
                        SysResourceRdbms sysResourceRdbms = rdbmsQueueMapper.selectOne(queryWrapper);
                        metadataColumnEntity.setDbName(sysResourceRdbms.getDbType()+sysResourceRdbms.getInstanceName()+name);
                        rdbMap.put(instance.get("guid").toString(),sysResourceRdbms.getDbType()+sysResourceRdbms.getInstanceName()+name);
                }else if (Constants.HIVE_COLUMN.equals(entity.getTypeName())){
                    //如果map中有相关数据,直接取出,否则去查数据库。
                    if (hiveMap.size()>0&&hiveMap.containsKey(instance.get("guid"))){
                        metadataColumnEntity.setDbName(hiveMap.get(instance.get("guid"))+name);
                    }else{
                        Wrapper<SysResourceHive> queryWrapper = new QueryWrapper<SysResourceHive>().lambda().eq(SysResourceHive::getAtlasGuid,instance.get("guid"));
                        SysResourceHive sysResourceHive = hiveQueueMapper.selectOne(queryWrapper);
                        metadataColumnEntity.setDbName("hive"+sysResourceHive.getClusterName()+name);
                        hiveMap.put(instance.get("guid").toString(),"hive"+sysResourceHive.getClusterName()+name);
                metadataColumnEntityList.add(metadataColumnEntity);
            //数据处理完毕,以下是同步到es的过程。
            //将mongo实体转为es实体;
            List<EsDataSync<MetadataColumnEntity>> esList = metadataColumnEntityList.stream().map(metadataColumnEntity -> new EsDataSync<>(metadataColumnEntity.getGuid(), metadataColumnEntity)).collect(Collectors.toList());
            //批量插入;
            BulkResponse bulkItemResponses = esTemplate.addBatchDocument(Constants.INDEX_METADATA_COLUMN + dateTimeString, esList);
            //同步后需要进行清空,方便下次操作。
            metadataColumnEntityList.clear();
        //根据索引别名获取原索引
        List<String> indexListByAlias = esTemplate.getIndexListByAlias(Constants.INDEX_METADATA_COLUMN);
        //如果已有相关索引,删除旧索引,给新索引起别名。
        if (CollectionUtils.isNotEmpty(indexListByAlias)){
            String oldIndex = indexListByAlias.get(0);
            esTemplate.renameAliases(oldIndex,Constants.INDEX_METADATA_COLUMN+dateTimeString,Constants.INDEX_METADATA_COLUMN);
            //删除原索引;
            esTemplate.deleteIndex(indexListByAlias.get(0));
        }else{
            esTemplate.addAliases(Constants.INDEX_METADATA_COLUMN+dateTimeString,Constants.INDEX_METADATA_COLUMN);
}

以上代码,已经进行过验证,已成功导入,如有错误,欢迎指正。




routeros 安装 python routeros 安装后启动选择

现在能够使用的routeos版本只有2.7.14了,因为只有这个有算号器,新版的2.8由于算法改变,目前还没有新的算号器出来。 Routeos的发行版本有光盘版及软盘版(9张),但是在vmware或vpc之下,这些盘均不能正常读取,所以本人在介绍安装过程的时候只有使用文本方式,就不附图了。 首先将下载的光盘镜像或安装软盘制作成盘,在cmos中设置好启动顺序后从光盘或软盘启动。第一步安装程序