Logstash中的Grok正則捕獲
來自專欄 Ghost Stories
Table of Contents
- 概述
- 示例
- %{COMBINEDAPACHELOG}
- 官方文檔示例
- 自定義匹配
- 常用內置方法
- add_field
- add_tag
概述
Grok 是 Logstash 最重要的插件。你可以在 grok 里預定義好命名正則表達式
Grok 支持把預定義的 grok 表達式 寫入到文件中,官方提供的預定義 grok 表達式見:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。
grok的語法格式為 %{SYNTAX:SEMANTIC}
SYNTAX是文本要匹配的模式
SEMANTIC 是匹配到的文本片段的標識
例如:
%{NUMBER:duration}
%{IP:client}
默認情況下,所有的SEMANTIC是以字元串的方式保存,如果想要轉換一個SEMANTIC的數據類型,例如轉換一個字元串為整形,可以寫成如下的方式:
%{NUMBER:num:int}
例如日誌
55.3.244.1 GET /index.html 15824 0.043
可以寫成如下的grok過濾表達式
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
示例
%{COMBINEDAPACHELOG}
%{COMBINEDAPACHELOG} 是logstash自帶的匹配模式
它的grok表達式是:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:req
uest}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
輸入常規的Apache日誌:
127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
配置filter:
filter {
if [type] == "apache" {
grok {
match => ["message", "%{COMBINEDAPACHELOG}"]
}
}
}
輸出:
{
"message" => "127.0.0.1 - - [14/Apr/2015:09:53:40 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"",
"@version" => "1",
"@timestamp" => "2015-04-14T01:53:57.182Z",
"type" => "apache",
"host" => "xxxxxxxx",
"path" => "/var/log/httpd/access_log",
"clientip" => "127.0.0.1",
"ident" => "-",
"auth" => "-",
"timestamp" => "14/Apr/2015:09:53:40 +0800",
"verb" => "GET",
"request" => "/router.php",
"httpversion" => "1.1",
"response" => "404",
"bytes" => "285",
"referrer" => ""-"",
"agent" => ""curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2""
}
{
"message" => "127.0.0.1 - - [14/Apr/2015:09:53:40 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"",
"@version" => "1",
"@timestamp" => "2015-04-14T01:53:57.187Z",
"type" => "apache",
"host" => "xxxxxxx",
"path" => "/var/log/httpd/access_log",
"clientip" => "127.0.0.1",
"ident" => "-",
"auth" => "-",
"timestamp" => "14/Apr/2015:09:53:40 +0800",
"verb" => "GET",
"request" => "/router.php",
"httpversion" => "1.1",
"response" => "404",
"bytes" => "285",
"referrer" => ""-"",
"agent" => ""curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2""
}
官方文檔示例
下面是從官方文件中摘抄的最簡單但是足夠說明用法的示例:
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
第一行,用普通的正則表達式來定義一個 grok 表達式
第二行,通過列印賦值格式(sprintf format),用前面定義好的 grok 表達式來定義另一個 grok 表達式
grok 表達式的列印賦值格式的完整語法是下面這樣的:
%{PATTERN_NAME:capture_name:data_type}
我們的配置filter成下面這樣:
filter {
grok {
match => {
"message" => "%{WORD} %{NUMBER:request_time:float}%{WORD}"
}
}
}
運行 logstash 進程然後輸入 「begin 123.456 end」
會看到類似下面這樣的輸出:
{
"message" => "begin 123.456 end",
"@version" => "1",
"@timestamp" => "2014-08-09T12:23:36.634Z",
"host" => "raochenlindeMacBook-Air.local",
"request_time" => 123.456
}
實際運用中,我們需要處理各種各樣的日誌文件,如果你都是在配置文件里各自寫一行自己的表達式,就完全不可管理了。所以,我們建議是把所有的 grok 表達式統一寫入到一個地方。然後用 filter/grok 的 patterns_dir 選項來指明。
如果你把 「message」 里所有的信息都 grok 到不同的欄位了,數據實質上就相當於是重複存儲了兩份。所以你可以用 remove_field 參數來刪除掉 message 欄位,或者用 overwrite 參數來重寫默認的 message 欄位,只保留最重要的部分。
filter {
grok {
patterns_dir => ["/path/to/your/own/patterns"]
match => {
"message" => "%{SYSLOGBASE} %{DATA:message}"
}
overwrite => ["message"]
}
}
建議每個人都要使用 Grok Debugger 來調試自己的 grok 表達式。
https://grokdebug.herokuapp.com/自定義匹配
在有些情況下自帶的匹配模式無法滿足需求,可以自定義一些匹配模式
首先可以根據正則表達式匹配文本片段
(?<field_name>the pattern here)
例如,postfix日誌有一個欄位表示 queue id,可以使用以下表達式進行匹配:
(?<queue_id>[0-9A-F]{10,11}
可以手動創建一個匹配文件,內容:
# contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11}
filter配置:
filter {
grok {
patterns_dir => "./patterns"
match => [ "message", "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" ]
}
}
patterns_dir指定了文件的目錄,match中使用了自定義的:POSTFIX_QUEUEID
輸入:
55.3.244.1 GET /index.html 15824 0.043 ABC24C98567
輸出:
client_id_address: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
http_response_time: 0.043
queue_id: ABC24C98567
發現queue_id 被匹配出來了
常用內置方法
add_field
當pattern匹配切分成功之後,可以動態的對某些欄位進行特定的修改或者添加新的欄位,使用%{fieldName}來獲取欄位的值
filter:
filter {
grok{
add_field => { "foo_%{somefield}" => "Hello world, %{somefield}" }
}
}
如果somefield=dad,logstash會將foo_dad新欄位加入elasticsearch,並將值Hello world, dad賦予該欄位
add_tag
為經過filter或者匹配成功的event添加標籤
filter {
grok {
add_tag => [ "foo_%{somefield}" ]
}
}
推薦閱讀:
