当您需要将自建Elasticsearch中的全量或增量数据迁移至阿里云Elasticsearch时,可通过在ECS中自建Logstash,并通过Logstash的管道配置功能实现。本文为您介绍具体的实现方法。

背景信息

本文中数据迁移的流程如下。 数据架构
  1. 在ECS服务器部署自建Elasticsearch并准备待迁移的数据。
  2. 开通阿里云Elasticsearch服务。
  3. 在ECS服务器运行Python脚本迁移索引元数据。
  4. 部署Logstash,并通过Logstash管道配置功能,将自建Elasticsearch中的全量或增量数据迁移至阿里云Elasticsearch中。

注意事项

  • 本文在阿里云ECS上部署自建Logstash,该Logstash所在的ECS需要与阿里云Elasticsearch集群在同一专有网络下,同时该Logstash需要能够同时访问源Elasticsearch集群(自建)和目标Elasticsearch集群(阿里云)。
  • 数据迁移可以全量迁移或增量迁移。如果业务侧时刻存在写入更新,首次迁移时,需先全量迁移,再通过时间标识字段(或其他可标识增量的字段)进行增量迁移,否则迁移后新数据极易被旧数据覆盖。如果已有全量数据,可以只通过标识字段实现增量数据迁移。

操作流程

  1. 步骤一:准备环境与实例
  2. 步骤二:迁移索引元数据(设置和映射)
  3. 步骤三:迁移全量数据
  4. 步骤四:迁移增量数据
  5. 步骤五:查看数据迁移结果

步骤一:准备环境与实例

  1. 创建阿里云Elasticsearch实例。
    具体操作请参见 创建阿里云Elasticsearch实例 。本文使用的测试环境如下。
    环境项 环境信息
    地域 华东1(杭州)。
    版本 通用商业版7.10.0。
    实例规格配置 3个可用区、3个数据节点、单节点4核CPU、16 GB内存、100 GB ESSD云盘。
  2. 创建ECS实例,用于部署自建Elasticsearch、自建Kibana和自建Logstash。
    具体操作请参见 自定义购买实例 。本文使用的测试环境如下。
    环境项 环境信息
    地域 华东1(杭州)。
    实例规格 4 vCPU 16 GiB内存。
    镜像 公共镜像、CentOS、7.9 64位。
    存储 系统盘、ESSD云盘、100 GiB。
    网络 与阿里云Elasticsearch相同的专有网络,选中 分配公网IPv4地址 ,并按使用流量计费,带宽峰值为100 Mbps。
    安全组 入方向添加5601端口(即Kibana端口),在授权对象中添加您客户端的IP地址。
    重要
    • 如果您的客户端处在家庭网络或公司局域网中,您需要在授权对象中添加局域网的公网出口IP地址,而非客户端机器的IP地址。建议您通过浏览器访问 https://myip.ipip.net 查询。
    • 您也可以在授权对象中添加0.0.0.0/0,表示允许所有IPv4地址访问ECS实例。此配置会导致ECS实例完全暴露在公网中,增加安全风险,生产环境尽量避免。
  3. 部署自建Elasticsearch。
    本文使用的自建Elasticsearch版本为7.6.2,1个数据节点,具体操作步骤如下:
    1. 连接ECS服务器。
    2. 使用root用户权限创建elastic用户。
      useradd elastic
      passwd <your_password>
    3. 将root用户切换为elastic用户。
      su -l elastic
    4. 下载Elasticsearch软件安装包并解压缩。
      sudo wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz
      sudo tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gz
    5. 启动Elasticsearch。
      进入Elasticsearch的安装目录下,启动服务。
      sudo cd elasticsearch-7.6.2
      ./bin/elasticsearch -d
    6. 验证Elasticsearch服务是否正常运行。
      cd ~ 
      curl localhost:9200
      正常情况下,返回结果中会显示Elasticsearch版本号和 "You Know, for Search" 部署自建Elasticsearch
  4. 部署自建Kibana,并准备测试数据。
    本文使用的自建Kibana版本为7.6.2,1个数据节点,具体操作步骤如下:
    1. 连接ECS服务器。
      具体操作请参见 通过密码或密钥认证登录Linux实例
      说明 本文档以普通用户权限为例。
    2. 下载Kibana软件安装包并解压缩。
      sudo wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz
      sudo tar -zvxf kibana-7.6.2-linux-x86_64.tar.gz
    3. 修改Kibana配置文件 config/kibana.yml ,增加 server.host: "0.0.0.0" 配置,允许通过公网IP访问Kibana。
      进入Kibana安装目录,修改 kibana.yml
      cd kibana-7.6.2-linux-x86_64
      sudo vi config/kibana.yml
      Kibana配置
    4. 使用非root用户启用Kibana。
      sudo nohup ./bin/kibana &
    5. 登录Kibana控制台,添加示例数据。
      1. 通过公网IP地址登录Kibana控制台。

        公网IP地址为: http://<ECS服务器的公网IP地址>:5601/app/kibana#/home

      2. 在Kibana控制台首页,单击 Try our sample data
      3. Sample data 页签,单击日志示例数据模块下的 Add data ,添加对应数据。 添加示例数据
  5. 部署自建Logstash。
    本文使用的Logstash版本为7.10.0,1个节点,具体操作步骤如下:
    1. 连接ECS服务器。
      具体操作请参见 通过密码或密钥认证登录Linux实例
      说明 本文档以普通用户权限为例。
    2. 回到跟目录,下载Logstash软件安装包并解压缩。
      cd ~
      sudo wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
      sudo tar -zvxf logstash-7.10.0-linux-x86_64.tar.gz
    3. 修改Logstash的堆内存使用。
      Logstash默认的堆内存为1 GB,您需要根据ECS规格配置合适的内存大小,加快集群数据的迁移效率。
      进入Logstash的安装目录下,修改Logstash配置文件config/jvm.options,增加-Xms8g和-Xmx8g。
      cd logstash-7.10.0
      sudo vi config/jvm.options
      Logstash堆内存配置
    4. 修改Logstash批量写入记录条数。
      每批量写入5~15 MB数据,可以加快集群数据的迁移效率。

      修改Logstash配置文件config/pipelines.yml,将每批量写入记录条数 pipeline.batch.size 从125改为5000。

      sudo vi config/pipelines.yml
      Logstash批量写入记录数
    5. 验证Logstash功能。
      1. 通过控制台输入输出收集数据。
        sudo bin/logstash -e 'input { stdin { } } output { stdout {} }'
      2. 在控制台中输入 "Hello world!"
        正常情况下,控制台会输出 "Hello world!" 验证Logstash功能

步骤二:迁移索引元数据(设置和映射)

在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。

您可以通过Python脚本创建目标索引,具体操作步骤如下:

  1. 连接ECS服务器。
    具体操作请参见 通过密码或密钥认证登录Linux实例
    说明 本文档以普通用户权限为例。
  2. 创建并打开Python脚本文件(本文以indiceCreate.py为例)。
    sudo vi indiceCreate.py
  3. 修改Python脚本文件,拷贝以下代码(注意修改集群的连接地址、用户名和密码)。
    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    # 文件名:indiceCreate.py
    import sys
    import base64
    import time
    import httplib
    import json
    ## 源集群host。
    oldClusterHost = "localhost:9200"
    ## 源集群用户名,可为空。
    oldClusterUserName = "elastic"
    ## 源集群密码,可为空。
    oldClusterPassword = "xxxxxx"
    ## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
    newClusterHost = "es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"
    ## 目标集群用户名。
    newClusterUser = "elastic"
    ## 目标集群密码。
    newClusterPassword = "xxxxxx"
    DEFAULT_REPLICAS = 0
    def httpRequest(method, host, endpoint, params="", username="", password=""):
        conn = httplib.HTTPConnection(host)
        headers = {}
        if (username != "") :
            'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
            base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
            headers["Authorization"] = "Basic %s" % base64string;
        if "GET" == method:
            headers["Content-Type"] = "application/x-www-form-urlencoded"
            conn.request(method=method, url=endpoint, headers=headers)
        else :
            headers["Content-Type"] = "application/json"
            conn.request(method=method, url=endpoint, body=params, headers=headers)
        response = conn.getresponse()
        res = response.read()
        return res
    def httpGet(host, endpoint, username="", password=""):
        return httpRequest("GET", host, endpoint, "", username, password)
    def httpPost(host, endpoint, params, username="", password=""):
        return httpRequest("POST", host, endpoint, params, username, password)
    def httpPut(host, endpoint, params, username="", password=""):
        return httpRequest("PUT", host, endpoint, params, username, password)
    def getIndices(host, username="", password=""):
        endpoint = "/_cat/indices"
        indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
        indicesList = indicesResult.split("\n")
        indexList = []
        for indices in indicesList:
            if (indices.find("open") > 0):
                indexList.append(indices.split()[2])
        return indexList
    def getSettings(index, host, username="", password=""):
        endpoint = "/" + index + "/_settings"
        indexSettings = httpGet(host, endpoint, username, password)
        print (index + "  原始settings如下:\n" + indexSettings)
        settingsDict = json.loads(indexSettings)
        ## 分片数默认和源集群索引保持一致。
        number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
        ## 副本数默认为0。
        number_of_replicas = DEFAULT_REPLICAS
        newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
        return newSetting
    def getMapping(index, host, username="", password=""):
        endpoint = "/" + index + "/_mapping"
        indexMapping = httpGet(host, endpoint, username, password)
        print (index + " 原始mapping如下:\n" + indexMapping)
        mappingDict = json.loads(indexMapping)
        mappings = json.dumps(mappingDict[index]["mappings"])
        newMapping = "\"mappings\" : " + mappings
        return newMapping
    def createIndexStatement(oldIndexName):
        settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
        mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
        createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
        return createstatement
    def createIndex(oldIndexName, newIndexName=""):
        if (newIndexName == "") :
            newIndexName = oldIndexName
        createstatement = createIndexStatement(oldIndexName)
        print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
        endpoint = "/" + newIndexName
        createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
        print ("新索引 " + newIndexName + " 创建结果:" + createResult)
    ## main
    indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
    systemIndex = []
    for index in indexList:
        if (index.startswith(".")):
            systemIndex.append(index)
        else :
            createIndex(index, index)
    if (len(systemIndex) > 0) :
        for index in systemIndex:
            print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")
  4. 执行Python脚本,创建目标索引。
    sudo /usr/bin/python indiceCreate.py
  5. 参见 登录Kibana控制台 ,登录目标集群的Kibana控制台,查看已创建的索引。
    GET /_cat/indices?v

步骤三:迁移全量数据

  1. 连接ECS服务器。
  2. 在config目录下,创建并打开Logstash配置文件。
    cd logstash-7.10.0/config
    sudo vi es2es_all.conf
  3. 参考以下配置,修改Logstash配置文件。
    input{
        elasticsearch{
            # 源端ES地址。
            hosts =>  ["http://localhost:9200"]
            # 安全集群配置登录用户名密码。
            user => "xxxxxx"
            password => "xxxxxx"
            # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。
            index => "kibana_sample_data_*"
            # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。
            docinfo=>true
            slices => 5
            size => 5000
    filter {
      # 去掉一些Logstash自己加的字段。
      mutate {
        remove_field => ["@timestamp", "@version"]
    output{
        elasticsearch{
            # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。
            hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"]
            # 安全集群配置登录用户名密码。
            user => "elastic"
            password => "xxxxxx"
            # 目标端索引名称,以下配置表示索引与源端保持一致。
            index => "%{[@metadata][_index]}"
            # 目标端索引type,以下配置表示索引类型与源端保持一致。
            document_type => "%{[@metadata][_type]}"
            # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。
            document_id => "%{[@metadata][_id]}"
            ilm_enabled => false
            manage_template => false
    }
    说明 为了保证迁移数据的准确性,建议您创建多个Logstash管道配置文件,分批次迁移数据,每个Logstash迁移部分数据。
  4. 启动Logstash全量迁移任务。
    cd ../
    sudo nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &

步骤四:迁移增量数据

  1. 连接ECS服务器,在config目录下,创建并打开Logstash增量配置文件。
    cd config
    sudo vi es2es_kibana_sample_data_logs.conf
    说明 本文档以普通用户权限为例。
  2. 参考以下配置,修改Logstash配置文件。
    开启Logstash定时任务即可触发增量迁移,配置示例如下。
    input{
        elasticsearch{
            # 源端ES地址。
            hosts =>  ["http://localhost:9200"]
            # 安全集群配置登录用户名密码。
            user => "xxxxxx"
            password => "xxxxxx"
            # 需要迁移的索引列表,多个索引使用英文逗号(,)分隔。
            index => "kibana_sample_data_logs"
            # 按时间范围查询增量数据,以下配置表示查询最近5分钟的数据。
            query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
            # 定时任务,以下配置表示每分钟执行一次。
            schedule => "* * * * *"
            scroll => "5m"
            docinfo=>true
            size => 5000
    filter {
      # 去掉一些Logstash自己加的字段.
      mutate {
        remove_field => ["@timestamp", "@version"]
    output{
        elasticsearch{
            # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。
            hosts => ["http://es-cn-zvp2m4bko0009****.elasticsearch.aliyuncs.com:9200"]
            # 安全集群配置登录用户名密码.
            user => "elastic"
            password => "xxxxxx"
            # 目标端索引名称,以下配置表示索引与源端保持一致。
            index => "%{[@metadata][_index]}"
            # 目标端索引type,以下配置表示索引类型与源端保持一致。
            document_type => "%{[@metadata][_type]}"
            # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。
            document_id => "%{[@metadata][_id]}"
            ilm_enabled => false
            manage_template => false
    }
    重要
    • Logstash记录的时间戳为UTC时间,如果您的本地时间为北京时间(东八区),那么两者会存在8个小时的时区差,此时将UTC时间转化为北京时间,可使用公式:UTC+时区差=北京时间。例如,以上示例中通过源端索引中的 @timestamp 字段进行range范围过滤查询获取增量数据,并在对应的时间上+8h转换为北京时间。
    • 通过Logstash控制时间字段实现增量数据的同步,需确保原索引中有可控制的时间字段,如果原索引中没有时间字段数据,可使用 ingest pipeline 指定 _ingest.timestamp 获取元数据值,从而引入 @timestamp 时间字段。
  3. 启动Logstash增量迁移任务。
    sudo nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &
  4. 在目标端Elasticsearch集群的Kibana中,查询最近更新的记录,验证增量数据是否同步。
    以下示例的查询条件为:索引名称为kibana_sample_data_logs、最近时间范围为5分钟。
    GET kibana_sample_data_logs/_search
      "query": {
        "range": {
          "@timestamp": {
            "gte": "now-5m",
            "lte": "now/m"
      "sort": [
          "@timestamp": {
            "order": "desc"
                            

步骤五:查看数据迁移结果

  1. 查看是否完成全量迁移。
    1. 查看自建Elasticsearch集群的索引和数据量信息。
      GET _cat/indices?v
      结果如下。 自建ES集群数据量
    2. 全量迁移前,查看阿里云Elasticsearch集群的索引和数据量信息。
      阿里云ES数据量信息
    3. 全量迁移后,查看阿里云Elasticsearch集群索引和数据量信息。
      正常情况下,返回的记录条数应该与自建Elasticsearch集群一致。 全量同步成功
  2. 查看是否完成增量迁移。
    查看自建Elasticsearch集群的最近更新记录。
    GET kibana_sample_data_logs/_search
      "query": {
        "range": {
          "@timestamp": {
            "gte": "now-5m",
            "lte": "now/m"
      "sort": [
          "@timestamp": {
            "order": "desc"
    }
    返回结果如下。 增量迁移原数据

    增量迁移完成后,使用同样命令查看阿里云Elasticsearch集群最近的更新记录。正常情况下,阿里云Elasticsearch集群的更新记录会与自建集群一致。