《ELK Stack權威指南 》第3章 場景示例
場 景 示 例
前麵雖然介紹了幾十個Logstash插件的常見配置項,但是過多的選擇下,如何組合使用這些插件,依然是一部分用戶的難題。本章將列舉一些最常見的日誌場景,演示針對性的組件搭配,希望能給讀者帶來啟發。
本章介紹的場景包括:Nginx訪問日誌、Nginx錯誤日誌、Postfix日誌、Ossec日誌、Windows係統日誌、Java日誌、MySQL慢查詢日誌、Docker容器日誌。
3.1 Nginx訪問日誌
訪問日誌處理分析絕對是使用ELK stack時最常見的需求。默認的處理方式下,性能和精確度都不夠好。本節會列舉對Nginx訪問日誌的幾種不同處理方式,並闡明其優劣。
3.1.1 grok處理方式
Logstash默認自帶了Apache標準日誌的grok正則表達式:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{NOTSPACE:auth}\[%{HTTPDATE:
timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?
|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
對於Nginx標準日誌格式,可以發現隻是最後多了一個$http_x_forwarded_for變量。所以Nginx標準日誌的grok正則定義是:
MAINNGINXLOG %{COMBINEDAPACHELOG} %{QS:x_forwarded_for}
自定義的日誌格式,可以照此修改。
3.1.2 split處理方式
Nginx日誌因為部分變量中內含空格,所以很多時候隻能使用%{QS}正則來做分隔,性能和細度都不太好。如果能自定義一個比較少見的字符作為分隔符,那麼處理起來就簡單多了。假設定義的日誌格式如下:
log_format main "$http_x_forwarded_for | $time_local | $request | $status |
$body_bytes_sent | "
"$request_body | $content_length | $http_referer | $http_user_agent | $nuid | "
"$http_cookie | $remote_addr | $hostname | $upstream_addr | $upstream_response_
time | $request_time";
實際日誌如下:
117.136.9.248 | 08/Apr/2015:16:00:01 +0800 | POST /notice/newmessage?sign=cba4f614e05db285850cadc696fcdad0&token=JAGQ92Mjs3--gik_b_DsPIQHcyMKYGpD&did=b749736ac70f12df700b18cd6d051d5&osn=android&osv=4.0.4&appv=3.0.1&net=460-02-2g&longitude=120.393006&latitude=36.178329&ch=360&lp=1&ver=1&ts=1428479998151&im=869736012353958&sw=0&sh=0&la=zh-CN&lm=weixin&dt=vivoS11tHTTP/1.1| 200 | 132 | abcd-sign-v1://dd03c57f8cb6f1cef919ab5df66f2903f:d51asq5yslwnyz5t/{\x22type\x22:4,\x22uid\x22:7567306} | 89 | - | abcd/3.0.1, Android/4.0.4, vivo S11t | nuid=0C0A0A0A01E02455EA7CF47E02FD072C1428480001.157| - | 10.10.10.13 | bnx02.abcdprivate.com | 10.10.10.22:9999 | 0.022 | 0.022 59.50.44.53 | 08/Apr/2015:16:00:01 +0800 | POST /feed/pubList?appv=3.0.3&did=89da72550de488328e2aba5d97850e9f&dt=iPhone6%2C2&im=B48C21F3-487E-4071-9742-DC6D61710888&la=cn&latitude=0.000000&lm=weixin&longitude=0.000000&lp=-1.000000&net=0-0-wifi&osn=iOS&osv=8.1.3&sh=568.000000&sw=320.000000&token=7NobA7asg3Jb6n9o4ETdPXyNNiHwMs4J&ts=1428480001275 HTTP/1.1 | 200 | 983 | abcd-sign-v1://b398870a0b25b29aae65cd553addc43d:72214ee85d7cca22/{\x22nextkey\x22:\x22\x22,\x22uid\x22:\x2213062545\x22,\x22token\x22:\x227NobA7asg3Jb6n9o4ETdPXyNNiHwMs4J\x22}| 139 | - | Shopping/3.0.3 (iPhone; iOS 8.1.3; Scale/2.00) | nuid=0C0A0A0A81-DF2455017D548502E48E2E1428480001.154 | nuid=CgoKDFUk34GFVH0BLo7kAg== | 10.10.10.11 | bnx02.abcdprivate.com | 10.10.10.35:9999 | 0.025 | 0.026
然後還可以針對request做更細致的切分。比如URL參數部分。很明顯,URL參數中的字段順序是亂的。第一行問號之後的第一個字段是sign,第二行問號之後的第一個字段是appv。所以需要將字段進行切分,取出每個字段對應的值。官方自帶grok滿足不了要求,最終采用的Logstash配置如下:
filter {
ruby {
init =>"@kname =['http_x_forwarded_for','time_local','request','status',
'body_bytes_sent','request_body','content_length','http_referer','http_
user_agent','nuid','http_cookie','remote_addr','hostname','upstream_
addr','upstream_response_time','request_time']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
if [request] {
ruby {
init =>"@kname = ['method','uri','verb']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
if [uri] {
ruby {
init =>"@kname = ['url_path','url_args']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
kv {
prefix =>"url_"
source =>"url_args"
field_split =>"&"
remove_field => [ "url_args","uri","request" ]
}
}
}
mutate {
convert => [
"body_bytes_sent" , "integer",
"content_length", "integer",
"upstream_response_time", "float",
"request_time", "float"
]
}
date {
match => [ "time_local", "dd/MMM/yyyy:hh:mm:ss Z" ]
locale =>"en"
}
}
最終結果如下:
{
"message" =>"1.43.3.188 | 08/Apr/2015:16:00:01 +0800 | POST /search/sug
gest?appv=3.0.3&did=dfd5629d705d400795f698055806f01d&dt=iPhone7%2C2&im=
AC926907-27AA-4A10-9916-C5DC75F29399&la=cn&latitude=-33.903867&lm=
sina&longitude=151.208137&lp=-1.000000&net=0-0-wifi&osn=iOS&osv=8.1.3&sh=66
7.000000&sw=375.000000&token=_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO&ts=
1428480001567 HTTP/1.1 | 200 | 353 | abcd-sign-v1://a24b478486d3bb92ed89a-
901541b60a5:b23e9d2c14fe6755/{\\x22key\\x22:\\x22last\\x22,\\x22offset\\x22:
\\x220\\x22,\\x22token\\x22:\\x22_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO\\x22,
\\x22limit\\x22:\\x2220\\x22} | 148 | - | abcdShopping/3.0.3 (iPhone; iOS
8.1.3; Scale/2.00) | nuid=0B0A0A0A9A64AF54F97634640230944E1428480001.113
| nuid=CgoKC1SvZJpkNHb5TpQwAg== | 10.10.10.11 | bnx02.abcdprivate.com |
10.10.10.26:9999 | 0.070 | 0.071",
"@version" =>"1",
"@timestamp" =>"2015-04-08T08:00:01.000Z",
"type" =>"nginxapiaccess",
"host" =>"blog05.abcdprivate.com",
"path" =>"/home/nginx/logs/api.access.log",
"http_x_forwarded_for" =>"1.43.3.188",
"time_local" =>" 08/Apr/2015:16:00:01 +0800",
"status" =>"200",
"body_bytes_sent" => 353,
"request_body" =>"abcd-sign-v1://a24b478486d3bb92ed89a901541b60a5:b23e9d2c1
4fe6755/{\\x22key\\x22:\\x22last\\x22,\\x22offset\\x22:\\x220\\x22,\\x22token
\\x22:\\x22_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO\\x22,\\x22limit\\x22:\\x2220\\x22}",
"content_length" => 148,
"http_referer" =>"-",
"http_user_agent" =>"abcdShopping/3.0.3 (iPhone; iOS 8.1.3; Scale/2.00)",
"nuid" =>"nuid=0B0A0A0A9A64AF54F97634640230944E1428480001.113",
"http_cookie" =>"nuid=CgoKC1SvZJpkNHb5TpQwAg==",
"remote_addr" =>"10.10.10.11",
"hostname" =>"bnx02.abcdprivate.com",
"upstream_addr" =>"10.10.10.26:9999",
"upstream_response_time" => 0.070,
"request_time" => 0.071,
"method" =>"POST",
"verb" =>"HTTP/1.1",
"url_path" =>"/search/suggest",
"url_appv" =>"3.0.3",
"url_did" =>"dfd5629d705d400795f698055806f01d",
"url_dt" =>"iPhone7%2C2",
"url_im" =>"AC926907-27AA-4A10-9916-C5DC75F29399",
"url_la" =>"cn",
"url_latitude" =>"-33.903867",
"url_lm" =>"sina",
"url_longitude" =>"151.208137",
"url_lp" =>"-1.000000",
"url_net" =>"0-0-wifi",
"url_osn" =>"iOS",
"url_osv" =>"8.1.3",
"url_sh" =>"667.000000",
"url_sw" =>"375.000000",
"url_token" =>"_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO",
"url_ts" =>"1428480001567"
}
如果URL參數過多,可以不使用kv切分,或者預先定義成nested object後改成數組形式:
if [uri] {
ruby {
init =>"@kname = ['url_path','url_args']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
if [url_args] {
ruby {
init => "@kname = ['key','value']"
code => "event.set('nested_args', event.get('url_args').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
remove_field => [ "url_args","uri","request" ]
}
}
}
采用nested object的優化原理和nested object的使用方式,請閱讀後麵第11章中介紹Elasticsearch調優的內容。
3.1.3 JSON格式
自定義分隔符雖好,但是配置寫起來畢竟複雜很多。其實對Logstash來說,Nginx日誌還有另一種更簡便的處理方式,就是自定義日誌格式時,通過手工拚寫直接輸出成JSON 格式:
log_format json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"url":"$uri",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"agent":"$http_user_agent",'
'"status":"$status"}';
然後采用下麵的Logstash配置即可:
input {
file {
path =>"/var/log/nginx/access.log"
codec => json
}
}
filter {
mutate {
split => [ "upstreamtime", "," ]
}
mutate {
convert => [ "upstreamtime", "float" ]
}
}
這裏采用多個mutate插件,是因為upstreamtime可能有多個數值,所以先切割成數組以後,再分別轉換成浮點型數值。而在mutate中,convert函數的執行優先級高於split函數,所以隻能分開兩步寫。mutate內各函數的優先級順序,之前2.3.8節有詳細說明,讀者可以返回去閱讀。
3.1.4 syslog方式發送
Nginx從1.7版開始,加入了syslog支持,Tengine則更早。這樣,我們可以通過syslog直接發送日誌。Nginx上的配置如下:
access_log syslog:server=unix:/data0/rsyslog/nginx.sock locallog;
或者直接發送給遠程Logstash機器:
access_log syslog:server=192.168.0.2:5140,facility=local6,tag=nginx-access,
severity=info logstashlog;
默認情況下,Nginx將使用local7.info等級,以nginx為標簽發送數據。注意,采用syslog發送日誌的時候,無法配置buffer=16k選項。
3.2 Nginx錯誤日誌
Nginx錯誤日誌是運維人員最常見但又極其容易忽略的日誌類型之一。本節介紹對Nginx錯誤日誌的處理方式,並推薦讀者在性能優化中對此多加關注。Nginx錯誤日誌既沒有統一明確的分隔符,也沒有特別方便的正則模式,但通過Logstash不同插件的組合,還是可以輕鬆進行數據處理的。
值得注意的是,Nginx錯誤日誌中有一類數據是接收過大請求體時的報錯,默認信息會把請求體的具體字節數記錄下來。每次請求的字節數基本都是在變化的,這意味著常用的topN等聚合函數對該字段沒有明顯效果。所以,對此需要做一下特殊處理。
最後形成的Logstash配置如下所示:
filter {
grok {
match => { "message" =>"(?<datetime>\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d)
\[(?<errtype>\w+)\] \S+: \*\d+ (?<errmsg>[^,]+), (?<errinfo>.*)$" }
}
mutate {
rename => [ "host", "fromhost" ]
gsub => [ "errmsg", "too large body: \d+ bytes", "too large body" ]
}
if [errinfo]
{
ruby {
code => "
new_event = LogStash::Event.new(Hash[event.get('errinfo').split(', ').map{|l| l.split(': ')}])
new_event.remove('@timestamp')
event.append(new_event)""
"
}
}
grok {
match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}(?:\?%{NGX_
URIPARAM:urlparam})?(?: HTTP/%{NUMBER:httpversion})"' }
patterns_dir =>["/etc/logstash/patterns"]
remove_field => [ "message", "errinfo", "request" ]
}
}
經過以上Logstash配置的Nginx錯誤日誌生成的事件如下所示:
{
"@version": "1",
"@timestamp": "2015-07-02T01:26:40.000Z",
"type": "nginx-error",
"errtype": "error",
"errmsg": "client intended to send too large body",
"fromhost": "web033.mweibo.yf.sinanode.com",
"client": "36.16.7.17",
"server": "api.v5.weibo.cn",
"host": "\"api.weibo.cn\"",
"verb": "POST",
"urlpath": "/2/client/addlog_batch",
"urlparam": "gsid=_2A254UNaSDeTxGeRI7FMX9CrEyj2IHXVZRG1arDV6PUJbrdANLROskWp9b
XakjUZM5792FW9A5S9EU4jxqQ..&wm=3333_2001&i=0c6f156&b=1&from=1053093010&c=
iphone&v_p=21&skin=default&v_f=1&s=8f14e573&lang=zh_CN&ua=iPhone7,1__weibo__
5.3.0__iphone__os8.3",
"httpversion": "1.1"
}
3.3 Postfix日誌
Postfix是Linux平台上最常用的郵件服務器軟件。郵件服務的運維複雜度一向較高,在此提供一個針對Postfix日誌的解析處理方案。方案出自:https://github.com/whyscream/postfix-grok-patterns。
因為Postfix默認通過syslog方式輸出日誌,所以可以選擇通過rsyslog直接轉發給Logstash,也可以由Logstash讀取rsyslog記錄的文件。
Postfix會根據實際日誌的不同,主動設置好不同的syslogtag,有anvil、bounce、cleanup、dnsblog、local、master、pickup、pipe、postdrop、postscreen、qmgr、scache、sendmail、smtp、lmtp、smtpd、tlsmgr、tlsproxy、trivial-rewrite和discard等20個不同的後綴,而在Logstash中,syslogtag通常被解析為program字段。本節以第一種anvil日誌的處理配置作為示例:
input {
syslog { }
}
filter {
if [program] =~ /^postfix.*\/anvil$/ {
grok {
patterns_dir =>["/etc/logstash/patterns.d"]
match => [ "message", "%{POSTFIX_ANVIL}" ]
tag_on_failure => [ "_grok_postfix_anvil_nomatch" ]
add_tag => [ "_grok_postfix_success" ]
}
}
mutate {
convert => [
"postfix_anvil_cache_size", "integer",
"postfix_anvil_conn_count", "integer",
"postfix_anvil_conn_rate", "integer",
]
}
}
配置中使用了一個叫POSTFIX_ANVIL的自定義grok正則,該正則及其相關正則內容如下所示。將這段grok正則保存成文本文件,放入/etc/logstash/patterns.d/目錄即可使用。
POSTFIX_TIME_UNIT %{NUMBER}[smhd]
POSTFIX_ANVIL_CONN_RATE statistics: max connection rate %{NUMBER:postfix_anvil_conn_
rate}/%{POSTFIX_TIME_UNIT:postfix_anvil_conn_period} for \(%{DATA:postfix_
service}:%{IP:postfix_client_ip}\) at %{SYSLOGTIMESTAMP:postfix_anvil_
timestamp}
POSTFIX_ANVIL_CONN_CACHE statistics: max cache size %{NUMBER:postfix_anvil_
cache_size} at %{SYSLOGTIMESTAMP:postfix_anvil_timestamp}
POSTFIX_ANVIL_CONN_COUNT statistics: max connection count %{NUMBER:postfix_
anvil_conn_count} for \(%{DATA:postfix_service}:%{IP:postfix_client_ip}\) at
%{SYSLOGTIMESTAMP:postfix_anvil_timestamp}
POSTFIX_ANVIL %{POSTFIX_ANVIL_CONN_RATE}|%{POSTFIX_ANVIL_CONN_CACHE}|%{POSTFIX_
ANVIL_CONN_COUNT}
其餘19種Postfix日誌的完整grok正則和Logstash過濾配置,讀者可以通過https://github.com/whyscream/postfix-grok-patterns獲取。
3.4 Ossec日誌
Ossec是一款開源的多平台入侵檢測係統。將Ossec的監測報警信息轉發到ELK中,無疑可以極大地幫助我們快速可視化安全事件。本節介紹Ossec與Logstash的結合方式。
3.4.1 配置所有Ossec agent采用syslog輸出
配置步驟如下:
1)編輯ossec.conf文件(默認為/var/ossec/etc/ossec.conf)。
2)在ossec.conf中添加下列內容(10.0.0.1為接收syslog的服務器):
<syslog_output>
<server>10.0.0.1</server>
<port>9000</port>
<format>default</format>
</syslog_output>
3)開啟Ossec允許syslog輸出功能:
/var/ossec/bin/ossec-control enable client-syslog
4)重啟Ossec服務:
/var/ossec/bin/ossec-control start
3.4.2 配置Logstash
在Logstash配置文件中增加(或新建)如下內容(假設10.0.0.1為Elasticsearch服務器):
input {
udp {
port => 9000
type =>"syslog"
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" =>"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:
syslog_host} %{DATA:syslog_program}: Alert Level: %{BASE10NUM:
Alert_Level}; Rule: %{BASE10NUM:Rule} - %{GREEDYDATA:Description};
Location: %{GREEDYDATA:Details}" }
add_field => [ "ossec_server", "%{host}" ]
}
mutate {
remove_field => [ "syslog_hostname", "syslog_message", "syslog_pid",
"message", "@version", "type", "host" ]
}
}
}
output {
elasticsearch {
}
}
3.4.3 推薦Kibana儀表盤
社區已經有人根據Ossec的常見需求製作了儀表盤,可以直接從Kibana 3頁麵加載使用,示例如圖3-1所示。
儀表盤的JSON文件見:https://github.com/magenx/Logstash/raw/master/kibana/kibana_dash-board.json。
加載方式請閱讀本書第三部分介紹的Kibana相關內容。
3.5 Windows係統日誌
Logstash社區有眾多的Windows用戶,本節單獨介紹一下對Windows平台係統日誌的收集處理。之前介紹過Linux上的係統日誌,即syslog的處理。事實上,對於Windows平台,也有類似syslog的設計,叫eventlog。本節介紹如何處理Windows eventlog。
3.5.1 采集端配置
由於Logstash作者出身Linux運維,早期版本中出了不少Windows平台上獨有的bug。所以,目前對Windows上的日誌,推薦大家在嚐試Logstash的同時,也可以試用更穩定的nxlog軟件。nxlog更詳細的介紹,請閱讀本書後麵5.5節。
這裏先介紹Logstash和nxlog在處理Windows的eventlog時的配置方法。
Logstash配置如下:
圖3-1 Ossec儀表盤
input {
eventlog {
#logfile => ["Application", "Security", "System"]
logfile => ["Security"]
type =>"winevent"
tags => [ "caen" ]
}
}
nxlog配置中有如下幾個要點:
1)ROOT位置必須是nxlog的實際安裝路徑。
2)輸入模塊,在Windows 2003及之前版本上,不叫im_msvistalog而叫im_mseventlog。
下麵是一段完整的nxlog配置示例:
define ROOT C:\Program Files (x86)\nxlog
Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log
<Extension json>
Module xm_json
</Extension>
<Input in>
Module im_msvistalog
Exec to_json();
</Input>
<Output out>
Module om_tcp
Host 10.66.66.66
Port 5140
</Output>
<Route 1>
Path in => out
</Route>
3.5.2 接收解析端配置
在中心的接收端,統一采用Logstash來完成解析入庫操作。如果采集端也是Logstash,主要字段都已經生成,接收端配置也就沒什麼特別的了。如果采集端是nxlog,那麼我們還需要把一些nxlog生成的字段轉換成Logstash更通用的風格設計。
在之前插件介紹章節我們已經講過,因為在Elasticsearch中默認按小寫來檢索,所以需要盡量把數據小寫化。不巧的是,nxlog中,不單數據內容,字段名稱也是大小寫混用的,所以,我們隻能通過logstash-filter-mutate的rename功能來完成對字段名稱的小寫化重命名。
配置示例如下:
input {
tcp {
codec =>"json"
port => 5140
tags => ["windows","nxlog"]
type =>"nxlog-json"
}
} # end input
filter {
if [type] == "nxlog-json" {
date {
match => ["[EventTime]", "YYYY-MM-dd HH:mm:ss"]
timezone =>"Europe/London"
}
mutate {
rename => [ "AccountName", "user" ]
rename => [ "AccountType", "[eventlog][account_type]" ]
rename => [ "ActivityId", "[eventlog][activity_id]" ]
rename => [ "Address", "ip6" ]
rename => [ "ApplicationPath", "[eventlog][application_path]" ]
rename => [ "AuthenticationPackageName", "[eventlog][authentication_package_name]" ]
rename => [ "Category", "[eventlog][category]" ]
rename => [ "Channel", "[eventlog][channel]" ]
rename => [ "Domain", "domain" ]
rename => [ "EventID", "[eventlog][event_id]" ]
rename => [ "EventType", "[eventlog][event_type]" ]
rename => [ "File", "[eventlog][file_path]" ]
rename => [ "Guid", "[eventlog][guid]" ]
rename => [ "Hostname", "hostname" ]
rename => [ "Interface", "[eventlog][interface]" ]
rename => [ "InterfaceGuid", "[eventlog][interface_guid]" ]
rename => [ "InterfaceName", "[eventlog][interface_name]" ]
rename => [ "IpAddress", "ip" ]
rename => [ "IpPort", "port" ]
rename => [ "Key", "[eventlog][key]" ]
rename => [ "LogonGuid", "[eventlog][logon_guid]" ]
rename => [ "Message", "message" ]
rename => [ "ModifyingUser", "[eventlog][modifying_user]" ]
rename => [ "NewProfile", "[eventlog][new_profile]" ]
rename => [ "OldProfile", "[eventlog][old_profile]" ]
rename => [ "Port", "port" ]
rename => [ "PrivilegeList", "[eventlog][privilege_list]" ]
rename => [ "ProcessID", "pid" ]
rename => [ "ProcessName", "[eventlog][process_name]" ]
rename => [ "ProviderGuid", "[eventlog][provider_guid]" ]
rename => [ "ReasonCode", "[eventlog][reason_code]" ]
rename => [ "RecordNumber", "[eventlog][record_number]" ]
rename => [ "ScenarioId", "[eventlog][scenario_id]" ]
rename => [ "Severity", "level" ]
rename => [ "SeverityValue", "[eventlog][severity_code]" ]
rename => [ "SourceModuleName", "nxlog_input" ]
rename => [ "SourceName", "[eventlog][program]" ]
rename => [ "SubjectDomainName", "[eventlog][subject_domain_name]" ]
rename => [ "SubjectLogonId", "[eventlog][subject_logonid]" ]
rename => [ "SubjectUserName", "[eventlog][subject_user_name]" ]
rename => [ "SubjectUserSid", "[eventlog][subject_user_sid]" ]
rename => [ "System", "[eventlog][system]" ]
rename => [ "TargetDomainName", "[eventlog][target_domain_name]" ]
rename => [ "TargetLogonId", "[eventlog][target_logonid]" ]
rename => [ "TargetUserName", "[eventlog][target_user_name]" ]
rename => [ "TargetUserSid", "[eventlog][target_user_sid]" ]
rename => [ "ThreadID", "thread" ]
}
mutate {
remove_field => [
"CurrentOrNextState","Description","EventReceivedTime","EventTime","EventTimeWritten","IPVersion","KeyLength","Keywords","LmPackageName","LogonProcessName","LogonType","Name","O-pcode","OpcodeValue","PolicyProcessingMode","Protocol","Prot-ocolType","SourceModuleType","State","Task","TransmittedSe- rvices","Type","UserID","Version"
]
}
}
}
3.6 Java日誌
之前在2.2節有關codec的介紹中曾經提到過,對Java日誌,除了使用multiline做多行日誌合並以外,還可以直接通過Log4J寫入logstash裏。本節就講述如何在Java應用環境做到這點。
3.6.1 Log4J配置
首先,需要配置Java應用的Log4J設置,啟動一個內置的SocketAppender。修改應用的log4j.xml配置文件,添加如下配置段:
<appender name="LOGSTASH" >
<param name="RemoteHost" value="logstash_hostname" />
<param name="ReconnectionDelay" value="60000" />
<param name="LocationInfo" value="true" />
<param name="Threshold" value="DEBUG" />
</appender>
然後把這個新定義的appender對象加入root logger裏,可以跟其他已有logger共存:
<root>
<level value="INFO"/>
<appender-ref ref="OTHERPLACE"/>
<appender-ref ref="LOGSTASH"/>
</root>
如果是log4j.properties配置文件,則對應配置如下:
log4j.rootLogger=DEBUG, logstash
###SocketAppender###
log4j.appender.logstash=org.apache.log4j.net.SocketAppender
log4j.appender.logstash.Port=4560
log4j.appender.logstash.RemoteHost=logstash_hostname
log4j.appender.logstash.ReconnectionDelay=60000
log4j.appender.logstash.LocationInfo=true
Log4J會持續嚐試連接你配置的logstash_hostname這個地址,建立連接後,即開始發送日誌數據。
3.6.2 Logstash配置
Java應用端的配置完成以後,開始設置Logstash的接收端。配置如下所示,其中4560端口是Log4J SocketAppender的默認對端端口:
input {
log4j {
type =>"log4j-json"
port => 4560
}
}
3.6.3 異常堆棧測試驗證
運行Logstash後,編寫一個簡單的Log4J程序:
import org.apache.log4j.Logger;
public class HelloExample{
final static Logger logger = Logger.getLogger(HelloExample.class);
public static void main(String[] args) {
HelloExample obj = new HelloExample();
try{
obj.divide();
}catch(ArithmeticException ex){
logger.error("Sorry, something wrong!", ex);
}
}
private void divide(){
int i = 10 /0;
}
}
編譯運行:
# javac -cp ./logstash-1.5.0.rc2/vendor/bundle/jruby/1.9/gems/logstash-input-
log4j-0.1.3-java/lib/log4j/log4j/1.2.17/log4j-1.2.17.jar HelloExample.java
# java -cp .:./logstash-1.5.0.rc2/vendor/bundle/jruby/1.9/gems/logstash-input-
log4j-0.1.3-java/lib/log4j/log4j/1.2.17/log4j-1.2.17.jar HelloExample
這樣即可在Logstash的終端輸出看到如下事件記錄:
{
"message" =>"Sorry, something wrong!",
"@version" =>"1",
"@timestamp" =>"2015-07-02T13:24:45.727Z",
"type" =>"log4j-json",
"host" =>"127.0.0.1:52420",
"path" =>"HelloExample",
"priority" =>"ERROR",
"logger_name" =>"HelloExample",
"thread" =>"main",
"class" =>"HelloExample",
"file" =>"HelloExample.java:9",
"method" =>"main",
"stack_trace" =>"java.lang.ArithmeticException: / by zero\n\tat HelloExample.
divide(HelloExample.java:13)\n\tat HelloExample.main(HelloExample.java:7)"
}
可以看到,異常堆棧直接記錄在單行內了。
3.6.4 JSON Event layout
如果無法采用SocketAppender,必須使用文件方式的,其實Log4J有一個layout特性,用來控製日誌輸出的格式。和Nginx日誌自己拚接JSON輸出類似,也可以通過layout功能記錄成JSON格式。
Logstash官方提供了擴展包,可以通過mvnrepository.com搜索下載:
# wget https://central.maven.org/maven2/net/logstash/log4j/jsonevent-layout/1.7/
jsonevent-layout-1.7.jar
或者直接編輯自己項目的pom.xml添加依賴:
<dependency>
<groupId>net.logstash.log4j</groupId>
<artifactId>jsonevent-layout</artifactId>
<version>1.7</version>
</dependency>
然後修改項目的log4j.properties文件如下:
log4j.rootCategory=WARN, RollingLog
log4j.appender.RollingLog=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingLog.Threshold=TRACE
log4j.appender.RollingLog.File=api.log
log4j.appender.RollingLog.DatePattern=.yyyy-MM-dd
log4j.appender.RollingLog.layout=net.logstash.log4j.JSONEventLayoutV1
如果是log4j.xml,則修改如下:
<appender name="Console" >
<param name="Threshold" value="TRACE" />
<layout />
</appender>
生成的文件就是符合Logstash標準的JSON格式了,Logstash使用下麵配置讀取:
input {
file {
codec => json
path => ["/path/to/log4j.log"]
}
}
生成的Logstash事件如下:
{
"mdc":{},
"line_number":"29",
"class":"org.eclipse.jetty.examples.logging.EchoFormServlet",
"@version":1,
"source_host":"jvstratusmbp.local",
"thread_name":"qtp513694835-14",
"message":"Got request from 0:0:0:0:0:0:0:1%0 using Mozilla\/5.0 (Macintosh;Intel Mac OS X 10_9_1) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/32.0.1700.77 Safari\/537.36",
"@timestamp":"2014-01-27T19:52:35.738Z",
"level":"INFO",
"file":"EchoFormServlet.java",
"method":"doPost",
"logger_name":"org.eclipse.jetty.examples.logging.EchoFormServlet"
}
可以看到,同樣達到了效果。
如果你使用的不是Log4J而是logback項目來記錄Java日誌,Logstash官方也有類似的擴展包,在pom.xml中改成如下定義即可:
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.4</version>
</dependency>
3.7 MySQL慢查詢日誌
MySQL有多種日誌可以記錄,常見的有error log、slow log、general log、bin log等。其中slow log作為性能監控和優化的入手點,最為首要。本節即討論如何用Logstash處理slow log。至於general log,格式處理基本類似,不過由於general量級比slow大得多,推薦采用packetbeat協議解析的方式更高效地完成這項工作,相關內容閱讀本書稍後8.3節。
MySQL slow log 的Logstash處理配置示例如下:
input {
file {
type =>"mysql-slow"
path =>"/var/log/mysql/mysql-slow.log"
codec => multiline {
pattern =>"^# User@Host:"
negate => true
what =>"previous"
}
}
}
filter {
# drop sleep events
grok {
match => { "message" =>"SELECT SLEEP" }
add_tag => [ "sleep_drop" ]
tag_on_failure => [] # prevent default _grokparsefailure tag on real records
}
if "sleep_drop" in [tags] {
drop {}
}
grok {
match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clien-thost>\S*) )?\[(?:%{IP:clientip})?\]\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n# Time:.*$" ]
}
date {
match => [ "timestamp", "UNIX" ]
remove_field => [ "timestamp" ]
}
}
配置中,利用了grok插件的add_tag選項僅在成功時添加,而tag_on_failure選項僅在失敗時添加的互斥特性,巧妙地過濾出日誌中無用的sleep語句刪除掉。
如下一段多行的MySQL slow log:
# User@Host: logstash[logstash] @ localhost [127.0.0.1]
# Query_time: 5.310431 Lock_time: 0.029219 Rows_sent: 1 Rows_examined: 24575727
SET timestamp=1393963146;
select count(*) from node join variable order by rand();
# Time: 140304 19:59:14
通過運行上麵的配置,Logstash即可處理成如下單個事件:
{
"@timestamp" =>"2014-03-04T19:59:06.000Z",
"message" =>"# User@Host: logstash[logstash] @ localhost [127.0.0.1]\n# Query_
time: 5.310431 Lock_time: 0.029219 Rows_sent: 1 Rows_examined: 24575727\nSET
timestamp=1393963146;\nselect count(*) from node join variable order by rand();
\n# Time: 140304 19:59:14",
"@version" =>"1",
"tags" => [
[0] "multiline"
],
"type" =>"mysql-slow",
"host" =>"raochenlindeMacBook-Air.local",
"path" =>"/var/log/mysql/mysql-slow.log",
"user" =>"logstash",
"clienthost" =>"localhost",
"clientip" =>"127.0.0.1",
"query_time" => 5.310431,
"lock_time" => 0.029219,
"rows_sent" => 1,
"rows_examined" => 24575727,
"query" =>"select count(*) from node join variable order by rand();",
"action" =>"select"
}
後續即可針對其中的action、query_time、lock_time和rows_examined字段做監控報警及Kibana可視化統計了。
3.8 Docker日誌
Docker是目前大規模互聯網基礎架構解決方案中最熱門的技術。它帶給運維工程師一個截然不同的思考角度和工作方式。
就日誌層麵看,Docker最大的影響在於:其最佳實踐要求一個容器內部隻有一個生命周期隨時可以消亡的服務進程。這也就意味著:傳統的寫入磁盤,固定采集方式的日誌係統,無法正常發揮作用。所以,在容器服務中,記錄日誌需要采用另外的方式。本節將介紹其中最常見的兩種:記錄到主機磁盤,或通過logspout收集。
3.8.1 記錄到主機磁盤
默認情況下,Docker會將容器的標準輸出和錯誤輸出,保存在主機的/var/lib/docker/containers/目錄下。所以,在規模比較穩定的情況下,直接記錄到主機磁盤,然後通過主機上的Logstash收集日誌,也是不錯的方案。
以Nginx為例,將Nginx訪問日誌和錯誤日誌輸出到標準輸出的配置如下:
daemon off;
error_log /dev/stdout info;
http {
access_log /dev/stdout;
...
}
不過,容器的特殊性在這裏又一次體現出來,容器中其實是沒有/dev/stdout設備的。所以我們需要自己單獨處理一下,在Dockerflie裏加上一句:
RUN ln -sf /proc/self/fd /dev/
這樣,既保證了nginx.conf是主機和容器通用的配置,又順利達到目的。
然後通過如下Logstash配置收集即可:
input {
file {
path => ["/var/lib/docker/containers/*/*-json.log"]
codec => json
}
}
filter {
grok {
match => ["path"。"/(?<container_id>\w+)-json.log" ]
remove_field => ["path"]
}
date {
match => ["time", "ISO8601"]
}
}
3.8.2 通過logspout收集
logspout是Docker生態圈中最有名的日誌收集方式,其設計思路是:每個主機上啟動一個單獨容器運行logspout服務,負責將同一個主機上其他容器的日誌,根據route設定,轉發給不同的接收端。
logspout的基本用法如下:
$ docker pull gliderlabs/logspout:latest
$ docker run --name="logspout" \
--volume=/var/run/docker.sock:/tmp/docker.sock \
gliderlabs/logspout \
--publish=127.0.0.1:8000:80
syslog://remoteaddr:514
此外,logspout提供動態變更route的方式,如下所示:
# curl $(docker port `docker ps -lq` 8000)/routes \
-X POST \
-d '{"source": {"filter_name": "*_db", "types": ["stderr"]}, "target":
{"type": "syslog", "addr": "remoteaddr2:5140"}}'
這個配置的意思是,將容器名帶有db字樣的走錯誤輸出的采集的日誌,以syslog協議發送到remoteaddr2主機的5140端口。
注意,logspout采用的是RFC5424版本的syslog協議,所以如果使用的接收方是RFC3164版本的syslog協議解析,需要自己調整一下。比如logstash-input-syslog采用的就是RFC3164協議,所以需要自己來另外完成:
input {
tcp {
port => 5140
}
}
filter {
grok {
match => [“message”, “<SYSLOG5424PRI:syslog_pri> %{SYSLOG5424LINE:message}“]
}
}
此外,logspout支持模塊化擴展,所以,我們也可以直接在logspout上處理成對Logstash更友好的格式。擴展logspout支持Logstash格式的方法如下:
1)編輯Dockerfile,修改成如下內容:
FROM gliderlabs/logspout:master
ENV ROUTE_URIS=logstash://host:port
2)編輯modules.go,修改成如下內容:
package main
import (
_ "github.com/looplab/logspout-logstash"
_ "github.com/gliderlabs/logspout/transports/udp"
)
3)構建鏡像:
docker build
這樣,後續Logstash就直接進行JSON解析即可。
最後更新:2017-05-19 15:02:48