閱讀1012 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《ELK Stack權威指南 》第2章 插件配置

本節書摘來自華章出版社《ELK Stack權威指南 》一書中的第1章,第2節,作者饒琛琳,更多章節內容可以訪問雲棲社區“華章計算機”公眾號查看。


插 件 配 置

插件是Logstash最大的特色。各種不同的插件源源不斷地被創造出來,發布到社區中供大家使用。本章會按照插件的類別,對一般場景下的一些常用插件做詳細的配置和用例介紹。本章介紹的插件包括:1)輸入插件。基於shipper端場景,主要介紹STDIN、TCP、File等插件。2)編解碼插件。編解碼通常是會被遺忘的環節,但是運用好了,會大大提高工作效率,本節介紹最常用的JSON和multiline插件。3)過濾器插件。名為過濾器,其實各種數據裁剪和計算都可以在這類插件裏完成,是Logstash最強大的一環。本節會詳細介紹grok、date、mutate、ruby、metrics等插件的妙用。4)輸出插件。Logstash雖然經常跟Elasticsearch並稱,但是作為一個日誌傳輸框架,它其實可以輸出數據到各種不同的地方。比如Graphite、HDFS、Nagios等等。本章會介紹這些常用的輸出插件用法。

2.1 輸入插件

在“Hello World”示例中,我們已經見到並介紹了Logstash的運行流程和配置的基礎語法。從這章開始,我們就要逐一介紹Logstash流程中比較常用的一些插件,並在介紹中針對其主要適用的場景、推薦的配置,作一些說明。

限於篇幅,接下來內容中,配置示例不一定能貼完整。請記住一個原則:Logstash配置一定要有一個input和一個output。在演示過程中,如果沒有寫明input,默認就會使用“Hello World”裏我們已經演示過的logstash-input-stdin,同理,沒有寫明的output就是logstash-output-stdout。

以上請讀者自明。

2.1.1 標準輸入

我們已經見過好幾個示例使用stdin了。這也應該是Logstash裏最簡單和基礎的插件了。所以,在這段中,我們先介紹一些未來每個插件都會有的一些方法。

配置示例如下:

input {

    stdin {

        add_field => {"key" =>"value"}

        codec =>"plain"

        tags => ["add"]

        type =>"std"

    }

}

用上麵的新stdin設置重新運行一次最開始的Hello World示例。我建議大家把整段配置都寫入一個文本文件,然後運行命令:bin/logstash -f stdin.conf。輸入“Hello World”並回車後,你會在終端看到如下輸出:

{

"message" =>"hello world",

"@version" =>"1",

"@timestamp" =>"2014-08-08T06:48:47.789Z",

"type" =>"std",

"tags" => [

        [0] "add"

    ],

"key" =>"value",

"host" =>"raochenlindeMacBook-Air.local"

}

type和tags是Logstash事件中兩個特殊的字段。通常來說,我們會在“輸入區段”中通過type來標記事件類型—我們肯定是提前能知道這個事件屬於什麼類型的。而tags則是在數據處理過程中,由具體的插件來添加或者刪除的。

最常見的用法是像下麵這樣:

input {

    stdin {

        type =>"web"

    }

}

filter {

    if [type] == "web" {

        grok {

            match => ["message", %{COMBINEDAPACHELOG}]

        }

    }

}

output {

    if "_grokparsefailure" in [tags] {

        nagios_nsca {

            nagios_status =>"1"

        }

    } else {

        elasticsearch {

        }

    }

}

看起來蠻複雜的,對吧?

繼續學習,你也可以寫出來的。

2.1.2 文件輸入

分析網站訪問日誌應該是一個運維工程師最常見的工作了。所以我們先學習一下怎麼用Logstash來處理日誌文件。

Logstash使用一個名叫FileWatch的Ruby Gem庫來監聽文件變化。這個庫支持glob展開文件路徑,而且會記錄一個叫.sincedb的數據庫文件來跟蹤被監聽日誌文件的當前讀取位置。所以,不要擔心Logstash會漏過你的數據。

sincedb文件中記錄了每個被監聽的文件的inode, major number, minor number和pos。

配置示例如下:

input {

    file {

        path => ["/var/log/*.log", "/var/log/message"]

        type =>"system"

        start_position =>"beginning"

    }

}

有一些比較有用的配置項,可以用來指定FileWatch庫的行為:

discover_interval:Logstash每隔多久去檢查一次被監聽的path下是否有新文件,默認值是15秒。

exclude:不想被監聽的文件可以排除出去,這裏跟path一樣支持glob展開。

sincedb_path:如果你不想用默認的$HOME/.sincedb(Windows平台上為%USERPROFILE%\.sincedb,該變量默認值是:C:\Windows\System32\config\systemprofile),可以通過這個配置定義sincedb文件到其他位置。

sincedb_write_interval:Logstash每隔多久寫一次sincedb文件,默認是15秒。

stat_interval:Logstash每隔多久檢查一次被監聽文件狀態(是否有更新),默認是1秒。

start_position:Logstash從什麼位置開始讀取文件數據,默認是結束位置,也就是說,Logstash進程會以類似tail-F的形式運行。如果你是要導入原有數據,把這個設定改成"beginning",Logstash進程就從頭開始讀取,有點類似於less+F命令的形式運行。

close_older:一個已經監聽中的文件,如果超過這個值的時間內沒有更新內容,就關閉監聽它的文件句柄。默認是3 600秒,即一小時。

ignore_older:在每次檢查文件列表的時候,如果一個文件的最後修改時間超過這個值,就忽略這個文件。默認是86 400秒,即一天。

注意事項如下:

1)通常你要導入原有數據進Elasticsearch的話,你還需要filter/date插件來修改默認的"@timestamp"字段值。稍後會學習這方麵的知識。

2)FileWatch隻支持文件的絕對路徑,而且會不自動遞歸目錄。所以有需要的話,請用數組方式都寫明具體哪些文件。

3)LogStash::Inputs::File隻是在進程運行的注冊階段初始化一個FileWatch對象。所以它不能支持類似fluentd那樣的path =>"/path/to/%{+yyyy/MM/dd/hh}.log"寫法。達到相同目的,你隻能寫成path =>"/path/to/*/*/*/*.log"。FileWatch模塊提供了一個稍微簡單一點的寫法:/path/to/**/*.log,用**來縮寫表示遞歸全部子目錄。

4)start_position僅在該文件從未被監聽過的時候起作用。如果sincedb文件中已經有這個文件的inode記錄了,那麼Logstash依然會從記錄過的pos開始讀取數據。所以重複測試的時候每回需要刪除sincedb文件。此外,官方博客上提供了另一個巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定義為/dev/null,則每次重啟自動從頭開始讀。

5)因為Windows平台上沒有inode的概念,Logstash某些版本在Windows平台上監聽文件不是很靠譜。Windows平台上,推薦考慮使用nxlog作為收集端,參見後麵5.5節。

2.1.3 TCP輸入

未來你可能會用Redis服務器或者其他的消息隊列係統來作為Logstash Broker的角色。不過Logstash其實也有自己的TCP/UDP插件,在臨時任務的時候,也算能用,尤其是測試環境。

雖然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL庫實現了高級的SSL功能,但Logstash本身隻能在SizedQueue中緩存20個事件。這就是我們建議在生產環境中換用其他消息隊列的原因。

配置示例如下:

input {

    tcp {

        port => 8888

        mode =>"server"

        ssl_enable => false

    }

}

目前來看,LogStash::Inputs::TCP最常見的用法就是配合nc命令導入舊數據。在啟動Logstash進程後,在另一個終端運行如下命令即可導入數據:

# nc 127.0.0.1 8888 < olddata

這種做法比用LogStash::Inputs::File好,因為當nc命令結束,我們就知道數據導入完畢了。而用input/file方式,Logstash進程還會一直等待新數據輸入被監聽的文件,不能直接看出是否任務完成了。

2.1.4 syslog輸入

syslog可能是運維領域最流行的數據傳輸協議了。當你想從設備上收集係統日誌的時候,syslog應該會是你的第一選擇。尤其是網絡設備,比如思科中syslog幾乎是唯一可行的辦法。

我們這裏不解釋如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf來發送數據,而隻講如何把Logstash配置成一個syslog服務器來接收數據。

有關rsyslog的用法,後麵的5.4節會有更詳細的介紹。

配置示例如下:

input {

    syslog {

        port =>"514"

    }

}

作為最簡單的測試,我們先暫停一下本機的syslogd(或rsyslogd)進程,然後啟動Logstash進程(這樣就不會有端口衝突問題)。現在,本機的syslog就會默認發送到Logstash裏了。我們可以用自帶的logger命令行工具發送一條“Hello World”信息到syslog裏(即Logstash裏)。看到的Logstash輸出像下麵這樣:

{

"message" =>"Hello World",

"@version" =>"1",

"@timestamp" =>"2014-08-08T09:01:15.911Z",

"host" =>"127.0.0.1",

"priority" =>31,

"timestamp" =>"Aug  8 17:01:15",

"logsource" =>"raochenlindeMacBook-Air.local",

"program" =>"com.apple.metadata.mdflagwriter",

"pid" =>"381",

"severity" =>7,

"facility" =>3,

"facility_label" =>"system",

"severity_label" =>"Debug"

}

Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok來實現LogStash::Inputs::Syslog的。所以你其實可以直接用Logstash配置實現一樣的效果,如下所示:

input {

    tcp {

        port =>"8514"

    }

}

filter {

    grok {

        match => ["message", "%{SYSLOGLINE}" ]

    }

    syslog_pri { }

}

建議在使用LogStash::Inputs::Syslog的時候走TCP協議來傳輸數據。

因為具體實現中,UDP監聽器隻用了一個線程,而TCP監聽器會在接收每個連接的時候都啟動新的線程來處理後續步驟。

如果你已經在使用UDP監聽器收集日誌,用下行命令檢查你的UDP接收隊列大小:

# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'

Recv-Q

228096

228096是UDP接收隊列的默認最大大小,這時候linux內核開始丟棄數據包了!

強烈建議使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合實現同樣的syslog功能!

雖然LogStash::Inputs::Syslog在使用TCPServer的時候可以采用多線程處理數據的接收,但是在同一個客戶端數據的處理中,其grok和date是一直在該線程中完成的,這會導致總體上的處理性能幾何級的下降—經過測試,TCPServer每秒可以接收50 000條數據,而在同一線程中啟用grok後每秒隻能處理5 000條,再加上date隻能達到500條!

才將這兩步拆分到filters階段後,Logstash支持對該階段插件單獨設置多線程運行,大大提高了總體處理性能。在相同環境下,logstash -f tcp.conf -w 20的測試中,總體處理性能可以達到每秒30 000條數據!

測試采用Logstash作者提供的命令:

yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000

出處見:https:// github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile

如果你實在沒法切換到TCP協議,可以自己寫程序,或者使用其他基於異步I/O框架(比如libev)的項目。下麵是一個簡單的異步I/O實現UDP監聽數據輸入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。

2.1.5 http_poller抓取

Logstash作為數據采集係統,也支持自己作為一個HTTP客戶端去抓取網頁數據或者接口數據。這方麵有一個很明顯的IT運維應用場景:很多業務係統軟件本身提供了RESTful的內部運行狀態接口,可以直接通過接口采集這些監控信息。

更長期的方案應該是編寫對應的metricbeat模塊,但是直接采用logstash-input-http_poller顯然更快捷。

比如Nginx的性能狀態,社區有一個非常全麵的性能狀態監控模塊:nginx-module-vts。在新浪微博,後端池分為核心接口、非核心接口兩塊,我們要分別監控的話,nginx-module-vts的配置如下:

http {

    vhost_traffic_status_zone;

    map $uri $filter_uri {

        default 'non-core';

        /2/api/timeline core;

        ~^/2/api/unread core;

    }

    server {

        vhost_traffic_status_filter_by_set_key $filter_uri;

        location /status {

            auth_basic "Restricted";

            auth_basic_user_file pass_file;

            vhost_traffic_status_display;

            vhost_traffic_status_display_format json;

        }

    }

}

則對應的logstash-input-http_poller配置如下:

input {

    http_poller {

        urls => {

            0 => {

                method => get

                url => "https://localhost:80/status/format/json"

                headers => {

                    Accept => "application/json"

                }

                auth => {

                    user => "YouKnowIKnow"

                    password => "IKnowYouDonotKnow"

                }

            }

            1 => {

                method => get

                url => "https://localhost:80/status/con ... up%3D*"

                headers => {

                    Accept => "application/json"

                }

                auth => {

                    user => "YouKnowIKnow"

                    password => "IKnowYouDonotKnow"

                }

            }

        }

        request_timeout => 60

        interval => 60

        codec => "json"

    }

}

這段配置就可以就可以每60秒獲得一次 vts 數據,並重置計數了。

注意,url是一個Hash值,所以它的執行順序是根據Hash.map來的,為了確保我們是先獲取數據再重置,這裏幹脆用0, 1來作為Hash的key,這樣順序就沒問題了。

2.2 編解碼配置

Codec是Logstash從1.3.0版開始新引入的概念(Codec來自Coder/decoder兩個單詞的首字母縮寫)。

在此之前,Logstash隻支持純文本形式輸入,然後以過濾器處理它。但現在,我們可以在輸入期處理不同類型的數據,這全是因為有了Codec設置。

所以,這裏需要糾正之前的一個概念。Logstash不隻是一個input | filter | output的數據流,而是一個input | decode | filter | encode | output的數據流!Codec就是用來decode、encode事件的。

Codec的引入,使得Logstash可以更好、更方便地與其他有自定義數據格式的運維產品共存,比如graphite、fluent、netflow、collectd,以及使用msgpack、json、edn等通用數據格式的其他產品等。

事實上,我們在第一個“Hello World”用例中就已經用過Codec了—rubydebug就是一種Codec!雖然它一般隻會用在stdout插件中,作為配置測試或者調試的工具。

這個五段式的流程說明源自Perl版的Logstash(後來改名叫Message::Passing模塊)的設計。本書稍後5.8節會對該模塊稍作介紹。

2.2.1 JSON編解碼

在早期的版本中,有一種降低Logstash過濾器的CPU負載消耗的做法盛行於社區(在當時的Cookbook上有專門的一節介紹):直接輸入預定義好的JSON數據,這樣就可以省略掉filter/grok配置!

這個建議依然有效,不過在當前版本中需要稍微做一點配置變動,因為現在有專門的Codec設置。

1.配置示例

社區常見的示例都是用的Apache的customlog,不過我覺得Nginx是一個比Apache更常用的新型Web服務器,所以我這裏會用nginx.conf做示例:

logformat json '{"@timestamp":"$time_iso8601",'

               '"@version":"1",'

               '"host":"$server_addr",'

               '"client":"$remote_addr",'

               '"size":$body_bytes_sent,'

               '"responsetime":$request_time,'

               '"domain":"$host",'

               '"url":"$uri",'

               '"status":"$status"}';

access_log /var/log/nginx/access.log_json json;

注意,在$request_time和$body_bytes_sent變量兩頭沒有雙引號",這兩個數據在JSON裏應該是數值類型!

重啟Nginx應用,然後修改你的input/file區段配置成下麵這樣:

input {

    file {

        path =>"/var/log/nginx/access.log_json""

        codec =>"json"

    }

}

2.運行結果

下麵訪問一下用Nginx發布的Web頁麵,然後你會看到Logstash進程輸出類似下麵這樣的內容:

{

"@timestamp" =>"2014-03-21T18:52:25.000+08:00",

"@version" =>"1",

"host" =>"raochenlindeMacBook-Air.local",

"client" =>"123.125.74.53",

"size" =>8096,

"responsetime" =>0.04,

"domain" =>"www.domain.com",

"url" =>"/path/to/file.suffix",

"status" =>"200"

}

3. Nginx代理服務的日誌格式問題

對於一個Web服務器的訪問日誌,看起來已經可以很好的工作了。不過如果Nginx是作為一個代理服務器運行的話,訪問日誌裏有些變量,比如說$upstream_response_time,可能不會一直是數字,它也可能是一個“-”字符串!這會直接導致Logstash對輸入數據驗證報異常。

有兩個辦法解決這個問題:

1)用sed在輸入之前先替換-成0。運行Logstash進程時不再讀取文件而是標準輸入,這樣命令就成了下麵這個樣子:

tail -F /var/log/nginx/proxy_access.log_json \

    | sed 's/upstreamtime":-/upstreamtime":0/' \

    | /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/proxylog.conf

2)日誌格式中統一記錄為字符串格式(即都帶上雙引號),然後再在Logstash中用filter/mutate插件來變更應該是數值類型的字符字段的值類型。

有關LogStash::Filters::Mutate的內容,本書稍後會有介紹。

2.2.2 多行事件編碼

有些時候,應用程序調試日誌會包含非常豐富的內容,為一個事件打印出很多行內容。這種日誌通常都很難通過命令行解析的方式做分析。

而Logstash正為此準備好了codec/multiline插件!當然,multiline插件也可以用於其他類似的堆棧式信息,比如Linux的內核日誌。

配置示例如下:

input {

    stdin {

        codec => multiline {

            pattern =>"^\["

            negate => true

            what =>"previous"

        }

    }

}

運行Logstash進程,然後在等待輸入的終端中輸入如下幾行數據:

[Aug/08/08 14:54:03] hello world

[Aug/08/09 14:54:04] hello logstash

    hello best practice

    hello raochenlin

[Aug/08/10 14:54:05] the end

你會發現Logstash輸出下麵這樣的返回:

{

"@timestamp" =>"2014-08-09T13:32:03.368Z",

"message" =>"[Aug/08/08 14:54:03] hello world\n",

"@version" =>"1",

"host" =>"raochenlindeMacBook-Air.local"

}

{

"@timestamp" =>"2014-08-09T13:32:24.359Z",

"message" =>"[Aug/08/09 14:54:04] hello logstash\n\n    hello best practice\n\n

            hello raochenlin\n",

"@version" =>"1",

    "tags" => [

        [0] "multiline"

    ],

"host" =>"raochenlindeMacBook-Air.local"

}

你看,後麵這個事件,在“message”字段裏存儲了三行數據!

輸出的事件中沒有最後一行的"the end"字符串,這是因為你最後輸入的回車符\n並不匹配設定的^\[正則表達式,Logstash還得等下一行數據直到匹配成功後才會輸出這個事件。

其實這個插件的原理很簡單,就是把當前行的數據添加到前麵一行後麵,直到新進的當前行匹配^\[正則為止。這個正則還可以用grok表達式,稍後你就會學習這方麵的內容。具體的Java日誌正則見:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/java

說到應用程序日誌,Log4j肯定是第一個被大家想到的,使用codec/multiline也確實是一個辦法。

不過,如果你本身就是開發人員,或者可以推動程序修改變更的話,Logstash還提供了另一種處理Log4j的方式:input/log4j。與codec/multiline不同,這個插件是直接調用了org.apache.log4j.spi.LoggingEvent處理TCP端口接收的數據。後麵3.6節會詳細講述Log4j的用法。

2.2.3 網絡流編碼

NetFlow是Cisco發明的一種數據交換方式。NetFlow提供網絡流量的會話級視圖,記錄下每個TCP/IP事務的信息。它的目的不是像tcpdump那樣提供網絡流量的完整記錄,而是匯集起來形成更易於管理和易讀的流向和容量的分析監控。

Cisco上配置NetFlow的方法,請參照具體的設備說明,主要是設定采集服務器的地址和端口,為運行Logstash服務的主機地址和端口(示例中為9995)。

采集NetFlow數據的Logstash服務配置示例如下:

input {

    udp {

        port => 9995

        codec => netflow {

            definitions =>"/opt/logstash-1.4.2/lib/logstash/codecs/netflow/netflow.yaml"

            versions => [5]

        }

    }

}

output {

    elasticsearch {

        index =>"logstash_netflow5-%{+YYYY.MM.dd}"

        host =>"localhost"

    }

}

由於該插件生成的字段較多,所以建議對應的Elasticsesarch索引模板也需要單獨提交:

# curl -XPUT localhost:9200/_template/logstash_netflow5 -d '{

"template" : "logstash_netflow5-*",

"settings": {

        "index.refresh_interval": "5s"

},

"mappings" : {

        "_default_" : {

        "_all" : {"enabled" : false},

        "properties" : {

            "@version": { "index": "analyzed", "type": "integer" },

            "@timestamp": { "index": "analyzed", "type": "date" },

            "netflow": {

            "dynamic": true,

            "type": "object",

            "properties": {

                "version": { "index": "analyzed", "type": "integer" },

                "flow_seq_num": { "index": "not_analyzed", "type": "long" },

                "engine_type": { "index": "not_analyzed", "type": "integer" },

                "engine_id": { "index": "not_analyzed", "type": "integer" },

                "sampling_algorithm": { "index": "not_analyzed", "type": "integer" },

                "sampling_interval": { "index": "not_analyzed", "type": "integer" },

                "flow_records": { "index": "not_analyzed", "type": "integer" },

                "ipv4_src_addr": { "index": "analyzed", "type": "ip" },

                "ipv4_dst_addr": { "index": "analyzed", "type": "ip" },

                "ipv4_next_hop": { "index": "analyzed", "type": "ip" },

                "input_snmp": { "index": "not_analyzed", "type": "long" },

                "output_snmp": { "index": "not_analyzed", "type": "long" },

                "in_pkts": { "index": "analyzed", "type": "long" },

                "in_bytes": { "index": "analyzed", "type": "long" },

                "first_switched": { "index": "not_analyzed", "type": "date" },

                "last_switched": { "index": "not_analyzed", "type": "date" },

                "l4_src_port": { "index": "analyzed", "type": "long" },

                "l4_dst_port": { "index": "analyzed", "type": "long" },

                "tcp_flags": { "index": "analyzed", "type": "integer" },

                "protocol": { "index": "analyzed", "type": "integer" },

                "src_tos": { "index": "analyzed", "type": "integer" },

                "src_as": { "index": "analyzed", "type": "integer" },

                "dst_as": { "index": "analyzed", "type": "integer" },

                "src_mask": { "index": "analyzed", "type": "integer" },

                "dst_mask": { "index": "analyzed", "type": "integer" }

              }

          }

       }

    }

  }

}'

Elasticsearch索引模板的功能,本書稍後12.6節會有詳細介紹。

2.2.4 collectd輸入

collectd是一個守護(daemon)進程,用來收集係統性能和提供各種存儲方式來存儲不同值的機製。它會在係統運行和存儲信息時周期性的統計係統的相關統計信息。利用這些信息有助於查找當前係統性能瓶頸(如作為性能分析performance analysis)和預測係統未來的load(如能力部署capacity planning)等

下麵簡單介紹一下:collectd的部署以及與Logstash對接的相關配置實例。

1. collectd的安裝

collectd的安裝同樣有兩種方式,使用官方軟件倉庫安裝或源代碼安裝。

使用官方軟件倉庫安裝(推薦)

collectd官方有一個隱藏的軟件倉庫:https://pkg.ci.collectd.org,構建有RHEL/CentOS (rpm),Debian/Ubuntu(deb)的軟件包,如果你使用的操作係統屬於上述的,那麼推薦使用軟件倉庫安裝。

目前collectd官方維護3個版本:5.4、5.5、5.6。根據需要選擇合適的版本倉庫。下麵示例安裝5.5版本的方法。

Debian/Ubuntu倉庫安裝:

echo "deb https://pkg.ci.collectd.org/deb $(lsb_release -sc) collectd-5.5" | sudo tee /etc/apt/sources.list.d/collectd.list

curl -s https://pkg.ci.collectd.org/pubkey.asc | sudo apt-key add -

sudo apt-get update && sudo apt-get install -y collectd

注意,Debian/Ubuntu軟件倉庫自帶有collectd軟件包,如果軟件倉庫自帶的版本足夠你使用,那麼可以不用添加倉庫,直接通過apt-get install collectd即可。

RHEL/CentOS倉庫安裝:

cat > /etc/yum.repos.d/collectd.repo <<EOF

[collectd-5.5]

name=collectd-5.5

baseurl=https://pkg.ci.collectd.org/rpm/collectd-5.5/epel-\$releasever-\$basearch/

gpgcheck=1

gpgkey=https://pkg.ci.collectd.org/pubkey.asc

EOF

yum install -y collectd

你如果需要使用其他collectd插件,此時也可一並安裝對應的collectd-xxxx軟件包。

源碼安裝collectd

collectd目前維護3個版本:5.4、5.5、5.6。源代碼編譯安裝時同樣可以根據自己需要選擇對應版本的源碼下載:

wget https://collectd.org/files/collectd-5.4.1.tar.gz

tar zxvf collectd-5.4.1.tar.gz

cd collectd-5.4.1

./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib --mandir=/usr/share/man --enable-all-plugins

make && make install

源代碼編譯安裝需要預先解決好各種環境依賴,在RedHat平台上要提前安裝如下軟件包:

rpm -ivh "https://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm"

yum -y install libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool rrdtool-prel libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl-CPAN net-snmp net-snmp-devel

安裝啟動腳本如下:

cp contrib/redhat/init.d-collectd /etc/init.d/collectd

chmod +x /etc/init.d/collectd

啟動collectd如下:

service collectd start

2. collectd的配置

以下配置可以實現對服務器基本的CPU、內存、網卡流量、磁盤I/O以及磁盤空間占用情況的監控:

Hostname "host.example.com"

LoadPlugin interface

LoadPlugin cpu

LoadPlugin memory

LoadPlugin network

LoadPlugin df

LoadPlugin disk

<Plugin interface>

    Interface "eth0"

    IgnoreSelected false

</Plugin>

<Plugin network>

<Server "10.0.0.1""25826"> ## Logstash 的 IP 地址和 collectd 的數據接收端口號

</Server>

</Plugin>

3. Logstash的配置

以下配置實現通過Logstash監聽25826端口,接收從collectd發送過來的各項檢測數據。

注意,logstash-filter-collectd插件本身需要單獨安裝,logstash插件安裝說明之前已經講過:

bin/logstash-plugin install logstash-filter-collectd

Logstash默認自帶有collectd的codec編碼插件。

推薦配置示例如下:

udp {

    port => 25826

    buffer_size => 1452

    workers => 3          # Default is 2

    queue_size => 30000   # Default is 2000

    codec => collectd { }

    type =>"collectd"

}

下麵是簡單的一個輸出結果:

{

"_index": "logstash-2014.12.11",

"_type": "collectd",

"_id": "dS6vVz4aRtK5xS86kwjZnw",

"_score": null,

"_source": {

"host": "host.example.com",

"@timestamp": "2014-12-11T06:28:52.118Z",

"plugin": "interface",

"plugin_instance": "eth0",

"collectd_type": "if_packets",

"rx": 19147144,

"tx": 3608629,

"@version": "1",

"type": "collectd",

"tags": [

"_grokparsefailure"

        ]

    },

"sort": [

        1418279332118

    ]

}

參考資料

collectd支持收集的數據類型:https://git.verplant.org/?p=collectd.git;a=blob;hb=master; f=README

collectd收集各數據類型的配置參考資料:https://collectd.org/documentation/manpages/collectd.conf.5.shtml

collectd簡單配置文件示例:https://gist.github.com/untergeek/ab85cb86a9bf39f1fc6d

2.3 過濾器配置

有豐富的過濾器插件,是Logstash威力如此強大的重要因素。名為過濾器,其實提供的不單單是過濾的功能。下麵我們就會重點介紹幾個插件,它們擴展了進入過濾器的原始數據,進行複雜的邏輯處理,甚至可以無中生有地添加新的Logstash事件到後續的流程中去!

2.3.1 date時間處理

之前章節已經提過,logstash-filter-date插件可以用來轉換你的日誌記錄中的時間字符串,變成LogStash::Timestamp對象,然後轉存到@timestamp字段裏。

因為在稍後的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}這種寫法必須讀取@timestamp數據,所以一定不要直接刪掉這個字段保留自己的字段,而是應該用logstash-filter-date轉換後刪除自己的字段!

這在導入舊數據的時候固然非常有用,而在實時數據處理的時候同樣有效,因為一般情況下數據流程中我們都會有緩衝區,導致最終的實際處理時間跟事件產生時間略有偏差。

強烈建議打開Nginx的access_log配置項的buffer參數,對極限響應性能有極大提升!

1.配置示例

logstash-filter-date插件支持五種時間格式:

ISO8601:類似“2011-04-19T03:44:01.103Z”這樣的格式。具體Z後麵可以有“08:00”也可以沒有,“.103”這個也可以沒有。常用場景裏來說,Nginx的log_format配置裏就可以使用$time_iso8601變量來記錄請求時間成這種格式。

UNIX:UNIX時間戳格式,記錄的是從1970年起始至今的總秒數。Squid默認日誌格式中就使用了這種格式。

UNIX_MS:這個時間戳則是從1970年起始至今的總毫秒數。據我所知,JavaScript裏經常使用這個時間格式。

TAI64N:TAI64N格式比較少見,是這個樣子的:@4000000052f88ea32489532c。我目前隻知道常見應用中,qmail會用這個格式。

Joda-Time庫:Logstash內部使用了Java的Joda時間庫來作時間處理。所以我們可以使用Joda庫所支持的時間格式來作具體定義。Joda時間格式定義見表2-1。

表2-1 Joda時間庫格式

格式符  含  義    描  述    示  例

G   era text    AD

C   century of era (>=0)    number  20

Y   year of era (>=0)   year    1996

x   weekyear    year    1996

w   week of weekyear    number  27

e   day of week number  2

E   day of week text    Tuesday; Tue

y   year    year    1996

D   day of year number  189

M   month of year   month   July; Jul; 07

d   day of month    number  10

a   halfday of day  text    PM

K   hour of halfday (0~11)  number  0

h   clockhour of halfday (1~12) number  12

H   hour of day (0~23)  number  0

k   clockhour of day (1~24) number  24

m   minute of hour  number  30

s   second of minute    number  55

S   fraction of second  number  978

z   time zone   text    Pacific Standard Time; PST

Z   time zone offset/id zone    -0800; -08:00; America/Los_Angeles

'   escape for text delimiter  

''  single quote    literal '

 

下麵我們寫一個Joda時間格式的配置作為示例:

filter {

    grok {

        match => ["message", "%{HTTPDATE:logdate}"]

    }

    date {

        match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]

    }

}

注意,時區偏移量隻需要用一個字母Z即可。

2.時區問題的解釋

很多中國用戶經常提一個問題:為什麼@timestamp比我們晚了8個小時?怎麼修改成北京時間?

其實,Elasticsearch內部,對時間類型字段,是統一采用UTC時間,存成long長整形數據的!對日誌統一采用UTC時間存儲,是國際安全/運維界的一個通識—歐美公司的服務器普遍廣泛分布在多個時區裏—不像中國,地域橫跨五個時區卻隻用北京時間。

對於頁麵查看,ELK的解決方案是在Kibana上,讀取瀏覽器的當前時區,然後在頁麵上轉換時間內容的顯示。

所以,建議大家接受這種設定。否則,即便你用.getLocalTime修改,也還要麵臨在Kibana過去修改,以及Elasticsearch原有的["now-1h" TO "now"]這種方便的搜索語句無法正常使用的尷尬。

以上,請讀者自行斟酌。

2.3.2 grok正則捕獲

grok是Logstash最重要的插件。你可以在grok裏預定義好命名正則表達式,在稍後(grok參數或者其他正則表達式裏)引用它。

1.正則表達式語法

運維工程師多多少少都會一點正則。你可以在grok裏寫標準的正則,像下麵這樣:

\s+(?<request_time>\d+(?:\.\d+)?)\s+

這個正則表達式寫法對於Perl或者Ruby程序員應該很熟悉了,Python程序員可能更習慣寫(?P<name>pattern),沒辦法,適應一下吧。

現在給我們的配置文件添加第一個過濾器區段配置。配置要添加在輸入和輸出區段之間(Logstash執行區段的時候並不依賴於次序,不過為了自己看得方便,還是按次序書寫吧):

input {stdin{}}

filter {

    grok {

        match => {

        "message" =>"\s+(?<request_time>\d+(?:\.\d+)?)\s+"

        }

    }

}

output {stdout{codec=>rubydebug}}

運行Logstash進程然後輸入“begin 123.456 end”,你會看到類似下麵這樣的輸出:

{

"message" =>"begin 123.456 end",

"@version" =>"1",

"@timestamp" =>"2014-08-09T11:55:38.186Z",

"host" =>"raochenlindeMacBook-Air.local",

"request_time" =>"123.456"

}

漂亮!不過數據類型好像不太滿意……request_time應該是數值而不是字符串。

我們已經提過稍後會學習用LogStash::Filters::Mutate來轉換字段值類型,不過在grok 裏,其實有自己的魔法來實現這個功能!

2. grok表達式語法

grok支持把預定義的grok表達式寫入到文件中,官方提供的預定義grok表達式見:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

下麵是從官方文件中摘抄的最簡單但是足夠說明用法的示例:

USERNAME [a-zA-Z0-9._-]+

USER %{USERNAME}

第一行,用普通的正則表達式來定義一個grok表達式;第二行,通過打印賦值格式(sprintf format),用前麵定義好的grok表達式來定義另一個grok表達式。

grok表達式的打印複製格式的完整語法見下行示例。其中data_type目前隻支持兩個值:int和float。

%{PATTERN_NAME:capture_name:data_type}

所以我們可以改進我們的配置成下麵這樣:

filter {

    grok {

        match => {

        "message" =>"%{WORD} %{NUMBER:request_time:float} %{WORD}"

        }

    }

}

重新運行進程然後可以得到如下結果:

{

"message" =>"begin 123.456 end",

"@version" =>"1",

"@timestamp" =>"2014-08-09T12:23:36.634Z",

"host" =>"raochenlindeMacBook-Air.local",

"request_time" => 123.456

}

這次request_time變成數值類型了。

3.最佳實踐

實際運用中,我們需要處理各種各樣的日誌文件,如果你都是在配置文件裏各自寫一行自己的表達式,就完全不可管理了。所以,我們建議是把所有的grok表達式統一寫入到一個地方。然後用filter/grok的patterns_dir選項來指明。

如果你把“message”裏所有的信息都grok到不同的字段了,數據實質上就相當於是重複存儲了兩份。所以你可以用remove_field參數來刪除掉message字段,或者用overwrite參數來重寫默認的message字段,隻保留最重要的部分。

重寫參數的示例如下:

filter {

    grok {

        patterns_dir =>["/path/to/your/own/patterns"]

        match => {

        "message" =>"%{SYSLOGBASE} %{DATA:message}"

        }

        overwrite => ["message"]

    }

}

更多有關grok正則性能的最佳實踐(比如timeout_millis等配置參數),請參考:https://www.elastic.co/blog/do-you-grok-grok。

4.高級用法

多行匹配 在和codec/multiline搭配使用的時候,需要注意一個問題,grok正則和普通正則一樣,默認是不支持匹配回車換行的。就像你需要=~ // m一樣也需要單獨指定,具體寫法是在表達式開始位置加(?m)標記。如下所示:

match => {

"message" =>"(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+"

}

多項選擇 有時候我們會碰上一個日誌有多種可能格式的情況。這時候要寫成單一正則就比較困難,或者全用|隔開又比較醜陋。這時候,Logstash的語法提供給我們一個有趣的解決方式。

文檔中,都說明logstash-filters-grok插件的match參數應該接受的是一個Hash值。但是因為早期的Logstash語法中Hash值也是用[]這種方式書寫的,所以其實現在傳遞Array值給match參數也完全沒問題。所以,我們這裏其實可以傳遞多個正則來匹配同一個字段:

match => [

"message", "(?<request_time>\d+(?:\.\d+)?)",

"message", "%{SYSLOGBASE} %{DATA:message}",

"message", "(?m)%{WORD}"

]

Logstash會按照這個定義次序依次嚐試匹配,到匹配成功為止。雖說效果跟用|分割寫個大大的正則是一樣的,但是可閱讀性好了很多。

我強烈建議每個人都要使用Grok Debugger (https://grokdebug.herokuapp.com/)來調試自己的grok表達式。

2.3.3 dissect解析

grok作為Logstash最廣為人知的插件,在性能和資源損耗方麵同樣也廣為詬病。為了應對這個情況,同時也考慮到大多數情況下日誌格式並沒有那麼複雜,Logstash開發團隊在5.0版新添加了另一個解析字段的插件:dissect。當日誌格式有比較簡明的分隔標誌位且重複性較大的時候,可以使用dissect插件更快地完成解析工作。下麵是解析syslog的示例。

filter {

    dissect {

        mapping => {

            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"

        }

        convert_datatype => {

            pid => "int"

        }

    }

}

我們看到上麵使用了和Grok很類似的%{}語法來表示字段,這顯然是基於習慣延續的考慮。不過示例中%{+ts}的加號就不一般了。dissect除了字段外麵的字符串定位功能以外,還通過幾個特殊符號來處理字段提取的規則:

%{+key} 這個+表示前麵已經捕獲到一個key字段了,而這次捕獲的內容自動添補到之前key字段內容的後麵。

%{+key/2} 這個/2表示在有多次捕獲內容都填到key字段裏的時候,拚接字符串的順序誰前誰後。/2表示排第2位。

%{?string} 這個?表示這塊隻是一個占位,並不會實際生成捕獲字段存到Event裏麵。

%{?string} %{&string} 當同樣捕獲名稱都是string,但是一個?和一個&在一起的時候,表示這是一個鍵值對。

比如對https://rizhiyi.com/index.do?id=123寫這麼一段配置:

https://%{domain}/%{?url}?%{?arg1}=%{&arg1}

則最終生成的 Event 內容是這樣的:

{

    domain => "rizhiyi.com",

    id => "123"

}

2.3.4 GeoIP地址查詢

GeoIP是最常見的免費IP地址歸類查詢庫,同時也有收費版可以采購。GeoIP庫可以根據IP地址提供對應的地域信息,包括國別、省市、經緯度等,對於可視化地圖和區域統計非常有用。

配置示例如下:

filter {

    geoip {

        source =>"message"

    }

}

運行結果如下:

{

"message" =>"183.60.92.253",

"@version" =>"1",

"@timestamp" =>"2014-08-07T10:32:55.610Z",

"host" =>"raochenlindeMacBook-Air.local",

"geoip" => {

"ip" =>"183.60.92.253",

"country_code2" =>"CN",

"country_code3" =>"CHN",

"country_name" =>"China",

"continent_code" =>"AS",

"region_name" =>"30",

"city_name" =>"Guangzhou",

"latitude" =>23.11670000000001,

"longitude" =>113.25,

"timezone" =>"Asia/Chongqing",

"real_region_name" =>"Guangdong",

"location" => [

            [0] 113.25,

            [1] 23.11670000000001

        ]

    }

}

GeoIP庫數據較多,如果你不需要這麼多內容,可以通過fields選項指定自己所需要的。下例為全部可選內容:

filter {

    geoip {

        fields => ["city_name", "continent_code", "country_code2",

            "country_code3", "country_name", "dma_code", "ip", "latitude",

            "longitude", "postal_code", "region_name", "timezone"]

    }

}

需要注意的是:geoip.location是Logstash通過latitude和longitude額外生成的數據。所以,如果你是想要經緯度又不想重複數據的話,應該像下麵這樣做:

filter {

    geoip {

        fields => ["city_name", "country_code2", "country_name", "latitude",

            "longitude", "region_name"]

        remove_field => ["[geoip][latitude]", "[geoip][longitude]"]

    }

}

還要注意:geoip插件的“source”字段可以是任一處理後的字段,比如“client_ip”,但是字段內容卻需要小心!GeoIp庫內隻存有公共網絡上的IP信息,查詢不到結果的,會直接返回null,而Logstash的GeoIp插件對null結果的處理是:“不生成對應的geoip.字段”。所以讀者在測試時,如果使用了諸如127.0.0.1、172.16.0.1、182.168.0.1、10.0.0.1等內網地址,會發現沒有對應輸出!

2.3.5 JSON編解碼

在上一章,已經講過在Codec中使用JSON編碼。但是,有些日誌可能是一種複合的數據結構,其中隻有一部分記錄是JSON格式的。這時候,我們依然需要在filter階段,單獨啟用JSON解碼插件。

配置示例如下:

filter {

    json {

        source =>"message"

        target =>"jsoncontent"

    }

}

運行結果如下:

{

"@version": "1",

"@timestamp": "2014-11-18T08:11:33.000Z",

"host": "web121.mweibo.tc.sinanode.com",

"message": "{\"uid\":3081609001,\"type\":\"signal\"}",

"jsoncontent": {

"uid": 3081609001,

"type": "signal"

}

}

如果不打算使用多層結構的話,刪掉target配置即可。單層結構新的結果如下:

{

"@version": "1",

"@timestamp": "2014-11-18T08:11:33.000Z",

"host": "web121.mweibo.tc.sinanode.com",

"message": "{\"uid\":3081609001,\"type\":\"signal\"}",

"uid": 3081609001,

"type": "signal"

}

2.3.6 key-value切分

在很多情況下,日誌內容本身都是一個類似於key-value的格式,但是格式具體的樣式卻是多種多樣的。Logstash提供logstash-filter-kv插件,幫助處理不同樣式的key-value日誌,變成實際的LogStash::Event數據。

配置示例如下:

filter {

    ruby {

        init =>"@kname = ['method','uri','verb']"

    ruby {

        init => "@kname = ['method','uri','verb']"

        code => "

            new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('|'))])

            new_event.remove('@timestamp')

            event.append(new_event)""

        "

    }

    if [uri] {

        ruby {

            init => "@kname = ['url_path','url_args']"

            code => "

                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])

                new_event.remove('@timestamp')

                event.append(new_event)""

            "

        }

        kv {

            prefix =>"url_"

            source =>"url_args"

            field_split =>"&"

            remove_field => [ "url_args", "uri", "request" ]

        }

    }

}

Nginx訪問日誌中的$request,通過這段配置,可以詳細切分成method、url_path、verb、url_a、url_b...

進一步,如果url_args中有過多字段,可能導致Elasticsearch集群因為頻繁update mapping或者消耗太多內存在cluster state上而宕機。所以,更優的選擇是隻保留明確有用的url_args內容,其他部分舍去,如下所示:

kv {

    prefix =>"url_"

    source =>"url_args"

    field_split =>"&"

    include_keys => [ "uid", "cip" ]

    remove_field => [ "url_args", "uri", "request" ]

}

上例即表示,除了url_uid和url_cip兩個字段以外,其他的url_*都不保留。

2.3.7 metrics數值統計

logstash-filter-metrics插件是使用Ruby的Metriks模塊來實現在內存裏實時地計數和采樣分析。該模塊支持兩個類型的數值分析:meter和timer。下麵分別舉例說明。

1. Meter示例(速率閾值檢測)

Web訪問日誌的異常狀態碼頻率是運維人員會非常關心的一個數據。通常我們的做法是通過Logstash或者其他日誌分析腳本,把計數發送到rrdtool或者graphite裏麵,然後再通過check_graphite腳本之類的東西來檢查異常並報警。

事實上這個事情可以直接在Logstash內部就完成。比如如果最近一分鍾504請求的個數超過100個就報警,如下所示:

filter {

    metrics {

        meter => "error_%{status}"

            add_tag => "metric"

            ignore_older_than => 10

    }

    if "metric" in [tags] {

        ruby {

            code => "event.cancel if event.get('[error_504][rate_1m]') * 60 > 100"

        }

    }

}

output {

    if "metric" in [tags] {

        exec {

            command => "echo \"Out of threshold: %{[error_504][rate_1m]}\""

        }

}

這裏需要注意*60的含義。metrics模塊生成的rate_1m/5m/15m意思是:最近1、5、15分鍾的每秒速率!

2. Timer示例(box and whisker異常檢測)

官版的logstash-filter-metrics插件隻適用於metric事件的檢查。由插件生成的新事件內部不存有來自input區段的實際數據信息。所以,要完成我們的百分比分布箱體檢測,需要首先對代碼稍微做幾行變動,即在metric的timer事件裏加一個屬性,存儲最近一個實際事件的數值:

def register

    …

    @last = {}

end

def filter(event)

    …

    @last[event.sprintf(name)] = event.sprintf(value).to_f

end

def flush(options = {})

    …

    @metric_timer.each_pair do |name, metric|

event.set(“[#{name}][last]”, @last[name])

metric.clear if should_clear?

    end

    …

end

有了這個Last值,然後我們就可以用如下配置來探測異常數據了:

filter {

    metrics {

        timer => {"rt" =>"%{request_time}"}

        percentiles => [25, 75]

        add_tag =>"percentile"

    }

        ruby {

            code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)"

        }

    }

}

output {

    if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) {

       exec {

           command => "echo \"Anomaly: %{[rt][last]}\""

       }

}

有關box and shisker plot內容和重要性,參見《數據之魅》一書。

2.3.8 mutate數據修改

logstash-filter-mutate插件是Logstash另一個重要插件,它提供了豐富的基礎類型數據處理能力,包括類型轉換、字符串處理和字段處理等。

1.類型轉換

類型轉換是logstash-filter-mutate插件最初誕生時的唯一功能,其應用場景在之前的2.3.5節“JSON編解碼”中已經提到。

可以設置的轉換類型包括:“integer”、“float”和“string”。示例如下:

filter {

    mutate {

        convert => ["request_time", "float"]

    }

}

mutate除了轉換簡單的字符值,還支持對數組類型的字段進行轉換,即將[“1”,“2”]轉換成[1,2]。但不支持對哈希類型的字段做類似處理。有這方麵需求的可以采用稍後講述的logstash-filter-ruby插件完成。

2. 字符串處理

有如下字符串處理的插件:

gsub:僅對字符串類型字段有效。

gsub => ["urlparams", "[\\?#]", "_"]

split:分割字符串。

filter {

    mutate {

        split => ["message", "|"]

    }

}

隨意輸入一串以|分割的字符,比如“123|321|adfd|dfjld*=123”,可以看到如下輸出:

{

"message" => [

    [0] "123",

    [1] "321",

    [2] "adfd",

    [3] "dfjld*=123"

],

"@version" =>"1",

"@timestamp" =>"2014-08-20T15:58:23.120Z",

"host" =>"raochenlindeMacBook-Air.local"

}

join:僅對數組類型字段有效。

我們在之前已經用split割切的基礎上再join回去。配置改成:

filter {

    mutate {

        split => ["message", "|"]

    }

    mutate {

        join => ["message", ","]

    }

}

filter區段之內,是順序執行的。所以我們最後看到的輸出結果是:

{

"message" =>"123,321,adfd,dfjld*=123",

"@version" =>"1",

"@timestamp" =>"2014-08-20T16:01:33.972Z",

"host" =>"raochenlindeMacBook-Air.local"

}

merge:合並兩個數組或者哈希字段。依然在之前split的基礎上繼續:

filter {

    mutate {

        split => ["message", "|"]

    }

    mutate {

        merge => ["message", "message"]

    }

}

我們會看到輸出:

{

"message" => [

    [0] "123",

    [1] "321",

    [2] "adfd",

    [3] "dfjld*=123",

    [4] "123",

    [5] "321",

    [6] "adfd",

    [7] "dfjld*=123"

],

"@version" =>"1",

"@timestamp" =>"2014-08-20T16:05:53.711Z",

"host" =>"raochenlindeMacBook-Air.local"

}

如果src字段是字符串,會自動先轉換成一個單元素的數組再合並。把上一示例中的來源字段改成“host”:

filter {

    mutate {

        split => ["message", "|"]

    }

    mutate {

        merge => ["message", "host"]

    }

}

結果變成:

{

"message" => [

    [0] "123",

    [1] "321",

    [2] "adfd",

    [3] "dfjld*=123",

    [4] "raochenlindeMacBook-Air.local"

],

"@version" =>"1",

"@timestamp" =>"2014-08-20T16:07:53.533Z",

"host" => [

    [0] "raochenlindeMacBook-Air.local"

    ]

}

看,目的字段“message”確實多了一個元素,但是來源字段“host”本身也由字符串類型變成數組類型了!

同樣,如果目的字段不是數組,也會被強製轉換。即使來源字段並不存在:

filter {

    mutate {

        merge => ["message", "not_exist_field"]

    }

}

結果會變成:

{

"message" => [

        [0] "123|321|adfd|dfjld*=123"

    ],

"@version" =>"1",

"@timestamp" =>"2014-08-20T15:58:23.120Z",

"host" =>"raochenlindeMacBook-Air.local"

}

strip:去除字段內容前後的空格。可以接受數組參數:

filter {

    mutate {

        strip => ["syslog_message", "syslog_datetime"]

    }

}

lowercase:將字段內容全部轉換成小寫字母。同樣可以接受數組。在ELK stack場景中,將內容轉換成小寫會是一個比較常見的需求。因為Elasticsearch默認是統一按照小寫字母來搜索的。為了確保檢索準確率,在不影響使用的情況下,建議對常用檢索字段啟用lowercase配置。

uppercase:將字段內容全部轉換成大寫字母。同樣可以接受數組。

3.字段處理

字段處理的插件有:

rename:重命名某個字段,如果目的字段已經存在,會被覆蓋掉,如下所示:

filter {

    mutate {

        rename => ["syslog_host", "host"]

    }

}

update:更新某個字段的內容。如果字段不存在,不會新建。

replace:作用和update類似,但是當字段不存在的時候,它會起到add_field參數一樣的效果,自動添加新的字段。

4.執行次序

需要注意的是,filter/mutate內部是有執行次序的。其次序如下:

rename(event) if @rename

update(event) if @update

replace(event) if @replace

convert(event) if @convert

gsub(event) if @gsub

uppercase(event) if @uppercase

lowercase(event) if @lowercase

strip(event) if @strip

remove(event) if @remove

split(event) if @split

join(event) if @join

merge(event) if @merge

filter_matched(event)

而filter_matched這個filters/base.rb裏繼承的方法也是有次序的:

@add_field.each do |field, value|

end

@remove_field.each do |field|

end

@add_tag.each do |tag|

end

@remove_tag.each do |tag|

end

2.3.9 隨心所欲的Ruby處理

如果你稍微懂那麼一點點Ruby語法的話,logstash-filter-ruby插件將會是一個非常有用的工具。比如你需要稍微修改一下LogStash::Event對象,但是又不打算為此寫一個完整的插件,用logstash-filter-ruby插件絕對感覺良好。

配置示例如下:

filter {

    ruby {

        init =>"@kname = ['client','servername','url','status','time','size','upstream',

            'upstreamstatus','upstreamtime','referer','xff','useragent']"

        code => "

            ew_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])

            new_event.remove('@timestamp')

        event.append(new_event)"

    }

}

官網示例是一個比較有趣但是沒啥大用的做法—隨機取消90%的事件。

所以上麵我們給出了一個有用而且強大的實例。

通常我們都是用logstash-filter-grok插件來捕獲字段的,但是正則耗費大量的CPU資源,很容易成為Logstash進程的瓶頸。

而實際上,很多流經Logstash的數據都是有自己預定義的特殊分隔符的,我們可以很簡單的直接切割成多個字段。

從Logstash2.3開始,LogStash::event.append不再直接接受Hash對象,而必須是LogStash:: Event對象。所以示例變成要先初始化一個新的event,再把無用的@timestamp移除,再append進去。否則會把@timestamp變成有兩個時間的數組了!

  從Logstash 5.0開始,LogStash::Event改為Java實現,直接使用event["parent"]["child"]形式獲取的,不是原事件的引用而是複製品,需要改用event.get('[parent][child]')和event.set('[parent][child]', 'value')的方法。

logstash-filter-mutate插件裏的“split”選項隻能切成數組,後續很不方便使用和識別。而在logstash-filter-ruby裏,我們可以通過“init”參數預定義好由每個新字段的名字組成的數組,然後在“code”參數指定的Ruby語句裏通過兩個數組的zip操作生成一個哈希並添加進數組裏。短短一行Ruby代碼,可以減少50%以上的CPU使用率。

logstash-filter-ruby插件用途遠不止這一點,下一節你還會繼續見到它的身影。

更多實例如下:

filter{

    date {

        match => ["datetime" , "UNIX"]

    }

    ruby {

        code =>"event.cancel if 5 * 24 * 3600 < (event.get( '@timestamp')-::Time.now).abs"

    }

}

在實際運用中,我們幾乎肯定會碰到出乎意料的輸入數據。這都有可能導致Elasticsearch集群出現問題。

當數據格式發生變化,比如UNIX時間格式變成UNIX_MS時間格式,會導致Logstash瘋狂創建新索引,集群崩潰。

或者誤輸入過老的數據時,因為一般我們會close幾天之前的索引以節省內存,必要時再打開。而直接嚐試把數據寫入被關閉的索引會導致內存問題。

這時候我們就需要提前校驗數據的合法性。上麵配置,就是用於過濾掉時間範圍與當前時間差距太大的非法數據的。

2.3.10 split拆分事件

上一章我們通過multiline插件將多行數據合並進一個事件裏,那麼反過來,也可以把一行數據,拆分成多個事件。這就是split插件。

配置示例如下:

filter {

    split {

        field =>"message"

        terminator =>"#"

    }

}

這個測試中,我們在intputs/stdin的終端中輸入一行數據:“test1#test2”,結果看到輸出兩個事件:

{

"@version": "1",

"@timestamp": "2014-11-18T08:11:33.000Z",

"host": "web121.mweibo.tc.sinanode.com",

"message": "test1"

}

{

"@version": "1",

"@timestamp": "2014-11-18T08:11:33.000Z",

"host": "web121.mweibo.tc.sinanode.com",

"message": "test2"

}最後更新:2017-05-19 15:02:37

  上一篇:go  事務必會必知
  下一篇:go  InterruptedException 和 interrupting threads 的一些說明