之前网上看了很多文章,说mysql同步数据到Elasticsearch只能做增量同步,即只能新增(add)或者修改(update),不支持删除(delete)操作。所以一般的做法是使用逻辑删除,查询时带上逻辑删除字段,然后使用定时器定时去物理删除状态为删除的数据。最近刚好应用,一开始也是采用逻辑删除的做法,但是我的sql语句关联了4张表,每张表的逻辑删除状态有deleted(删除状态)、status(禁用启用状态)两个,一共就是4*2=8个逻辑删除字段,查询ES时需带上8个字段,状态字段太多了。这种做法实现后,花了一些时间看文档,发现可以实现mysql同步数据到ES实现删除,特此记录一下。

先说一下两种删除:

物理 删除:即采用delete from table where id = 1;采用delete将数据直接从数据库删除;

逻辑 删除:update table set deleted = 1(删除)where id =1;将数据标记为删除状态;

本文章实现的删除,主要是MySQL采用逻辑删除,同步到ES,ES采用物理删除。如果MySQL数据是物理删除,则logstash感知不到物理删除的数据,并不能更改到ES的数据状态。如果需要感知到MySQL的物理删除,需要使用其他数据同步方案,例如canal。

mysql:5.6

logstash:7.6.2(版本与ES版本保持一致,6.x及以上版本支持pipeline)

Elasticsearch:7.6.2

三.安装logstash

环境:linux

mysql以及Elasticsearch的安装不在此教程;

3.1logstash安装教程

1.安装JDK1.8(版本对应需1.8),及配置JDK环境变量。

2.解压:tar -zxvf logstash-7.6.2.tar.gz

3.我这边使用的input插件是:logstash-input-jdbc;output插件是logstash-output-elasticsearch; 这两个插件属于常见插件,logstash已经集成了,无需额外安装;

查看logstash已安装插件的命令 在安装目录的bin下面执行:./logstash-plugin list ;

4.使用logstash-input-jdbc输入插件,需要连接mysql驱动的jar包,将jar包放入指定目录,然后在配置文件的jdbc_driver_library 配置上对应的路径即可;

官网下载很慢,先上网盘链接,需要的自取

链接:https://pan.baidu.com/s/1z2jmU6AFQ2mV521dl7CgTg 提取码:yd8r

logstash主要是三大块,input:数据源,filter:过滤器对数据进行过滤,output:输出;这三块都有很多插件方便使用,可以看看 官网文档 了解;我这里的实践是以下:

input:mysql,mysql的一行记录读取为一个事件(event)

filter:filter给数据加上@metadata标识数据的状态

output:ES

主要实现删除,是通过filter给数据加上状态,然后output里面根据状态,做出不同的响应,配置文件举个例子应该就很清楚;

input {
   jdbc {
       # /logstash-7.6.2/logstash-core/lib 如果把jar包放在这里,则jdbc_driver_library => ""可以这么写
       jdbc_driver_library => "填jar包放得路径即可"
       jdbc_driver_class => "com.mysql.cj.jdbc.Driver" 
       jdbc_connection_string => "数据库连接地址"
       jdbc_user => "root"
       jdbc_password => "root"
       jdbc_default_timezone => "Asia/Shanghai"
       # 每分钟同步一次
       schedule => "* * * * *"
       statement_filepath => "写你放sql文件的地址"
       tracking_column => "update_time"
       tracking_column_type => "timestamp"
       lowercase_column_names => false
       record_last_run => true
       # When set to true, uses the defined tracking_column value as the :sql_last_value. When set to false, :sql_last_value reflects the last time the query was executed.
       # 当设置为true的时候,:sql_last_value值使用的是我们tracking_column 设置的值,不会改变,如果我们要达到增量更新的目的,应该设置为false,即使用我们上次执行查询的值(一直在变化)
       use_column_value => false
       clean_run => false
       last_run_metadata_path => "填自己的地址"
filter {
  # deleted、disabled =1为删除或者禁用的数据,这里判断然后给加上delete标识
  # 我理解的action相当于 @metadata对象的一个属性,应该还可以加其他的,比如[@metadata][test]
  if [deleted] == 1 or [diasbled] == 1 {
    mutate{ add_field => { "[@metadata][action]" => "delete"}} 
  } else {
    mutate{ add_field => { "[@metadata][action]" => "index"}} 
  # 状态字段使用完成以后,可以移除,避免同步到ES中
  mutate {
      remove_field => ["deleted", "diasbled"]
output {
  elasticsearch {
    # 主要实现想法,就来源于这里action可以指定,那么我前面给数据打上标识,就可以实现删除了
    action => "%{[@metadata][action]}"
    hosts => ["10.120.135.22:9200", "10.110.125.23:9200"]
    index => "你的index名字,也可以使用变量"
    document_id => "Q_%{questionId}"
  stdout {
    codec => json_lines

测试配置文件语法的命令:在bin目录下执行:./logstash -f ../conf/main.conf -t

贴上我的sql示例,应该便于理解:

select id AS questionId, question, deleted, disabled, update_time AS updateTime from question where update_time >= :sql_last_value;
deleted 删除标志:0-正常、1-删除
disabled 禁用标志:0-正常、1-禁用
我这里是根据update_time更新时间做增量更新
数据库里面下划线的字段比如update_time 一定要AS updateTime;
sql的列名不要使用type,type为logstash的关键字
一.前言 之前网上看了很多文章,说mysql同步数据到Elasticsearch只能做增量同步,即只能新增(add)或者修改(update),不支持删除(delete)操作。所以一般的做法是使用逻辑删除,查询时带上逻辑删除字段,然后使用定时器定时去物理删除状态为删除的数据。最近刚好应用,一开始也是采用逻辑删除的做法,但是我的sql语句关联了4张表,每张表的逻辑删除状态有deleted(删除状态)、status(禁用启用状态)两个,一共就是4*2=8个逻辑删除字段,查询ES时需带上8个字段,... 我们现在的同步, 是依靠 Logstash的 input-jdbc-plugin插件来实现的自动增量更新,这个的方案貌似只能 增量 添加数据而不能修改或者删除数据. 其实不然, 我们根据input-jdbc-plugin这个插件的一些配置, 是可以实现我们要的效果的. 方案原理: 用一个更新时间的字段来作为每次Logstash增...
之前博客有用logstash-input-jdbc同步mysql数据ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个 网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据
使用logstashmysql数据导入到elasticsearch过程: 1 安装elasticsearch 网址:https://www.elastic.co/downloads/elasticsearch,以下所有过程在win环境下。 2 解压启动,在win环境下进入bin目录,双击elasticsearch.bat启动,输入localhost:9200可以看到如下信息: 3 安装...
1. 安装 参考官方网站 Logstash # curl -OL https://artifacts.elastic.co/downloads/logstash/logstash-7.13.2-linux-x86_64.tar.gz # tar -xf logstash-7.13.2-linux-x86_64.tar.gz -C /usr/local/ # mv /usr/local/logstash-7.13.2/ /usr/local/logstash 2. 测试运行
许多将数据驱动到 Elasticsearch 中的系统将利用 Elasticsearch 为新插入的文档自动生成的 id 值。 但是,如果数据源意外地将同一文档多次发送到Elasticsearch,并且如果将这种自动生成的_id值用于Elasticsearch插入的每个文档,则该同一文档将使用不同的_id值多次存储在Elasticsearch中。 如果发生这种情况,那么可能有必要找到并删除此类重复项。 因此,在此博客文章中,我们介绍如何通过 使用Logstash 使用Python编写的自定义代码从Ela
最近又重新在看ElasticSearch的文档,发现那些DSL语法全都忘记了,所以准备写一个用ES做储存的demo小项目。其实是用DSL代替之前项目的SQL,但是数据以及一些字段还是需要,所以就需要将以前的MySQL数据导入到ElasticSearch中。 以前的做法是写一个脚本,通过创建索引,创建文档,将MySQL数据插入到ElasticSearch中,现在想通过Elastic中的成员之一——Logstash,来完成初步的导入工作。下面就从基础简单介绍该方式的导... jdbc { type => "hot_words" jdbc_driver_library => "D:/ELK/logstash/bin/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" 之前博文介绍过了mysql/oracle与ES之间的同步机制。而logstash最初始的日志同步功能还没有介绍。本文就logstash同步日志到ES做下详细解读。 1、目的: 将本地磁盘存储的日志文件同步(全量同步、实时增量同步)到ES中。 2、源文件: [root@5b9dbaaa148a test_log]# ll -rwxrwxrwx 1 root root 170 Jul 5 08:02 logmachine.sh -rw-r--r-- 1 root root 66 Jul 5 08:25 MProbe01.log -rw-r--r-- 1 root root 74
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。 logstash安装部署 1 . 拉取logstash镜像 (需要与es版本对应) 2 . 构建logstash容器 3 . 进入logstash容器内部安装
问题:通过logstash同步mysql数据ESmysql数据删除后,ES里面的数据不会删掉 这时候会导致ESmysql数据不一致,如果对数据一致性要求不是那么强,可以用MQ消息队列做,如果对数据一致性要求很高,比如电商项目,这样会出现很大的问题,也可以在Mysql删除成功后,再删除ES里面的数据 下面是具体解决方案 参考链接:https://www.zhihu.com/question/351802336/answer/868380109
一:logstash概述: 简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。 logstash常用于日志系统中做日志采集设备,最常用于ELK中作为日志收集器使用 二:logstash作用: 集中、转换和存储你的数据,是一个开源的服务器端数据处理管道,可以同时从多个数据源获取数据,并对其进行转换,然后将其发送到你最喜欢的“存