《ELK Stack權威指南 》第2章 插件配置
插 件 配 置
插件是Logstash最大的特色。各種不同的插件源源不斷地被創造出來,發布到社區中供大家使用。本章會按照插件的類別,對一般場景下的一些常用插件做詳細的配置和用例介紹。本章介紹的插件包括:1)輸入插件。基於shipper端場景,主要介紹STDIN、TCP、File等插件。2)編解碼插件。編解碼通常是會被遺忘的環節,但是運用好了,會大大提高工作效率,本節介紹最常用的JSON和multiline插件。3)過濾器插件。名為過濾器,其實各種數據裁剪和計算都可以在這類插件裏完成,是Logstash最強大的一環。本節會詳細介紹grok、date、mutate、ruby、metrics等插件的妙用。4)輸出插件。Logstash雖然經常跟Elasticsearch並稱,但是作為一個日誌傳輸框架,它其實可以輸出數據到各種不同的地方。比如Graphite、HDFS、Nagios等等。本章會介紹這些常用的輸出插件用法。
2.1 輸入插件
在“Hello World”示例中,我們已經見到並介紹了Logstash的運行流程和配置的基礎語法。從這章開始,我們就要逐一介紹Logstash流程中比較常用的一些插件,並在介紹中針對其主要適用的場景、推薦的配置,作一些說明。
限於篇幅,接下來內容中,配置示例不一定能貼完整。請記住一個原則:Logstash配置一定要有一個input和一個output。在演示過程中,如果沒有寫明input,默認就會使用“Hello World”裏我們已經演示過的logstash-input-stdin,同理,沒有寫明的output就是logstash-output-stdout。
以上請讀者自明。
2.1.1 標準輸入
我們已經見過好幾個示例使用stdin了。這也應該是Logstash裏最簡單和基礎的插件了。所以,在這段中,我們先介紹一些未來每個插件都會有的一些方法。
配置示例如下:
input {
stdin {
add_field => {"key" =>"value"}
codec =>"plain"
tags => ["add"]
type =>"std"
}
}
用上麵的新stdin設置重新運行一次最開始的Hello World示例。我建議大家把整段配置都寫入一個文本文件,然後運行命令:bin/logstash -f stdin.conf。輸入“Hello World”並回車後,你會在終端看到如下輸出:
{
"message" =>"hello world",
"@version" =>"1",
"@timestamp" =>"2014-08-08T06:48:47.789Z",
"type" =>"std",
"tags" => [
[0] "add"
],
"key" =>"value",
"host" =>"raochenlindeMacBook-Air.local"
}
type和tags是Logstash事件中兩個特殊的字段。通常來說,我們會在“輸入區段”中通過type來標記事件類型—我們肯定是提前能知道這個事件屬於什麼類型的。而tags則是在數據處理過程中,由具體的插件來添加或者刪除的。
最常見的用法是像下麵這樣:
input {
stdin {
type =>"web"
}
}
filter {
if [type] == "web" {
grok {
match => ["message", %{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags] {
nagios_nsca {
nagios_status =>"1"
}
} else {
elasticsearch {
}
}
}
看起來蠻複雜的,對吧?
繼續學習,你也可以寫出來的。
2.1.2 文件輸入
分析網站訪問日誌應該是一個運維工程師最常見的工作了。所以我們先學習一下怎麼用Logstash來處理日誌文件。
Logstash使用一個名叫FileWatch的Ruby Gem庫來監聽文件變化。這個庫支持glob展開文件路徑,而且會記錄一個叫.sincedb的數據庫文件來跟蹤被監聽日誌文件的當前讀取位置。所以,不要擔心Logstash會漏過你的數據。
sincedb文件中記錄了每個被監聽的文件的inode, major number, minor number和pos。
配置示例如下:
input {
file {
path => ["/var/log/*.log", "/var/log/message"]
type =>"system"
start_position =>"beginning"
}
}
有一些比較有用的配置項,可以用來指定FileWatch庫的行為:
discover_interval:Logstash每隔多久去檢查一次被監聽的path下是否有新文件,默認值是15秒。
exclude:不想被監聽的文件可以排除出去,這裏跟path一樣支持glob展開。
sincedb_path:如果你不想用默認的$HOME/.sincedb(Windows平台上為%USERPROFILE%\.sincedb,該變量默認值是:C:\Windows\System32\config\systemprofile),可以通過這個配置定義sincedb文件到其他位置。
sincedb_write_interval:Logstash每隔多久寫一次sincedb文件,默認是15秒。
stat_interval:Logstash每隔多久檢查一次被監聽文件狀態(是否有更新),默認是1秒。
start_position:Logstash從什麼位置開始讀取文件數據,默認是結束位置,也就是說,Logstash進程會以類似tail-F的形式運行。如果你是要導入原有數據,把這個設定改成"beginning",Logstash進程就從頭開始讀取,有點類似於less+F命令的形式運行。
close_older:一個已經監聽中的文件,如果超過這個值的時間內沒有更新內容,就關閉監聽它的文件句柄。默認是3 600秒,即一小時。
ignore_older:在每次檢查文件列表的時候,如果一個文件的最後修改時間超過這個值,就忽略這個文件。默認是86 400秒,即一天。
注意事項如下:
1)通常你要導入原有數據進Elasticsearch的話,你還需要filter/date插件來修改默認的"@timestamp"字段值。稍後會學習這方麵的知識。
2)FileWatch隻支持文件的絕對路徑,而且會不自動遞歸目錄。所以有需要的話,請用數組方式都寫明具體哪些文件。
3)LogStash::Inputs::File隻是在進程運行的注冊階段初始化一個FileWatch對象。所以它不能支持類似fluentd那樣的path =>"/path/to/%{+yyyy/MM/dd/hh}.log"寫法。達到相同目的,你隻能寫成path =>"/path/to/*/*/*/*.log"。FileWatch模塊提供了一個稍微簡單一點的寫法:/path/to/**/*.log,用**來縮寫表示遞歸全部子目錄。
4)start_position僅在該文件從未被監聽過的時候起作用。如果sincedb文件中已經有這個文件的inode記錄了,那麼Logstash依然會從記錄過的pos開始讀取數據。所以重複測試的時候每回需要刪除sincedb文件。此外,官方博客上提供了另一個巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定義為/dev/null,則每次重啟自動從頭開始讀。
5)因為Windows平台上沒有inode的概念,Logstash某些版本在Windows平台上監聽文件不是很靠譜。Windows平台上,推薦考慮使用nxlog作為收集端,參見後麵5.5節。
2.1.3 TCP輸入
未來你可能會用Redis服務器或者其他的消息隊列係統來作為Logstash Broker的角色。不過Logstash其實也有自己的TCP/UDP插件,在臨時任務的時候,也算能用,尤其是測試環境。
雖然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL庫實現了高級的SSL功能,但Logstash本身隻能在SizedQueue中緩存20個事件。這就是我們建議在生產環境中換用其他消息隊列的原因。
配置示例如下:
input {
tcp {
port => 8888
mode =>"server"
ssl_enable => false
}
}
目前來看,LogStash::Inputs::TCP最常見的用法就是配合nc命令導入舊數據。在啟動Logstash進程後,在另一個終端運行如下命令即可導入數據:
# nc 127.0.0.1 8888 < olddata
這種做法比用LogStash::Inputs::File好,因為當nc命令結束,我們就知道數據導入完畢了。而用input/file方式,Logstash進程還會一直等待新數據輸入被監聽的文件,不能直接看出是否任務完成了。
2.1.4 syslog輸入
syslog可能是運維領域最流行的數據傳輸協議了。當你想從設備上收集係統日誌的時候,syslog應該會是你的第一選擇。尤其是網絡設備,比如思科中syslog幾乎是唯一可行的辦法。
我們這裏不解釋如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf來發送數據,而隻講如何把Logstash配置成一個syslog服務器來接收數據。
有關rsyslog的用法,後麵的5.4節會有更詳細的介紹。
配置示例如下:
input {
syslog {
port =>"514"
}
}
作為最簡單的測試,我們先暫停一下本機的syslogd(或rsyslogd)進程,然後啟動Logstash進程(這樣就不會有端口衝突問題)。現在,本機的syslog就會默認發送到Logstash裏了。我們可以用自帶的logger命令行工具發送一條“Hello World”信息到syslog裏(即Logstash裏)。看到的Logstash輸出像下麵這樣:
{
"message" =>"Hello World",
"@version" =>"1",
"@timestamp" =>"2014-08-08T09:01:15.911Z",
"host" =>"127.0.0.1",
"priority" =>31,
"timestamp" =>"Aug 8 17:01:15",
"logsource" =>"raochenlindeMacBook-Air.local",
"program" =>"com.apple.metadata.mdflagwriter",
"pid" =>"381",
"severity" =>7,
"facility" =>3,
"facility_label" =>"system",
"severity_label" =>"Debug"
}
Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok來實現LogStash::Inputs::Syslog的。所以你其實可以直接用Logstash配置實現一樣的效果,如下所示:
input {
tcp {
port =>"8514"
}
}
filter {
grok {
match => ["message", "%{SYSLOGLINE}" ]
}
syslog_pri { }
}
建議在使用LogStash::Inputs::Syslog的時候走TCP協議來傳輸數據。
因為具體實現中,UDP監聽器隻用了一個線程,而TCP監聽器會在接收每個連接的時候都啟動新的線程來處理後續步驟。
如果你已經在使用UDP監聽器收集日誌,用下行命令檢查你的UDP接收隊列大小:
# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'
Recv-Q
228096
228096是UDP接收隊列的默認最大大小,這時候linux內核開始丟棄數據包了!
強烈建議使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合實現同樣的syslog功能!
雖然LogStash::Inputs::Syslog在使用TCPServer的時候可以采用多線程處理數據的接收,但是在同一個客戶端數據的處理中,其grok和date是一直在該線程中完成的,這會導致總體上的處理性能幾何級的下降—經過測試,TCPServer每秒可以接收50 000條數據,而在同一線程中啟用grok後每秒隻能處理5 000條,再加上date隻能達到500條!
才將這兩步拆分到filters階段後,Logstash支持對該階段插件單獨設置多線程運行,大大提高了總體處理性能。在相同環境下,logstash -f tcp.conf -w 20的測試中,總體處理性能可以達到每秒30 000條數據!
測試采用Logstash作者提供的命令:
yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000
出處見:https:// github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile
如果你實在沒法切換到TCP協議,可以自己寫程序,或者使用其他基於異步I/O框架(比如libev)的項目。下麵是一個簡單的異步I/O實現UDP監聽數據輸入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。
2.1.5 http_poller抓取
Logstash作為數據采集係統,也支持自己作為一個HTTP客戶端去抓取網頁數據或者接口數據。這方麵有一個很明顯的IT運維應用場景:很多業務係統軟件本身提供了RESTful的內部運行狀態接口,可以直接通過接口采集這些監控信息。
更長期的方案應該是編寫對應的metricbeat模塊,但是直接采用logstash-input-http_poller顯然更快捷。
比如Nginx的性能狀態,社區有一個非常全麵的性能狀態監控模塊:nginx-module-vts。在新浪微博,後端池分為核心接口、非核心接口兩塊,我們要分別監控的話,nginx-module-vts的配置如下:
http {
vhost_traffic_status_zone;
map $uri $filter_uri {
default 'non-core';
/2/api/timeline core;
~^/2/api/unread core;
}
server {
vhost_traffic_status_filter_by_set_key $filter_uri;
location /status {
auth_basic "Restricted";
auth_basic_user_file pass_file;
vhost_traffic_status_display;
vhost_traffic_status_display_format json;
}
}
}
則對應的logstash-input-http_poller配置如下:
input {
http_poller {
urls => {
0 => {
method => get
url => "https://localhost:80/status/format/json"
headers => {
Accept => "application/json"
}
auth => {
user => "YouKnowIKnow"
password => "IKnowYouDonotKnow"
}
}
1 => {
method => get
url => "https://localhost:80/status/con ... up%3D*"
headers => {
Accept => "application/json"
}
auth => {
user => "YouKnowIKnow"
password => "IKnowYouDonotKnow"
}
}
}
request_timeout => 60
interval => 60
codec => "json"
}
}
這段配置就可以就可以每60秒獲得一次 vts 數據,並重置計數了。
注意,url是一個Hash值,所以它的執行順序是根據Hash.map來的,為了確保我們是先獲取數據再重置,這裏幹脆用0, 1來作為Hash的key,這樣順序就沒問題了。
2.2 編解碼配置
Codec是Logstash從1.3.0版開始新引入的概念(Codec來自Coder/decoder兩個單詞的首字母縮寫)。
在此之前,Logstash隻支持純文本形式輸入,然後以過濾器處理它。但現在,我們可以在輸入期處理不同類型的數據,這全是因為有了Codec設置。
所以,這裏需要糾正之前的一個概念。Logstash不隻是一個input | filter | output的數據流,而是一個input | decode | filter | encode | output的數據流!Codec就是用來decode、encode事件的。
Codec的引入,使得Logstash可以更好、更方便地與其他有自定義數據格式的運維產品共存,比如graphite、fluent、netflow、collectd,以及使用msgpack、json、edn等通用數據格式的其他產品等。
事實上,我們在第一個“Hello World”用例中就已經用過Codec了—rubydebug就是一種Codec!雖然它一般隻會用在stdout插件中,作為配置測試或者調試的工具。
這個五段式的流程說明源自Perl版的Logstash(後來改名叫Message::Passing模塊)的設計。本書稍後5.8節會對該模塊稍作介紹。
2.2.1 JSON編解碼
在早期的版本中,有一種降低Logstash過濾器的CPU負載消耗的做法盛行於社區(在當時的Cookbook上有專門的一節介紹):直接輸入預定義好的JSON數據,這樣就可以省略掉filter/grok配置!
這個建議依然有效,不過在當前版本中需要稍微做一點配置變動,因為現在有專門的Codec設置。
1.配置示例
社區常見的示例都是用的Apache的customlog,不過我覺得Nginx是一個比Apache更常用的新型Web服務器,所以我這裏會用nginx.conf做示例:
logformat json '{"@timestamp":"$time_iso8601",'
'"@version":"1",'
'"host":"$server_addr",'
'"client":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"domain":"$host",'
'"url":"$uri",'
'"status":"$status"}';
access_log /var/log/nginx/access.log_json json;
注意,在$request_time和$body_bytes_sent變量兩頭沒有雙引號",這兩個數據在JSON裏應該是數值類型!
重啟Nginx應用,然後修改你的input/file區段配置成下麵這樣:
input {
file {
path =>"/var/log/nginx/access.log_json""
codec =>"json"
}
}
2.運行結果
下麵訪問一下用Nginx發布的Web頁麵,然後你會看到Logstash進程輸出類似下麵這樣的內容:
{
"@timestamp" =>"2014-03-21T18:52:25.000+08:00",
"@version" =>"1",
"host" =>"raochenlindeMacBook-Air.local",
"client" =>"123.125.74.53",
"size" =>8096,
"responsetime" =>0.04,
"domain" =>"www.domain.com",
"url" =>"/path/to/file.suffix",
"status" =>"200"
}
3. Nginx代理服務的日誌格式問題
對於一個Web服務器的訪問日誌,看起來已經可以很好的工作了。不過如果Nginx是作為一個代理服務器運行的話,訪問日誌裏有些變量,比如說$upstream_response_time,可能不會一直是數字,它也可能是一個“-”字符串!這會直接導致Logstash對輸入數據驗證報異常。
有兩個辦法解決這個問題:
1)用sed在輸入之前先替換-成0。運行Logstash進程時不再讀取文件而是標準輸入,這樣命令就成了下麵這個樣子:
tail -F /var/log/nginx/proxy_access.log_json \
| sed 's/upstreamtime":-/upstreamtime":0/' \
| /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/proxylog.conf
2)日誌格式中統一記錄為字符串格式(即都帶上雙引號),然後再在Logstash中用filter/mutate插件來變更應該是數值類型的字符字段的值類型。
有關LogStash::Filters::Mutate的內容,本書稍後會有介紹。
2.2.2 多行事件編碼
有些時候,應用程序調試日誌會包含非常豐富的內容,為一個事件打印出很多行內容。這種日誌通常都很難通過命令行解析的方式做分析。
而Logstash正為此準備好了codec/multiline插件!當然,multiline插件也可以用於其他類似的堆棧式信息,比如Linux的內核日誌。
配置示例如下:
input {
stdin {
codec => multiline {
pattern =>"^\["
negate => true
what =>"previous"
}
}
}
運行Logstash進程,然後在等待輸入的終端中輸入如下幾行數據:
[Aug/08/08 14:54:03] hello world
[Aug/08/09 14:54:04] hello logstash
hello best practice
hello raochenlin
[Aug/08/10 14:54:05] the end
你會發現Logstash輸出下麵這樣的返回:
{
"@timestamp" =>"2014-08-09T13:32:03.368Z",
"message" =>"[Aug/08/08 14:54:03] hello world\n",
"@version" =>"1",
"host" =>"raochenlindeMacBook-Air.local"
}
{
"@timestamp" =>"2014-08-09T13:32:24.359Z",
"message" =>"[Aug/08/09 14:54:04] hello logstash\n\n hello best practice\n\n
hello raochenlin\n",
"@version" =>"1",
"tags" => [
[0] "multiline"
],
"host" =>"raochenlindeMacBook-Air.local"
}
你看,後麵這個事件,在“message”字段裏存儲了三行數據!
輸出的事件中沒有最後一行的"the end"字符串,這是因為你最後輸入的回車符\n並不匹配設定的^\[正則表達式,Logstash還得等下一行數據直到匹配成功後才會輸出這個事件。
其實這個插件的原理很簡單,就是把當前行的數據添加到前麵一行後麵,直到新進的當前行匹配^\[正則為止。這個正則還可以用grok表達式,稍後你就會學習這方麵的內容。具體的Java日誌正則見:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/java
說到應用程序日誌,Log4j肯定是第一個被大家想到的,使用codec/multiline也確實是一個辦法。
不過,如果你本身就是開發人員,或者可以推動程序修改變更的話,Logstash還提供了另一種處理Log4j的方式:input/log4j。與codec/multiline不同,這個插件是直接調用了org.apache.log4j.spi.LoggingEvent處理TCP端口接收的數據。後麵3.6節會詳細講述Log4j的用法。
2.2.3 網絡流編碼
NetFlow是Cisco發明的一種數據交換方式。NetFlow提供網絡流量的會話級視圖,記錄下每個TCP/IP事務的信息。它的目的不是像tcpdump那樣提供網絡流量的完整記錄,而是匯集起來形成更易於管理和易讀的流向和容量的分析監控。
Cisco上配置NetFlow的方法,請參照具體的設備說明,主要是設定采集服務器的地址和端口,為運行Logstash服務的主機地址和端口(示例中為9995)。
采集NetFlow數據的Logstash服務配置示例如下:
input {
udp {
port => 9995
codec => netflow {
definitions =>"/opt/logstash-1.4.2/lib/logstash/codecs/netflow/netflow.yaml"
versions => [5]
}
}
}
output {
elasticsearch {
index =>"logstash_netflow5-%{+YYYY.MM.dd}"
host =>"localhost"
}
}
由於該插件生成的字段較多,所以建議對應的Elasticsesarch索引模板也需要單獨提交:
# curl -XPUT localhost:9200/_template/logstash_netflow5 -d '{
"template" : "logstash_netflow5-*",
"settings": {
"index.refresh_interval": "5s"
},
"mappings" : {
"_default_" : {
"_all" : {"enabled" : false},
"properties" : {
"@version": { "index": "analyzed", "type": "integer" },
"@timestamp": { "index": "analyzed", "type": "date" },
"netflow": {
"dynamic": true,
"type": "object",
"properties": {
"version": { "index": "analyzed", "type": "integer" },
"flow_seq_num": { "index": "not_analyzed", "type": "long" },
"engine_type": { "index": "not_analyzed", "type": "integer" },
"engine_id": { "index": "not_analyzed", "type": "integer" },
"sampling_algorithm": { "index": "not_analyzed", "type": "integer" },
"sampling_interval": { "index": "not_analyzed", "type": "integer" },
"flow_records": { "index": "not_analyzed", "type": "integer" },
"ipv4_src_addr": { "index": "analyzed", "type": "ip" },
"ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
"ipv4_next_hop": { "index": "analyzed", "type": "ip" },
"input_snmp": { "index": "not_analyzed", "type": "long" },
"output_snmp": { "index": "not_analyzed", "type": "long" },
"in_pkts": { "index": "analyzed", "type": "long" },
"in_bytes": { "index": "analyzed", "type": "long" },
"first_switched": { "index": "not_analyzed", "type": "date" },
"last_switched": { "index": "not_analyzed", "type": "date" },
"l4_src_port": { "index": "analyzed", "type": "long" },
"l4_dst_port": { "index": "analyzed", "type": "long" },
"tcp_flags": { "index": "analyzed", "type": "integer" },
"protocol": { "index": "analyzed", "type": "integer" },
"src_tos": { "index": "analyzed", "type": "integer" },
"src_as": { "index": "analyzed", "type": "integer" },
"dst_as": { "index": "analyzed", "type": "integer" },
"src_mask": { "index": "analyzed", "type": "integer" },
"dst_mask": { "index": "analyzed", "type": "integer" }
}
}
}
}
}
}'
Elasticsearch索引模板的功能,本書稍後12.6節會有詳細介紹。
2.2.4 collectd輸入
collectd是一個守護(daemon)進程,用來收集係統性能和提供各種存儲方式來存儲不同值的機製。它會在係統運行和存儲信息時周期性的統計係統的相關統計信息。利用這些信息有助於查找當前係統性能瓶頸(如作為性能分析performance analysis)和預測係統未來的load(如能力部署capacity planning)等
下麵簡單介紹一下:collectd的部署以及與Logstash對接的相關配置實例。
1. collectd的安裝
collectd的安裝同樣有兩種方式,使用官方軟件倉庫安裝或源代碼安裝。
使用官方軟件倉庫安裝(推薦)
collectd官方有一個隱藏的軟件倉庫:https://pkg.ci.collectd.org,構建有RHEL/CentOS (rpm),Debian/Ubuntu(deb)的軟件包,如果你使用的操作係統屬於上述的,那麼推薦使用軟件倉庫安裝。
目前collectd官方維護3個版本:5.4、5.5、5.6。根據需要選擇合適的版本倉庫。下麵示例安裝5.5版本的方法。
Debian/Ubuntu倉庫安裝:
echo "deb https://pkg.ci.collectd.org/deb $(lsb_release -sc) collectd-5.5" | sudo tee /etc/apt/sources.list.d/collectd.list
curl -s https://pkg.ci.collectd.org/pubkey.asc | sudo apt-key add -
sudo apt-get update && sudo apt-get install -y collectd
注意,Debian/Ubuntu軟件倉庫自帶有collectd軟件包,如果軟件倉庫自帶的版本足夠你使用,那麼可以不用添加倉庫,直接通過apt-get install collectd即可。
RHEL/CentOS倉庫安裝:
cat > /etc/yum.repos.d/collectd.repo <<EOF
[collectd-5.5]
name=collectd-5.5
baseurl=https://pkg.ci.collectd.org/rpm/collectd-5.5/epel-\$releasever-\$basearch/
gpgcheck=1
gpgkey=https://pkg.ci.collectd.org/pubkey.asc
EOF
yum install -y collectd
你如果需要使用其他collectd插件,此時也可一並安裝對應的collectd-xxxx軟件包。
源碼安裝collectd
collectd目前維護3個版本:5.4、5.5、5.6。源代碼編譯安裝時同樣可以根據自己需要選擇對應版本的源碼下載:
wget https://collectd.org/files/collectd-5.4.1.tar.gz
tar zxvf collectd-5.4.1.tar.gz
cd collectd-5.4.1
./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib --mandir=/usr/share/man --enable-all-plugins
make && make install
源代碼編譯安裝需要預先解決好各種環境依賴,在RedHat平台上要提前安裝如下軟件包:
rpm -ivh "https://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm"
yum -y install libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool rrdtool-prel libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl-CPAN net-snmp net-snmp-devel
安裝啟動腳本如下:
cp contrib/redhat/init.d-collectd /etc/init.d/collectd
chmod +x /etc/init.d/collectd
啟動collectd如下:
service collectd start
2. collectd的配置
以下配置可以實現對服務器基本的CPU、內存、網卡流量、磁盤I/O以及磁盤空間占用情況的監控:
Hostname "host.example.com"
LoadPlugin interface
LoadPlugin cpu
LoadPlugin memory
LoadPlugin network
LoadPlugin df
LoadPlugin disk
<Plugin interface>
Interface "eth0"
IgnoreSelected false
</Plugin>
<Plugin network>
<Server "10.0.0.1""25826"> ## Logstash 的 IP 地址和 collectd 的數據接收端口號
</Server>
</Plugin>
3. Logstash的配置
以下配置實現通過Logstash監聽25826端口,接收從collectd發送過來的各項檢測數據。
注意,logstash-filter-collectd插件本身需要單獨安裝,logstash插件安裝說明之前已經講過:
bin/logstash-plugin install logstash-filter-collectd
Logstash默認自帶有collectd的codec編碼插件。
推薦配置示例如下:
udp {
port => 25826
buffer_size => 1452
workers => 3 # Default is 2
queue_size => 30000 # Default is 2000
codec => collectd { }
type =>"collectd"
}
下麵是簡單的一個輸出結果:
{
"_index": "logstash-2014.12.11",
"_type": "collectd",
"_id": "dS6vVz4aRtK5xS86kwjZnw",
"_score": null,
"_source": {
"host": "host.example.com",
"@timestamp": "2014-12-11T06:28:52.118Z",
"plugin": "interface",
"plugin_instance": "eth0",
"collectd_type": "if_packets",
"rx": 19147144,
"tx": 3608629,
"@version": "1",
"type": "collectd",
"tags": [
"_grokparsefailure"
]
},
"sort": [
1418279332118
]
}
參考資料
collectd支持收集的數據類型:https://git.verplant.org/?p=collectd.git;a=blob;hb=master; f=README
collectd收集各數據類型的配置參考資料:https://collectd.org/documentation/manpages/collectd.conf.5.shtml
collectd簡單配置文件示例:https://gist.github.com/untergeek/ab85cb86a9bf39f1fc6d
2.3 過濾器配置
有豐富的過濾器插件,是Logstash威力如此強大的重要因素。名為過濾器,其實提供的不單單是過濾的功能。下麵我們就會重點介紹幾個插件,它們擴展了進入過濾器的原始數據,進行複雜的邏輯處理,甚至可以無中生有地添加新的Logstash事件到後續的流程中去!
2.3.1 date時間處理
之前章節已經提過,logstash-filter-date插件可以用來轉換你的日誌記錄中的時間字符串,變成LogStash::Timestamp對象,然後轉存到@timestamp字段裏。
因為在稍後的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}這種寫法必須讀取@timestamp數據,所以一定不要直接刪掉這個字段保留自己的字段,而是應該用logstash-filter-date轉換後刪除自己的字段!
這在導入舊數據的時候固然非常有用,而在實時數據處理的時候同樣有效,因為一般情況下數據流程中我們都會有緩衝區,導致最終的實際處理時間跟事件產生時間略有偏差。
強烈建議打開Nginx的access_log配置項的buffer參數,對極限響應性能有極大提升!
1.配置示例
logstash-filter-date插件支持五種時間格式:
ISO8601:類似“2011-04-19T03:44:01.103Z”這樣的格式。具體Z後麵可以有“08:00”也可以沒有,“.103”這個也可以沒有。常用場景裏來說,Nginx的log_format配置裏就可以使用$time_iso8601變量來記錄請求時間成這種格式。
UNIX:UNIX時間戳格式,記錄的是從1970年起始至今的總秒數。Squid默認日誌格式中就使用了這種格式。
UNIX_MS:這個時間戳則是從1970年起始至今的總毫秒數。據我所知,JavaScript裏經常使用這個時間格式。
TAI64N:TAI64N格式比較少見,是這個樣子的:@4000000052f88ea32489532c。我目前隻知道常見應用中,qmail會用這個格式。
Joda-Time庫:Logstash內部使用了Java的Joda時間庫來作時間處理。所以我們可以使用Joda庫所支持的時間格式來作具體定義。Joda時間格式定義見表2-1。
表2-1 Joda時間庫格式
格式符 含 義 描 述 示 例
G era text AD
C century of era (>=0) number 20
Y year of era (>=0) year 1996
x weekyear year 1996
w week of weekyear number 27
e day of week number 2
E day of week text Tuesday; Tue
y year year 1996
D day of year number 189
M month of year month July; Jul; 07
d day of month number 10
a halfday of day text PM
K hour of halfday (0~11) number 0
h clockhour of halfday (1~12) number 12
H hour of day (0~23) number 0
k clockhour of day (1~24) number 24
m minute of hour number 30
s second of minute number 55
S fraction of second number 978
z time zone text Pacific Standard Time; PST
Z time zone offset/id zone -0800; -08:00; America/Los_Angeles
' escape for text delimiter
'' single quote literal '
下麵我們寫一個Joda時間格式的配置作為示例:
filter {
grok {
match => ["message", "%{HTTPDATE:logdate}"]
}
date {
match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
注意,時區偏移量隻需要用一個字母Z即可。
2.時區問題的解釋
很多中國用戶經常提一個問題:為什麼@timestamp比我們晚了8個小時?怎麼修改成北京時間?
其實,Elasticsearch內部,對時間類型字段,是統一采用UTC時間,存成long長整形數據的!對日誌統一采用UTC時間存儲,是國際安全/運維界的一個通識—歐美公司的服務器普遍廣泛分布在多個時區裏—不像中國,地域橫跨五個時區卻隻用北京時間。
對於頁麵查看,ELK的解決方案是在Kibana上,讀取瀏覽器的當前時區,然後在頁麵上轉換時間內容的顯示。
所以,建議大家接受這種設定。否則,即便你用.getLocalTime修改,也還要麵臨在Kibana過去修改,以及Elasticsearch原有的["now-1h" TO "now"]這種方便的搜索語句無法正常使用的尷尬。
以上,請讀者自行斟酌。
2.3.2 grok正則捕獲
grok是Logstash最重要的插件。你可以在grok裏預定義好命名正則表達式,在稍後(grok參數或者其他正則表達式裏)引用它。
1.正則表達式語法
運維工程師多多少少都會一點正則。你可以在grok裏寫標準的正則,像下麵這樣:
\s+(?<request_time>\d+(?:\.\d+)?)\s+
這個正則表達式寫法對於Perl或者Ruby程序員應該很熟悉了,Python程序員可能更習慣寫(?P<name>pattern),沒辦法,適應一下吧。
現在給我們的配置文件添加第一個過濾器區段配置。配置要添加在輸入和輸出區段之間(Logstash執行區段的時候並不依賴於次序,不過為了自己看得方便,還是按次序書寫吧):
input {stdin{}}
filter {
grok {
match => {
"message" =>"\s+(?<request_time>\d+(?:\.\d+)?)\s+"
}
}
}
output {stdout{codec=>rubydebug}}
運行Logstash進程然後輸入“begin 123.456 end”,你會看到類似下麵這樣的輸出:
{
"message" =>"begin 123.456 end",
"@version" =>"1",
"@timestamp" =>"2014-08-09T11:55:38.186Z",
"host" =>"raochenlindeMacBook-Air.local",
"request_time" =>"123.456"
}
漂亮!不過數據類型好像不太滿意……request_time應該是數值而不是字符串。
我們已經提過稍後會學習用LogStash::Filters::Mutate來轉換字段值類型,不過在grok 裏,其實有自己的魔法來實現這個功能!
2. grok表達式語法
grok支持把預定義的grok表達式寫入到文件中,官方提供的預定義grok表達式見:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。
下麵是從官方文件中摘抄的最簡單但是足夠說明用法的示例:
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
第一行,用普通的正則表達式來定義一個grok表達式;第二行,通過打印賦值格式(sprintf format),用前麵定義好的grok表達式來定義另一個grok表達式。
grok表達式的打印複製格式的完整語法見下行示例。其中data_type目前隻支持兩個值:int和float。
%{PATTERN_NAME:capture_name:data_type}
所以我們可以改進我們的配置成下麵這樣:
filter {
grok {
match => {
"message" =>"%{WORD} %{NUMBER:request_time:float} %{WORD}"
}
}
}
重新運行進程然後可以得到如下結果:
{
"message" =>"begin 123.456 end",
"@version" =>"1",
"@timestamp" =>"2014-08-09T12:23:36.634Z",
"host" =>"raochenlindeMacBook-Air.local",
"request_time" => 123.456
}
這次request_time變成數值類型了。
3.最佳實踐
實際運用中,我們需要處理各種各樣的日誌文件,如果你都是在配置文件裏各自寫一行自己的表達式,就完全不可管理了。所以,我們建議是把所有的grok表達式統一寫入到一個地方。然後用filter/grok的patterns_dir選項來指明。
如果你把“message”裏所有的信息都grok到不同的字段了,數據實質上就相當於是重複存儲了兩份。所以你可以用remove_field參數來刪除掉message字段,或者用overwrite參數來重寫默認的message字段,隻保留最重要的部分。
重寫參數的示例如下:
filter {
grok {
patterns_dir =>["/path/to/your/own/patterns"]
match => {
"message" =>"%{SYSLOGBASE} %{DATA:message}"
}
overwrite => ["message"]
}
}
更多有關grok正則性能的最佳實踐(比如timeout_millis等配置參數),請參考:https://www.elastic.co/blog/do-you-grok-grok。
4.高級用法
多行匹配 在和codec/multiline搭配使用的時候,需要注意一個問題,grok正則和普通正則一樣,默認是不支持匹配回車換行的。就像你需要=~ // m一樣也需要單獨指定,具體寫法是在表達式開始位置加(?m)標記。如下所示:
match => {
"message" =>"(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+"
}
多項選擇 有時候我們會碰上一個日誌有多種可能格式的情況。這時候要寫成單一正則就比較困難,或者全用|隔開又比較醜陋。這時候,Logstash的語法提供給我們一個有趣的解決方式。
文檔中,都說明logstash-filters-grok插件的match參數應該接受的是一個Hash值。但是因為早期的Logstash語法中Hash值也是用[]這種方式書寫的,所以其實現在傳遞Array值給match參數也完全沒問題。所以,我們這裏其實可以傳遞多個正則來匹配同一個字段:
match => [
"message", "(?<request_time>\d+(?:\.\d+)?)",
"message", "%{SYSLOGBASE} %{DATA:message}",
"message", "(?m)%{WORD}"
]
Logstash會按照這個定義次序依次嚐試匹配,到匹配成功為止。雖說效果跟用|分割寫個大大的正則是一樣的,但是可閱讀性好了很多。
我強烈建議每個人都要使用Grok Debugger (https://grokdebug.herokuapp.com/)來調試自己的grok表達式。
2.3.3 dissect解析
grok作為Logstash最廣為人知的插件,在性能和資源損耗方麵同樣也廣為詬病。為了應對這個情況,同時也考慮到大多數情況下日誌格式並沒有那麼複雜,Logstash開發團隊在5.0版新添加了另一個解析字段的插件:dissect。當日誌格式有比較簡明的分隔標誌位且重複性較大的時候,可以使用dissect插件更快地完成解析工作。下麵是解析syslog的示例。
filter {
dissect {
mapping => {
"message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
}
convert_datatype => {
pid => "int"
}
}
}
我們看到上麵使用了和Grok很類似的%{}語法來表示字段,這顯然是基於習慣延續的考慮。不過示例中%{+ts}的加號就不一般了。dissect除了字段外麵的字符串定位功能以外,還通過幾個特殊符號來處理字段提取的規則:
%{+key} 這個+表示前麵已經捕獲到一個key字段了,而這次捕獲的內容自動添補到之前key字段內容的後麵。
%{+key/2} 這個/2表示在有多次捕獲內容都填到key字段裏的時候,拚接字符串的順序誰前誰後。/2表示排第2位。
%{?string} 這個?表示這塊隻是一個占位,並不會實際生成捕獲字段存到Event裏麵。
%{?string} %{&string} 當同樣捕獲名稱都是string,但是一個?和一個&在一起的時候,表示這是一個鍵值對。
比如對https://rizhiyi.com/index.do?id=123寫這麼一段配置:
https://%{domain}/%{?url}?%{?arg1}=%{&arg1}
則最終生成的 Event 內容是這樣的:
{
domain => "rizhiyi.com",
id => "123"
}
2.3.4 GeoIP地址查詢
GeoIP是最常見的免費IP地址歸類查詢庫,同時也有收費版可以采購。GeoIP庫可以根據IP地址提供對應的地域信息,包括國別、省市、經緯度等,對於可視化地圖和區域統計非常有用。
配置示例如下:
filter {
geoip {
source =>"message"
}
}
運行結果如下:
{
"message" =>"183.60.92.253",
"@version" =>"1",
"@timestamp" =>"2014-08-07T10:32:55.610Z",
"host" =>"raochenlindeMacBook-Air.local",
"geoip" => {
"ip" =>"183.60.92.253",
"country_code2" =>"CN",
"country_code3" =>"CHN",
"country_name" =>"China",
"continent_code" =>"AS",
"region_name" =>"30",
"city_name" =>"Guangzhou",
"latitude" =>23.11670000000001,
"longitude" =>113.25,
"timezone" =>"Asia/Chongqing",
"real_region_name" =>"Guangdong",
"location" => [
[0] 113.25,
[1] 23.11670000000001
]
}
}
GeoIP庫數據較多,如果你不需要這麼多內容,可以通過fields選項指定自己所需要的。下例為全部可選內容:
filter {
geoip {
fields => ["city_name", "continent_code", "country_code2",
"country_code3", "country_name", "dma_code", "ip", "latitude",
"longitude", "postal_code", "region_name", "timezone"]
}
}
需要注意的是:geoip.location是Logstash通過latitude和longitude額外生成的數據。所以,如果你是想要經緯度又不想重複數據的話,應該像下麵這樣做:
filter {
geoip {
fields => ["city_name", "country_code2", "country_name", "latitude",
"longitude", "region_name"]
remove_field => ["[geoip][latitude]", "[geoip][longitude]"]
}
}
還要注意:geoip插件的“source”字段可以是任一處理後的字段,比如“client_ip”,但是字段內容卻需要小心!GeoIp庫內隻存有公共網絡上的IP信息,查詢不到結果的,會直接返回null,而Logstash的GeoIp插件對null結果的處理是:“不生成對應的geoip.字段”。所以讀者在測試時,如果使用了諸如127.0.0.1、172.16.0.1、182.168.0.1、10.0.0.1等內網地址,會發現沒有對應輸出!
2.3.5 JSON編解碼
在上一章,已經講過在Codec中使用JSON編碼。但是,有些日誌可能是一種複合的數據結構,其中隻有一部分記錄是JSON格式的。這時候,我們依然需要在filter階段,單獨啟用JSON解碼插件。
配置示例如下:
filter {
json {
source =>"message"
target =>"jsoncontent"
}
}
運行結果如下:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "{\"uid\":3081609001,\"type\":\"signal\"}",
"jsoncontent": {
"uid": 3081609001,
"type": "signal"
}
}
如果不打算使用多層結構的話,刪掉target配置即可。單層結構新的結果如下:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "{\"uid\":3081609001,\"type\":\"signal\"}",
"uid": 3081609001,
"type": "signal"
}
2.3.6 key-value切分
在很多情況下,日誌內容本身都是一個類似於key-value的格式,但是格式具體的樣式卻是多種多樣的。Logstash提供logstash-filter-kv插件,幫助處理不同樣式的key-value日誌,變成實際的LogStash::Event數據。
配置示例如下:
filter {
ruby {
init =>"@kname = ['method','uri','verb']"
ruby {
init => "@kname = ['method','uri','verb']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('|'))])
new_event.remove('@timestamp')
event.append(new_event)""
"
}
if [uri] {
ruby {
init => "@kname = ['url_path','url_args']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])
new_event.remove('@timestamp')
event.append(new_event)""
"
}
kv {
prefix =>"url_"
source =>"url_args"
field_split =>"&"
remove_field => [ "url_args", "uri", "request" ]
}
}
}
Nginx訪問日誌中的$request,通過這段配置,可以詳細切分成method、url_path、verb、url_a、url_b...
進一步,如果url_args中有過多字段,可能導致Elasticsearch集群因為頻繁update mapping或者消耗太多內存在cluster state上而宕機。所以,更優的選擇是隻保留明確有用的url_args內容,其他部分舍去,如下所示:
kv {
prefix =>"url_"
source =>"url_args"
field_split =>"&"
include_keys => [ "uid", "cip" ]
remove_field => [ "url_args", "uri", "request" ]
}
上例即表示,除了url_uid和url_cip兩個字段以外,其他的url_*都不保留。
2.3.7 metrics數值統計
logstash-filter-metrics插件是使用Ruby的Metriks模塊來實現在內存裏實時地計數和采樣分析。該模塊支持兩個類型的數值分析:meter和timer。下麵分別舉例說明。
1. Meter示例(速率閾值檢測)
Web訪問日誌的異常狀態碼頻率是運維人員會非常關心的一個數據。通常我們的做法是通過Logstash或者其他日誌分析腳本,把計數發送到rrdtool或者graphite裏麵,然後再通過check_graphite腳本之類的東西來檢查異常並報警。
事實上這個事情可以直接在Logstash內部就完成。比如如果最近一分鍾504請求的個數超過100個就報警,如下所示:
filter {
metrics {
meter => "error_%{status}"
add_tag => "metric"
ignore_older_than => 10
}
if "metric" in [tags] {
ruby {
code => "event.cancel if event.get('[error_504][rate_1m]') * 60 > 100"
}
}
}
output {
if "metric" in [tags] {
exec {
command => "echo \"Out of threshold: %{[error_504][rate_1m]}\""
}
}
這裏需要注意*60的含義。metrics模塊生成的rate_1m/5m/15m意思是:最近1、5、15分鍾的每秒速率!
2. Timer示例(box and whisker異常檢測)
官版的logstash-filter-metrics插件隻適用於metric事件的檢查。由插件生成的新事件內部不存有來自input區段的實際數據信息。所以,要完成我們的百分比分布箱體檢測,需要首先對代碼稍微做幾行變動,即在metric的timer事件裏加一個屬性,存儲最近一個實際事件的數值:
def register
…
@last = {}
end
def filter(event)
…
@last[event.sprintf(name)] = event.sprintf(value).to_f
end
def flush(options = {})
…
@metric_timer.each_pair do |name, metric|
…
event.set(“[#{name}][last]”, @last[name])
metric.clear if should_clear?
end
…
end
有了這個Last值,然後我們就可以用如下配置來探測異常數據了:
filter {
metrics {
timer => {"rt" =>"%{request_time}"}
percentiles => [25, 75]
add_tag =>"percentile"
}
ruby {
code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)"
}
}
}
output {
if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) {
exec {
command => "echo \"Anomaly: %{[rt][last]}\""
}
}
有關box and shisker plot內容和重要性,參見《數據之魅》一書。
2.3.8 mutate數據修改
logstash-filter-mutate插件是Logstash另一個重要插件,它提供了豐富的基礎類型數據處理能力,包括類型轉換、字符串處理和字段處理等。
1.類型轉換
類型轉換是logstash-filter-mutate插件最初誕生時的唯一功能,其應用場景在之前的2.3.5節“JSON編解碼”中已經提到。
可以設置的轉換類型包括:“integer”、“float”和“string”。示例如下:
filter {
mutate {
convert => ["request_time", "float"]
}
}
mutate除了轉換簡單的字符值,還支持對數組類型的字段進行轉換,即將[“1”,“2”]轉換成[1,2]。但不支持對哈希類型的字段做類似處理。有這方麵需求的可以采用稍後講述的logstash-filter-ruby插件完成。
2. 字符串處理
有如下字符串處理的插件:
gsub:僅對字符串類型字段有效。
gsub => ["urlparams", "[\\?#]", "_"]
split:分割字符串。
filter {
mutate {
split => ["message", "|"]
}
}
隨意輸入一串以|分割的字符,比如“123|321|adfd|dfjld*=123”,可以看到如下輸出:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123"
],
"@version" =>"1",
"@timestamp" =>"2014-08-20T15:58:23.120Z",
"host" =>"raochenlindeMacBook-Air.local"
}
join:僅對數組類型字段有效。
我們在之前已經用split割切的基礎上再join回去。配置改成:
filter {
mutate {
split => ["message", "|"]
}
mutate {
join => ["message", ","]
}
}
filter區段之內,是順序執行的。所以我們最後看到的輸出結果是:
{
"message" =>"123,321,adfd,dfjld*=123",
"@version" =>"1",
"@timestamp" =>"2014-08-20T16:01:33.972Z",
"host" =>"raochenlindeMacBook-Air.local"
}
merge:合並兩個數組或者哈希字段。依然在之前split的基礎上繼續:
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "message"]
}
}
我們會看到輸出:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123",
[4] "123",
[5] "321",
[6] "adfd",
[7] "dfjld*=123"
],
"@version" =>"1",
"@timestamp" =>"2014-08-20T16:05:53.711Z",
"host" =>"raochenlindeMacBook-Air.local"
}
如果src字段是字符串,會自動先轉換成一個單元素的數組再合並。把上一示例中的來源字段改成“host”:
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "host"]
}
}
結果變成:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123",
[4] "raochenlindeMacBook-Air.local"
],
"@version" =>"1",
"@timestamp" =>"2014-08-20T16:07:53.533Z",
"host" => [
[0] "raochenlindeMacBook-Air.local"
]
}
看,目的字段“message”確實多了一個元素,但是來源字段“host”本身也由字符串類型變成數組類型了!
同樣,如果目的字段不是數組,也會被強製轉換。即使來源字段並不存在:
filter {
mutate {
merge => ["message", "not_exist_field"]
}
}
結果會變成:
{
"message" => [
[0] "123|321|adfd|dfjld*=123"
],
"@version" =>"1",
"@timestamp" =>"2014-08-20T15:58:23.120Z",
"host" =>"raochenlindeMacBook-Air.local"
}
strip:去除字段內容前後的空格。可以接受數組參數:
filter {
mutate {
strip => ["syslog_message", "syslog_datetime"]
}
}
lowercase:將字段內容全部轉換成小寫字母。同樣可以接受數組。在ELK stack場景中,將內容轉換成小寫會是一個比較常見的需求。因為Elasticsearch默認是統一按照小寫字母來搜索的。為了確保檢索準確率,在不影響使用的情況下,建議對常用檢索字段啟用lowercase配置。
uppercase:將字段內容全部轉換成大寫字母。同樣可以接受數組。
3.字段處理
字段處理的插件有:
rename:重命名某個字段,如果目的字段已經存在,會被覆蓋掉,如下所示:
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
update:更新某個字段的內容。如果字段不存在,不會新建。
replace:作用和update類似,但是當字段不存在的時候,它會起到add_field參數一樣的效果,自動添加新的字段。
4.執行次序
需要注意的是,filter/mutate內部是有執行次序的。其次序如下:
rename(event) if @rename
update(event) if @update
replace(event) if @replace
convert(event) if @convert
gsub(event) if @gsub
uppercase(event) if @uppercase
lowercase(event) if @lowercase
strip(event) if @strip
remove(event) if @remove
split(event) if @split
join(event) if @join
merge(event) if @merge
filter_matched(event)
而filter_matched這個filters/base.rb裏繼承的方法也是有次序的:
@add_field.each do |field, value|
end
@remove_field.each do |field|
end
@add_tag.each do |tag|
end
@remove_tag.each do |tag|
end
2.3.9 隨心所欲的Ruby處理
如果你稍微懂那麼一點點Ruby語法的話,logstash-filter-ruby插件將會是一個非常有用的工具。比如你需要稍微修改一下LogStash::Event對象,但是又不打算為此寫一個完整的插件,用logstash-filter-ruby插件絕對感覺良好。
配置示例如下:
filter {
ruby {
init =>"@kname = ['client','servername','url','status','time','size','upstream',
'upstreamstatus','upstreamtime','referer','xff','useragent']"
code => "
ew_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])
new_event.remove('@timestamp')
event.append(new_event)"
}
}
官網示例是一個比較有趣但是沒啥大用的做法—隨機取消90%的事件。
所以上麵我們給出了一個有用而且強大的實例。
通常我們都是用logstash-filter-grok插件來捕獲字段的,但是正則耗費大量的CPU資源,很容易成為Logstash進程的瓶頸。
而實際上,很多流經Logstash的數據都是有自己預定義的特殊分隔符的,我們可以很簡單的直接切割成多個字段。
從Logstash2.3開始,LogStash::event.append不再直接接受Hash對象,而必須是LogStash:: Event對象。所以示例變成要先初始化一個新的event,再把無用的@timestamp移除,再append進去。否則會把@timestamp變成有兩個時間的數組了!
從Logstash 5.0開始,LogStash::Event改為Java實現,直接使用event["parent"]["child"]形式獲取的,不是原事件的引用而是複製品,需要改用event.get('[parent][child]')和event.set('[parent][child]', 'value')的方法。
logstash-filter-mutate插件裏的“split”選項隻能切成數組,後續很不方便使用和識別。而在logstash-filter-ruby裏,我們可以通過“init”參數預定義好由每個新字段的名字組成的數組,然後在“code”參數指定的Ruby語句裏通過兩個數組的zip操作生成一個哈希並添加進數組裏。短短一行Ruby代碼,可以減少50%以上的CPU使用率。
logstash-filter-ruby插件用途遠不止這一點,下一節你還會繼續見到它的身影。
更多實例如下:
filter{
date {
match => ["datetime" , "UNIX"]
}
ruby {
code =>"event.cancel if 5 * 24 * 3600 < (event.get( '@timestamp')-::Time.now).abs"
}
}
在實際運用中,我們幾乎肯定會碰到出乎意料的輸入數據。這都有可能導致Elasticsearch集群出現問題。
當數據格式發生變化,比如UNIX時間格式變成UNIX_MS時間格式,會導致Logstash瘋狂創建新索引,集群崩潰。
或者誤輸入過老的數據時,因為一般我們會close幾天之前的索引以節省內存,必要時再打開。而直接嚐試把數據寫入被關閉的索引會導致內存問題。
這時候我們就需要提前校驗數據的合法性。上麵配置,就是用於過濾掉時間範圍與當前時間差距太大的非法數據的。
2.3.10 split拆分事件
上一章我們通過multiline插件將多行數據合並進一個事件裏,那麼反過來,也可以把一行數據,拆分成多個事件。這就是split插件。
配置示例如下:
filter {
split {
field =>"message"
terminator =>"#"
}
}
這個測試中,我們在intputs/stdin的終端中輸入一行數據:“test1#test2”,結果看到輸出兩個事件:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "test1"
}
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "test2"
}
最後更新:2017-05-19 15:02:37