Logstash|如何自己寫 Ruby,讓 CIDR 網段更具可讀性

瀏覽人次: 1176
2022-11-18 更新

Logstash Logo

通常 Log
以最原始的方式呈現時,往往會缺少一些可讀性,因為通常透過設備,或系統級 Log
所拋出的訊息中,不會夾帶讓用戶更好識別的額外資訊,所以在業界的實務上,都必需要額外針對
Log 進行加工處理,讓 Log 更具可讀性。

一般我們可以透過 Logstash Plugin
進行處理,例如:mutate、csv、kv、grok、gsub 等等,但這一次我們以 CIDR
的網段為例子,帶大家了解如果透過內建 的 plugin 不好處理時,可以怎麼自行撰寫
ruby,並透過 ruby plugin 讓 logstash 來幫我們進行邏輯處理。

新增 Logstash 的 Ruby 腳本

以下為 ruby 的基本 template:

def register(params) 
end
def filter(event)
    return [event]
end

定義 input 參數

params 讓 logstash 可以將用戶的參數帶給我們後續使用:

def register(params) 
    @cidr_file_paths = params["cidr_file_paths"]
    @source = params["source"]
    @target = params["target"]
end

logstash 即可以這樣寫:

ruby { 
    path => "/etc/logstash/cidrRuby/cidrmapping.rb"
    script_params => {
        "cidr_file_paths" => ["/etc/logstash/cidrRuby/cidr.csv"]
        "source" => "destinationAddress"
        "target" => "dest_ip_cidr_desc"
        }
}

Ruby 加入必需套件

我們會需要用到 ipaddr、csv 兩個套件,幫助我們進行 ip 判斷

require "ipaddr" 
require "csv"

Ruby 重點說明

透過 csv 套件將 mapping
文件載入,我們這次是以陣列的方式來處理,所以可以讀取多份 csv
文件,但需要注意太多份 csv 檔,會降低 logstash 的處理速度,並讓 cpu
高度滿載

CSV.parse(File.read(cidr_file_path), headers: true)

透過 IPAddr 來進行 IP 類型的轉換,讓判斷上比較方便

net = IPAddr.new(cidr[2].strip)

透過 include 的 method,將 csv 文件的 cidr 網段與 source 欄位的
ip,進行判斷

可以透過 event.get、event.set 對資料進行取得或設置

net.include?(IPAddr.new(event.get('['+@source+']').strip))

CSV 範例

集先鋒科技-台北,10.99.1.1/24
集先鋒科技-台中,10.99.2.1/24
集先鋒科技-高雄,10.99.3.1/24

完整程式碼

require "ipaddr"
require 'csv'
def register(params)
@cidr_file_paths = params["cidr_file_paths"]
    @source = params["source"]
    @target = params["target"]
end
# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
    match = false
    for cidr_file_path in @cidr_file_paths do
        cidr_table = CSV.parse(File.read(cidr_file_path), headers: true)
        cidr_desc = ""
        for cidr in cidr_table do
            begin
                if event.include?(@source)
                    net = IPAddr.new(cidr[2].strip)
                    if net.include?(IPAddr.new(event.get('['+@source+']').strip))
                        match = true
                        cidr_desc = cidr[0].strip
                        event.set(@target, cidr_desc)
                        break
                    end
                end
            rescue => error
                # p error.message
            end
        end
        if match
            break
        end
    end
    if !match
        if event.include?(@source)
            event.set(@target, "外網")
        end
    end
    return [event]
end

快速跳轉目錄

✦ 集先鋒 Bimap – 企業建置高速穩定的海量日誌分析平台✦

集中不同的結構化資料和非結構化日誌,並進行關聯性的大數據整合,客製化儀表版、自訂事件告警、機器學習等等,以滿足各種大數據的應用場景和解決方案。