通常 Log
以最原始的方式呈現時,往往會缺少一些可讀性,因為通常透過設備,或系統級 Log
所拋出的訊息中,不會夾帶讓用戶更好識別的額外資訊,所以在業界的實務上,都必需要額外針對
Log 進行加工處理,讓 Log 更具可讀性。
一般我們可以透過 Logstash Plugin
進行處理,例如:mutate、csv、kv、grok、gsub 等等,但這一次我們以 CIDR
的網段為例子,帶大家了解如果透過內建 的 plugin 不好處理時,可以怎麼自行撰寫
ruby,並透過 ruby plugin 讓 logstash 來幫我們進行邏輯處理。
新增 Logstash 的 Ruby 腳本
以下為 ruby 的基本 template:
def register(params)
end
def filter(event)
return [event]
end
定義 input 參數
params 讓 logstash 可以將用戶的參數帶給我們後續使用:
def register(params)
@cidr_file_paths = params["cidr_file_paths"]
@source = params["source"]
@target = params["target"]
end
logstash 即可以這樣寫:
ruby {
path => "/etc/logstash/cidrRuby/cidrmapping.rb"
script_params => {
"cidr_file_paths" => ["/etc/logstash/cidrRuby/cidr.csv"]
"source" => "destinationAddress"
"target" => "dest_ip_cidr_desc"
}
}
Ruby 加入必需套件
我們會需要用到 ipaddr、csv 兩個套件,幫助我們進行 ip 判斷
require "ipaddr"
require "csv"
Ruby 重點說明
透過 csv 套件將 mapping
文件載入,我們這次是以陣列的方式來處理,所以可以讀取多份 csv
文件,但需要注意太多份 csv 檔,會降低 logstash 的處理速度,並讓 cpu
高度滿載
CSV.parse(File.read(cidr_file_path), headers: true)
透過 IPAddr 來進行 IP 類型的轉換,讓判斷上比較方便
net = IPAddr.new(cidr[2].strip)
透過 include 的 method,將 csv 文件的 cidr 網段與 source 欄位的
ip,進行判斷
可以透過 event.get、event.set 對資料進行取得或設置
net.include?(IPAddr.new(event.get('['+@source+']').strip))
CSV 範例
集先鋒科技-台北,10.99.1.1/24
集先鋒科技-台中,10.99.2.1/24
集先鋒科技-高雄,10.99.3.1/24
完整程式碼
require "ipaddr"
require 'csv'
def register(params)
@cidr_file_paths = params["cidr_file_paths"]
@source = params["source"]
@target = params["target"]
end
# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
match = false
for cidr_file_path in @cidr_file_paths do
cidr_table = CSV.parse(File.read(cidr_file_path), headers: true)
cidr_desc = ""
for cidr in cidr_table do
begin
if event.include?(@source)
net = IPAddr.new(cidr[2].strip)
if net.include?(IPAddr.new(event.get('['+@source+']').strip))
match = true
cidr_desc = cidr[0].strip
event.set(@target, cidr_desc)
break
end
end
rescue => error
# p error.message
end
end
if match
break
end
end
if !match
if event.include?(@source)
event.set(@target, "外網")
end
end
return [event]
end