Elasticsearch跨级群同步数据

logstash可以实现这个功能

下载安装好logstash,我下载的是logstash-7.6.2,安装地址: elastic.co/cn/downloads

官方文档参考: elastic.co/guide/en/log

实现跨级群同步数据很简单,就配置个文件就好了,启动命令D:\tools\logstash\logstash-6.4.2\bin>logstash -f logstashda.conf

我的配置文件:

input {
elasticsearch {
hosts => ["http://****"]
index => "test_index"
size => 1000
scroll => "1m"
codec => "json"
docinfo => true
schedule => "*/5 * * * * *" //这里配置每隔多长时间同步一次
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
mutate {
remove_field => ["@timestamp", "@version"]//过滤不展示字段
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200/"]
index => "test_index" //这里可以指定名字也可以 不指定"%{[@metadata][_index]}"这样也可以获取,前提是上面要配docinfo => true
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
action =>"update" //这个字段有很多种,参考 ouput下的文档
doc_as_upsert =>"true"
template => "D:/tools/logstash/logstash-7.6.2/template/test.json"
template_overwrite => true //重写模板
template_name => "test_index" //这里可以指定json被下载到哪个名字的模板下
}

stdout { codec => rubydebug { metadata => true } } //这里是打印同步日志

}


json内容如下,我这里使用的是动态模板,模板不像数据可以实时同步,经过实践发现,只会在第一次同步,此后修改不对索引模板生效:

{
"template": "test_index",
"order": 2,
"settings": {
"number_of_shards": 4,
"number_of_replicas": 0
},
"mappings": {
"_default_": {
"dynamic_templates": [{
"string_fields": {
"match": "name",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
{"string_fields": {
"match": "age",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
},
{
"string_fields": {
"match": "country",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"analyzer": "keyword"
}
}
}
],
"dynamic_date_formats": ["yyyy-MM-dd HH:mm:ss.SSS"]
}
}
}

上面这一套配置可以实现数据的增加和修改同步

想要实现删除数据同步,可以有两种方案:

1.每个索引库都配上一个生失效字段,原es服务删除数据不是真实的删除,只是这个字段的改变,而被同步的es服务可以去做真实的删除

2.每个索引库都建立对应的删除索引库,删除数据之前把删除的数据写到删除索引库,删除索引库作为源数据库来做删除数据的同步

配置如下:

1.

input {
elasticsearch {
hosts => ["http://********/es"]
query => '{ "query": { "match": { "isEffective": 0 } } }' //我用isEffective字段来标识生失效
index => "*"
size => 1000
scroll => "1m"
codec => "json"
docinfo => true
schedule => "*/5 * * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
mutate {
remove_field => ["@timestamp", "@version"]
}

}
output {

elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}"
action =>"delete"
doc_as_upsert =>"true"
}

stdout { codec => rubydebug { metadata => true } }

}

2.

input {
elasticsearch {
hosts => ["http://********/es"]
index => "test1_del" //把test1索引库需要删除的数据写到test1_del这个索引库中
size => 1000
scroll => "1m"
codec => "json"
docinfo => true
schedule => "*/5 * * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
mutate {
remove_field => ["@timestamp", "@version"]
}

}
output {

elasticsearch {
hosts => ["localhost:9200"]
index => "test1" //delete操作实现原理deletes a document by id (An id is required for this action)
document_id => "%{[@metadata][_id]}"
action =>"delete"
doc_as_upsert =>"true"
}

stdout { codec => rubydebug { metadata => true } }

}

一个logstash可以同时执行多个conf文件,同时执行多个文件的时候需要指定各自的path.data

数据的同步还好实现,除了删除以外,模板映射的同步,用动态模板也可以,就是需要提前考虑到各种情况,因为修改不会被同步。

配置多个输出:

output {

elasticsearch {
hosts => ["http://localhost:9200","http://***"]
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
action =>"update"
doc_as_upsert =>true
pipeline => "%{INGEST_PIPELINE}"
}

发布于 2020-06-21 17:28