大家在使用 Logstash 時,grok 基本上一定是必用,透過 grok 將非結構化的 log
資料轉為結構化,旦凡 syslog、apache log、mysql log、高度人為的 AP
log,都會需要使用這個解析神器。你可以在
gork pattern
中找到約 120 種定義好的 pattern,加速 log 解析的開發時間,或是使用
grok
debugger 定義屬於自己的解析格式。
Grok Basics
安裝方式參考:Logstash Install
我們先來了解一下基本的 grok 格式
%{NUMBER:duration} %{IP:client}
NUMBER 為解析的格式類型,有 120 種已定義的 pattern 可以使用,duration 是解析後存放的 key 值,依此類推。建議每個解析類型都要給予一個 key 值。
預設我們解析出來的格式都是 string,如果希望直接轉成 int、float 的話,可以用以下方式:
%{NUMBER:num:int}
接著看一個簡單的用例:
55.3.244.1 GET /index.html 15824 0.043
所以 gork 的格式為:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
完整的 logstash 腳本為:
input { file { path => "/var/log/http.log" } } filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } }
假如找不到適合的 grok pattern,我們也能自己寫:
(?<queue_id>[0-9A-F]{10,11})
如果太多自己寫的 pattern,建議抽一個 file 出來維護,否則 grok pattern 會變得很雜亂
# touch ./patterns/postfix POSTFIX_QUEUEID [0-9A-F]{10,11}
在 logstash 中加入路徑來源
filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } }
可以試著加入 stdout 來檢視解析的 log 是否如預期,完整的 log 如下:
input { file { path => "/var/log/http.log" } } filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } } output { stdout { codec => rubydebug } }
Multiline Codec Plugin
有時後同一份 log 不見得只有一種格式,我們以一般的 ap log 為例子,一定內含很多種格式的 log,甚至是同一條 log,但跨了多行,種種複雜的情形,我們來看下面的例子:
[2021-07-08T00:11:15,598][ERROR][o.e.x.i.IndexLifecycleRunner] [elkplus01] policy [apm-rollover-30-days] for index [apm-7.9.0-metric] failed on step [{"phase":"hot","action":"rollover","name":"check-rollover-ready"}]. Moving to ERROR step java.lang.IllegalArgumentException: index.lifecycle.rollover_alias [apm-7.9.0-metric] does not point to index [apm-7.9.0-metric] at org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep.evaluateCondition(WaitForRolloverReadyStep.java:114) [x-pack-core-7.10.1.jar:7.10.1] at org.elasticsearch.xpack.ilm.IndexLifecycleRunner.runPeriodicStep(IndexLifecycleRunner.java:174) [x-pack-ilm-7.10.1.jar:7.10.1] at org.elasticsearch.xpack.ilm.IndexLifecycleService.triggerPolicies(IndexLifecycleService.java:327) [x-pack-ilm-7.10.1.jar:7.10.1] at org.elasticsearch.xpack.ilm.IndexLifecycleService.triggered(IndexLifecycleService.java:265) [x-pack-ilm-7.10.1.jar:7.10.1] at org.elasticsearch.xpack.core.scheduler.SchedulerEngine.notifyListeners(SchedulerEngine.java:183) [x-pack-core-7.10.1.jar:7.10.1] at org.elasticsearch.xpack.core.scheduler.SchedulerEngine$ActiveSchedule.run(SchedulerEngine.java:216) [x-pack-core-7.10.1.jar:7.10.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?] at java.lang.Thread.run(Thread.java:832) [?:?] [2021-07-08T00:11:15,600][ERROR][o.e.x.i.IndexLifecycleRunner] [elkplus01] policy [apm-rollover-30-days] for index [apm-7.9.0-transaction] failed on step [{"phase":"hot","action":"rollover","name":"check-rollover-ready"}]. Moving to ERROR step
我們可以發現格式不太一樣,且中間的 bug trace log 跨很多行,所以針對這況狀況需要做一些定義:
input { file { path => "./logstash-pipeline/eslog/**/*.log" start_position => "beginning" sincedb_path => "./logstash-pipeline/es.log" codec => multiline { pattern => "^[%{TIMESTAMP_ISO8601}]" negate => true what => "previous" } } }
在 input 中,記得定義 sincedb_path (紀錄最後一次執行的位置),並在 codec 中額外定義 multiline 的規則。
- pattern 起始格式
- what 定義 multiline 的關聯性,可設定 previous、next
- negate 默認為 false,如果為 true,則會參考 what 的設定,將不符合 pattern 的 log 組合起來
所以用例中的意義為:起始不符合 timestamp pattern 的 log,會往上一條 log 不斷組合,直到看到下一個符合 pattern 的 log 出現,所以上述的 sample log 的第一條 log 會與下面的 java trace code 合為一條 log,加上最後一行的 log,共 2 條 log。
Grok Multi Pattern
資料來源定義好了 multiline 結構後,grok 也需要針對多種的 log pattern 進行多種定義,如下:
filter { grok { match => {"message" => [ "[%{TIMESTAMP_ISO8601:timestamp}][%{LOGLEVEL:log_level}%{SPACE}][%{DATA:service}]%{SPACE}[%{DATA:hostname}]%{SPACE}%{GREEDYDATA:reason}", "[%{TIMESTAMP_ISO8601:timestamp}][%{LOGLEVEL:log_level}%{SPACE}][%{DATA:service}]%{SPACE}[%{DATA:hostname}]%{SPACE}[%{DATA:action}][%{INT:pid}]%{SPACE}%{DATA:state},%{SPACE}%{GREEDYDATA:reason}", "[%{TIMESTAMP_ISO8601:timestamp}][%{LOGLEVEL:log_level}%{SPACE}][%{DATA:service}]%{SPACE}[%{DATA:hostname}]%{SPACE}%{DATA:action}[%{DATA:target}]%{SPACE}%{GREEDYDATA:reason}", "[%{TIMESTAMP_ISO8601:timestamp}][%{LOGLEVEL:log_level}%{SPACE}][%{DATA:service}]%{SPACE}[%{DATA:hostname}]%{SPACE}[%{DATA:action}][%{DATA:status}][%{INT:pid}][%{INT:sub_pid}]%{SPACE}%{DATA}%{SPACE}[%{BASE10NUM:duration}%{DATA}],%{SPACE}%{DATA}%{SPACE}[%{BASE10NUM:collections_count}%{DATA}]/[%{BASE10NUM:collections_time}%{DATA}],%{SPACE}%{DATA}%{SPACE}[%{BASE10NUM:total_seconds}%{DATA}]/[%{BASE10NUM:total_hours}%{DATA}],%{SPACE}%{GREEDYDATA:memory},%{SPACE}%{GREEDYDATA:all_pools}" ]} } }
上面的用例中,配合 multiline 的配置,針對每一個格式進行定義,再將多組 grok pattern 設定於此。
Grok Common Options
最後我們整理出其他 grok 簡單且常常使用的設定
tag_on_failure
解析失敗會添加 tag,你可以留空則不會額外加入 tag
filter { grok { match => ["context", ""tags":[%{DATA:apptags}]"] tag_on_failure => [ ] } }
或是透過 if 判斷是否要 drop 掉這一次的解析
filter { grok { match => [ "message", "something"] } if "_grokparsefailure" in [tags] { drop { } } }
name_captures_only:boolean
設為 true 只保留 grok 解析出來的欄位,其他則捨棄,可以即省儲存空間,但無法看到完整 log。
add_field:hash
可使用 %{field} 來動態組合字串
filter { grok { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" "new_field" => "new_static_value" } } }
remove_field:array
可使用 %{field} 來動態組合字串
filter { grok { remove_field => [ "foo_%{somefield}", "my_extraneous_field" ] } }
有任何問題,或是想看新主題?
聯絡我們