ELK|Integrate Kafka with Logstash and Beats to Elasticsearch

瀏覽人次: 2428
2021-08-11 更新

kafka integrate beats and logstash



在服務企業的過程中,遇到一天 TB、PB
級的資料是常態,且可能在短時間內會有大量的 Log 資料寫入到 Elasticsearch
中,因此透過 Kafka
分擔工作量是很常使用的技巧之一,底下筆者將標準用法整理出來。

安裝

安裝 Kafka

wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0

啟動 Kafka 服務

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

設定 Filebeat

Filebeat 做為一個 producer 的角色,指定一個 topic 推播工作消息。

需要調整的參數有:Log 路徑、Kafka Hostname、Kafka Topic、帳號、密碼。

    filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /your/log/path
    
    output.kafka:
      topic: elk-lab
      required_acks: 1
      version: '1.0.0'
      partition.round_robin:
        reachable_only: false
      hosts:
        - "your-kafka-domain:9092"
      #username: ""
      #password: ""
      #ssl.enabled: true
      compression: none
      
    logging:
      level: debug
      to_files: true
      to_syslog: false
      files:
        path: /opt/filebeat/log
        name: eventhub.log
        keepfiles: 7
        rotateeverybytes: 10485760 # 10 MB    

設定 Logstash

Logstash 做為一個 consumer 的角色,可以處理多個 topics,並轉送至
elasticsearch。

    input {
        kafka {
          bootstrap_servers => "localhost:9092"
          topics => ["elk-lab"]
        }
      }
      
      output {
        elasticsearch {
          hosts => ["http://localhost:9200"]
          index => "logstash-kafka-%{+YYYY.MM.dd}"
        }
      } 

可以進一步參考 grok filter plugin 處理 Log 資料


建立 Index Pattern

選單中選取 Stack Management > Index Pattern > Create Index
Pattern

kafka index pattern

Discover it

建立完 Index Pattern 後,即可以在 Discover 中檢視資料。

kafka discover



有任何問題,或是想看新主題?

聯絡我們

快速跳轉目錄

✦ 集先鋒 Bimap – 企業建置高速穩定的海量日誌分析平台✦

集中不同的結構化資料和非結構化日誌,並進行關聯性的大數據整合,客製化儀表版、自訂事件告警、機器學習等等,以滿足各種大數據的應用場景和解決方案。