阅读1012 返回首页    go 阿里云 go 技术社区[云栖]


《ELK Stack权威指南 》第2章 插件配置

本节书摘来自华章出版社《ELK Stack权威指南 》一书中的第1章,第2节,作者饶琛琳,更多章节内容可以访问云栖社区“华章计算机”公众号查看。


插 件 配 置

插件是Logstash最大的特色。各种不同的插件源源不断地被创造出来,发布到社区中供大家使用。本章会按照插件的类别,对一般场景下的一些常用插件做详细的配置和用例介绍。本章介绍的插件包括:1)输入插件。基于shipper端场景,主要介绍STDIN、TCP、File等插件。2)编解码插件。编解码通常是会被遗忘的环节,但是运用好了,会大大提高工作效率,本节介绍最常用的JSON和multiline插件。3)过滤器插件。名为过滤器,其实各种数据裁剪和计算都可以在这类插件里完成,是Logstash最强大的一环。本节会详细介绍grok、date、mutate、ruby、metrics等插件的妙用。4)输出插件。Logstash虽然经常跟Elasticsearch并称,但是作为一个日志传输框架,它其实可以输出数据到各种不同的地方。比如Graphite、HDFS、Nagios等等。本章会介绍这些常用的输出插件用法。

2.1 输入插件

在“Hello World”示例中,我们已经见到并介绍了Logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍Logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。

限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“Hello World”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。

以上请读者自明。

2.1.1 标准输入

我们已经见过好几个示例使用stdin了。这也应该是Logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。

配置示例如下:

input {

    stdin {

        add_field => {"key" =>"value"}

        codec =>"plain"

        tags => ["add"]

        type =>"std"

    }

}

用上面的新stdin设置重新运行一次最开始的Hello World示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“Hello World”并回车后,你会在终端看到如下输出:

{

"message" =>"hello world",

"@version" =>"1",

"@timestamp" =>"2014-08-08T06:48:47.789Z",

"type" =>"std",

"tags" => [

        [0] "add"

    ],

"key" =>"value",

"host" =>"raochenlindeMacBook-Air.local"

}

type和tags是Logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型—我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。

最常见的用法是像下面这样:

input {

    stdin {

        type =>"web"

    }

}

filter {

    if [type] == "web" {

        grok {

            match => ["message", %{COMBINEDAPACHELOG}]

        }

    }

}

output {

    if "_grokparsefailure" in [tags] {

        nagios_nsca {

            nagios_status =>"1"

        }

    } else {

        elasticsearch {

        }

    }

}

看起来蛮复杂的,对吧?

继续学习,你也可以写出来的。

2.1.2 文件输入

分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用Logstash来处理日志文件。

Logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心Logstash会漏过你的数据。

sincedb文件中记录了每个被监听的文件的inode, major number, minor number和pos。

配置示例如下:

input {

    file {

        path => ["/var/log/*.log", "/var/log/message"]

        type =>"system"

        start_position =>"beginning"

    }

}

有一些比较有用的配置项,可以用来指定FileWatch库的行为:

discover_interval:Logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。

exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。

sincedb_path:如果你不想用默认的$HOME/.sincedb(Windows平台上为%USERPROFILE%\.sincedb,该变量默认值是:C:\Windows\System32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。

sincedb_write_interval:Logstash每隔多久写一次sincedb文件,默认是15秒。

stat_interval:Logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。

start_position:Logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,Logstash进程会以类似tail-F的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",Logstash进程就从头开始读取,有点类似于less+F命令的形式运行。

close_older:一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3 600秒,即一小时。

ignore_older:在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86 400秒,即一天。

注意事项如下:

1)通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。

2)FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。

3)LogStash::Inputs::File只是在进程运行的注册阶段初始化一个FileWatch对象。所以它不能支持类似fluentd那样的path =>"/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path =>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。

4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么Logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。此外,官方博客上提供了另一个巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定义为/dev/null,则每次重启自动从头开始读。

5)因为Windows平台上没有inode的概念,Logstash某些版本在Windows平台上监听文件不是很靠谱。Windows平台上,推荐考虑使用nxlog作为收集端,参见后面5.5节。

2.1.3 TCP输入

未来你可能会用Redis服务器或者其他的消息队列系统来作为Logstash Broker的角色。不过Logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。

虽然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。

配置示例如下:

input {

    tcp {

        port => 8888

        mode =>"server"

        ssl_enable => false

    }

}

目前来看,LogStash::Inputs::TCP最常见的用法就是配合nc命令导入旧数据。在启动Logstash进程后,在另一个终端运行如下命令即可导入数据:

# nc 127.0.0.1 8888 < olddata

这种做法比用LogStash::Inputs::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,Logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。

2.1.4 syslog输入

syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。

我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把Logstash配置成一个syslog服务器来接收数据。

有关rsyslog的用法,后面的5.4节会有更详细的介绍。

配置示例如下:

input {

    syslog {

        port =>"514"

    }

}

作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动Logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到Logstash里了。我们可以用自带的logger命令行工具发送一条“Hello World”信息到syslog里(即Logstash里)。看到的Logstash输出像下面这样:

{

"message" =>"Hello World",

"@version" =>"1",

"@timestamp" =>"2014-08-08T09:01:15.911Z",

"host" =>"127.0.0.1",

"priority" =>31,

"timestamp" =>"Aug  8 17:01:15",

"logsource" =>"raochenlindeMacBook-Air.local",

"program" =>"com.apple.metadata.mdflagwriter",

"pid" =>"381",

"severity" =>7,

"facility" =>3,

"facility_label" =>"system",

"severity_label" =>"Debug"

}

Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok来实现LogStash::Inputs::Syslog的。所以你其实可以直接用Logstash配置实现一样的效果,如下所示:

input {

    tcp {

        port =>"8514"

    }

}

filter {

    grok {

        match => ["message", "%{SYSLOGLINE}" ]

    }

    syslog_pri { }

}

建议在使用LogStash::Inputs::Syslog的时候走TCP协议来传输数据。

因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

如果你已经在使用UDP监听器收集日志,用下行命令检查你的UDP接收队列大小:

# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'

Recv-Q

228096

228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!

强烈建议使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合实现同样的syslog功能!

虽然LogStash::Inputs::Syslog在使用TCPServer的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降—经过测试,TCPServer每秒可以接收50 000条数据,而在同一线程中启用grok后每秒只能处理5 000条,再加上date只能达到500条!

才将这两步拆分到filters阶段后,Logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到每秒30 000条数据!

测试采用Logstash作者提供的命令:

yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000

出处见:https:// github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile

如果你实在没法切换到TCP协议,可以自己写程序,或者使用其他基于异步I/O框架(比如libev)的项目。下面是一个简单的异步I/O实现UDP监听数据输入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。

2.1.5 http_poller抓取

Logstash作为数据采集系统,也支持自己作为一个HTTP客户端去抓取网页数据或者接口数据。这方面有一个很明显的IT运维应用场景:很多业务系统软件本身提供了RESTful的内部运行状态接口,可以直接通过接口采集这些监控信息。

更长期的方案应该是编写对应的metricbeat模块,但是直接采用logstash-input-http_poller显然更快捷。

比如Nginx的性能状态,社区有一个非常全面的性能状态监控模块:nginx-module-vts。在新浪微博,后端池分为核心接口、非核心接口两块,我们要分别监控的话,nginx-module-vts的配置如下:

http {

    vhost_traffic_status_zone;

    map $uri $filter_uri {

        default 'non-core';

        /2/api/timeline core;

        ~^/2/api/unread core;

    }

    server {

        vhost_traffic_status_filter_by_set_key $filter_uri;

        location /status {

            auth_basic "Restricted";

            auth_basic_user_file pass_file;

            vhost_traffic_status_display;

            vhost_traffic_status_display_format json;

        }

    }

}

则对应的logstash-input-http_poller配置如下:

input {

    http_poller {

        urls => {

            0 => {

                method => get

                url => "https://localhost:80/status/format/json"

                headers => {

                    Accept => "application/json"

                }

                auth => {

                    user => "YouKnowIKnow"

                    password => "IKnowYouDonotKnow"

                }

            }

            1 => {

                method => get

                url => "https://localhost:80/status/con ... up%3D*"

                headers => {

                    Accept => "application/json"

                }

                auth => {

                    user => "YouKnowIKnow"

                    password => "IKnowYouDonotKnow"

                }

            }

        }

        request_timeout => 60

        interval => 60

        codec => "json"

    }

}

这段配置就可以就可以每60秒获得一次 vts 数据,并重置计数了。

注意,url是一个Hash值,所以它的执行顺序是根据Hash.map来的,为了确保我们是先获取数据再重置,这里干脆用0, 1来作为Hash的key,这样顺序就没问题了。

2.2 编解码配置

Codec是Logstash从1.3.0版开始新引入的概念(Codec来自Coder/decoder两个单词的首字母缩写)。

在此之前,Logstash只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入期处理不同类型的数据,这全是因为有了Codec设置。

所以,这里需要纠正之前的一个概念。Logstash不只是一个input | filter | output的数据流,而是一个input | decode | filter | encode | output的数据流!Codec就是用来decode、encode事件的。

Codec的引入,使得Logstash可以更好、更方便地与其他有自定义数据格式的运维产品共存,比如graphite、fluent、netflow、collectd,以及使用msgpack、json、edn等通用数据格式的其他产品等。

事实上,我们在第一个“Hello World”用例中就已经用过Codec了—rubydebug就是一种Codec!虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。

这个五段式的流程说明源自Perl版的Logstash(后来改名叫Message::Passing模块)的设计。本书稍后5.8节会对该模块稍作介绍。

2.2.1 JSON编解码

在早期的版本中,有一种降低Logstash过滤器的CPU负载消耗的做法盛行于社区(在当时的Cookbook上有专门的一节介绍):直接输入预定义好的JSON数据,这样就可以省略掉filter/grok配置!

这个建议依然有效,不过在当前版本中需要稍微做一点配置变动,因为现在有专门的Codec设置。

1.配置示例

社区常见的示例都是用的Apache的customlog,不过我觉得Nginx是一个比Apache更常用的新型Web服务器,所以我这里会用nginx.conf做示例:

logformat json '{"@timestamp":"$time_iso8601",'

               '"@version":"1",'

               '"host":"$server_addr",'

               '"client":"$remote_addr",'

               '"size":$body_bytes_sent,'

               '"responsetime":$request_time,'

               '"domain":"$host",'

               '"url":"$uri",'

               '"status":"$status"}';

access_log /var/log/nginx/access.log_json json;

注意,在$request_time和$body_bytes_sent变量两头没有双引号",这两个数据在JSON里应该是数值类型!

重启Nginx应用,然后修改你的input/file区段配置成下面这样:

input {

    file {

        path =>"/var/log/nginx/access.log_json""

        codec =>"json"

    }

}

2.运行结果

下面访问一下用Nginx发布的Web页面,然后你会看到Logstash进程输出类似下面这样的内容:

{

"@timestamp" =>"2014-03-21T18:52:25.000+08:00",

"@version" =>"1",

"host" =>"raochenlindeMacBook-Air.local",

"client" =>"123.125.74.53",

"size" =>8096,

"responsetime" =>0.04,

"domain" =>"www.domain.com",

"url" =>"/path/to/file.suffix",

"status" =>"200"

}

3. Nginx代理服务的日志格式问题

对于一个Web服务器的访问日志,看起来已经可以很好的工作了。不过如果Nginx是作为一个代理服务器运行的话,访问日志里有些变量,比如说$upstream_response_time,可能不会一直是数字,它也可能是一个“-”字符串!这会直接导致Logstash对输入数据验证报异常。

有两个办法解决这个问题:

1)用sed在输入之前先替换-成0。运行Logstash进程时不再读取文件而是标准输入,这样命令就成了下面这个样子:

tail -F /var/log/nginx/proxy_access.log_json \

    | sed 's/upstreamtime":-/upstreamtime":0/' \

    | /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/proxylog.conf

2)日志格式中统一记录为字符串格式(即都带上双引号),然后再在Logstash中用filter/mutate插件来变更应该是数值类型的字符字段的值类型。

有关LogStash::Filters::Mutate的内容,本书稍后会有介绍。

2.2.2 多行事件编码

有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。

而Logstash正为此准备好了codec/multiline插件!当然,multiline插件也可以用于其他类似的堆栈式信息,比如Linux的内核日志。

配置示例如下:

input {

    stdin {

        codec => multiline {

            pattern =>"^\["

            negate => true

            what =>"previous"

        }

    }

}

运行Logstash进程,然后在等待输入的终端中输入如下几行数据:

[Aug/08/08 14:54:03] hello world

[Aug/08/09 14:54:04] hello logstash

    hello best practice

    hello raochenlin

[Aug/08/10 14:54:05] the end

你会发现Logstash输出下面这样的返回:

{

"@timestamp" =>"2014-08-09T13:32:03.368Z",

"message" =>"[Aug/08/08 14:54:03] hello world\n",

"@version" =>"1",

"host" =>"raochenlindeMacBook-Air.local"

}

{

"@timestamp" =>"2014-08-09T13:32:24.359Z",

"message" =>"[Aug/08/09 14:54:04] hello logstash\n\n    hello best practice\n\n

            hello raochenlin\n",

"@version" =>"1",

    "tags" => [

        [0] "multiline"

    ],

"host" =>"raochenlindeMacBook-Air.local"

}

你看,后面这个事件,在“message”字段里存储了三行数据!

输出的事件中没有最后一行的"the end"字符串,这是因为你最后输入的回车符\n并不匹配设定的^\[正则表达式,Logstash还得等下一行数据直到匹配成功后才会输出这个事件。

其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,直到新进的当前行匹配^\[正则为止。这个正则还可以用grok表达式,稍后你就会学习这方面的内容。具体的Java日志正则见:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/java

说到应用程序日志,Log4j肯定是第一个被大家想到的,使用codec/multiline也确实是一个办法。

不过,如果你本身就是开发人员,或者可以推动程序修改变更的话,Logstash还提供了另一种处理Log4j的方式:input/log4j。与codec/multiline不同,这个插件是直接调用了org.apache.log4j.spi.LoggingEvent处理TCP端口接收的数据。后面3.6节会详细讲述Log4j的用法。

2.2.3 网络流编码

NetFlow是Cisco发明的一种数据交换方式。NetFlow提供网络流量的会话级视图,记录下每个TCP/IP事务的信息。它的目的不是像tcpdump那样提供网络流量的完整记录,而是汇集起来形成更易于管理和易读的流向和容量的分析监控。

Cisco上配置NetFlow的方法,请参照具体的设备说明,主要是设定采集服务器的地址和端口,为运行Logstash服务的主机地址和端口(示例中为9995)。

采集NetFlow数据的Logstash服务配置示例如下:

input {

    udp {

        port => 9995

        codec => netflow {

            definitions =>"/opt/logstash-1.4.2/lib/logstash/codecs/netflow/netflow.yaml"

            versions => [5]

        }

    }

}

output {

    elasticsearch {

        index =>"logstash_netflow5-%{+YYYY.MM.dd}"

        host =>"localhost"

    }

}

由于该插件生成的字段较多,所以建议对应的Elasticsesarch索引模板也需要单独提交:

# curl -XPUT localhost:9200/_template/logstash_netflow5 -d '{

"template" : "logstash_netflow5-*",

"settings": {

        "index.refresh_interval": "5s"

},

"mappings" : {

        "_default_" : {

        "_all" : {"enabled" : false},

        "properties" : {

            "@version": { "index": "analyzed", "type": "integer" },

            "@timestamp": { "index": "analyzed", "type": "date" },

            "netflow": {

            "dynamic": true,

            "type": "object",

            "properties": {

                "version": { "index": "analyzed", "type": "integer" },

                "flow_seq_num": { "index": "not_analyzed", "type": "long" },

                "engine_type": { "index": "not_analyzed", "type": "integer" },

                "engine_id": { "index": "not_analyzed", "type": "integer" },

                "sampling_algorithm": { "index": "not_analyzed", "type": "integer" },

                "sampling_interval": { "index": "not_analyzed", "type": "integer" },

                "flow_records": { "index": "not_analyzed", "type": "integer" },

                "ipv4_src_addr": { "index": "analyzed", "type": "ip" },

                "ipv4_dst_addr": { "index": "analyzed", "type": "ip" },

                "ipv4_next_hop": { "index": "analyzed", "type": "ip" },

                "input_snmp": { "index": "not_analyzed", "type": "long" },

                "output_snmp": { "index": "not_analyzed", "type": "long" },

                "in_pkts": { "index": "analyzed", "type": "long" },

                "in_bytes": { "index": "analyzed", "type": "long" },

                "first_switched": { "index": "not_analyzed", "type": "date" },

                "last_switched": { "index": "not_analyzed", "type": "date" },

                "l4_src_port": { "index": "analyzed", "type": "long" },

                "l4_dst_port": { "index": "analyzed", "type": "long" },

                "tcp_flags": { "index": "analyzed", "type": "integer" },

                "protocol": { "index": "analyzed", "type": "integer" },

                "src_tos": { "index": "analyzed", "type": "integer" },

                "src_as": { "index": "analyzed", "type": "integer" },

                "dst_as": { "index": "analyzed", "type": "integer" },

                "src_mask": { "index": "analyzed", "type": "integer" },

                "dst_mask": { "index": "analyzed", "type": "integer" }

              }

          }

       }

    }

  }

}'

Elasticsearch索引模板的功能,本书稍后12.6节会有详细介绍。

2.2.4 collectd输入

collectd是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析performance analysis)和预测系统未来的load(如能力部署capacity planning)等

下面简单介绍一下:collectd的部署以及与Logstash对接的相关配置实例。

1. collectd的安装

collectd的安装同样有两种方式,使用官方软件仓库安装或源代码安装。

使用官方软件仓库安装(推荐)

collectd官方有一个隐藏的软件仓库:https://pkg.ci.collectd.org,构建有RHEL/CentOS (rpm),Debian/Ubuntu(deb)的软件包,如果你使用的操作系统属于上述的,那么推荐使用软件仓库安装。

目前collectd官方维护3个版本:5.4、5.5、5.6。根据需要选择合适的版本仓库。下面示例安装5.5版本的方法。

Debian/Ubuntu仓库安装:

echo "deb https://pkg.ci.collectd.org/deb $(lsb_release -sc) collectd-5.5" | sudo tee /etc/apt/sources.list.d/collectd.list

curl -s https://pkg.ci.collectd.org/pubkey.asc | sudo apt-key add -

sudo apt-get update && sudo apt-get install -y collectd

注意,Debian/Ubuntu软件仓库自带有collectd软件包,如果软件仓库自带的版本足够你使用,那么可以不用添加仓库,直接通过apt-get install collectd即可。

RHEL/CentOS仓库安装:

cat > /etc/yum.repos.d/collectd.repo <<EOF

[collectd-5.5]

name=collectd-5.5

baseurl=https://pkg.ci.collectd.org/rpm/collectd-5.5/epel-\$releasever-\$basearch/

gpgcheck=1

gpgkey=https://pkg.ci.collectd.org/pubkey.asc

EOF

yum install -y collectd

你如果需要使用其他collectd插件,此时也可一并安装对应的collectd-xxxx软件包。

源码安装collectd

collectd目前维护3个版本:5.4、5.5、5.6。源代码编译安装时同样可以根据自己需要选择对应版本的源码下载:

wget https://collectd.org/files/collectd-5.4.1.tar.gz

tar zxvf collectd-5.4.1.tar.gz

cd collectd-5.4.1

./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib --mandir=/usr/share/man --enable-all-plugins

make && make install

源代码编译安装需要预先解决好各种环境依赖,在RedHat平台上要提前安装如下软件包:

rpm -ivh "https://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm"

yum -y install libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool rrdtool-prel libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl-CPAN net-snmp net-snmp-devel

安装启动脚本如下:

cp contrib/redhat/init.d-collectd /etc/init.d/collectd

chmod +x /etc/init.d/collectd

启动collectd如下:

service collectd start

2. collectd的配置

以下配置可以实现对服务器基本的CPU、内存、网卡流量、磁盘I/O以及磁盘空间占用情况的监控:

Hostname "host.example.com"

LoadPlugin interface

LoadPlugin cpu

LoadPlugin memory

LoadPlugin network

LoadPlugin df

LoadPlugin disk

<Plugin interface>

    Interface "eth0"

    IgnoreSelected false

</Plugin>

<Plugin network>

<Server "10.0.0.1""25826"> ## Logstash 的 IP 地址和 collectd 的数据接收端口号

</Server>

</Plugin>

3. Logstash的配置

以下配置实现通过Logstash监听25826端口,接收从collectd发送过来的各项检测数据。

注意,logstash-filter-collectd插件本身需要单独安装,logstash插件安装说明之前已经讲过:

bin/logstash-plugin install logstash-filter-collectd

Logstash默认自带有collectd的codec编码插件。

推荐配置示例如下:

udp {

    port => 25826

    buffer_size => 1452

    workers => 3          # Default is 2

    queue_size => 30000   # Default is 2000

    codec => collectd { }

    type =>"collectd"

}

下面是简单的一个输出结果:

{

"_index": "logstash-2014.12.11",

"_type": "collectd",

"_id": "dS6vVz4aRtK5xS86kwjZnw",

"_score": null,

"_source": {

"host": "host.example.com",

"@timestamp": "2014-12-11T06:28:52.118Z",

"plugin": "interface",

"plugin_instance": "eth0",

"collectd_type": "if_packets",

"rx": 19147144,

"tx": 3608629,

"@version": "1",

"type": "collectd",

"tags": [

"_grokparsefailure"

        ]

    },

"sort": [

        1418279332118

    ]

}

参考资料

collectd支持收集的数据类型:https://git.verplant.org/?p=collectd.git;a=blob;hb=master; f=README

collectd收集各数据类型的配置参考资料:https://collectd.org/documentation/manpages/collectd.conf.5.shtml

collectd简单配置文件示例:https://gist.github.com/untergeek/ab85cb86a9bf39f1fc6d

2.3 过滤器配置

有丰富的过滤器插件,是Logstash威力如此强大的重要因素。名为过滤器,其实提供的不单单是过滤的功能。下面我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有地添加新的Logstash事件到后续的流程中去!

2.3.1 date时间处理

之前章节已经提过,logstash-filter-date插件可以用来转换你的日志记录中的时间字符串,变成LogStash::Timestamp对象,然后转存到@timestamp字段里。

因为在稍后的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用logstash-filter-date转换后删除自己的字段!

这在导入旧数据的时候固然非常有用,而在实时数据处理的时候同样有效,因为一般情况下数据流程中我们都会有缓冲区,导致最终的实际处理时间跟事件产生时间略有偏差。

强烈建议打开Nginx的access_log配置项的buffer参数,对极限响应性能有极大提升!

1.配置示例

logstash-filter-date插件支持五种时间格式:

ISO8601:类似“2011-04-19T03:44:01.103Z”这样的格式。具体Z后面可以有“08:00”也可以没有,“.103”这个也可以没有。常用场景里来说,Nginx的log_format配置里就可以使用$time_iso8601变量来记录请求时间成这种格式。

UNIX:UNIX时间戳格式,记录的是从1970年起始至今的总秒数。Squid默认日志格式中就使用了这种格式。

UNIX_MS:这个时间戳则是从1970年起始至今的总毫秒数。据我所知,JavaScript里经常使用这个时间格式。

TAI64N:TAI64N格式比较少见,是这个样子的:@4000000052f88ea32489532c。我目前只知道常见应用中,qmail会用这个格式。

Joda-Time库:Logstash内部使用了Java的Joda时间库来作时间处理。所以我们可以使用Joda库所支持的时间格式来作具体定义。Joda时间格式定义见表2-1。

表2-1 Joda时间库格式

格式符  含  义    描  述    示  例

G   era text    AD

C   century of era (>=0)    number  20

Y   year of era (>=0)   year    1996

x   weekyear    year    1996

w   week of weekyear    number  27

e   day of week number  2

E   day of week text    Tuesday; Tue

y   year    year    1996

D   day of year number  189

M   month of year   month   July; Jul; 07

d   day of month    number  10

a   halfday of day  text    PM

K   hour of halfday (0~11)  number  0

h   clockhour of halfday (1~12) number  12

H   hour of day (0~23)  number  0

k   clockhour of day (1~24) number  24

m   minute of hour  number  30

s   second of minute    number  55

S   fraction of second  number  978

z   time zone   text    Pacific Standard Time; PST

Z   time zone offset/id zone    -0800; -08:00; America/Los_Angeles

'   escape for text delimiter  

''  single quote    literal '

 

下面我们写一个Joda时间格式的配置作为示例:

filter {

    grok {

        match => ["message", "%{HTTPDATE:logdate}"]

    }

    date {

        match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]

    }

}

注意,时区偏移量只需要用一个字母Z即可。

2.时区问题的解释

很多中国用户经常提一个问题:为什么@timestamp比我们晚了8个小时?怎么修改成北京时间?

其实,Elasticsearch内部,对时间类型字段,是统一采用UTC时间,存成long长整形数据的!对日志统一采用UTC时间存储,是国际安全/运维界的一个通识—欧美公司的服务器普遍广泛分布在多个时区里—不像中国,地域横跨五个时区却只用北京时间。

对于页面查看,ELK的解决方案是在Kibana上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。

所以,建议大家接受这种设定。否则,即便你用.getLocalTime修改,也还要面临在Kibana过去修改,以及Elasticsearch原有的["now-1h" TO "now"]这种方便的搜索语句无法正常使用的尴尬。

以上,请读者自行斟酌。

2.3.2 grok正则捕获

grok是Logstash最重要的插件。你可以在grok里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。

1.正则表达式语法

运维工程师多多少少都会一点正则。你可以在grok里写标准的正则,像下面这样:

\s+(?<request_time>\d+(?:\.\d+)?)\s+

这个正则表达式写法对于Perl或者Ruby程序员应该很熟悉了,Python程序员可能更习惯写(?P<name>pattern),没办法,适应一下吧。

现在给我们的配置文件添加第一个过滤器区段配置。配置要添加在输入和输出区段之间(Logstash执行区段的时候并不依赖于次序,不过为了自己看得方便,还是按次序书写吧):

input {stdin{}}

filter {

    grok {

        match => {

        "message" =>"\s+(?<request_time>\d+(?:\.\d+)?)\s+"

        }

    }

}

output {stdout{codec=>rubydebug}}

运行Logstash进程然后输入“begin 123.456 end”,你会看到类似下面这样的输出:

{

"message" =>"begin 123.456 end",

"@version" =>"1",

"@timestamp" =>"2014-08-09T11:55:38.186Z",

"host" =>"raochenlindeMacBook-Air.local",

"request_time" =>"123.456"

}

漂亮!不过数据类型好像不太满意……request_time应该是数值而不是字符串。

我们已经提过稍后会学习用LogStash::Filters::Mutate来转换字段值类型,不过在grok 里,其实有自己的魔法来实现这个功能!

2. grok表达式语法

grok支持把预定义的grok表达式写入到文件中,官方提供的预定义grok表达式见:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

下面是从官方文件中摘抄的最简单但是足够说明用法的示例:

USERNAME [a-zA-Z0-9._-]+

USER %{USERNAME}

第一行,用普通的正则表达式来定义一个grok表达式;第二行,通过打印赋值格式(sprintf format),用前面定义好的grok表达式来定义另一个grok表达式。

grok表达式的打印复制格式的完整语法见下行示例。其中data_type目前只支持两个值:int和float。

%{PATTERN_NAME:capture_name:data_type}

所以我们可以改进我们的配置成下面这样:

filter {

    grok {

        match => {

        "message" =>"%{WORD} %{NUMBER:request_time:float} %{WORD}"

        }

    }

}

重新运行进程然后可以得到如下结果:

{

"message" =>"begin 123.456 end",

"@version" =>"1",

"@timestamp" =>"2014-08-09T12:23:36.634Z",

"host" =>"raochenlindeMacBook-Air.local",

"request_time" => 123.456

}

这次request_time变成数值类型了。

3.最佳实践

实际运用中,我们需要处理各种各样的日志文件,如果你都是在配置文件里各自写一行自己的表达式,就完全不可管理了。所以,我们建议是把所有的grok表达式统一写入到一个地方。然后用filter/grok的patterns_dir选项来指明。

如果你把“message”里所有的信息都grok到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用remove_field参数来删除掉message字段,或者用overwrite参数来重写默认的message字段,只保留最重要的部分。

重写参数的示例如下:

filter {

    grok {

        patterns_dir =>["/path/to/your/own/patterns"]

        match => {

        "message" =>"%{SYSLOGBASE} %{DATA:message}"

        }

        overwrite => ["message"]

    }

}

更多有关grok正则性能的最佳实践(比如timeout_millis等配置参数),请参考:https://www.elastic.co/blog/do-you-grok-grok。

4.高级用法

多行匹配 在和codec/multiline搭配使用的时候,需要注意一个问题,grok正则和普通正则一样,默认是不支持匹配回车换行的。就像你需要=~ // m一样也需要单独指定,具体写法是在表达式开始位置加(?m)标记。如下所示:

match => {

"message" =>"(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+"

}

多项选择 有时候我们会碰上一个日志有多种可能格式的情况。这时候要写成单一正则就比较困难,或者全用|隔开又比较丑陋。这时候,Logstash的语法提供给我们一个有趣的解决方式。

文档中,都说明logstash-filters-grok插件的match参数应该接受的是一个Hash值。但是因为早期的Logstash语法中Hash值也是用[]这种方式书写的,所以其实现在传递Array值给match参数也完全没问题。所以,我们这里其实可以传递多个正则来匹配同一个字段:

match => [

"message", "(?<request_time>\d+(?:\.\d+)?)",

"message", "%{SYSLOGBASE} %{DATA:message}",

"message", "(?m)%{WORD}"

]

Logstash会按照这个定义次序依次尝试匹配,到匹配成功为止。虽说效果跟用|分割写个大大的正则是一样的,但是可阅读性好了很多。

我强烈建议每个人都要使用Grok Debugger (https://grokdebug.herokuapp.com/)来调试自己的grok表达式。

2.3.3 dissect解析

grok作为Logstash最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数情况下日志格式并没有那么复杂,Logstash开发团队在5.0版新添加了另一个解析字段的插件:dissect。当日志格式有比较简明的分隔标志位且重复性较大的时候,可以使用dissect插件更快地完成解析工作。下面是解析syslog的示例。

filter {

    dissect {

        mapping => {

            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"

        }

        convert_datatype => {

            pid => "int"

        }

    }

}

我们看到上面使用了和Grok很类似的%{}语法来表示字段,这显然是基于习惯延续的考虑。不过示例中%{+ts}的加号就不一般了。dissect除了字段外面的字符串定位功能以外,还通过几个特殊符号来处理字段提取的规则:

%{+key} 这个+表示前面已经捕获到一个key字段了,而这次捕获的内容自动添补到之前key字段内容的后面。

%{+key/2} 这个/2表示在有多次捕获内容都填到key字段里的时候,拼接字符串的顺序谁前谁后。/2表示排第2位。

%{?string} 这个?表示这块只是一个占位,并不会实际生成捕获字段存到Event里面。

%{?string} %{&string} 当同样捕获名称都是string,但是一个?和一个&在一起的时候,表示这是一个键值对。

比如对https://rizhiyi.com/index.do?id=123写这么一段配置:

https://%{domain}/%{?url}?%{?arg1}=%{&arg1}

则最终生成的 Event 内容是这样的:

{

    domain => "rizhiyi.com",

    id => "123"

}

2.3.4 GeoIP地址查询

GeoIP是最常见的免费IP地址归类查询库,同时也有收费版可以采购。GeoIP库可以根据IP地址提供对应的地域信息,包括国别、省市、经纬度等,对于可视化地图和区域统计非常有用。

配置示例如下:

filter {

    geoip {

        source =>"message"

    }

}

运行结果如下:

{

"message" =>"183.60.92.253",

"@version" =>"1",

"@timestamp" =>"2014-08-07T10:32:55.610Z",

"host" =>"raochenlindeMacBook-Air.local",

"geoip" => {

"ip" =>"183.60.92.253",

"country_code2" =>"CN",

"country_code3" =>"CHN",

"country_name" =>"China",

"continent_code" =>"AS",

"region_name" =>"30",

"city_name" =>"Guangzhou",

"latitude" =>23.11670000000001,

"longitude" =>113.25,

"timezone" =>"Asia/Chongqing",

"real_region_name" =>"Guangdong",

"location" => [

            [0] 113.25,

            [1] 23.11670000000001

        ]

    }

}

GeoIP库数据较多,如果你不需要这么多内容,可以通过fields选项指定自己所需要的。下例为全部可选内容:

filter {

    geoip {

        fields => ["city_name", "continent_code", "country_code2",

            "country_code3", "country_name", "dma_code", "ip", "latitude",

            "longitude", "postal_code", "region_name", "timezone"]

    }

}

需要注意的是:geoip.location是Logstash通过latitude和longitude额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:

filter {

    geoip {

        fields => ["city_name", "country_code2", "country_name", "latitude",

            "longitude", "region_name"]

        remove_field => ["[geoip][latitude]", "[geoip][longitude]"]

    }

}

还要注意:geoip插件的“source”字段可以是任一处理后的字段,比如“client_ip”,但是字段内容却需要小心!GeoIp库内只存有公共网络上的IP信息,查询不到结果的,会直接返回null,而Logstash的GeoIp插件对null结果的处理是:“不生成对应的geoip.字段”。所以读者在测试时,如果使用了诸如127.0.0.1、172.16.0.1、182.168.0.1、10.0.0.1等内网地址,会发现没有对应输出!

2.3.5 JSON编解码

在上一章,已经讲过在Codec中使用JSON编码。但是,有些日志可能是一种复合的数据结构,其中只有一部分记录是JSON格式的。这时候,我们依然需要在filter阶段,单独启用JSON解码插件。

配置示例如下:

filter {

    json {

        source =>"message"

        target =>"jsoncontent"

    }

}

运行结果如下:

{

"@version": "1",

"@timestamp": "2014-11-18T08:11:33.000Z",

"host": "web121.mweibo.tc.sinanode.com",

"message": "{\"uid\":3081609001,\"type\":\"signal\"}",

"jsoncontent": {

"uid": 3081609001,

"type": "signal"

}

}

如果不打算使用多层结构的话,删掉target配置即可。单层结构新的结果如下:

{

"@version": "1",

"@timestamp": "2014-11-18T08:11:33.000Z",

"host": "web121.mweibo.tc.sinanode.com",

"message": "{\"uid\":3081609001,\"type\":\"signal\"}",

"uid": 3081609001,

"type": "signal"

}

2.3.6 key-value切分

在很多情况下,日志内容本身都是一个类似于key-value的格式,但是格式具体的样式却是多种多样的。Logstash提供logstash-filter-kv插件,帮助处理不同样式的key-value日志,变成实际的LogStash::Event数据。

配置示例如下:

filter {

    ruby {

        init =>"@kname = ['method','uri','verb']"

    ruby {

        init => "@kname = ['method','uri','verb']"

        code => "

            new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('|'))])

            new_event.remove('@timestamp')

            event.append(new_event)""

        "

    }

    if [uri] {

        ruby {

            init => "@kname = ['url_path','url_args']"

            code => "

                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])

                new_event.remove('@timestamp')

                event.append(new_event)""

            "

        }

        kv {

            prefix =>"url_"

            source =>"url_args"

            field_split =>"&"

            remove_field => [ "url_args", "uri", "request" ]

        }

    }

}

Nginx访问日志中的$request,通过这段配置,可以详细切分成method、url_path、verb、url_a、url_b...

进一步,如果url_args中有过多字段,可能导致Elasticsearch集群因为频繁update mapping或者消耗太多内存在cluster state上而宕机。所以,更优的选择是只保留明确有用的url_args内容,其他部分舍去,如下所示:

kv {

    prefix =>"url_"

    source =>"url_args"

    field_split =>"&"

    include_keys => [ "uid", "cip" ]

    remove_field => [ "url_args", "uri", "request" ]

}

上例即表示,除了url_uid和url_cip两个字段以外,其他的url_*都不保留。

2.3.7 metrics数值统计

logstash-filter-metrics插件是使用Ruby的Metriks模块来实现在内存里实时地计数和采样分析。该模块支持两个类型的数值分析:meter和timer。下面分别举例说明。

1. Meter示例(速率阈值检测)

Web访问日志的异常状态码频率是运维人员会非常关心的一个数据。通常我们的做法是通过Logstash或者其他日志分析脚本,把计数发送到rrdtool或者graphite里面,然后再通过check_graphite脚本之类的东西来检查异常并报警。

事实上这个事情可以直接在Logstash内部就完成。比如如果最近一分钟504请求的个数超过100个就报警,如下所示:

filter {

    metrics {

        meter => "error_%{status}"

            add_tag => "metric"

            ignore_older_than => 10

    }

    if "metric" in [tags] {

        ruby {

            code => "event.cancel if event.get('[error_504][rate_1m]') * 60 > 100"

        }

    }

}

output {

    if "metric" in [tags] {

        exec {

            command => "echo \"Out of threshold: %{[error_504][rate_1m]}\""

        }

}

这里需要注意*60的含义。metrics模块生成的rate_1m/5m/15m意思是:最近1、5、15分钟的每秒速率!

2. Timer示例(box and whisker异常检测)

官版的logstash-filter-metrics插件只适用于metric事件的检查。由插件生成的新事件内部不存有来自input区段的实际数据信息。所以,要完成我们的百分比分布箱体检测,需要首先对代码稍微做几行变动,即在metric的timer事件里加一个属性,存储最近一个实际事件的数值:

def register

    …

    @last = {}

end

def filter(event)

    …

    @last[event.sprintf(name)] = event.sprintf(value).to_f

end

def flush(options = {})

    …

    @metric_timer.each_pair do |name, metric|

event.set(“[#{name}][last]”, @last[name])

metric.clear if should_clear?

    end

    …

end

有了这个Last值,然后我们就可以用如下配置来探测异常数据了:

filter {

    metrics {

        timer => {"rt" =>"%{request_time}"}

        percentiles => [25, 75]

        add_tag =>"percentile"

    }

        ruby {

            code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)"

        }

    }

}

output {

    if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) {

       exec {

           command => "echo \"Anomaly: %{[rt][last]}\""

       }

}

有关box and shisker plot内容和重要性,参见《数据之魅》一书。

2.3.8 mutate数据修改

logstash-filter-mutate插件是Logstash另一个重要插件,它提供了丰富的基础类型数据处理能力,包括类型转换、字符串处理和字段处理等。

1.类型转换

类型转换是logstash-filter-mutate插件最初诞生时的唯一功能,其应用场景在之前的2.3.5节“JSON编解码”中已经提到。

可以设置的转换类型包括:“integer”、“float”和“string”。示例如下:

filter {

    mutate {

        convert => ["request_time", "float"]

    }

}

mutate除了转换简单的字符值,还支持对数组类型的字段进行转换,即将[“1”,“2”]转换成[1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的logstash-filter-ruby插件完成。

2. 字符串处理

有如下字符串处理的插件:

gsub:仅对字符串类型字段有效。

gsub => ["urlparams", "[\\?#]", "_"]

split:分割字符串。

filter {

    mutate {

        split => ["message", "|"]

    }

}

随意输入一串以|分割的字符,比如“123|321|adfd|dfjld*=123”,可以看到如下输出:

{

"message" => [

    [0] "123",

    [1] "321",

    [2] "adfd",

    [3] "dfjld*=123"

],

"@version" =>"1",

"@timestamp" =>"2014-08-20T15:58:23.120Z",

"host" =>"raochenlindeMacBook-Air.local"

}

join:仅对数组类型字段有效。

我们在之前已经用split割切的基础上再join回去。配置改成:

filter {

    mutate {

        split => ["message", "|"]

    }

    mutate {

        join => ["message", ","]

    }

}

filter区段之内,是顺序执行的。所以我们最后看到的输出结果是:

{

"message" =>"123,321,adfd,dfjld*=123",

"@version" =>"1",

"@timestamp" =>"2014-08-20T16:01:33.972Z",

"host" =>"raochenlindeMacBook-Air.local"

}

merge:合并两个数组或者哈希字段。依然在之前split的基础上继续:

filter {

    mutate {

        split => ["message", "|"]

    }

    mutate {

        merge => ["message", "message"]

    }

}

我们会看到输出:

{

"message" => [

    [0] "123",

    [1] "321",

    [2] "adfd",

    [3] "dfjld*=123",

    [4] "123",

    [5] "321",

    [6] "adfd",

    [7] "dfjld*=123"

],

"@version" =>"1",

"@timestamp" =>"2014-08-20T16:05:53.711Z",

"host" =>"raochenlindeMacBook-Air.local"

}

如果src字段是字符串,会自动先转换成一个单元素的数组再合并。把上一示例中的来源字段改成“host”:

filter {

    mutate {

        split => ["message", "|"]

    }

    mutate {

        merge => ["message", "host"]

    }

}

结果变成:

{

"message" => [

    [0] "123",

    [1] "321",

    [2] "adfd",

    [3] "dfjld*=123",

    [4] "raochenlindeMacBook-Air.local"

],

"@version" =>"1",

"@timestamp" =>"2014-08-20T16:07:53.533Z",

"host" => [

    [0] "raochenlindeMacBook-Air.local"

    ]

}

看,目的字段“message”确实多了一个元素,但是来源字段“host”本身也由字符串类型变成数组类型了!

同样,如果目的字段不是数组,也会被强制转换。即使来源字段并不存在:

filter {

    mutate {

        merge => ["message", "not_exist_field"]

    }

}

结果会变成:

{

"message" => [

        [0] "123|321|adfd|dfjld*=123"

    ],

"@version" =>"1",

"@timestamp" =>"2014-08-20T15:58:23.120Z",

"host" =>"raochenlindeMacBook-Air.local"

}

strip:去除字段内容前后的空格。可以接受数组参数:

filter {

    mutate {

        strip => ["syslog_message", "syslog_datetime"]

    }

}

lowercase:将字段内容全部转换成小写字母。同样可以接受数组。在ELK stack场景中,将内容转换成小写会是一个比较常见的需求。因为Elasticsearch默认是统一按照小写字母来搜索的。为了确保检索准确率,在不影响使用的情况下,建议对常用检索字段启用lowercase配置。

uppercase:将字段内容全部转换成大写字母。同样可以接受数组。

3.字段处理

字段处理的插件有:

rename:重命名某个字段,如果目的字段已经存在,会被覆盖掉,如下所示:

filter {

    mutate {

        rename => ["syslog_host", "host"]

    }

}

update:更新某个字段的内容。如果字段不存在,不会新建。

replace:作用和update类似,但是当字段不存在的时候,它会起到add_field参数一样的效果,自动添加新的字段。

4.执行次序

需要注意的是,filter/mutate内部是有执行次序的。其次序如下:

rename(event) if @rename

update(event) if @update

replace(event) if @replace

convert(event) if @convert

gsub(event) if @gsub

uppercase(event) if @uppercase

lowercase(event) if @lowercase

strip(event) if @strip

remove(event) if @remove

split(event) if @split

join(event) if @join

merge(event) if @merge

filter_matched(event)

而filter_matched这个filters/base.rb里继承的方法也是有次序的:

@add_field.each do |field, value|

end

@remove_field.each do |field|

end

@add_tag.each do |tag|

end

@remove_tag.each do |tag|

end

2.3.9 随心所欲的Ruby处理

如果你稍微懂那么一点点Ruby语法的话,logstash-filter-ruby插件将会是一个非常有用的工具。比如你需要稍微修改一下LogStash::Event对象,但是又不打算为此写一个完整的插件,用logstash-filter-ruby插件绝对感觉良好。

配置示例如下:

filter {

    ruby {

        init =>"@kname = ['client','servername','url','status','time','size','upstream',

            'upstreamstatus','upstreamtime','referer','xff','useragent']"

        code => "

            ew_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])

            new_event.remove('@timestamp')

        event.append(new_event)"

    }

}

官网示例是一个比较有趣但是没啥大用的做法—随机取消90%的事件。

所以上面我们给出了一个有用而且强大的实例。

通常我们都是用logstash-filter-grok插件来捕获字段的,但是正则耗费大量的CPU资源,很容易成为Logstash进程的瓶颈。

而实际上,很多流经Logstash的数据都是有自己预定义的特殊分隔符的,我们可以很简单的直接切割成多个字段。

从Logstash2.3开始,LogStash::event.append不再直接接受Hash对象,而必须是LogStash:: Event对象。所以示例变成要先初始化一个新的event,再把无用的@timestamp移除,再append进去。否则会把@timestamp变成有两个时间的数组了!

  从Logstash 5.0开始,LogStash::Event改为Java实现,直接使用event["parent"]["child"]形式获取的,不是原事件的引用而是复制品,需要改用event.get('[parent][child]')和event.set('[parent][child]', 'value')的方法。

logstash-filter-mutate插件里的“split”选项只能切成数组,后续很不方便使用和识别。而在logstash-filter-ruby里,我们可以通过“init”参数预定义好由每个新字段的名字组成的数组,然后在“code”参数指定的Ruby语句里通过两个数组的zip操作生成一个哈希并添加进数组里。短短一行Ruby代码,可以减少50%以上的CPU使用率。

logstash-filter-ruby插件用途远不止这一点,下一节你还会继续见到它的身影。

更多实例如下:

filter{

    date {

        match => ["datetime" , "UNIX"]

    }

    ruby {

        code =>"event.cancel if 5 * 24 * 3600 < (event.get( '@timestamp')-::Time.now).abs"

    }

}

在实际运用中,我们几乎肯定会碰到出乎意料的输入数据。这都有可能导致Elasticsearch集群出现问题。

当数据格式发生变化,比如UNIX时间格式变成UNIX_MS时间格式,会导致Logstash疯狂创建新索引,集群崩溃。

或者误输入过老的数据时,因为一般我们会close几天之前的索引以节省内存,必要时再打开。而直接尝试把数据写入被关闭的索引会导致内存问题。

这时候我们就需要提前校验数据的合法性。上面配置,就是用于过滤掉时间范围与当前时间差距太大的非法数据的。

2.3.10 split拆分事件

上一章我们通过multiline插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是split插件。

配置示例如下:

filter {

    split {

        field =>"message"

        terminator =>"#"

    }

}

这个测试中,我们在intputs/stdin的终端中输入一行数据:“test1#test2”,结果看到输出两个事件:

{

"@version": "1",

"@timestamp": "2014-11-18T08:11:33.000Z",

"host": "web121.mweibo.tc.sinanode.com",

"message": "test1"

}

{

"@version": "1",

"@timestamp": "2014-11-18T08:11:33.000Z",

"host": "web121.mweibo.tc.sinanode.com",

"message": "test2"

}最后更新:2017-05-19 15:02:37

  上一篇:go  事务必会必知
  下一篇:go  InterruptedException 和 interrupting threads 的一些说明