之前网上看了很多文章,说mysql同步数据到Elasticsearch只能做增量同步,即只能新增(add)或者修改(update),不支持删除(delete)操作。所以一般的做法是使用逻辑删除,查询时带上逻辑删除字段,然后使用定时器定时去物理删除状态为删除的数据。最近刚好应用,一开始也是采用逻辑删除的做法,但是我的sql语句关联了4张表,每张表的逻辑删除状态有deleted(删除状态)、status(禁用启用状态)两个,一共就是4*2=8个逻辑删除字段,查询ES时需带上8个字段,状态字段太多了。这种做法实现后,花了一些时间看文档,发现可以实现mysql同步数据到ES实现删除,特此记录一下。
先说一下两种删除:
物理
删除:即采用delete from table where id = 1;采用delete将数据直接从数据库删除;
逻辑
删除:update table set deleted = 1(删除)where id =1;将数据标记为删除状态;
本文章实现的删除,主要是MySQL采用逻辑删除,同步到ES,ES采用物理删除。如果MySQL数据是物理删除,则logstash感知不到物理删除的数据,并不能更改到ES的数据状态。如果需要感知到MySQL的物理删除,需要使用其他数据同步方案,例如canal。
mysql:5.6
logstash:7.6.2(版本与ES版本保持一致,6.x及以上版本支持pipeline)
Elasticsearch:7.6.2
三.安装logstash
环境:linux
mysql以及Elasticsearch的安装不在此教程;
3.1logstash安装教程
1.安装JDK1.8(版本对应需1.8),及配置JDK环境变量。
2.解压:tar -zxvf logstash-7.6.2.tar.gz
3.我这边使用的input插件是:logstash-input-jdbc;output插件是logstash-output-elasticsearch;
这两个插件属于常见插件,logstash已经集成了,无需额外安装;
查看logstash已安装插件的命令
:
在安装目录的bin下面执行:./logstash-plugin list ;
4.使用logstash-input-jdbc输入插件,需要连接mysql驱动的jar包,将jar包放入指定目录,然后在配置文件的jdbc_driver_library 配置上对应的路径即可;
官网下载很慢,先上网盘链接,需要的自取
链接:https://pan.baidu.com/s/1z2jmU6AFQ2mV521dl7CgTg 提取码:yd8r
logstash主要是三大块,input:数据源,filter:过滤器对数据进行过滤,output:输出;这三块都有很多插件方便使用,可以看看
官网文档
了解;我这里的实践是以下:
input:mysql,mysql的一行记录读取为一个事件(event)
filter:filter给数据加上@metadata标识数据的状态
output:ES
主要实现删除,是通过filter给数据加上状态,然后output里面根据状态,做出不同的响应,配置文件举个例子应该就很清楚;
input {
jdbc {
# /logstash-7.6.2/logstash-core/lib 如果把jar包放在这里,则jdbc_driver_library => ""可以这么写
jdbc_driver_library => "填jar包放得路径即可"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "数据库连接地址"
jdbc_user => "root"
jdbc_password => "root"
jdbc_default_timezone => "Asia/Shanghai"
# 每分钟同步一次
schedule => "* * * * *"
statement_filepath => "写你放sql文件的地址"
tracking_column => "update_time"
tracking_column_type => "timestamp"
lowercase_column_names => false
record_last_run => true
# When set to true, uses the defined tracking_column value as the :sql_last_value. When set to false, :sql_last_value reflects the last time the query was executed.
# 当设置为true的时候,:sql_last_value值使用的是我们tracking_column 设置的值,不会改变,如果我们要达到增量更新的目的,应该设置为false,即使用我们上次执行查询的值(一直在变化)
use_column_value => false
clean_run => false
last_run_metadata_path => "填自己的地址"
filter {
# deleted、disabled =1为删除或者禁用的数据,这里判断然后给加上delete标识
# 我理解的action相当于 @metadata对象的一个属性,应该还可以加其他的,比如[@metadata][test]
if [deleted] == 1 or [diasbled] == 1 {
mutate{ add_field => { "[@metadata][action]" => "delete"}}
} else {
mutate{ add_field => { "[@metadata][action]" => "index"}}
# 状态字段使用完成以后,可以移除,避免同步到ES中
mutate {
remove_field => ["deleted", "diasbled"]
output {
elasticsearch {
# 主要实现想法,就来源于这里action可以指定,那么我前面给数据打上标识,就可以实现删除了
action => "%{[@metadata][action]}"
hosts => ["10.120.135.22:9200", "10.110.125.23:9200"]
index => "你的index名字,也可以使用变量"
document_id => "Q_%{questionId}"
stdout {
codec => json_lines
测试配置文件语法的命令:在bin目录下执行:./logstash -f ../conf/main.conf -t
贴上我的sql示例,应该便于理解:
select id AS questionId, question, deleted, disabled, update_time AS updateTime from question where update_time >= :sql_last_value;
deleted 删除标志:0-正常、1-删除
disabled 禁用标志:0-正常、1-禁用
我这里是根据update_time更新时间做增量更新
数据库里面下划线的字段比如update_time 一定要AS updateTime;
sql的列名不要使用type,type为logstash的关键字
一.前言 之前网上看了很多文章,说mysql同步数据到Elasticsearch只能做增量同步,即只能新增(add)或者修改(update),不支持删除(delete)操作。所以一般的做法是使用逻辑删除,查询时带上逻辑删除字段,然后使用定时器定时去物理删除状态为删除的数据。最近刚好应用,一开始也是采用逻辑删除的做法,但是我的sql语句关联了4张表,每张表的逻辑删除状态有deleted(删除状态)、status(禁用启用状态)两个,一共就是4*2=8个逻辑删除字段,查询ES时需带上8个字段,...
我们现在的同步, 是依靠 Logstash的 input-jdbc-plugin插件来实现的自动增量更新,这个的方案貌似只能 增量 添加数据而不能修改或者删除数据. 其实不然, 我们根据input-jdbc-plugin这个插件的一些配置, 是可以实现我们要的效果的.
方案原理:
用一个更新时间的字段来作为每次Logstash增...
之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个
网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,
使用logstash将mysql数据导入到elasticsearch过程:
1 安装elasticsearch 网址:https://www.elastic.co/downloads/elasticsearch,以下所有过程在win环境下。
2 解压启动,在win环境下进入bin目录,双击elasticsearch.bat启动,输入localhost:9200可以看到如下信息:
3 安装...
1. 安装
参考官方网站 Logstash
# curl -OL https://artifacts.elastic.co/downloads/logstash/logstash-7.13.2-linux-x86_64.tar.gz
# tar -xf logstash-7.13.2-linux-x86_64.tar.gz -C /usr/local/
# mv /usr/local/logstash-7.13.2/ /usr/local/logstash
2. 测试运行
许多将数据驱动到 Elasticsearch 中的系统将利用 Elasticsearch 为新插入的文档自动生成的 id 值。 但是,如果数据源意外地将同一文档多次发送到Elasticsearch,并且如果将这种自动生成的_id值用于Elasticsearch插入的每个文档,则该同一文档将使用不同的_id值多次存储在Elasticsearch中。 如果发生这种情况,那么可能有必要找到并删除此类重复项。 因此,在此博客文章中,我们介绍如何通过
使用Logstash
使用Python编写的自定义代码从Ela
最近又重新在看ElasticSearch的文档,发现那些DSL语法全都忘记了,所以准备写一个用ES做储存的demo小项目。其实是用DSL代替之前项目的SQL,但是数据以及一些字段还是需要,所以就需要将以前的MySQL数据导入到ElasticSearch中。
以前的做法是写一个脚本,通过创建索引,创建文档,将MySQL数据插入到ElasticSearch中,现在想通过Elastic中的成员之一——Logstash,来完成初步的导入工作。下面就从基础简单介绍该方式的导...
jdbc {
type => "hot_words"
jdbc_driver_library => "D:/ELK/logstash/bin/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
之前博文介绍过了mysql/oracle与ES之间的同步机制。而logstash最初始的日志同步功能还没有介绍。本文就logstash同步日志到ES做下详细解读。
1、目的:
将本地磁盘存储的日志文件同步(全量同步、实时增量同步)到ES中。
2、源文件:
[root@5b9dbaaa148a test_log]# ll
-rwxrwxrwx 1 root root 170 Jul 5 08:02 logmachine.sh
-rw-r--r-- 1 root root 66 Jul 5 08:25 MProbe01.log
-rw-r--r-- 1 root root 74
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。
logstash安装部署
1 . 拉取logstash镜像 (需要与es版本对应)
2 . 构建logstash容器
3 . 进入logstash容器内部安装
问题:通过
logstash同步mysql数据到
ES,
mysql数据删除后,
ES里面的
数据不会删掉
这时候会导致
ES和
mysql数据不一致,如果对
数据一致性要求不是那么强,可以用MQ消息队列做,如果对
数据一致性要求很高,比如电商项目,这样会出现很大的问题,也可以在
Mysql删除成功后,再
删除ES里面的
数据
下面是具体解决方案
参考链接:https://www.zhihu.com/qu
estion/351802336/answer/868380109
一:logstash概述:
简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。
logstash常用于日志系统中做日志采集设备,最常用于ELK中作为日志收集器使用
二:logstash作用:
集中、转换和存储你的数据,是一个开源的服务器端数据处理管道,可以同时从多个数据源获取数据,并对其进行转换,然后将其发送到你最喜欢的“存