《ELK Stack权威指南 》第3章 场景示例
场 景 示 例
前面虽然介绍了几十个Logstash插件的常见配置项,但是过多的选择下,如何组合使用这些插件,依然是一部分用户的难题。本章将列举一些最常见的日志场景,演示针对性的组件搭配,希望能给读者带来启发。
本章介绍的场景包括:Nginx访问日志、Nginx错误日志、Postfix日志、Ossec日志、Windows系统日志、Java日志、MySQL慢查询日志、Docker容器日志。
3.1 Nginx访问日志
访问日志处理分析绝对是使用ELK stack时最常见的需求。默认的处理方式下,性能和精确度都不够好。本节会列举对Nginx访问日志的几种不同处理方式,并阐明其优劣。
3.1.1 grok处理方式
Logstash默认自带了Apache标准日志的grok正则表达式:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{NOTSPACE:auth}\[%{HTTPDATE:
timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?
|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
对于Nginx标准日志格式,可以发现只是最后多了一个$http_x_forwarded_for变量。所以Nginx标准日志的grok正则定义是:
MAINNGINXLOG %{COMBINEDAPACHELOG} %{QS:x_forwarded_for}
自定义的日志格式,可以照此修改。
3.1.2 split处理方式
Nginx日志因为部分变量中内含空格,所以很多时候只能使用%{QS}正则来做分隔,性能和细度都不太好。如果能自定义一个比较少见的字符作为分隔符,那么处理起来就简单多了。假设定义的日志格式如下:
log_format main "$http_x_forwarded_for | $time_local | $request | $status |
$body_bytes_sent | "
"$request_body | $content_length | $http_referer | $http_user_agent | $nuid | "
"$http_cookie | $remote_addr | $hostname | $upstream_addr | $upstream_response_
time | $request_time";
实际日志如下:
117.136.9.248 | 08/Apr/2015:16:00:01 +0800 | POST /notice/newmessage?sign=cba4f614e05db285850cadc696fcdad0&token=JAGQ92Mjs3--gik_b_DsPIQHcyMKYGpD&did=b749736ac70f12df700b18cd6d051d5&osn=android&osv=4.0.4&appv=3.0.1&net=460-02-2g&longitude=120.393006&latitude=36.178329&ch=360&lp=1&ver=1&ts=1428479998151&im=869736012353958&sw=0&sh=0&la=zh-CN&lm=weixin&dt=vivoS11tHTTP/1.1| 200 | 132 | abcd-sign-v1://dd03c57f8cb6f1cef919ab5df66f2903f:d51asq5yslwnyz5t/{\x22type\x22:4,\x22uid\x22:7567306} | 89 | - | abcd/3.0.1, Android/4.0.4, vivo S11t | nuid=0C0A0A0A01E02455EA7CF47E02FD072C1428480001.157| - | 10.10.10.13 | bnx02.abcdprivate.com | 10.10.10.22:9999 | 0.022 | 0.022 59.50.44.53 | 08/Apr/2015:16:00:01 +0800 | POST /feed/pubList?appv=3.0.3&did=89da72550de488328e2aba5d97850e9f&dt=iPhone6%2C2&im=B48C21F3-487E-4071-9742-DC6D61710888&la=cn&latitude=0.000000&lm=weixin&longitude=0.000000&lp=-1.000000&net=0-0-wifi&osn=iOS&osv=8.1.3&sh=568.000000&sw=320.000000&token=7NobA7asg3Jb6n9o4ETdPXyNNiHwMs4J&ts=1428480001275 HTTP/1.1 | 200 | 983 | abcd-sign-v1://b398870a0b25b29aae65cd553addc43d:72214ee85d7cca22/{\x22nextkey\x22:\x22\x22,\x22uid\x22:\x2213062545\x22,\x22token\x22:\x227NobA7asg3Jb6n9o4ETdPXyNNiHwMs4J\x22}| 139 | - | Shopping/3.0.3 (iPhone; iOS 8.1.3; Scale/2.00) | nuid=0C0A0A0A81-DF2455017D548502E48E2E1428480001.154 | nuid=CgoKDFUk34GFVH0BLo7kAg== | 10.10.10.11 | bnx02.abcdprivate.com | 10.10.10.35:9999 | 0.025 | 0.026
然后还可以针对request做更细致的切分。比如URL参数部分。很明显,URL参数中的字段顺序是乱的。第一行问号之后的第一个字段是sign,第二行问号之后的第一个字段是appv。所以需要将字段进行切分,取出每个字段对应的值。官方自带grok满足不了要求,最终采用的Logstash配置如下:
filter {
ruby {
init =>"@kname =['http_x_forwarded_for','time_local','request','status',
'body_bytes_sent','request_body','content_length','http_referer','http_
user_agent','nuid','http_cookie','remote_addr','hostname','upstream_
addr','upstream_response_time','request_time']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
if [request] {
ruby {
init =>"@kname = ['method','uri','verb']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split(' '))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
if [uri] {
ruby {
init =>"@kname = ['url_path','url_args']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
kv {
prefix =>"url_"
source =>"url_args"
field_split =>"&"
remove_field => [ "url_args","uri","request" ]
}
}
}
mutate {
convert => [
"body_bytes_sent" , "integer",
"content_length", "integer",
"upstream_response_time", "float",
"request_time", "float"
]
}
date {
match => [ "time_local", "dd/MMM/yyyy:hh:mm:ss Z" ]
locale =>"en"
}
}
最终结果如下:
{
"message" =>"1.43.3.188 | 08/Apr/2015:16:00:01 +0800 | POST /search/sug
gest?appv=3.0.3&did=dfd5629d705d400795f698055806f01d&dt=iPhone7%2C2&im=
AC926907-27AA-4A10-9916-C5DC75F29399&la=cn&latitude=-33.903867&lm=
sina&longitude=151.208137&lp=-1.000000&net=0-0-wifi&osn=iOS&osv=8.1.3&sh=66
7.000000&sw=375.000000&token=_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO&ts=
1428480001567 HTTP/1.1 | 200 | 353 | abcd-sign-v1://a24b478486d3bb92ed89a-
901541b60a5:b23e9d2c14fe6755/{\\x22key\\x22:\\x22last\\x22,\\x22offset\\x22:
\\x220\\x22,\\x22token\\x22:\\x22_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO\\x22,
\\x22limit\\x22:\\x2220\\x22} | 148 | - | abcdShopping/3.0.3 (iPhone; iOS
8.1.3; Scale/2.00) | nuid=0B0A0A0A9A64AF54F97634640230944E1428480001.113
| nuid=CgoKC1SvZJpkNHb5TpQwAg== | 10.10.10.11 | bnx02.abcdprivate.com |
10.10.10.26:9999 | 0.070 | 0.071",
"@version" =>"1",
"@timestamp" =>"2015-04-08T08:00:01.000Z",
"type" =>"nginxapiaccess",
"host" =>"blog05.abcdprivate.com",
"path" =>"/home/nginx/logs/api.access.log",
"http_x_forwarded_for" =>"1.43.3.188",
"time_local" =>" 08/Apr/2015:16:00:01 +0800",
"status" =>"200",
"body_bytes_sent" => 353,
"request_body" =>"abcd-sign-v1://a24b478486d3bb92ed89a901541b60a5:b23e9d2c1
4fe6755/{\\x22key\\x22:\\x22last\\x22,\\x22offset\\x22:\\x220\\x22,\\x22token
\\x22:\\x22_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO\\x22,\\x22limit\\x22:\\x2220\\x22}",
"content_length" => 148,
"http_referer" =>"-",
"http_user_agent" =>"abcdShopping/3.0.3 (iPhone; iOS 8.1.3; Scale/2.00)",
"nuid" =>"nuid=0B0A0A0A9A64AF54F97634640230944E1428480001.113",
"http_cookie" =>"nuid=CgoKC1SvZJpkNHb5TpQwAg==",
"remote_addr" =>"10.10.10.11",
"hostname" =>"bnx02.abcdprivate.com",
"upstream_addr" =>"10.10.10.26:9999",
"upstream_response_time" => 0.070,
"request_time" => 0.071,
"method" =>"POST",
"verb" =>"HTTP/1.1",
"url_path" =>"/search/suggest",
"url_appv" =>"3.0.3",
"url_did" =>"dfd5629d705d400795f698055806f01d",
"url_dt" =>"iPhone7%2C2",
"url_im" =>"AC926907-27AA-4A10-9916-C5DC75F29399",
"url_la" =>"cn",
"url_latitude" =>"-33.903867",
"url_lm" =>"sina",
"url_longitude" =>"151.208137",
"url_lp" =>"-1.000000",
"url_net" =>"0-0-wifi",
"url_osn" =>"iOS",
"url_osv" =>"8.1.3",
"url_sh" =>"667.000000",
"url_sw" =>"375.000000",
"url_token" =>"_ovaPz6Ue68ybBuhXustPbG-xf1WbsPO",
"url_ts" =>"1428480001567"
}
如果URL参数过多,可以不使用kv切分,或者预先定义成nested object后改成数组形式:
if [uri] {
ruby {
init =>"@kname = ['url_path','url_args']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
if [url_args] {
ruby {
init => "@kname = ['key','value']"
code => "event.set('nested_args', event.get('url_args').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
remove_field => [ "url_args","uri","request" ]
}
}
}
采用nested object的优化原理和nested object的使用方式,请阅读后面第11章中介绍Elasticsearch调优的内容。
3.1.3 JSON格式
自定义分隔符虽好,但是配置写起来毕竟复杂很多。其实对Logstash来说,Nginx日志还有另一种更简便的处理方式,就是自定义日志格式时,通过手工拼写直接输出成JSON 格式:
log_format json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"url":"$uri",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"agent":"$http_user_agent",'
'"status":"$status"}';
然后采用下面的Logstash配置即可:
input {
file {
path =>"/var/log/nginx/access.log"
codec => json
}
}
filter {
mutate {
split => [ "upstreamtime", "," ]
}
mutate {
convert => [ "upstreamtime", "float" ]
}
}
这里采用多个mutate插件,是因为upstreamtime可能有多个数值,所以先切割成数组以后,再分别转换成浮点型数值。而在mutate中,convert函数的执行优先级高于split函数,所以只能分开两步写。mutate内各函数的优先级顺序,之前2.3.8节有详细说明,读者可以返回去阅读。
3.1.4 syslog方式发送
Nginx从1.7版开始,加入了syslog支持,Tengine则更早。这样,我们可以通过syslog直接发送日志。Nginx上的配置如下:
access_log syslog:server=unix:/data0/rsyslog/nginx.sock locallog;
或者直接发送给远程Logstash机器:
access_log syslog:server=192.168.0.2:5140,facility=local6,tag=nginx-access,
severity=info logstashlog;
默认情况下,Nginx将使用local7.info等级,以nginx为标签发送数据。注意,采用syslog发送日志的时候,无法配置buffer=16k选项。
3.2 Nginx错误日志
Nginx错误日志是运维人员最常见但又极其容易忽略的日志类型之一。本节介绍对Nginx错误日志的处理方式,并推荐读者在性能优化中对此多加关注。Nginx错误日志既没有统一明确的分隔符,也没有特别方便的正则模式,但通过Logstash不同插件的组合,还是可以轻松进行数据处理的。
值得注意的是,Nginx错误日志中有一类数据是接收过大请求体时的报错,默认信息会把请求体的具体字节数记录下来。每次请求的字节数基本都是在变化的,这意味着常用的topN等聚合函数对该字段没有明显效果。所以,对此需要做一下特殊处理。
最后形成的Logstash配置如下所示:
filter {
grok {
match => { "message" =>"(?<datetime>\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d)
\[(?<errtype>\w+)\] \S+: \*\d+ (?<errmsg>[^,]+), (?<errinfo>.*)$" }
}
mutate {
rename => [ "host", "fromhost" ]
gsub => [ "errmsg", "too large body: \d+ bytes", "too large body" ]
}
if [errinfo]
{
ruby {
code => "
new_event = LogStash::Event.new(Hash[event.get('errinfo').split(', ').map{|l| l.split(': ')}])
new_event.remove('@timestamp')
event.append(new_event)""
"
}
}
grok {
match => { "request" => '"%{WORD:verb} %{URIPATH:urlpath}(?:\?%{NGX_
URIPARAM:urlparam})?(?: HTTP/%{NUMBER:httpversion})"' }
patterns_dir =>["/etc/logstash/patterns"]
remove_field => [ "message", "errinfo", "request" ]
}
}
经过以上Logstash配置的Nginx错误日志生成的事件如下所示:
{
"@version": "1",
"@timestamp": "2015-07-02T01:26:40.000Z",
"type": "nginx-error",
"errtype": "error",
"errmsg": "client intended to send too large body",
"fromhost": "web033.mweibo.yf.sinanode.com",
"client": "36.16.7.17",
"server": "api.v5.weibo.cn",
"host": "\"api.weibo.cn\"",
"verb": "POST",
"urlpath": "/2/client/addlog_batch",
"urlparam": "gsid=_2A254UNaSDeTxGeRI7FMX9CrEyj2IHXVZRG1arDV6PUJbrdANLROskWp9b
XakjUZM5792FW9A5S9EU4jxqQ..&wm=3333_2001&i=0c6f156&b=1&from=1053093010&c=
iphone&v_p=21&skin=default&v_f=1&s=8f14e573&lang=zh_CN&ua=iPhone7,1__weibo__
5.3.0__iphone__os8.3",
"httpversion": "1.1"
}
3.3 Postfix日志
Postfix是Linux平台上最常用的邮件服务器软件。邮件服务的运维复杂度一向较高,在此提供一个针对Postfix日志的解析处理方案。方案出自:https://github.com/whyscream/postfix-grok-patterns。
因为Postfix默认通过syslog方式输出日志,所以可以选择通过rsyslog直接转发给Logstash,也可以由Logstash读取rsyslog记录的文件。
Postfix会根据实际日志的不同,主动设置好不同的syslogtag,有anvil、bounce、cleanup、dnsblog、local、master、pickup、pipe、postdrop、postscreen、qmgr、scache、sendmail、smtp、lmtp、smtpd、tlsmgr、tlsproxy、trivial-rewrite和discard等20个不同的后缀,而在Logstash中,syslogtag通常被解析为program字段。本节以第一种anvil日志的处理配置作为示例:
input {
syslog { }
}
filter {
if [program] =~ /^postfix.*\/anvil$/ {
grok {
patterns_dir =>["/etc/logstash/patterns.d"]
match => [ "message", "%{POSTFIX_ANVIL}" ]
tag_on_failure => [ "_grok_postfix_anvil_nomatch" ]
add_tag => [ "_grok_postfix_success" ]
}
}
mutate {
convert => [
"postfix_anvil_cache_size", "integer",
"postfix_anvil_conn_count", "integer",
"postfix_anvil_conn_rate", "integer",
]
}
}
配置中使用了一个叫POSTFIX_ANVIL的自定义grok正则,该正则及其相关正则内容如下所示。将这段grok正则保存成文本文件,放入/etc/logstash/patterns.d/目录即可使用。
POSTFIX_TIME_UNIT %{NUMBER}[smhd]
POSTFIX_ANVIL_CONN_RATE statistics: max connection rate %{NUMBER:postfix_anvil_conn_
rate}/%{POSTFIX_TIME_UNIT:postfix_anvil_conn_period} for \(%{DATA:postfix_
service}:%{IP:postfix_client_ip}\) at %{SYSLOGTIMESTAMP:postfix_anvil_
timestamp}
POSTFIX_ANVIL_CONN_CACHE statistics: max cache size %{NUMBER:postfix_anvil_
cache_size} at %{SYSLOGTIMESTAMP:postfix_anvil_timestamp}
POSTFIX_ANVIL_CONN_COUNT statistics: max connection count %{NUMBER:postfix_
anvil_conn_count} for \(%{DATA:postfix_service}:%{IP:postfix_client_ip}\) at
%{SYSLOGTIMESTAMP:postfix_anvil_timestamp}
POSTFIX_ANVIL %{POSTFIX_ANVIL_CONN_RATE}|%{POSTFIX_ANVIL_CONN_CACHE}|%{POSTFIX_
ANVIL_CONN_COUNT}
其余19种Postfix日志的完整grok正则和Logstash过滤配置,读者可以通过https://github.com/whyscream/postfix-grok-patterns获取。
3.4 Ossec日志
Ossec是一款开源的多平台入侵检测系统。将Ossec的监测报警信息转发到ELK中,无疑可以极大地帮助我们快速可视化安全事件。本节介绍Ossec与Logstash的结合方式。
3.4.1 配置所有Ossec agent采用syslog输出
配置步骤如下:
1)编辑ossec.conf文件(默认为/var/ossec/etc/ossec.conf)。
2)在ossec.conf中添加下列内容(10.0.0.1为接收syslog的服务器):
<syslog_output>
<server>10.0.0.1</server>
<port>9000</port>
<format>default</format>
</syslog_output>
3)开启Ossec允许syslog输出功能:
/var/ossec/bin/ossec-control enable client-syslog
4)重启Ossec服务:
/var/ossec/bin/ossec-control start
3.4.2 配置Logstash
在Logstash配置文件中增加(或新建)如下内容(假设10.0.0.1为Elasticsearch服务器):
input {
udp {
port => 9000
type =>"syslog"
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" =>"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:
syslog_host} %{DATA:syslog_program}: Alert Level: %{BASE10NUM:
Alert_Level}; Rule: %{BASE10NUM:Rule} - %{GREEDYDATA:Description};
Location: %{GREEDYDATA:Details}" }
add_field => [ "ossec_server", "%{host}" ]
}
mutate {
remove_field => [ "syslog_hostname", "syslog_message", "syslog_pid",
"message", "@version", "type", "host" ]
}
}
}
output {
elasticsearch {
}
}
3.4.3 推荐Kibana仪表盘
社区已经有人根据Ossec的常见需求制作了仪表盘,可以直接从Kibana 3页面加载使用,示例如图3-1所示。
仪表盘的JSON文件见:https://github.com/magenx/Logstash/raw/master/kibana/kibana_dash-board.json。
加载方式请阅读本书第三部分介绍的Kibana相关内容。
3.5 Windows系统日志
Logstash社区有众多的Windows用户,本节单独介绍一下对Windows平台系统日志的收集处理。之前介绍过Linux上的系统日志,即syslog的处理。事实上,对于Windows平台,也有类似syslog的设计,叫eventlog。本节介绍如何处理Windows eventlog。
3.5.1 采集端配置
由于Logstash作者出身Linux运维,早期版本中出了不少Windows平台上独有的bug。所以,目前对Windows上的日志,推荐大家在尝试Logstash的同时,也可以试用更稳定的nxlog软件。nxlog更详细的介绍,请阅读本书后面5.5节。
这里先介绍Logstash和nxlog在处理Windows的eventlog时的配置方法。
Logstash配置如下:
图3-1 Ossec仪表盘
input {
eventlog {
#logfile => ["Application", "Security", "System"]
logfile => ["Security"]
type =>"winevent"
tags => [ "caen" ]
}
}
nxlog配置中有如下几个要点:
1)ROOT位置必须是nxlog的实际安装路径。
2)输入模块,在Windows 2003及之前版本上,不叫im_msvistalog而叫im_mseventlog。
下面是一段完整的nxlog配置示例:
define ROOT C:\Program Files (x86)\nxlog
Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log
<Extension json>
Module xm_json
</Extension>
<Input in>
Module im_msvistalog
Exec to_json();
</Input>
<Output out>
Module om_tcp
Host 10.66.66.66
Port 5140
</Output>
<Route 1>
Path in => out
</Route>
3.5.2 接收解析端配置
在中心的接收端,统一采用Logstash来完成解析入库操作。如果采集端也是Logstash,主要字段都已经生成,接收端配置也就没什么特别的了。如果采集端是nxlog,那么我们还需要把一些nxlog生成的字段转换成Logstash更通用的风格设计。
在之前插件介绍章节我们已经讲过,因为在Elasticsearch中默认按小写来检索,所以需要尽量把数据小写化。不巧的是,nxlog中,不单数据内容,字段名称也是大小写混用的,所以,我们只能通过logstash-filter-mutate的rename功能来完成对字段名称的小写化重命名。
配置示例如下:
input {
tcp {
codec =>"json"
port => 5140
tags => ["windows","nxlog"]
type =>"nxlog-json"
}
} # end input
filter {
if [type] == "nxlog-json" {
date {
match => ["[EventTime]", "YYYY-MM-dd HH:mm:ss"]
timezone =>"Europe/London"
}
mutate {
rename => [ "AccountName", "user" ]
rename => [ "AccountType", "[eventlog][account_type]" ]
rename => [ "ActivityId", "[eventlog][activity_id]" ]
rename => [ "Address", "ip6" ]
rename => [ "ApplicationPath", "[eventlog][application_path]" ]
rename => [ "AuthenticationPackageName", "[eventlog][authentication_package_name]" ]
rename => [ "Category", "[eventlog][category]" ]
rename => [ "Channel", "[eventlog][channel]" ]
rename => [ "Domain", "domain" ]
rename => [ "EventID", "[eventlog][event_id]" ]
rename => [ "EventType", "[eventlog][event_type]" ]
rename => [ "File", "[eventlog][file_path]" ]
rename => [ "Guid", "[eventlog][guid]" ]
rename => [ "Hostname", "hostname" ]
rename => [ "Interface", "[eventlog][interface]" ]
rename => [ "InterfaceGuid", "[eventlog][interface_guid]" ]
rename => [ "InterfaceName", "[eventlog][interface_name]" ]
rename => [ "IpAddress", "ip" ]
rename => [ "IpPort", "port" ]
rename => [ "Key", "[eventlog][key]" ]
rename => [ "LogonGuid", "[eventlog][logon_guid]" ]
rename => [ "Message", "message" ]
rename => [ "ModifyingUser", "[eventlog][modifying_user]" ]
rename => [ "NewProfile", "[eventlog][new_profile]" ]
rename => [ "OldProfile", "[eventlog][old_profile]" ]
rename => [ "Port", "port" ]
rename => [ "PrivilegeList", "[eventlog][privilege_list]" ]
rename => [ "ProcessID", "pid" ]
rename => [ "ProcessName", "[eventlog][process_name]" ]
rename => [ "ProviderGuid", "[eventlog][provider_guid]" ]
rename => [ "ReasonCode", "[eventlog][reason_code]" ]
rename => [ "RecordNumber", "[eventlog][record_number]" ]
rename => [ "ScenarioId", "[eventlog][scenario_id]" ]
rename => [ "Severity", "level" ]
rename => [ "SeverityValue", "[eventlog][severity_code]" ]
rename => [ "SourceModuleName", "nxlog_input" ]
rename => [ "SourceName", "[eventlog][program]" ]
rename => [ "SubjectDomainName", "[eventlog][subject_domain_name]" ]
rename => [ "SubjectLogonId", "[eventlog][subject_logonid]" ]
rename => [ "SubjectUserName", "[eventlog][subject_user_name]" ]
rename => [ "SubjectUserSid", "[eventlog][subject_user_sid]" ]
rename => [ "System", "[eventlog][system]" ]
rename => [ "TargetDomainName", "[eventlog][target_domain_name]" ]
rename => [ "TargetLogonId", "[eventlog][target_logonid]" ]
rename => [ "TargetUserName", "[eventlog][target_user_name]" ]
rename => [ "TargetUserSid", "[eventlog][target_user_sid]" ]
rename => [ "ThreadID", "thread" ]
}
mutate {
remove_field => [
"CurrentOrNextState","Description","EventReceivedTime","EventTime","EventTimeWritten","IPVersion","KeyLength","Keywords","LmPackageName","LogonProcessName","LogonType","Name","O-pcode","OpcodeValue","PolicyProcessingMode","Protocol","Prot-ocolType","SourceModuleType","State","Task","TransmittedSe- rvices","Type","UserID","Version"
]
}
}
}
3.6 Java日志
之前在2.2节有关codec的介绍中曾经提到过,对Java日志,除了使用multiline做多行日志合并以外,还可以直接通过Log4J写入logstash里。本节就讲述如何在Java应用环境做到这点。
3.6.1 Log4J配置
首先,需要配置Java应用的Log4J设置,启动一个内置的SocketAppender。修改应用的log4j.xml配置文件,添加如下配置段:
<appender name="LOGSTASH" >
<param name="RemoteHost" value="logstash_hostname" />
<param name="ReconnectionDelay" value="60000" />
<param name="LocationInfo" value="true" />
<param name="Threshold" value="DEBUG" />
</appender>
然后把这个新定义的appender对象加入root logger里,可以跟其他已有logger共存:
<root>
<level value="INFO"/>
<appender-ref ref="OTHERPLACE"/>
<appender-ref ref="LOGSTASH"/>
</root>
如果是log4j.properties配置文件,则对应配置如下:
log4j.rootLogger=DEBUG, logstash
###SocketAppender###
log4j.appender.logstash=org.apache.log4j.net.SocketAppender
log4j.appender.logstash.Port=4560
log4j.appender.logstash.RemoteHost=logstash_hostname
log4j.appender.logstash.ReconnectionDelay=60000
log4j.appender.logstash.LocationInfo=true
Log4J会持续尝试连接你配置的logstash_hostname这个地址,建立连接后,即开始发送日志数据。
3.6.2 Logstash配置
Java应用端的配置完成以后,开始设置Logstash的接收端。配置如下所示,其中4560端口是Log4J SocketAppender的默认对端端口:
input {
log4j {
type =>"log4j-json"
port => 4560
}
}
3.6.3 异常堆栈测试验证
运行Logstash后,编写一个简单的Log4J程序:
import org.apache.log4j.Logger;
public class HelloExample{
final static Logger logger = Logger.getLogger(HelloExample.class);
public static void main(String[] args) {
HelloExample obj = new HelloExample();
try{
obj.divide();
}catch(ArithmeticException ex){
logger.error("Sorry, something wrong!", ex);
}
}
private void divide(){
int i = 10 /0;
}
}
编译运行:
# javac -cp ./logstash-1.5.0.rc2/vendor/bundle/jruby/1.9/gems/logstash-input-
log4j-0.1.3-java/lib/log4j/log4j/1.2.17/log4j-1.2.17.jar HelloExample.java
# java -cp .:./logstash-1.5.0.rc2/vendor/bundle/jruby/1.9/gems/logstash-input-
log4j-0.1.3-java/lib/log4j/log4j/1.2.17/log4j-1.2.17.jar HelloExample
这样即可在Logstash的终端输出看到如下事件记录:
{
"message" =>"Sorry, something wrong!",
"@version" =>"1",
"@timestamp" =>"2015-07-02T13:24:45.727Z",
"type" =>"log4j-json",
"host" =>"127.0.0.1:52420",
"path" =>"HelloExample",
"priority" =>"ERROR",
"logger_name" =>"HelloExample",
"thread" =>"main",
"class" =>"HelloExample",
"file" =>"HelloExample.java:9",
"method" =>"main",
"stack_trace" =>"java.lang.ArithmeticException: / by zero\n\tat HelloExample.
divide(HelloExample.java:13)\n\tat HelloExample.main(HelloExample.java:7)"
}
可以看到,异常堆栈直接记录在单行内了。
3.6.4 JSON Event layout
如果无法采用SocketAppender,必须使用文件方式的,其实Log4J有一个layout特性,用来控制日志输出的格式。和Nginx日志自己拼接JSON输出类似,也可以通过layout功能记录成JSON格式。
Logstash官方提供了扩展包,可以通过mvnrepository.com搜索下载:
# wget https://central.maven.org/maven2/net/logstash/log4j/jsonevent-layout/1.7/
jsonevent-layout-1.7.jar
或者直接编辑自己项目的pom.xml添加依赖:
<dependency>
<groupId>net.logstash.log4j</groupId>
<artifactId>jsonevent-layout</artifactId>
<version>1.7</version>
</dependency>
然后修改项目的log4j.properties文件如下:
log4j.rootCategory=WARN, RollingLog
log4j.appender.RollingLog=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingLog.Threshold=TRACE
log4j.appender.RollingLog.File=api.log
log4j.appender.RollingLog.DatePattern=.yyyy-MM-dd
log4j.appender.RollingLog.layout=net.logstash.log4j.JSONEventLayoutV1
如果是log4j.xml,则修改如下:
<appender name="Console" >
<param name="Threshold" value="TRACE" />
<layout />
</appender>
生成的文件就是符合Logstash标准的JSON格式了,Logstash使用下面配置读取:
input {
file {
codec => json
path => ["/path/to/log4j.log"]
}
}
生成的Logstash事件如下:
{
"mdc":{},
"line_number":"29",
"class":"org.eclipse.jetty.examples.logging.EchoFormServlet",
"@version":1,
"source_host":"jvstratusmbp.local",
"thread_name":"qtp513694835-14",
"message":"Got request from 0:0:0:0:0:0:0:1%0 using Mozilla\/5.0 (Macintosh;Intel Mac OS X 10_9_1) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/32.0.1700.77 Safari\/537.36",
"@timestamp":"2014-01-27T19:52:35.738Z",
"level":"INFO",
"file":"EchoFormServlet.java",
"method":"doPost",
"logger_name":"org.eclipse.jetty.examples.logging.EchoFormServlet"
}
可以看到,同样达到了效果。
如果你使用的不是Log4J而是logback项目来记录Java日志,Logstash官方也有类似的扩展包,在pom.xml中改成如下定义即可:
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.4</version>
</dependency>
3.7 MySQL慢查询日志
MySQL有多种日志可以记录,常见的有error log、slow log、general log、bin log等。其中slow log作为性能监控和优化的入手点,最为首要。本节即讨论如何用Logstash处理slow log。至于general log,格式处理基本类似,不过由于general量级比slow大得多,推荐采用packetbeat协议解析的方式更高效地完成这项工作,相关内容阅读本书稍后8.3节。
MySQL slow log 的Logstash处理配置示例如下:
input {
file {
type =>"mysql-slow"
path =>"/var/log/mysql/mysql-slow.log"
codec => multiline {
pattern =>"^# User@Host:"
negate => true
what =>"previous"
}
}
}
filter {
# drop sleep events
grok {
match => { "message" =>"SELECT SLEEP" }
add_tag => [ "sleep_drop" ]
tag_on_failure => [] # prevent default _grokparsefailure tag on real records
}
if "sleep_drop" in [tags] {
drop {}
}
grok {
match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clien-thost>\S*) )?\[(?:%{IP:clientip})?\]\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n# Time:.*$" ]
}
date {
match => [ "timestamp", "UNIX" ]
remove_field => [ "timestamp" ]
}
}
配置中,利用了grok插件的add_tag选项仅在成功时添加,而tag_on_failure选项仅在失败时添加的互斥特性,巧妙地过滤出日志中无用的sleep语句删除掉。
如下一段多行的MySQL slow log:
# User@Host: logstash[logstash] @ localhost [127.0.0.1]
# Query_time: 5.310431 Lock_time: 0.029219 Rows_sent: 1 Rows_examined: 24575727
SET timestamp=1393963146;
select count(*) from node join variable order by rand();
# Time: 140304 19:59:14
通过运行上面的配置,Logstash即可处理成如下单个事件:
{
"@timestamp" =>"2014-03-04T19:59:06.000Z",
"message" =>"# User@Host: logstash[logstash] @ localhost [127.0.0.1]\n# Query_
time: 5.310431 Lock_time: 0.029219 Rows_sent: 1 Rows_examined: 24575727\nSET
timestamp=1393963146;\nselect count(*) from node join variable order by rand();
\n# Time: 140304 19:59:14",
"@version" =>"1",
"tags" => [
[0] "multiline"
],
"type" =>"mysql-slow",
"host" =>"raochenlindeMacBook-Air.local",
"path" =>"/var/log/mysql/mysql-slow.log",
"user" =>"logstash",
"clienthost" =>"localhost",
"clientip" =>"127.0.0.1",
"query_time" => 5.310431,
"lock_time" => 0.029219,
"rows_sent" => 1,
"rows_examined" => 24575727,
"query" =>"select count(*) from node join variable order by rand();",
"action" =>"select"
}
后续即可针对其中的action、query_time、lock_time和rows_examined字段做监控报警及Kibana可视化统计了。
3.8 Docker日志
Docker是目前大规模互联网基础架构解决方案中最热门的技术。它带给运维工程师一个截然不同的思考角度和工作方式。
就日志层面看,Docker最大的影响在于:其最佳实践要求一个容器内部只有一个生命周期随时可以消亡的服务进程。这也就意味着:传统的写入磁盘,固定采集方式的日志系统,无法正常发挥作用。所以,在容器服务中,记录日志需要采用另外的方式。本节将介绍其中最常见的两种:记录到主机磁盘,或通过logspout收集。
3.8.1 记录到主机磁盘
默认情况下,Docker会将容器的标准输出和错误输出,保存在主机的/var/lib/docker/containers/目录下。所以,在规模比较稳定的情况下,直接记录到主机磁盘,然后通过主机上的Logstash收集日志,也是不错的方案。
以Nginx为例,将Nginx访问日志和错误日志输出到标准输出的配置如下:
daemon off;
error_log /dev/stdout info;
http {
access_log /dev/stdout;
...
}
不过,容器的特殊性在这里又一次体现出来,容器中其实是没有/dev/stdout设备的。所以我们需要自己单独处理一下,在Dockerflie里加上一句:
RUN ln -sf /proc/self/fd /dev/
这样,既保证了nginx.conf是主机和容器通用的配置,又顺利达到目的。
然后通过如下Logstash配置收集即可:
input {
file {
path => ["/var/lib/docker/containers/*/*-json.log"]
codec => json
}
}
filter {
grok {
match => ["path"。"/(?<container_id>\w+)-json.log" ]
remove_field => ["path"]
}
date {
match => ["time", "ISO8601"]
}
}
3.8.2 通过logspout收集
logspout是Docker生态圈中最有名的日志收集方式,其设计思路是:每个主机上启动一个单独容器运行logspout服务,负责将同一个主机上其他容器的日志,根据route设定,转发给不同的接收端。
logspout的基本用法如下:
$ docker pull gliderlabs/logspout:latest
$ docker run --name="logspout" \
--volume=/var/run/docker.sock:/tmp/docker.sock \
gliderlabs/logspout \
--publish=127.0.0.1:8000:80
syslog://remoteaddr:514
此外,logspout提供动态变更route的方式,如下所示:
# curl $(docker port `docker ps -lq` 8000)/routes \
-X POST \
-d '{"source": {"filter_name": "*_db", "types": ["stderr"]}, "target":
{"type": "syslog", "addr": "remoteaddr2:5140"}}'
这个配置的意思是,将容器名带有db字样的走错误输出的采集的日志,以syslog协议发送到remoteaddr2主机的5140端口。
注意,logspout采用的是RFC5424版本的syslog协议,所以如果使用的接收方是RFC3164版本的syslog协议解析,需要自己调整一下。比如logstash-input-syslog采用的就是RFC3164协议,所以需要自己来另外完成:
input {
tcp {
port => 5140
}
}
filter {
grok {
match => [“message”, “<SYSLOG5424PRI:syslog_pri> %{SYSLOG5424LINE:message}“]
}
}
此外,logspout支持模块化扩展,所以,我们也可以直接在logspout上处理成对Logstash更友好的格式。扩展logspout支持Logstash格式的方法如下:
1)编辑Dockerfile,修改成如下内容:
FROM gliderlabs/logspout:master
ENV ROUTE_URIS=logstash://host:port
2)编辑modules.go,修改成如下内容:
package main
import (
_ "github.com/looplab/logspout-logstash"
_ "github.com/gliderlabs/logspout/transports/udp"
)
3)构建镜像:
docker build
这样,后续Logstash就直接进行JSON解析即可。
最后更新:2017-05-19 15:02:48