《ELK Stack权威指南 》第2章 插件配置
插 件 配 置
插件是Logstash最大的特色。各种不同的插件源源不断地被创造出来,发布到社区中供大家使用。本章会按照插件的类别,对一般场景下的一些常用插件做详细的配置和用例介绍。本章介绍的插件包括:1)输入插件。基于shipper端场景,主要介绍STDIN、TCP、File等插件。2)编解码插件。编解码通常是会被遗忘的环节,但是运用好了,会大大提高工作效率,本节介绍最常用的JSON和multiline插件。3)过滤器插件。名为过滤器,其实各种数据裁剪和计算都可以在这类插件里完成,是Logstash最强大的一环。本节会详细介绍grok、date、mutate、ruby、metrics等插件的妙用。4)输出插件。Logstash虽然经常跟Elasticsearch并称,但是作为一个日志传输框架,它其实可以输出数据到各种不同的地方。比如Graphite、HDFS、Nagios等等。本章会介绍这些常用的输出插件用法。
2.1 输入插件
在“Hello World”示例中,我们已经见到并介绍了Logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍Logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。
限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“Hello World”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。
以上请读者自明。
2.1.1 标准输入
我们已经见过好几个示例使用stdin了。这也应该是Logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。
配置示例如下:
input {
stdin {
add_field => {"key" =>"value"}
codec =>"plain"
tags => ["add"]
type =>"std"
}
}
用上面的新stdin设置重新运行一次最开始的Hello World示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“Hello World”并回车后,你会在终端看到如下输出:
{
"message" =>"hello world",
"@version" =>"1",
"@timestamp" =>"2014-08-08T06:48:47.789Z",
"type" =>"std",
"tags" => [
[0] "add"
],
"key" =>"value",
"host" =>"raochenlindeMacBook-Air.local"
}
type和tags是Logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型—我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。
最常见的用法是像下面这样:
input {
stdin {
type =>"web"
}
}
filter {
if [type] == "web" {
grok {
match => ["message", %{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags] {
nagios_nsca {
nagios_status =>"1"
}
} else {
elasticsearch {
}
}
}
看起来蛮复杂的,对吧?
继续学习,你也可以写出来的。
2.1.2 文件输入
分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用Logstash来处理日志文件。
Logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心Logstash会漏过你的数据。
sincedb文件中记录了每个被监听的文件的inode, major number, minor number和pos。
配置示例如下:
input {
file {
path => ["/var/log/*.log", "/var/log/message"]
type =>"system"
start_position =>"beginning"
}
}
有一些比较有用的配置项,可以用来指定FileWatch库的行为:
discover_interval:Logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。
exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。
sincedb_path:如果你不想用默认的$HOME/.sincedb(Windows平台上为%USERPROFILE%\.sincedb,该变量默认值是:C:\Windows\System32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。
sincedb_write_interval:Logstash每隔多久写一次sincedb文件,默认是15秒。
stat_interval:Logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。
start_position:Logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,Logstash进程会以类似tail-F的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",Logstash进程就从头开始读取,有点类似于less+F命令的形式运行。
close_older:一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3 600秒,即一小时。
ignore_older:在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86 400秒,即一天。
注意事项如下:
1)通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。
2)FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。
3)LogStash::Inputs::File只是在进程运行的注册阶段初始化一个FileWatch对象。所以它不能支持类似fluentd那样的path =>"/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path =>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。
4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么Logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。此外,官方博客上提供了另一个巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定义为/dev/null,则每次重启自动从头开始读。
5)因为Windows平台上没有inode的概念,Logstash某些版本在Windows平台上监听文件不是很靠谱。Windows平台上,推荐考虑使用nxlog作为收集端,参见后面5.5节。
2.1.3 TCP输入
未来你可能会用Redis服务器或者其他的消息队列系统来作为Logstash Broker的角色。不过Logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。
虽然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。
配置示例如下:
input {
tcp {
port => 8888
mode =>"server"
ssl_enable => false
}
}
目前来看,LogStash::Inputs::TCP最常见的用法就是配合nc命令导入旧数据。在启动Logstash进程后,在另一个终端运行如下命令即可导入数据:
# nc 127.0.0.1 8888 < olddata
这种做法比用LogStash::Inputs::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,Logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。
2.1.4 syslog输入
syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。
我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把Logstash配置成一个syslog服务器来接收数据。
有关rsyslog的用法,后面的5.4节会有更详细的介绍。
配置示例如下:
input {
syslog {
port =>"514"
}
}
作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动Logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到Logstash里了。我们可以用自带的logger命令行工具发送一条“Hello World”信息到syslog里(即Logstash里)。看到的Logstash输出像下面这样:
{
"message" =>"Hello World",
"@version" =>"1",
"@timestamp" =>"2014-08-08T09:01:15.911Z",
"host" =>"127.0.0.1",
"priority" =>31,
"timestamp" =>"Aug 8 17:01:15",
"logsource" =>"raochenlindeMacBook-Air.local",
"program" =>"com.apple.metadata.mdflagwriter",
"pid" =>"381",
"severity" =>7,
"facility" =>3,
"facility_label" =>"system",
"severity_label" =>"Debug"
}
Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok来实现LogStash::Inputs::Syslog的。所以你其实可以直接用Logstash配置实现一样的效果,如下所示:
input {
tcp {
port =>"8514"
}
}
filter {
grok {
match => ["message", "%{SYSLOGLINE}" ]
}
syslog_pri { }
}
建议在使用LogStash::Inputs::Syslog的时候走TCP协议来传输数据。
因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。
如果你已经在使用UDP监听器收集日志,用下行命令检查你的UDP接收队列大小:
# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'
Recv-Q
228096
228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!
强烈建议使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合实现同样的syslog功能!
虽然LogStash::Inputs::Syslog在使用TCPServer的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降—经过测试,TCPServer每秒可以接收50 000条数据,而在同一线程中启用grok后每秒只能处理5 000条,再加上date只能达到500条!
才将这两步拆分到filters阶段后,Logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到每秒30 000条数据!
测试采用Logstash作者提供的命令:
yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000
出处见:https:// github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile
如果你实在没法切换到TCP协议,可以自己写程序,或者使用其他基于异步I/O框架(比如libev)的项目。下面是一个简单的异步I/O实现UDP监听数据输入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。
2.1.5 http_poller抓取
Logstash作为数据采集系统,也支持自己作为一个HTTP客户端去抓取网页数据或者接口数据。这方面有一个很明显的IT运维应用场景:很多业务系统软件本身提供了RESTful的内部运行状态接口,可以直接通过接口采集这些监控信息。
更长期的方案应该是编写对应的metricbeat模块,但是直接采用logstash-input-http_poller显然更快捷。
比如Nginx的性能状态,社区有一个非常全面的性能状态监控模块:nginx-module-vts。在新浪微博,后端池分为核心接口、非核心接口两块,我们要分别监控的话,nginx-module-vts的配置如下:
http {
vhost_traffic_status_zone;
map $uri $filter_uri {
default 'non-core';
/2/api/timeline core;
~^/2/api/unread core;
}
server {
vhost_traffic_status_filter_by_set_key $filter_uri;
location /status {
auth_basic "Restricted";
auth_basic_user_file pass_file;
vhost_traffic_status_display;
vhost_traffic_status_display_format json;
}
}
}
则对应的logstash-input-http_poller配置如下:
input {
http_poller {
urls => {
0 => {
method => get
url => "https://localhost:80/status/format/json"
headers => {
Accept => "application/json"
}
auth => {
user => "YouKnowIKnow"
password => "IKnowYouDonotKnow"
}
}
1 => {
method => get
url => "https://localhost:80/status/con ... up%3D*"
headers => {
Accept => "application/json"
}
auth => {
user => "YouKnowIKnow"
password => "IKnowYouDonotKnow"
}
}
}
request_timeout => 60
interval => 60
codec => "json"
}
}
这段配置就可以就可以每60秒获得一次 vts 数据,并重置计数了。
注意,url是一个Hash值,所以它的执行顺序是根据Hash.map来的,为了确保我们是先获取数据再重置,这里干脆用0, 1来作为Hash的key,这样顺序就没问题了。
2.2 编解码配置
Codec是Logstash从1.3.0版开始新引入的概念(Codec来自Coder/decoder两个单词的首字母缩写)。
在此之前,Logstash只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入期处理不同类型的数据,这全是因为有了Codec设置。
所以,这里需要纠正之前的一个概念。Logstash不只是一个input | filter | output的数据流,而是一个input | decode | filter | encode | output的数据流!Codec就是用来decode、encode事件的。
Codec的引入,使得Logstash可以更好、更方便地与其他有自定义数据格式的运维产品共存,比如graphite、fluent、netflow、collectd,以及使用msgpack、json、edn等通用数据格式的其他产品等。
事实上,我们在第一个“Hello World”用例中就已经用过Codec了—rubydebug就是一种Codec!虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。
这个五段式的流程说明源自Perl版的Logstash(后来改名叫Message::Passing模块)的设计。本书稍后5.8节会对该模块稍作介绍。
2.2.1 JSON编解码
在早期的版本中,有一种降低Logstash过滤器的CPU负载消耗的做法盛行于社区(在当时的Cookbook上有专门的一节介绍):直接输入预定义好的JSON数据,这样就可以省略掉filter/grok配置!
这个建议依然有效,不过在当前版本中需要稍微做一点配置变动,因为现在有专门的Codec设置。
1.配置示例
社区常见的示例都是用的Apache的customlog,不过我觉得Nginx是一个比Apache更常用的新型Web服务器,所以我这里会用nginx.conf做示例:
logformat json '{"@timestamp":"$time_iso8601",'
'"@version":"1",'
'"host":"$server_addr",'
'"client":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"domain":"$host",'
'"url":"$uri",'
'"status":"$status"}';
access_log /var/log/nginx/access.log_json json;
注意,在$request_time和$body_bytes_sent变量两头没有双引号",这两个数据在JSON里应该是数值类型!
重启Nginx应用,然后修改你的input/file区段配置成下面这样:
input {
file {
path =>"/var/log/nginx/access.log_json""
codec =>"json"
}
}
2.运行结果
下面访问一下用Nginx发布的Web页面,然后你会看到Logstash进程输出类似下面这样的内容:
{
"@timestamp" =>"2014-03-21T18:52:25.000+08:00",
"@version" =>"1",
"host" =>"raochenlindeMacBook-Air.local",
"client" =>"123.125.74.53",
"size" =>8096,
"responsetime" =>0.04,
"domain" =>"www.domain.com",
"url" =>"/path/to/file.suffix",
"status" =>"200"
}
3. Nginx代理服务的日志格式问题
对于一个Web服务器的访问日志,看起来已经可以很好的工作了。不过如果Nginx是作为一个代理服务器运行的话,访问日志里有些变量,比如说$upstream_response_time,可能不会一直是数字,它也可能是一个“-”字符串!这会直接导致Logstash对输入数据验证报异常。
有两个办法解决这个问题:
1)用sed在输入之前先替换-成0。运行Logstash进程时不再读取文件而是标准输入,这样命令就成了下面这个样子:
tail -F /var/log/nginx/proxy_access.log_json \
| sed 's/upstreamtime":-/upstreamtime":0/' \
| /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/proxylog.conf
2)日志格式中统一记录为字符串格式(即都带上双引号),然后再在Logstash中用filter/mutate插件来变更应该是数值类型的字符字段的值类型。
有关LogStash::Filters::Mutate的内容,本书稍后会有介绍。
2.2.2 多行事件编码
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。
而Logstash正为此准备好了codec/multiline插件!当然,multiline插件也可以用于其他类似的堆栈式信息,比如Linux的内核日志。
配置示例如下:
input {
stdin {
codec => multiline {
pattern =>"^\["
negate => true
what =>"previous"
}
}
}
运行Logstash进程,然后在等待输入的终端中输入如下几行数据:
[Aug/08/08 14:54:03] hello world
[Aug/08/09 14:54:04] hello logstash
hello best practice
hello raochenlin
[Aug/08/10 14:54:05] the end
你会发现Logstash输出下面这样的返回:
{
"@timestamp" =>"2014-08-09T13:32:03.368Z",
"message" =>"[Aug/08/08 14:54:03] hello world\n",
"@version" =>"1",
"host" =>"raochenlindeMacBook-Air.local"
}
{
"@timestamp" =>"2014-08-09T13:32:24.359Z",
"message" =>"[Aug/08/09 14:54:04] hello logstash\n\n hello best practice\n\n
hello raochenlin\n",
"@version" =>"1",
"tags" => [
[0] "multiline"
],
"host" =>"raochenlindeMacBook-Air.local"
}
你看,后面这个事件,在“message”字段里存储了三行数据!
输出的事件中没有最后一行的"the end"字符串,这是因为你最后输入的回车符\n并不匹配设定的^\[正则表达式,Logstash还得等下一行数据直到匹配成功后才会输出这个事件。
其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,直到新进的当前行匹配^\[正则为止。这个正则还可以用grok表达式,稍后你就会学习这方面的内容。具体的Java日志正则见:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/java
说到应用程序日志,Log4j肯定是第一个被大家想到的,使用codec/multiline也确实是一个办法。
不过,如果你本身就是开发人员,或者可以推动程序修改变更的话,Logstash还提供了另一种处理Log4j的方式:input/log4j。与codec/multiline不同,这个插件是直接调用了org.apache.log4j.spi.LoggingEvent处理TCP端口接收的数据。后面3.6节会详细讲述Log4j的用法。
2.2.3 网络流编码
NetFlow是Cisco发明的一种数据交换方式。NetFlow提供网络流量的会话级视图,记录下每个TCP/IP事务的信息。它的目的不是像tcpdump那样提供网络流量的完整记录,而是汇集起来形成更易于管理和易读的流向和容量的分析监控。
Cisco上配置NetFlow的方法,请参照具体的设备说明,主要是设定采集服务器的地址和端口,为运行Logstash服务的主机地址和端口(示例中为9995)。
采集NetFlow数据的Logstash服务配置示例如下:
input {
udp {
port => 9995
codec => netflow {
definitions =>"/opt/logstash-1.4.2/lib/logstash/codecs/netflow/netflow.yaml"
versions => [5]
}
}
}
output {
elasticsearch {
index =>"logstash_netflow5-%{+YYYY.MM.dd}"
host =>"localhost"
}
}
由于该插件生成的字段较多,所以建议对应的Elasticsesarch索引模板也需要单独提交:
# curl -XPUT localhost:9200/_template/logstash_netflow5 -d '{
"template" : "logstash_netflow5-*",
"settings": {
"index.refresh_interval": "5s"
},
"mappings" : {
"_default_" : {
"_all" : {"enabled" : false},
"properties" : {
"@version": { "index": "analyzed", "type": "integer" },
"@timestamp": { "index": "analyzed", "type": "date" },
"netflow": {
"dynamic": true,
"type": "object",
"properties": {
"version": { "index": "analyzed", "type": "integer" },
"flow_seq_num": { "index": "not_analyzed", "type": "long" },
"engine_type": { "index": "not_analyzed", "type": "integer" },
"engine_id": { "index": "not_analyzed", "type": "integer" },
"sampling_algorithm": { "index": "not_analyzed", "type": "integer" },
"sampling_interval": { "index": "not_analyzed", "type": "integer" },
"flow_records": { "index": "not_analyzed", "type": "integer" },
"ipv4_src_addr": { "index": "analyzed", "type": "ip" },
"ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
"ipv4_next_hop": { "index": "analyzed", "type": "ip" },
"input_snmp": { "index": "not_analyzed", "type": "long" },
"output_snmp": { "index": "not_analyzed", "type": "long" },
"in_pkts": { "index": "analyzed", "type": "long" },
"in_bytes": { "index": "analyzed", "type": "long" },
"first_switched": { "index": "not_analyzed", "type": "date" },
"last_switched": { "index": "not_analyzed", "type": "date" },
"l4_src_port": { "index": "analyzed", "type": "long" },
"l4_dst_port": { "index": "analyzed", "type": "long" },
"tcp_flags": { "index": "analyzed", "type": "integer" },
"protocol": { "index": "analyzed", "type": "integer" },
"src_tos": { "index": "analyzed", "type": "integer" },
"src_as": { "index": "analyzed", "type": "integer" },
"dst_as": { "index": "analyzed", "type": "integer" },
"src_mask": { "index": "analyzed", "type": "integer" },
"dst_mask": { "index": "analyzed", "type": "integer" }
}
}
}
}
}
}'
Elasticsearch索引模板的功能,本书稍后12.6节会有详细介绍。
2.2.4 collectd输入
collectd是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析performance analysis)和预测系统未来的load(如能力部署capacity planning)等
下面简单介绍一下:collectd的部署以及与Logstash对接的相关配置实例。
1. collectd的安装
collectd的安装同样有两种方式,使用官方软件仓库安装或源代码安装。
使用官方软件仓库安装(推荐)
collectd官方有一个隐藏的软件仓库:https://pkg.ci.collectd.org,构建有RHEL/CentOS (rpm),Debian/Ubuntu(deb)的软件包,如果你使用的操作系统属于上述的,那么推荐使用软件仓库安装。
目前collectd官方维护3个版本:5.4、5.5、5.6。根据需要选择合适的版本仓库。下面示例安装5.5版本的方法。
Debian/Ubuntu仓库安装:
echo "deb https://pkg.ci.collectd.org/deb $(lsb_release -sc) collectd-5.5" | sudo tee /etc/apt/sources.list.d/collectd.list
curl -s https://pkg.ci.collectd.org/pubkey.asc | sudo apt-key add -
sudo apt-get update && sudo apt-get install -y collectd
注意,Debian/Ubuntu软件仓库自带有collectd软件包,如果软件仓库自带的版本足够你使用,那么可以不用添加仓库,直接通过apt-get install collectd即可。
RHEL/CentOS仓库安装:
cat > /etc/yum.repos.d/collectd.repo <<EOF
[collectd-5.5]
name=collectd-5.5
baseurl=https://pkg.ci.collectd.org/rpm/collectd-5.5/epel-\$releasever-\$basearch/
gpgcheck=1
gpgkey=https://pkg.ci.collectd.org/pubkey.asc
EOF
yum install -y collectd
你如果需要使用其他collectd插件,此时也可一并安装对应的collectd-xxxx软件包。
源码安装collectd
collectd目前维护3个版本:5.4、5.5、5.6。源代码编译安装时同样可以根据自己需要选择对应版本的源码下载:
wget https://collectd.org/files/collectd-5.4.1.tar.gz
tar zxvf collectd-5.4.1.tar.gz
cd collectd-5.4.1
./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib --mandir=/usr/share/man --enable-all-plugins
make && make install
源代码编译安装需要预先解决好各种环境依赖,在RedHat平台上要提前安装如下软件包:
rpm -ivh "https://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm"
yum -y install libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool rrdtool-prel libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl-CPAN net-snmp net-snmp-devel
安装启动脚本如下:
cp contrib/redhat/init.d-collectd /etc/init.d/collectd
chmod +x /etc/init.d/collectd
启动collectd如下:
service collectd start
2. collectd的配置
以下配置可以实现对服务器基本的CPU、内存、网卡流量、磁盘I/O以及磁盘空间占用情况的监控:
Hostname "host.example.com"
LoadPlugin interface
LoadPlugin cpu
LoadPlugin memory
LoadPlugin network
LoadPlugin df
LoadPlugin disk
<Plugin interface>
Interface "eth0"
IgnoreSelected false
</Plugin>
<Plugin network>
<Server "10.0.0.1""25826"> ## Logstash 的 IP 地址和 collectd 的数据接收端口号
</Server>
</Plugin>
3. Logstash的配置
以下配置实现通过Logstash监听25826端口,接收从collectd发送过来的各项检测数据。
注意,logstash-filter-collectd插件本身需要单独安装,logstash插件安装说明之前已经讲过:
bin/logstash-plugin install logstash-filter-collectd
Logstash默认自带有collectd的codec编码插件。
推荐配置示例如下:
udp {
port => 25826
buffer_size => 1452
workers => 3 # Default is 2
queue_size => 30000 # Default is 2000
codec => collectd { }
type =>"collectd"
}
下面是简单的一个输出结果:
{
"_index": "logstash-2014.12.11",
"_type": "collectd",
"_id": "dS6vVz4aRtK5xS86kwjZnw",
"_score": null,
"_source": {
"host": "host.example.com",
"@timestamp": "2014-12-11T06:28:52.118Z",
"plugin": "interface",
"plugin_instance": "eth0",
"collectd_type": "if_packets",
"rx": 19147144,
"tx": 3608629,
"@version": "1",
"type": "collectd",
"tags": [
"_grokparsefailure"
]
},
"sort": [
1418279332118
]
}
参考资料
collectd支持收集的数据类型:https://git.verplant.org/?p=collectd.git;a=blob;hb=master; f=README
collectd收集各数据类型的配置参考资料:https://collectd.org/documentation/manpages/collectd.conf.5.shtml
collectd简单配置文件示例:https://gist.github.com/untergeek/ab85cb86a9bf39f1fc6d
2.3 过滤器配置
有丰富的过滤器插件,是Logstash威力如此强大的重要因素。名为过滤器,其实提供的不单单是过滤的功能。下面我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有地添加新的Logstash事件到后续的流程中去!
2.3.1 date时间处理
之前章节已经提过,logstash-filter-date插件可以用来转换你的日志记录中的时间字符串,变成LogStash::Timestamp对象,然后转存到@timestamp字段里。
因为在稍后的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用logstash-filter-date转换后删除自己的字段!
这在导入旧数据的时候固然非常有用,而在实时数据处理的时候同样有效,因为一般情况下数据流程中我们都会有缓冲区,导致最终的实际处理时间跟事件产生时间略有偏差。
强烈建议打开Nginx的access_log配置项的buffer参数,对极限响应性能有极大提升!
1.配置示例
logstash-filter-date插件支持五种时间格式:
ISO8601:类似“2011-04-19T03:44:01.103Z”这样的格式。具体Z后面可以有“08:00”也可以没有,“.103”这个也可以没有。常用场景里来说,Nginx的log_format配置里就可以使用$time_iso8601变量来记录请求时间成这种格式。
UNIX:UNIX时间戳格式,记录的是从1970年起始至今的总秒数。Squid默认日志格式中就使用了这种格式。
UNIX_MS:这个时间戳则是从1970年起始至今的总毫秒数。据我所知,JavaScript里经常使用这个时间格式。
TAI64N:TAI64N格式比较少见,是这个样子的:@4000000052f88ea32489532c。我目前只知道常见应用中,qmail会用这个格式。
Joda-Time库:Logstash内部使用了Java的Joda时间库来作时间处理。所以我们可以使用Joda库所支持的时间格式来作具体定义。Joda时间格式定义见表2-1。
表2-1 Joda时间库格式
格式符 含 义 描 述 示 例
G era text AD
C century of era (>=0) number 20
Y year of era (>=0) year 1996
x weekyear year 1996
w week of weekyear number 27
e day of week number 2
E day of week text Tuesday; Tue
y year year 1996
D day of year number 189
M month of year month July; Jul; 07
d day of month number 10
a halfday of day text PM
K hour of halfday (0~11) number 0
h clockhour of halfday (1~12) number 12
H hour of day (0~23) number 0
k clockhour of day (1~24) number 24
m minute of hour number 30
s second of minute number 55
S fraction of second number 978
z time zone text Pacific Standard Time; PST
Z time zone offset/id zone -0800; -08:00; America/Los_Angeles
' escape for text delimiter
'' single quote literal '
下面我们写一个Joda时间格式的配置作为示例:
filter {
grok {
match => ["message", "%{HTTPDATE:logdate}"]
}
date {
match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
注意,时区偏移量只需要用一个字母Z即可。
2.时区问题的解释
很多中国用户经常提一个问题:为什么@timestamp比我们晚了8个小时?怎么修改成北京时间?
其实,Elasticsearch内部,对时间类型字段,是统一采用UTC时间,存成long长整形数据的!对日志统一采用UTC时间存储,是国际安全/运维界的一个通识—欧美公司的服务器普遍广泛分布在多个时区里—不像中国,地域横跨五个时区却只用北京时间。
对于页面查看,ELK的解决方案是在Kibana上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。
所以,建议大家接受这种设定。否则,即便你用.getLocalTime修改,也还要面临在Kibana过去修改,以及Elasticsearch原有的["now-1h" TO "now"]这种方便的搜索语句无法正常使用的尴尬。
以上,请读者自行斟酌。
2.3.2 grok正则捕获
grok是Logstash最重要的插件。你可以在grok里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。
1.正则表达式语法
运维工程师多多少少都会一点正则。你可以在grok里写标准的正则,像下面这样:
\s+(?<request_time>\d+(?:\.\d+)?)\s+
这个正则表达式写法对于Perl或者Ruby程序员应该很熟悉了,Python程序员可能更习惯写(?P<name>pattern),没办法,适应一下吧。
现在给我们的配置文件添加第一个过滤器区段配置。配置要添加在输入和输出区段之间(Logstash执行区段的时候并不依赖于次序,不过为了自己看得方便,还是按次序书写吧):
input {stdin{}}
filter {
grok {
match => {
"message" =>"\s+(?<request_time>\d+(?:\.\d+)?)\s+"
}
}
}
output {stdout{codec=>rubydebug}}
运行Logstash进程然后输入“begin 123.456 end”,你会看到类似下面这样的输出:
{
"message" =>"begin 123.456 end",
"@version" =>"1",
"@timestamp" =>"2014-08-09T11:55:38.186Z",
"host" =>"raochenlindeMacBook-Air.local",
"request_time" =>"123.456"
}
漂亮!不过数据类型好像不太满意……request_time应该是数值而不是字符串。
我们已经提过稍后会学习用LogStash::Filters::Mutate来转换字段值类型,不过在grok 里,其实有自己的魔法来实现这个功能!
2. grok表达式语法
grok支持把预定义的grok表达式写入到文件中,官方提供的预定义grok表达式见:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。
下面是从官方文件中摘抄的最简单但是足够说明用法的示例:
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
第一行,用普通的正则表达式来定义一个grok表达式;第二行,通过打印赋值格式(sprintf format),用前面定义好的grok表达式来定义另一个grok表达式。
grok表达式的打印复制格式的完整语法见下行示例。其中data_type目前只支持两个值:int和float。
%{PATTERN_NAME:capture_name:data_type}
所以我们可以改进我们的配置成下面这样:
filter {
grok {
match => {
"message" =>"%{WORD} %{NUMBER:request_time:float} %{WORD}"
}
}
}
重新运行进程然后可以得到如下结果:
{
"message" =>"begin 123.456 end",
"@version" =>"1",
"@timestamp" =>"2014-08-09T12:23:36.634Z",
"host" =>"raochenlindeMacBook-Air.local",
"request_time" => 123.456
}
这次request_time变成数值类型了。
3.最佳实践
实际运用中,我们需要处理各种各样的日志文件,如果你都是在配置文件里各自写一行自己的表达式,就完全不可管理了。所以,我们建议是把所有的grok表达式统一写入到一个地方。然后用filter/grok的patterns_dir选项来指明。
如果你把“message”里所有的信息都grok到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用remove_field参数来删除掉message字段,或者用overwrite参数来重写默认的message字段,只保留最重要的部分。
重写参数的示例如下:
filter {
grok {
patterns_dir =>["/path/to/your/own/patterns"]
match => {
"message" =>"%{SYSLOGBASE} %{DATA:message}"
}
overwrite => ["message"]
}
}
更多有关grok正则性能的最佳实践(比如timeout_millis等配置参数),请参考:https://www.elastic.co/blog/do-you-grok-grok。
4.高级用法
多行匹配 在和codec/multiline搭配使用的时候,需要注意一个问题,grok正则和普通正则一样,默认是不支持匹配回车换行的。就像你需要=~ // m一样也需要单独指定,具体写法是在表达式开始位置加(?m)标记。如下所示:
match => {
"message" =>"(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+"
}
多项选择 有时候我们会碰上一个日志有多种可能格式的情况。这时候要写成单一正则就比较困难,或者全用|隔开又比较丑陋。这时候,Logstash的语法提供给我们一个有趣的解决方式。
文档中,都说明logstash-filters-grok插件的match参数应该接受的是一个Hash值。但是因为早期的Logstash语法中Hash值也是用[]这种方式书写的,所以其实现在传递Array值给match参数也完全没问题。所以,我们这里其实可以传递多个正则来匹配同一个字段:
match => [
"message", "(?<request_time>\d+(?:\.\d+)?)",
"message", "%{SYSLOGBASE} %{DATA:message}",
"message", "(?m)%{WORD}"
]
Logstash会按照这个定义次序依次尝试匹配,到匹配成功为止。虽说效果跟用|分割写个大大的正则是一样的,但是可阅读性好了很多。
我强烈建议每个人都要使用Grok Debugger (https://grokdebug.herokuapp.com/)来调试自己的grok表达式。
2.3.3 dissect解析
grok作为Logstash最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数情况下日志格式并没有那么复杂,Logstash开发团队在5.0版新添加了另一个解析字段的插件:dissect。当日志格式有比较简明的分隔标志位且重复性较大的时候,可以使用dissect插件更快地完成解析工作。下面是解析syslog的示例。
filter {
dissect {
mapping => {
"message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
}
convert_datatype => {
pid => "int"
}
}
}
我们看到上面使用了和Grok很类似的%{}语法来表示字段,这显然是基于习惯延续的考虑。不过示例中%{+ts}的加号就不一般了。dissect除了字段外面的字符串定位功能以外,还通过几个特殊符号来处理字段提取的规则:
%{+key} 这个+表示前面已经捕获到一个key字段了,而这次捕获的内容自动添补到之前key字段内容的后面。
%{+key/2} 这个/2表示在有多次捕获内容都填到key字段里的时候,拼接字符串的顺序谁前谁后。/2表示排第2位。
%{?string} 这个?表示这块只是一个占位,并不会实际生成捕获字段存到Event里面。
%{?string} %{&string} 当同样捕获名称都是string,但是一个?和一个&在一起的时候,表示这是一个键值对。
比如对https://rizhiyi.com/index.do?id=123写这么一段配置:
https://%{domain}/%{?url}?%{?arg1}=%{&arg1}
则最终生成的 Event 内容是这样的:
{
domain => "rizhiyi.com",
id => "123"
}
2.3.4 GeoIP地址查询
GeoIP是最常见的免费IP地址归类查询库,同时也有收费版可以采购。GeoIP库可以根据IP地址提供对应的地域信息,包括国别、省市、经纬度等,对于可视化地图和区域统计非常有用。
配置示例如下:
filter {
geoip {
source =>"message"
}
}
运行结果如下:
{
"message" =>"183.60.92.253",
"@version" =>"1",
"@timestamp" =>"2014-08-07T10:32:55.610Z",
"host" =>"raochenlindeMacBook-Air.local",
"geoip" => {
"ip" =>"183.60.92.253",
"country_code2" =>"CN",
"country_code3" =>"CHN",
"country_name" =>"China",
"continent_code" =>"AS",
"region_name" =>"30",
"city_name" =>"Guangzhou",
"latitude" =>23.11670000000001,
"longitude" =>113.25,
"timezone" =>"Asia/Chongqing",
"real_region_name" =>"Guangdong",
"location" => [
[0] 113.25,
[1] 23.11670000000001
]
}
}
GeoIP库数据较多,如果你不需要这么多内容,可以通过fields选项指定自己所需要的。下例为全部可选内容:
filter {
geoip {
fields => ["city_name", "continent_code", "country_code2",
"country_code3", "country_name", "dma_code", "ip", "latitude",
"longitude", "postal_code", "region_name", "timezone"]
}
}
需要注意的是:geoip.location是Logstash通过latitude和longitude额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:
filter {
geoip {
fields => ["city_name", "country_code2", "country_name", "latitude",
"longitude", "region_name"]
remove_field => ["[geoip][latitude]", "[geoip][longitude]"]
}
}
还要注意:geoip插件的“source”字段可以是任一处理后的字段,比如“client_ip”,但是字段内容却需要小心!GeoIp库内只存有公共网络上的IP信息,查询不到结果的,会直接返回null,而Logstash的GeoIp插件对null结果的处理是:“不生成对应的geoip.字段”。所以读者在测试时,如果使用了诸如127.0.0.1、172.16.0.1、182.168.0.1、10.0.0.1等内网地址,会发现没有对应输出!
2.3.5 JSON编解码
在上一章,已经讲过在Codec中使用JSON编码。但是,有些日志可能是一种复合的数据结构,其中只有一部分记录是JSON格式的。这时候,我们依然需要在filter阶段,单独启用JSON解码插件。
配置示例如下:
filter {
json {
source =>"message"
target =>"jsoncontent"
}
}
运行结果如下:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "{\"uid\":3081609001,\"type\":\"signal\"}",
"jsoncontent": {
"uid": 3081609001,
"type": "signal"
}
}
如果不打算使用多层结构的话,删掉target配置即可。单层结构新的结果如下:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "{\"uid\":3081609001,\"type\":\"signal\"}",
"uid": 3081609001,
"type": "signal"
}
2.3.6 key-value切分
在很多情况下,日志内容本身都是一个类似于key-value的格式,但是格式具体的样式却是多种多样的。Logstash提供logstash-filter-kv插件,帮助处理不同样式的key-value日志,变成实际的LogStash::Event数据。
配置示例如下:
filter {
ruby {
init =>"@kname = ['method','uri','verb']"
ruby {
init => "@kname = ['method','uri','verb']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('|'))])
new_event.remove('@timestamp')
event.append(new_event)""
"
}
if [uri] {
ruby {
init => "@kname = ['url_path','url_args']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])
new_event.remove('@timestamp')
event.append(new_event)""
"
}
kv {
prefix =>"url_"
source =>"url_args"
field_split =>"&"
remove_field => [ "url_args", "uri", "request" ]
}
}
}
Nginx访问日志中的$request,通过这段配置,可以详细切分成method、url_path、verb、url_a、url_b...
进一步,如果url_args中有过多字段,可能导致Elasticsearch集群因为频繁update mapping或者消耗太多内存在cluster state上而宕机。所以,更优的选择是只保留明确有用的url_args内容,其他部分舍去,如下所示:
kv {
prefix =>"url_"
source =>"url_args"
field_split =>"&"
include_keys => [ "uid", "cip" ]
remove_field => [ "url_args", "uri", "request" ]
}
上例即表示,除了url_uid和url_cip两个字段以外,其他的url_*都不保留。
2.3.7 metrics数值统计
logstash-filter-metrics插件是使用Ruby的Metriks模块来实现在内存里实时地计数和采样分析。该模块支持两个类型的数值分析:meter和timer。下面分别举例说明。
1. Meter示例(速率阈值检测)
Web访问日志的异常状态码频率是运维人员会非常关心的一个数据。通常我们的做法是通过Logstash或者其他日志分析脚本,把计数发送到rrdtool或者graphite里面,然后再通过check_graphite脚本之类的东西来检查异常并报警。
事实上这个事情可以直接在Logstash内部就完成。比如如果最近一分钟504请求的个数超过100个就报警,如下所示:
filter {
metrics {
meter => "error_%{status}"
add_tag => "metric"
ignore_older_than => 10
}
if "metric" in [tags] {
ruby {
code => "event.cancel if event.get('[error_504][rate_1m]') * 60 > 100"
}
}
}
output {
if "metric" in [tags] {
exec {
command => "echo \"Out of threshold: %{[error_504][rate_1m]}\""
}
}
这里需要注意*60的含义。metrics模块生成的rate_1m/5m/15m意思是:最近1、5、15分钟的每秒速率!
2. Timer示例(box and whisker异常检测)
官版的logstash-filter-metrics插件只适用于metric事件的检查。由插件生成的新事件内部不存有来自input区段的实际数据信息。所以,要完成我们的百分比分布箱体检测,需要首先对代码稍微做几行变动,即在metric的timer事件里加一个属性,存储最近一个实际事件的数值:
def register
…
@last = {}
end
def filter(event)
…
@last[event.sprintf(name)] = event.sprintf(value).to_f
end
def flush(options = {})
…
@metric_timer.each_pair do |name, metric|
…
event.set(“[#{name}][last]”, @last[name])
metric.clear if should_clear?
end
…
end
有了这个Last值,然后我们就可以用如下配置来探测异常数据了:
filter {
metrics {
timer => {"rt" =>"%{request_time}"}
percentiles => [25, 75]
add_tag =>"percentile"
}
ruby {
code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)"
}
}
}
output {
if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) {
exec {
command => "echo \"Anomaly: %{[rt][last]}\""
}
}
有关box and shisker plot内容和重要性,参见《数据之魅》一书。
2.3.8 mutate数据修改
logstash-filter-mutate插件是Logstash另一个重要插件,它提供了丰富的基础类型数据处理能力,包括类型转换、字符串处理和字段处理等。
1.类型转换
类型转换是logstash-filter-mutate插件最初诞生时的唯一功能,其应用场景在之前的2.3.5节“JSON编解码”中已经提到。
可以设置的转换类型包括:“integer”、“float”和“string”。示例如下:
filter {
mutate {
convert => ["request_time", "float"]
}
}
mutate除了转换简单的字符值,还支持对数组类型的字段进行转换,即将[“1”,“2”]转换成[1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的logstash-filter-ruby插件完成。
2. 字符串处理
有如下字符串处理的插件:
gsub:仅对字符串类型字段有效。
gsub => ["urlparams", "[\\?#]", "_"]
split:分割字符串。
filter {
mutate {
split => ["message", "|"]
}
}
随意输入一串以|分割的字符,比如“123|321|adfd|dfjld*=123”,可以看到如下输出:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123"
],
"@version" =>"1",
"@timestamp" =>"2014-08-20T15:58:23.120Z",
"host" =>"raochenlindeMacBook-Air.local"
}
join:仅对数组类型字段有效。
我们在之前已经用split割切的基础上再join回去。配置改成:
filter {
mutate {
split => ["message", "|"]
}
mutate {
join => ["message", ","]
}
}
filter区段之内,是顺序执行的。所以我们最后看到的输出结果是:
{
"message" =>"123,321,adfd,dfjld*=123",
"@version" =>"1",
"@timestamp" =>"2014-08-20T16:01:33.972Z",
"host" =>"raochenlindeMacBook-Air.local"
}
merge:合并两个数组或者哈希字段。依然在之前split的基础上继续:
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "message"]
}
}
我们会看到输出:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123",
[4] "123",
[5] "321",
[6] "adfd",
[7] "dfjld*=123"
],
"@version" =>"1",
"@timestamp" =>"2014-08-20T16:05:53.711Z",
"host" =>"raochenlindeMacBook-Air.local"
}
如果src字段是字符串,会自动先转换成一个单元素的数组再合并。把上一示例中的来源字段改成“host”:
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "host"]
}
}
结果变成:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123",
[4] "raochenlindeMacBook-Air.local"
],
"@version" =>"1",
"@timestamp" =>"2014-08-20T16:07:53.533Z",
"host" => [
[0] "raochenlindeMacBook-Air.local"
]
}
看,目的字段“message”确实多了一个元素,但是来源字段“host”本身也由字符串类型变成数组类型了!
同样,如果目的字段不是数组,也会被强制转换。即使来源字段并不存在:
filter {
mutate {
merge => ["message", "not_exist_field"]
}
}
结果会变成:
{
"message" => [
[0] "123|321|adfd|dfjld*=123"
],
"@version" =>"1",
"@timestamp" =>"2014-08-20T15:58:23.120Z",
"host" =>"raochenlindeMacBook-Air.local"
}
strip:去除字段内容前后的空格。可以接受数组参数:
filter {
mutate {
strip => ["syslog_message", "syslog_datetime"]
}
}
lowercase:将字段内容全部转换成小写字母。同样可以接受数组。在ELK stack场景中,将内容转换成小写会是一个比较常见的需求。因为Elasticsearch默认是统一按照小写字母来搜索的。为了确保检索准确率,在不影响使用的情况下,建议对常用检索字段启用lowercase配置。
uppercase:将字段内容全部转换成大写字母。同样可以接受数组。
3.字段处理
字段处理的插件有:
rename:重命名某个字段,如果目的字段已经存在,会被覆盖掉,如下所示:
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
update:更新某个字段的内容。如果字段不存在,不会新建。
replace:作用和update类似,但是当字段不存在的时候,它会起到add_field参数一样的效果,自动添加新的字段。
4.执行次序
需要注意的是,filter/mutate内部是有执行次序的。其次序如下:
rename(event) if @rename
update(event) if @update
replace(event) if @replace
convert(event) if @convert
gsub(event) if @gsub
uppercase(event) if @uppercase
lowercase(event) if @lowercase
strip(event) if @strip
remove(event) if @remove
split(event) if @split
join(event) if @join
merge(event) if @merge
filter_matched(event)
而filter_matched这个filters/base.rb里继承的方法也是有次序的:
@add_field.each do |field, value|
end
@remove_field.each do |field|
end
@add_tag.each do |tag|
end
@remove_tag.each do |tag|
end
2.3.9 随心所欲的Ruby处理
如果你稍微懂那么一点点Ruby语法的话,logstash-filter-ruby插件将会是一个非常有用的工具。比如你需要稍微修改一下LogStash::Event对象,但是又不打算为此写一个完整的插件,用logstash-filter-ruby插件绝对感觉良好。
配置示例如下:
filter {
ruby {
init =>"@kname = ['client','servername','url','status','time','size','upstream',
'upstreamstatus','upstreamtime','referer','xff','useragent']"
code => "
ew_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])
new_event.remove('@timestamp')
event.append(new_event)"
}
}
官网示例是一个比较有趣但是没啥大用的做法—随机取消90%的事件。
所以上面我们给出了一个有用而且强大的实例。
通常我们都是用logstash-filter-grok插件来捕获字段的,但是正则耗费大量的CPU资源,很容易成为Logstash进程的瓶颈。
而实际上,很多流经Logstash的数据都是有自己预定义的特殊分隔符的,我们可以很简单的直接切割成多个字段。
从Logstash2.3开始,LogStash::event.append不再直接接受Hash对象,而必须是LogStash:: Event对象。所以示例变成要先初始化一个新的event,再把无用的@timestamp移除,再append进去。否则会把@timestamp变成有两个时间的数组了!
从Logstash 5.0开始,LogStash::Event改为Java实现,直接使用event["parent"]["child"]形式获取的,不是原事件的引用而是复制品,需要改用event.get('[parent][child]')和event.set('[parent][child]', 'value')的方法。
logstash-filter-mutate插件里的“split”选项只能切成数组,后续很不方便使用和识别。而在logstash-filter-ruby里,我们可以通过“init”参数预定义好由每个新字段的名字组成的数组,然后在“code”参数指定的Ruby语句里通过两个数组的zip操作生成一个哈希并添加进数组里。短短一行Ruby代码,可以减少50%以上的CPU使用率。
logstash-filter-ruby插件用途远不止这一点,下一节你还会继续见到它的身影。
更多实例如下:
filter{
date {
match => ["datetime" , "UNIX"]
}
ruby {
code =>"event.cancel if 5 * 24 * 3600 < (event.get( '@timestamp')-::Time.now).abs"
}
}
在实际运用中,我们几乎肯定会碰到出乎意料的输入数据。这都有可能导致Elasticsearch集群出现问题。
当数据格式发生变化,比如UNIX时间格式变成UNIX_MS时间格式,会导致Logstash疯狂创建新索引,集群崩溃。
或者误输入过老的数据时,因为一般我们会close几天之前的索引以节省内存,必要时再打开。而直接尝试把数据写入被关闭的索引会导致内存问题。
这时候我们就需要提前校验数据的合法性。上面配置,就是用于过滤掉时间范围与当前时间差距太大的非法数据的。
2.3.10 split拆分事件
上一章我们通过multiline插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是split插件。
配置示例如下:
filter {
split {
field =>"message"
terminator =>"#"
}
}
这个测试中,我们在intputs/stdin的终端中输入一行数据:“test1#test2”,结果看到输出两个事件:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "test1"
}
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "test2"
}
最后更新:2017-05-19 15:02:37