线上日志结构化收集

​ 写这篇文章的起因是因为在我们的生产环境中,部署节点比较多,机器也比较多,平时看日志只能一台一台登录的去查看,很难受。
单纯使用ELK的话,日志内容是非结构化的,就是一大坨文字,非常不直观,因此把我线上结构化日志的流程分享一下

1.日志规范定义

不管使用什么方式收集,日志的来源是一定要符合我们预想的结构的,否则日志结构化就无从谈起,所以第一步,在我们所有的java应用中首先要规范logback.xml或者其他的日志文件格式,以下是目前我规范出的日志格式

 <pattern><![CDATA[
%n%-4r | %d{yyyy-MM-dd HH:mm:ss} | %X{method} | %X{requestURIWithQueryString} | [ip=%X{remoteHost}, ref=%X{referrer}, ua=%X{userAgent}] | %level | %logger{35} | %m%n
 ]]></pattern>

其日志格式分别为 线程id | 时间 | http请求方式 | http请求路径 | 请求方的信息 ip、设备等 | 日志级别 | 日志内容
这里使用了" | " 来分隔不同的数据,方便我们转换的时候能更加方便一点

2. filebeat配置

https://www.elastic.co/cn/downloads/past-releases/
elastic旗下的技术栈都可以在这个网址下载,还可以任意选择版本。
我是推荐在自己本地使用传统方式进行安装,而不是homeberw或者npm,因为方便控制。

至于filebeat是什么,我在这里就不细讲了,filebeat属于elastic旗下beats的一种,专门用来收集日志用,并且消耗资源非常小,轻量级采集器。
如果只使用 filebeat + logstash 那么配置方式如下

filebeat.inputs:
- type: log
  enabled: true
  paths:
     - /Users/lvqiushi/idea/engine/logs/test.log
  # 多行日志合并
  multiline.pattern: ^[0-9][0-9][0-9][0-9]
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 10s 
  # 额外增加的字段
  fields:
      serverip: vm58
      appname: qkcheat
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
setup.kibana:
output:
   logstash:
      hosts: ["127.0.0.1:5044"]
processors:
- drop_fields:
   #fields: ["beat", "input", "source", "offset","id","metadata"]
   #fields: ["beat.hostname", "beat.name", "beat.version", "input_type", "beat"]#
   #fields: ["agent.hostname", "agent.name", "agent.version", "input_type", "beat"]
    fields:
      - beat
      - host
      - input
      - source
      - offset
      - prospector
      - tags
      - agent
      - ecs
      - log

如果要接入kafka的话,只需要更改一下output就好了

output.kafka:
  enabled: true
  # kafka集群地址
  hosts: [xxx","xxx","xxx"]
  topic: 'xx' 
  partition.hash:
    reachable_only: true
  compression: gzip

重点讲一下日志合并问题,通常程序打印的错误日志是包括很多行的,所以要在filebeat或者logstash中将多行日志合并起来,我是推荐在filebeat采集的时候就合并的。
至于filebeat中 multiline.*具体的参数含义大家百度一下就可以知道了。

3 logstash配置

logstash配置的话会比较难一点(logstash 高版本自带了本文所使用的所有插件,如果是4.x或者5.x还需要安装插件),
首先要区分使用的是filebeat直连,还是 filebeat + kafka,因为如果经过了kafka这一层以后,收到的结构会变的不一样,需要转换一下,很坑。

至于解析日志的方式—

  1. 我目前所使用的的思路是,写ruby脚本,分隔我定义好的日志列分隔符,从而把所需的字段一个个取出去。
  2. 使用gork插件通过正则表达式的方式也可以实现,但是考虑到正则匹配会比较困难,且消耗的cpu要比简单的字符串分隔要高,就放弃了这种想法。
    如果使用正则表达式匹配,可以在这个网址中进行尝试http://grokdebug.herokuapp.com/。

filebeat直连logstash下的配置

input {
    # stdin{} 命令行输入调试使用
 	beats {
   		port => 5044
filter {
     ruby {
        code =>'
            message = event.get("message")
            arr = message.split(" | ")
            if arr.length >= 4
               event.set("thread", arr[0])
               event.set("time", arr[1])
               event.set("method", arr[2])
               event.set("url", arr[3])
               event.set("base_params", arr[4])
               event.set("level", arr[5])
               event.set("java_class", arr[6])
               event.set("log", arr[7])
               fileds = event.get("fields")
               event.set("server_ip",fileds["serverip"] )
               event.set("app_name",fileds["appname"] )
               event.set("[@metadata][show]", "is")
       remove_field => [ "message", "tags", "fileds"]
output {
	if[@metadata][show] == "is" {
       stdout {}

filebeat + kafka + logstash配置(只增加了一个json格式化,因为从kafka过来的mq是纯字符串)

input {
      kafka {
            bootstrap_servers=>"xx"
            topics=>["xx"]
filter {
     json {
        source => "message"
     ruby {
        code =>'
            message = event.get("message")
            arr = message.split(" | ")
            if arr.length >= 4
               event.set("thread", arr[0])
               event.set("time", arr[1])
               event.set("method", arr[2])
               event.set("url", arr[3])
               event.set("base_params", arr[4])
               event.set("level", arr[5])
               event.set("java_class", arr[6])
               event.set("log", arr[7])
               fileds = event.get("fields")
               event.set("server_ip",fileds["serverip"] )
               event.set("app_name",fileds["appname"] )
               event.set("[@metadata][show]", "is")
       remove_field => [ "message", "tags", "fileds"]
output {
	if[@metadata][show] == "is" {
       stdout {}

上面配置中的output都是直接打印在了控制台内,进行调试试用,如果调试完成,再换成合适的output输出,考虑到日志量庞大的问题,我选择的是放到es中进行存储。
由于日志放进ES已经被格式化了,只需要再让前端写一个很简单很简单的页面,来方便查询就可以了。
至于为什么不直接用kibana,kibana的搜索条件实在是太难用了,搜索日志应该尽可能的简单好用为主,例如我们最常用的grep功能,所以只需要放两个搜索框即可。

线上日志结构化收集​ 写这篇文章的起因是因为在我们的生产环境中,部署节点比较多,机器也比较多,平时看日志只能一台一台登录的去查看,很难受。单纯使用ELK的话,日志内容是非结构化的,就是一大坨文字,非常不直观,因此把我线上结构化日志的流程分享一下1.日志规范定义不管使用什么方式收集,日志的来源是一定要符合我们预想的结构的,否则日志结构化就无从谈起,所以第一步,在我们所有的java应用中首先要规范logback.xml或者其他的日志文件格式,以下是目前我规范出的日志格式 &lt;pa input{ file { path =&amp;gt; &quot;/glusterfs/logs/dot/2018/06/28-dot-dot-app-774f45fc85-62q8f.log&quot; start_position =&amp;gt; &quot;beginning&quot; output { stdo...
output插件是经过了input,然后过滤结构化数据之后,接下来我们需要借助output传到我们想传到的地方.output相当于一个输出管道。 将采集数据标准输出到控制台 output { stdout { codec => rubydebug Codec 来自 Coder/decoder 两个单词的首字母缩写,Logst...
logstash logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。 首先将数据传给logstash,它将数据进行过滤和格式化(转成JSON格式),然后传给Elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜...
file { path =&amp;gt; &quot;/usr/local/log_test/*/*/*.log&quot; start_position =&amp;gt; &quot;beginning&quot; output { stdout { codec =&amp;gt; json 可以看到输出的数据类型 1、  logstash的安装与启动 1)下载logstash, https://download.elasticsearch.org/logstash/logstash/logstash-1.4.0.tar.gz 2)解压文件logstash-1.4.0.tar.gz tar –zxvf logstash-1.4.0.tar.gz cd logstash-1.