当前位置: > 其它学习 > Elasticsearch >

Filebeat + ES + Kibana日志解析痛点解决

时间:2019-12-27 13:53来源:linux.it.net.cn 作者:IT

使用Filebeat + ES + Kibina的组合进行日志收集的一个优点就是轻量级,因为去掉了笨重的logstash, 占用资源更少。但这也引入了一个问题,即filebeat并没有logstash那样强大的日志解析能力,往往只能把整条日志当成一个整体扔到ES中。好消息是,ES从5.x版本开始引入了Ingest Node,即允许你在对文档进行索引之前进行预处理,且支持logstash的Grok语法。因此我们可以在ES中针对我们的日志格式创建一个预处理pipeline, 通过配置pipeline中的processor完成日志解析。
 
以下面这条日志为基础举例:
 
[2019-02-19 17:04:28:017] http-nio-8050-exec-2 INFO c.b.o.xxx.ms.api.TaskController - response = {"jobId":"123","ms":10}
 
我们期望能够将这条日志中的时间2019-02-19 17:04:28:017、线程http-nio-8050-exec-2、日志级别INFO、Java类名c.b.o.xxx.ms.api.TaskController和日志正文response = {"jobId":"123","ms":10}分别提取出来方便我们日后在kibana中做筛选统计, 同时时间要以日志中打印的时间为基准而不是filebeat发送消息时的时间。为了实现这一目标,我们可以向ES发一个HTTP请求创建一个名为xxx-log的pipeline:
 
PUT /_ingest/pipeline/xxx-log HTTP/1.1
Host: localhost:8200
Content-Type: application/json
{
  "description" : "xxx-log",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["\\[%{TIMESTAMP_ISO8601:log_time}\\] %{NOTSPACE:thread} %{NOTSPACE:log_level} %{NOTSPACE:java_class} - %{GREEDYDATA:content}"]
      }
    },
    {
    "date": {
    "field": "log_time",
    "formats": ["yyyy-MM-dd HH:mm:ss:SSS"],
    "timezone": "Asia/Shanghai",
    "target_field": "@timestamp"
    }
    }
  ]
}
 
 
在这里我们定义了两个processor,第一个为grok处理器,用于解析日志字符串提取其中关键字段; 第二个是日期处理器,功能为把log_time字段以yyyy-MM-dd HH:mm:ss:SSS格式解析成日期,然后将结果保存到@timestamp字段中。
 
创建完processor以后,我们只需要配置filebeat在输出日志到ES时使用这个名为xxx-log的预处理器即可:
 
这样就完成了所有的工作。这时启动filebeat, 如果如出以下错误信息
 
ERROR pipeline/output.go:92 Failed to publish events: temporary bulk send failure
 
大概率是因为你发送的日志格式无法与grok表达式匹配,修改processor定义json即可。也可以在启动filebeat时添加-d "*"参数来查看具体的错误原因。
 
下图是日志在kibana中的展示效果:


 
可以看到主要的字段都已经被正确解析。
 

(责任编辑:IT)
------分隔线----------------------------