B i M A P

Logstash|Beginning to Grok filter plugin

Logstash Logo


大家在使用 Logstash 時,grok 基本上一定是必用,透過 grok 將非結構化的 log 資料轉為結構化,旦凡 syslog、apache log、mysql log、高度人為的 AP log,都會需要使用這個解析神器。你可以在 gork pattern 中找到約 120 種定義好的 pattern,加速 log 解析的開發時間,或是使用 grok debugger 定義屬於自己的解析格式。


Grok Basics

安裝方式參考:Logstash Install


我們先來了解一下基本的 grok 格式

%{NUMBER:duration} %{IP:client}

NUMBER 為解析的格式類型,有 120 種已定義的 pattern 可以使用,duration 是解析後存放的 key 值,依此類推。建議每個解析類型都要給予一個 key 值。

預設我們解析出來的格式都是 string,如果希望直接轉成 int、float 的話,可以用以下方式:

%{NUMBER:num:int}



接著看一個簡單的用例:

55.3.244.1 GET /index.html 15824 0.043

所以 gork 的格式為:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

完整的 logstash 腳本為:

    input {
        file {
          path => "/var/log/http.log"
        }
      }
      filter {
        grok {
          match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
        }
      }



假如找不到適合的 grok pattern,我們也能自己寫:

(?<queue_id>[0-9A-F]{10,11})

如果太多自己寫的 pattern,建議抽一個 file 出來維護,否則 grok pattern 會變得很雜亂

# touch ./patterns/postfix
POSTFIX_QUEUEID [0-9A-F]{10,11}

在 logstash 中加入路徑來源

    filter {
        grok {
          patterns_dir => ["./patterns"]
          match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
        }
      }

可以試著加入 stdout 來檢視解析的 log 是否如預期,完整的 log 如下:

    input {
        file {
          path => "/var/log/http.log"
      }
    }
    filter {
        grok {
          patterns_dir => ["./patterns"]
          match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
        }
    }
    output {
        stdout { codec => rubydebug }
    }


Multiline Codec Plugin

有時後同一份 log 不見得只有一種格式,我們以一般的 ap log 為例子,一定內含很多種格式的 log,甚至是同一條 log,但跨了多行,種種複雜的情形,我們來看下面的例子:

    [2021-07-08T00:11:15,598][ERROR][o.e.x.i.IndexLifecycleRunner] [elkplus01] policy [apm-rollover-30-days] for index [apm-7.9.0-metric] failed on step [{"phase":"hot","action":"rollover","name":"check-rollover-ready"}]. Moving to ERROR step
    java.lang.IllegalArgumentException: index.lifecycle.rollover_alias [apm-7.9.0-metric] does not point to index [apm-7.9.0-metric]
        at org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep.evaluateCondition(WaitForRolloverReadyStep.java:114) [x-pack-core-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.ilm.IndexLifecycleRunner.runPeriodicStep(IndexLifecycleRunner.java:174) [x-pack-ilm-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.ilm.IndexLifecycleService.triggerPolicies(IndexLifecycleService.java:327) [x-pack-ilm-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.ilm.IndexLifecycleService.triggered(IndexLifecycleService.java:265) [x-pack-ilm-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.core.scheduler.SchedulerEngine.notifyListeners(SchedulerEngine.java:183) [x-pack-core-7.10.1.jar:7.10.1]
        at org.elasticsearch.xpack.core.scheduler.SchedulerEngine$ActiveSchedule.run(SchedulerEngine.java:216) [x-pack-core-7.10.1.jar:7.10.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
        at java.lang.Thread.run(Thread.java:832) [?:?]
    [2021-07-08T00:11:15,600][ERROR][o.e.x.i.IndexLifecycleRunner] [elkplus01] policy [apm-rollover-30-days] for index [apm-7.9.0-transaction] failed on step [{"phase":"hot","action":"rollover","name":"check-rollover-ready"}]. Moving to ERROR step


我們可以發現格式不太一樣,且中間的 bug trace log 跨很多行,所以針對這況狀況需要做一些定義:

    input {
        file {
            path => "./logstash-pipeline/eslog/**/*.log"
            start_position => "beginning"
            sincedb_path => "./logstash-pipeline/es.log"
            codec => multiline {
              pattern => "^\[%{TIMESTAMP_ISO8601}\]"
              negate => true
              what => "previous"
            }
        }
    }

在 input 中,記得定義 sincedb_path (紀錄最後一次執行的位置),並在 codec 中額外定義 multiline 的規則。

  • pattern 起始格式
  • what 定義 multiline 的關聯性,可設定 previous、next
  • negate 默認為 false,如果為 true,則會參考 what 的設定,將不符合 pattern 的 log 組合起來

所以用例中的意義為:起始不符合 timestamp pattern 的 log,會往上一條 log 不斷組合,直到看到下一個符合 pattern 的 log 出現,所以上述的 sample log 的第一條 log 會與下面的 java trace code 合為一條 log,加上最後一行的 log,共 2 條 log。


Grok Multi Pattern

資料來源定義好了 multiline 結構後,grok 也需要針對多種的 log pattern 進行多種定義,如下:

    filter {
        grok {
            match => {"message" => [
                "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:log_level}%{SPACE}\]\[%{DATA:service}\]%{SPACE}\[%{DATA:hostname}\]%{SPACE}%{GREEDYDATA:reason}",
                "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:log_level}%{SPACE}\]\[%{DATA:service}\]%{SPACE}\[%{DATA:hostname}\]%{SPACE}\[%{DATA:action}\]\[%{INT:pid}\]%{SPACE}%{DATA:state},%{SPACE}%{GREEDYDATA:reason}",
                "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:log_level}%{SPACE}\]\[%{DATA:service}\]%{SPACE}\[%{DATA:hostname}\]%{SPACE}%{DATA:action}\[%{DATA:target}\]%{SPACE}%{GREEDYDATA:reason}",
                "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:log_level}%{SPACE}\]\[%{DATA:service}\]%{SPACE}\[%{DATA:hostname}\]%{SPACE}\[%{DATA:action}\]\[%{DATA:status}\]\[%{INT:pid}\]\[%{INT:sub_pid}\]%{SPACE}%{DATA}%{SPACE}\[%{BASE10NUM:duration}%{DATA}\],%{SPACE}%{DATA}%{SPACE}\[%{BASE10NUM:collections_count}%{DATA}\]/\[%{BASE10NUM:collections_time}%{DATA}\],%{SPACE}%{DATA}%{SPACE}\[%{BASE10NUM:total_seconds}%{DATA}\]/\[%{BASE10NUM:total_hours}%{DATA}\],%{SPACE}%{GREEDYDATA:memory},%{SPACE}%{GREEDYDATA:all_pools}"
            ]}
        }
    }

上面的用例中,配合 multiline 的配置,針對每一個格式進行定義,再將多組 grok pattern 設定於此。


Grok Common Options

最後我們整理出其他 grok 簡單且常常使用的設定


tag_on_failure

解析失敗會添加 tag,你可以留空則不會額外加入 tag

    filter {
        grok {
            match => ["context", "\"tags\":\[%{DATA:apptags}\]"]
            tag_on_failure => [ ]
        }
      }

或是透過 if 判斷是否要 drop 掉這一次的解析

    filter {
        grok {
            match => [ "message", "something"]
        }
        if "_grokparsefailure" in [tags] {
            drop { }
        }
    }


name_captures_only:boolean

設為 true 只保留 grok 解析出來的欄位,其他則捨棄,可以即省儲存空間,但無法看到完整 log。


add_field:hash

可使用 %{field} 來動態組合字串

    filter {
      grok {
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    }


remove_field:array

可使用 %{field} 來動態組合字串

    filter {
        grok {
          remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
        }
      }




有任何問題,或是想看新主題?  聯絡我們

延伸閱讀
winstonlu的大頭照
ELK 達人

我們致力於 ELK 的各種應用,協助企業建置相關服務。我們也提供基於 ELK 的各種解決方案,有任何問題,歡迎加入我們的官方 Line,或來信詢問,期待與您面對面的機會。