B i M A P

Logstash|如何自己寫 Ruby,讓 CIDR 網段更具可讀性

Logstash Logo


通常 Log 以最原始的方式呈現時,往往會缺少一些可讀性,因為通常透過設備,或系統級 Log 所拋出的訊息中,不會夾帶讓用戶更好識別的額外資訊,所以在業界的實務上,都必需要額外針對 Log 進行加工處理,讓 Log 更具可讀性。

一般我們可以透過 Logstash Plugin 進行處理,例如:mutate、csv、kv、grok、gsub 等等,但這一次我們以 CIDR 的網段為例子,帶大家了解如果透過內建 的 plugin 不好處理時,可以怎麼自行撰寫 ruby,並透過 ruby plugin 讓 logstash 來幫我們進行邏輯處理。


新增 Logstash 的 Ruby 腳本

以下為 ruby 的基本 template:

def register(params) 
end
def filter(event)
    return [event]
end

定義 input 參數

params 讓 logstash 可以將用戶的參數帶給我們後續使用:

def register(params) 
    @cidr_file_paths = params["cidr_file_paths"]
    @source = params["source"]
    @target = params["target"]
end

logstash 即可以這樣寫:

ruby { 
    path => "/etc/logstash/cidrRuby/cidrmapping.rb"
    script_params => {
        "cidr_file_paths" => ["/etc/logstash/cidrRuby/cidr.csv"]
        "source" => "destinationAddress"
        "target" => "dest_ip_cidr_desc"
        }
}


Ruby 加入必需套件

我們會需要用到 ipaddr、csv 兩個套件,幫助我們進行 ip 判斷

require "ipaddr" 
require "csv"


Ruby 重點說明

透過 csv 套件將 mapping 文件載入,我們這次是以陣列的方式來處理,所以可以讀取多份 csv 文件,但需要注意太多份 csv 檔,會降低 logstash 的處理速度,並讓 cpu 高度滿載

CSV.parse(File.read(cidr_file_path), headers: true)


透過 IPAddr 來進行 IP 類型的轉換,讓判斷上比較方便

net = IPAddr.new(cidr[2].strip)


透過 include 的 method,將 csv 文件的 cidr 網段與 source 欄位的 ip,進行判斷

可以透過 event.get、event.set 對資料進行取得或設置

net.include?(IPAddr.new(event.get('['+@source+']').strip))


CSV 範例

集先鋒科技-台北,10.99.1.1/24
集先鋒科技-台中,10.99.2.1/24
集先鋒科技-高雄,10.99.3.1/24


完整程式碼

require "ipaddr"
require 'csv'
def register(params)
@cidr_file_paths = params["cidr_file_paths"]
    @source = params["source"]
    @target = params["target"]
end
# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
    match = false
    for cidr_file_path in @cidr_file_paths do
        cidr_table = CSV.parse(File.read(cidr_file_path), headers: true)
        cidr_desc = ""
        for cidr in cidr_table do
            begin
                if event.include?(@source)
                    net = IPAddr.new(cidr[2].strip)
                    if net.include?(IPAddr.new(event.get('['+@source+']').strip))
                        match = true
                        cidr_desc = cidr[0].strip
                        event.set(@target, cidr_desc)
                        break
                    end
                end
            rescue => error
                # p error.message
            end
        end
        if match
            break
        end
    end
    if !match
        if event.include?(@source)
            event.set(@target, "外網")
        end
    end
    return [event]
end




延伸閱讀
winstonlu的大頭照
ELK 達人

我們致力於 ELK 的各種應用,協助企業建置相關服務。我們也提供基於 ELK 的各種解決方案,有任何問題,歡迎加入我們的官方 Line,或來信詢問,期待與您面對面的機會。