Logstash-数据流引擎
Logstash-数据流引擎
作者 | WenasWei
一 Logstash
Logstash是具有实时流水线功能的开源数据收集引擎。Logstash可以动态统一来自不同来源的数据,并将数据标准化到您选择的目标位置。清除所有数据并使其民主化,以用于各种高级下游分析和可视化用例。
1.1 Logstash简介
Logstash 是一个数据流引擎:
- 它是用于数据物流的开源流式 ETL(Extract-Transform-Load)引擎
- 在几分钟内建立数据流管道
- 具有水平可扩展及韧性且具有自适应缓冲
- 不可知的数据源
- 具有200多个集成和处理器的插件生态系统
- 使用 Elastic Stack 监视和管理部署
官方介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。
1.2 数据处理
Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。
当下最为流行的数据源:
Logstash 可以摄入日志,文件,指标或者网路真实数据。经过 Logstash 的处理,变为可以使用的 Web Apps 可以消耗的数据,也可以存储于数据中心,或变为其它的流式数据:
- Logstash 可以很方便地和 Beats一起合作,这也是被推荐的方法
- Logstash 也可以和那些著名的云厂商的服务一起合作处理它们的数据
- 它也可以和最为同样的信息消息队列,比如 redis 或 kafka 一起协作
- Logstash 也可以使用 JDBC 来访问 RDMS 数据
- 它也可以和 IoT 设备一起处理它们的数据
- Logstash 不仅仅可以把数据传送到 Elasticsearch,而且它还可以把数据发送至很多其它的目的地,并作为它们的输入源做进一步的处理
二 Logstash系统架构
Logstash 包含3个主要部分: 输入(inputs),过滤器(filters)和输出(outputs)
Logstash的事件(logstash将数据流中等每一条数据称之为一个event)处理流水线有三个主要角色完成:inputs –> filters –> outputs:
- inpust:必须,负责产生事件(Inputs generate events),常用:File、syslog、redis、kakfa、beats(如:Filebeats)
- filters:可选,负责数据处理与转换(filters modify them),常用:grok、mutate、drop、clone、geoip
- outpus:必须,负责数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、kakfa、statsd
三 Logstash安装
3.1 环境清单
- 操作系统:Linux #56-Ubuntu SMP Tue Jun 4 22:49:08 UTC 2019 x86_64
- Logstash版本:logstash-6.2.4
- Jdk版本:1.8.0_152
3.2 Linux安装JDK
3.2.1 解压缩并移动到指定目录(约定的目录:/usr/local)
(1)解压缩
tar -zxvf jdk-8u152-linux-x64.tar.gz
(2)创建目录
mkdir -p /usr/local/java
(3)移动安装包
mv jdk1.8.0_152/ /usr/local/java/
(4)设置所有者
chown -R root:root /usr/local/java/
3.2.2 配置环境变量
(1)配置系统环境变量
vi /etc/environment
(2)添加如下语句
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"
export JAVA_HOME=/usr/local/java/jdk1.8.0_152
export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
(3)配置用户环境变量
nano /etc/profile
(4)添加如下语句(一定要放中间)
if [ "$PS1" ]; then
if [ "$BASH" ] && [ "$BASH" != "/bin/sh" ]; then
# The file bash.bashrc already sets the default PS1.
# PS1='\h:\w\$ '
if [ -f /etc/bash.bashrc ]; then
. /etc/bash.bashrc
if [ "`id -u`" -eq 0 ]; then
PS1='# '
PS1='$ '
export JAVA_HOME=/usr/local/java/jdk1.8.0_152
export JRE_HOME=/usr/local/java/jdk1.8.0_152/jre
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOME/bin
if [ -d /etc/profile.d ]; then
for i in /etc/profile.d/*.sh; do
if [ -r $i ]; then
unset i
fi
(5)使用户环境变量生效
source /etc/profile
(6)测试是否安装成功
$ java -version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
3.3 安装Logstash
3.3.1 创建安装目录
$ sudo mkdir /usr/local/logstash
3.3.2 下载Logstash安装文件
$ wget -P /usr/local/logstash https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
3.3.2 解压缩安装文件
$ cd /usr/local/logstash/
$ sudo tar -zxvf logstash-6.2.4.tar.gz
3.3.3 测试安装是否成功
测试: 快速启动,标准输入输出作为input和output,没有filter
$ cd logstash-6.2.4/
$ ./bin/logstash -e 'input { stdin {} } output { stdout {} }'
Sending Logstash's logs to /usr/local/logstash/logstash-6.2.4/logs which is now configured via log4j2.properties
[2021-05-27T00:22:28,729][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/fb_apache/configuration"}
[2021-05-27T00:22:28,804][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/logstash-6.2.4/modules/netflow/configuration"}
[2021-05-27T00:22:29,827][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-05-27T00:22:30,979][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2021-05-27T00:22:31,821][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2021-05-27T00:22:36,463][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2021-05-27T00:22:36,690][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x55a5abea run>"}
The stdin plugin is now waiting for input:
[2021-05-27T00:22:36,853][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
## 此时命令窗口停留在等待输入状态,键盘键入任意字符 ##
hello world
## 下方是Logstash输出到效果 ##
"@timestamp" => 2021-05-26T16:22:52.527Z,
"host" => "*******",
"message" => "hello world",
"@version" => "1"
}
四 Logstash参数与配置
4.1 常用启动参数
参数 | 说明 | 举例 |
---|
4.2 配置文件结构及语法
(1)区段
Logstash通过{}来定义区域,区域内可以定义插件,一个区域内可以定义多个插件,如下:
input {
stdin {
beats {
port => 5044
}
(2)数据类型
Logstash仅支持少量的数据类型:
- Boolean:ssl_enable => true
- Number:port => 33
- String:name => “Hello world”
- Commonts:# this is a comment
(3)字段引用
Logstash数据流中的数据被称之为Event对象,Event以JSON结构构成,Event的属性被称之为字段,如果你像在配置文件中引用这些字段,只需要把字段的名字写在中括号[]里就行了,如[type],对于嵌套字段每层字段名称都写在[]里就可以了,比如:[tags][type];除此之外,对于Logstash的arrag类型支持下标与倒序下表,如:[tags][type][0],[tags][type][-1]。
(4)条件判断
Logstash支持下面的操作符:
- equality:==, !=, <, >, <=, >=
- regexp:=~, !~
- inclusion:in, not in
- boolean:and, or, nand, xor
- unary:!
例如:
if EXPRESSION {
} else if EXPRESSION {
} else {
}
(5)环境变量引用
Logstash支持引用系统环境变量,环境变量不存在时可以设置默认值,例如:
export TCP_PORT=12345
input {
tcp {
port => "${TCP_PORT:54321}"
}
4.3 常用输入插件(Input plugin)
输入插件包含有以下多种,详情查看官网文档- 常用输入插件 :
- elasticsearch
- exec
- file
- github
- http
- jdbc
- jms
- jmx
- kafka
- log4j
- rabbitmq
- redis
- tcp
- udp
- unix
- websocket
4.3.1 File读取插件
文件读取插件主要用来抓取文件的变化信息,将变化信息封装成Event进程处理或者传递。
- 配置事例
input
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
- 常用参数
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.3.2 TCP监听插件
TCP插件有两种工作模式,“Client”和“Server”,分别用于发送网络数据和监听网络数据。
- 配置事例
tcp {
port => 41414
}
- 常用参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.3.3 Redis读取插件
用于读取Redis中缓存的数据信息。
- 配置事例
input {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash-list"
}
- 常用参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
注意:
data_type 需要注意的是“channel”和“pattern_channel”是广播类型,相同的数据会同时发送给订阅了该channel的logstash,也就是说在logstash集群环境下会出现数据重复,集群中的每一个节点都将收到同样的数据,但是在单节点情况下,“pattern_channel”可以同时定于满足pattern的多个key
4.3.4 Kafka读取插件
用于读取Kafka中推送的主题数据信息。
- 配置事例
input {
kafka {
bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092"
topics_pattern => "elk-.*"
consumer_threads => 5
decorate_events => true
codec => "json"
auto_offset_reset => "latest"
group_id => "logstash1"##logstash 集群需相同
}
- 常用参数:
参数名称 | 类型 | 默认值 | 描述信息 |
---|
注意:
- auto_offset_reset: earliest-将偏移量自动重置为最早的偏移量;latest-自动将偏移量重置为最新偏移量;none-如果未找到消费者组的先前偏移量,则向消费者抛出异常;anything else-向消费者抛出异常。
- decorate_events: none:未添加元数据,basic:添加了记录的属性,extended:记录的属性,添加标题,false:不建议使用的别名 none,true:不建议使用的别名 basic
4.4 常用过滤插件(Filter plugin)
丰富的过滤器插件的是 logstash威力如此强大的重要因素,过滤器插件主要处理流经当前Logstash的事件信息,可以添加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也可以根据条件判断来进行不同的数据处理方式,详情查看官网文档- 常用过滤插件
4.4.1 grok正则捕获
grok 是Logstash中将非结构化数据解析成结构化数据以便于查询的最好工具,非常适合解析syslog logs,apache log, mysql log,以及一些其他的web log
(1)预定义表达式调用:
- Logstash提供120个常用正则表达式可供安装使用,安装之后你可以通过名称调用它们,语法如下:%{SYNTAX:SEMANTIC}
- SYNTAX:表示已经安装的正则表达式的名称
- SEMANTIC:表示从Event中匹配到的内容的名称
例如: Event的内容为“[debug] 127.0.0.1 - test log content”,匹配%{IP:client}将获得“client: 127.0.0.1”的结果,前提安装了IP表达式;如果你在捕获数据时想进行数据类型转换可以使用%{NUMBER:num:int}这种语法,默认情况下,所有的返回结果都是string类型,当前Logstash所支持的转换类型仅有“int”和“float”;
一个稍微完整一点的事例:
- 日志文件http.log内容:55.3.244.1 GET /index.html 15824 0.043
- 表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
- 配置文件内容:
input {
file {
path => "/var/log/http.log"
filter {
grok {
match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
}
- 输出结果:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
(2)自定义表达式调用
语法:(?<field_name>the pattern here)
举例:捕获10或11和长度的十六进制queue_id可以使用表达式(?<queue_id>[0-9A-F]{10,11}) 安装自定义表达式
与预定义表达式相同,你也可以将自定义的表达式配置到Logstash中,然后就可以像于定义的表达式一样使用;以下是操作步骤说明:
- 1、在Logstash根目录下创建文件夹“patterns”,在“patterns”文件夹中创建文件“extra”(文件名称无所谓,可自己选择有意义的文件名称);
- 2、在文件“extra”中添加表达式,格式:patternName regexp,名称与表达式之间用空格隔开即可,如下:
# contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11}
- 3、使用自定义的表达式时需要指定“patterns_dir”变量,变量内容指向表达式文件所在的目录,举例如下:
<1>日志内容
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
<2>Logstash配置
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
<3>运行结果
timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
(3)grok常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.4.2 date时间处理插件
该插件用于时间字段的格式转换,比如将“Apr 17 09:32:01”(MMM dd HH:mm:ss)转换为“MM-dd HH:mm:ss”。而且通常情况下,Logstash会为自动给Event打上时间戳,但是这个时间戳是Event的处理时间(主要是input接收数据的时间),和日志记录时间会存在偏差(主要原因是buffer),我们可以使用此插件用日志发生时间替换掉默认是时间戳的值。
常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
注意:
match的格式:时间字段匹配,可自定多种格式,直到匹配到或匹配结束,格式:
[ field,formats…],如:match=>[ “logdate”,“MMM dd yyyy HH:mm:ss”,“MMM d yyyy HH:mm:ss”,“ISO8601”]
4.4.3 mutate数据修改插件
mutate 插件是 Logstash另一个重要插件。它提供了丰富的基础类型数据处理能力。可以重命名,删除,替换和修改事件中的字段。
常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.4.4 JSON插件
JSON插件用于解码JSON格式的字符串,一般是一堆日志信息中,部分是JSON格式,部分不是的情况下
(1)配置事例
json {
source => ...
}
-
事例配置,message是JSON格式的字符串:
"{\"uid\":3081609001,\"type\":\"signal\"}"
filter {
json {
source => "message"
target => "jsoncontent"
}
- 输出结果:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "{\"uid\":3081609001,\"type\":\"signal\"}",
"jsoncontent": {
"uid": 3081609001,
"type": "signal"
}
-
如果从事例配置中删除
target
,输出结果如下:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "{\"uid\":3081609001,\"type\":\"signal\"}",
"uid": 3081609001,
"type": "signal"
}
(2)常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.4.5 elasticsearch查询过滤插件
用于查询Elasticsearch中的事件,可将查询结果应用于当前事件中
常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.5 常用输出插件(Output plugin)
4.5.1 ElasticSearch输出插件
用于将事件信息写入到Elasticsearch中,官方推荐插件,ELK必备插件
(1)配置事例
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "filebeat-%{type}-%{+yyyy.MM.dd}"
template_overwrite => true
}
(2)常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.5.2 Redis输出插件
用于将Event写入Redis中进行缓存,通常情况下Logstash的Filter处理比较吃系统资源,复杂的Filter处理会非常耗时,如果Event产生速度比较快,可以使用Redis作为buffer使用
(1)配置事例
output {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash-list"
}
(2)常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.5.3 File输出插件
用于将Event输出到文件内
(1)配置事例
output {
file {
path => ...
codec => line { format => "custom format: %{message}"}
}
(2)常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.5.4 Kafka输出插件
用于将Event输出到Kafka指定的Topic中, 官网Kafka详情配置
(1)配置事例
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "test"
compression_type => "gzip"
}
(2)常用配置参数(空 => 同上)
参数名称 | 类型 | 默认值 | 描述信息 |
---|
4.6 常用编码插件(Codec plugin)
4.6.1 JSON编码插件
直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置
- 配置事例
json {
}
- 常用配置参数 | 参数名称 | 类型 | 默认值 | 描述信息 | | ------------- | ------ | ------- | -------- | | charset | string | “UTF-8” | 字符集 | | enable_metric | | | | | id | | | |
五 Logstash实例
5.1 接收Filebeat事件,输出到Redis
input {
beats {
port => 5044
output {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash-list"
}
5.2 读取Redis数据,根据“type”判断,分别处理,输出到ES
input {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash-list"
filter {
if [type] == "application" {
grok {
match => ["message", "(?m)-(?<systemName>.+?):(?<logTime>(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) \[(?<level>(\b\w+\b)) *\] (?<thread>(\b\w+\b)) \((?<point>.*?)\) - (?<content>.*)"]
date {
match => ["logTime", "yyyy-MM-dd HH:mm:ss,SSS"]
json {
source => "message"
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
if [type] == "application_bizz" {
json {
source => "message"
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
mutate {
remove_field => ["@version", "beat", "logTime"]
output {
stdout{
elasticsearch {