Logstash|Beginning to Grok filter plugin

瀏覽人次: 2389
2021-08-04 更新

Logstash Logo

大家在使用 Logstash 時,grok 基本上一定是必用,透過 grok 將非結構化的 log
資料轉為結構化,旦凡 syslog、apache log、mysql log、高度人為的 AP
log,都會需要使用這個解析神器。你可以在
gork pattern
中找到約 120 種定義好的 pattern,加速 log 解析的開發時間,或是使用
grok
debugger
 定義屬於自己的解析格式。

Grok Basics

安裝方式參考:Logstash Install

我們先來了解一下基本的 grok 格式

%{NUMBER:duration} %{IP:client}

NUMBER 為解析的格式類型,有 120 種已定義的 pattern 可以使用,duration 是解析後存放的 key 值,依此類推。建議每個解析類型都要給予一個 key 值。

預設我們解析出來的格式都是 string,如果希望直接轉成 int、float 的話,可以用以下方式:

%{NUMBER:num:int}

接著看一個簡單的用例:

55.3.244.1 GET /index.html 15824 0.043

所以 gork 的格式為:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

完整的 logstash 腳本為:

    input {
        file {
          path => "/var/log/http.log"
        }
      }
      filter {
        grok {
          match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
        }
      }

假如找不到適合的 grok pattern,我們也能自己寫:

(?<queue_id>[0-9A-F]{10,11})

如果太多自己寫的 pattern,建議抽一個 file 出來維護,否則 grok pattern 會變得很雜亂

# touch ./patterns/postfix
POSTFIX_QUEUEID [0-9A-F]{10,11}

在 logstash 中加入路徑來源

    filter {
        grok {
          patterns_dir => ["./patterns"]
          match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
        }
      }

可以試著加入 stdout 來檢視解析的 log 是否如預期,完整的 log 如下:

    input {
        file {
          path => "/var/log/http.log"
      }
    }
    filter {
        grok {
          patterns_dir => ["./patterns"]
          match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
        }
    }
    output {
        stdout { codec => rubydebug }
    }

Multiline Codec Plugin

有時後同一份 log 不見得只有一種格式,我們以一般的 ap log 為例子,一定內含很多種格式的 log,甚至是同一條 log,但跨了多行,種種複雜的情形,我們來看下面的例子:

    [2021-07-08T00:11:15,598][ERROR][o.e.x.i.IndexLifecycleRunner] [elkplus01] policy [apm-rollover-30-days] for index [apm-7.9.0-metric] failed on step [{"phase":"hot","action":"rollover","name":"check-rollover-ready"}]. Moving to ERROR step
    java.lang.IllegalArgumentException: index.lifecycle.rollover_alias [apm-7.9.0-metric] does not point to index [apm-7.9.0-metric]
        at org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep.evaluateCondition(WaitForRolloverReadyStep.java:114) [x-pack-core-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.ilm.IndexLifecycleRunner.runPeriodicStep(IndexLifecycleRunner.java:174) [x-pack-ilm-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.ilm.IndexLifecycleService.triggerPolicies(IndexLifecycleService.java:327) [x-pack-ilm-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.ilm.IndexLifecycleService.triggered(IndexLifecycleService.java:265) [x-pack-ilm-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.core.scheduler.SchedulerEngine.notifyListeners(SchedulerEngine.java:183) [x-pack-core-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.core.scheduler.SchedulerEngine$ActiveSchedule.run(SchedulerEngine.java:216) [x-pack-core-7.10.1.jar:7.10.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
        at java.lang.Thread.run(Thread.java:832) [?:?]
    [2021-07-08T00:11:15,600][ERROR][o.e.x.i.IndexLifecycleRunner] [elkplus01] policy [apm-rollover-30-days] for index [apm-7.9.0-transaction] failed on step [{"phase":"hot","action":"rollover","name":"check-rollover-ready"}]. Moving to ERROR step

我們可以發現格式不太一樣,且中間的 bug trace log 跨很多行,所以針對這況狀況需要做一些定義:

    input {
        file {
            path => "./logstash-pipeline/eslog/**/*.log"
            start_position => "beginning"
            sincedb_path => "./logstash-pipeline/es.log"
            codec => multiline {
              pattern => "^[%{TIMESTAMP_ISO8601}]"
              negate => true
              what => "previous"
            }
        }
    }

在 input 中,記得定義 sincedb_path (紀錄最後一次執行的位置),並在 codec 中額外定義 multiline 的規則。

  • pattern 起始格式
  • what 定義 multiline 的關聯性,可設定 previous、next
  • negate 默認為 false,如果為 true,則會參考 what 的設定,將不符合 pattern 的 log 組合起來

所以用例中的意義為:起始不符合 timestamp pattern 的 log,會往上一條 log 不斷組合,直到看到下一個符合 pattern 的 log 出現,所以上述的 sample log 的第一條 log 會與下面的 java trace code 合為一條 log,加上最後一行的 log,共 2 條 log。

Grok Multi Pattern

資料來源定義好了 multiline 結構後,grok 也需要針對多種的 log pattern 進行多種定義,如下:

    filter {
        grok {
            match => {"message" => [
                "[%{TIMESTAMP_ISO8601:timestamp}][%{LOGLEVEL:log_level}%{SPACE}][%{DATA:service}]%{SPACE}[%{DATA:hostname}]%{SPACE}%{GREEDYDATA:reason}",
                "[%{TIMESTAMP_ISO8601:timestamp}][%{LOGLEVEL:log_level}%{SPACE}][%{DATA:service}]%{SPACE}[%{DATA:hostname}]%{SPACE}[%{DATA:action}][%{INT:pid}]%{SPACE}%{DATA:state},%{SPACE}%{GREEDYDATA:reason}",
                "[%{TIMESTAMP_ISO8601:timestamp}][%{LOGLEVEL:log_level}%{SPACE}][%{DATA:service}]%{SPACE}[%{DATA:hostname}]%{SPACE}%{DATA:action}[%{DATA:target}]%{SPACE}%{GREEDYDATA:reason}",
                "[%{TIMESTAMP_ISO8601:timestamp}][%{LOGLEVEL:log_level}%{SPACE}][%{DATA:service}]%{SPACE}[%{DATA:hostname}]%{SPACE}[%{DATA:action}][%{DATA:status}][%{INT:pid}][%{INT:sub_pid}]%{SPACE}%{DATA}%{SPACE}[%{BASE10NUM:duration}%{DATA}],%{SPACE}%{DATA}%{SPACE}[%{BASE10NUM:collections_count}%{DATA}]/[%{BASE10NUM:collections_time}%{DATA}],%{SPACE}%{DATA}%{SPACE}[%{BASE10NUM:total_seconds}%{DATA}]/[%{BASE10NUM:total_hours}%{DATA}],%{SPACE}%{GREEDYDATA:memory},%{SPACE}%{GREEDYDATA:all_pools}"
            ]}
        }
    }

上面的用例中,配合 multiline 的配置,針對每一個格式進行定義,再將多組 grok pattern 設定於此。

Grok Common Options

最後我們整理出其他 grok 簡單且常常使用的設定

tag_on_failure

解析失敗會添加 tag,你可以留空則不會額外加入 tag

    filter {
        grok {
            match => ["context", ""tags":[%{DATA:apptags}]"]
            tag_on_failure => [ ]
        }
      }

或是透過 if 判斷是否要 drop 掉這一次的解析

    filter {
        grok {
            match => [ "message", "something"]
        }
        if "_grokparsefailure" in [tags] {
            drop { }
        }
    }

name_captures_only:boolean

設為 true 只保留 grok 解析出來的欄位,其他則捨棄,可以即省儲存空間,但無法看到完整 log。

add_field:hash

可使用 %{field} 來動態組合字串

    filter {
      grok {
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    }

remove_field:array

可使用 %{field} 來動態組合字串

    filter {
        grok {
          remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
        }
      }



有任何問題,或是想看新主題? 

聯絡我們

快速跳轉目錄

✦ 集先鋒 Bimap – 企業建置高速穩定的海量日誌分析平台✦

集中不同的結構化資料和非結構化日誌,並進行關聯性的大數據整合,客製化儀表版、自訂事件告警、機器學習等等,以滿足各種大數據的應用場景和解決方案。