B i M A P

ELK|Integrate Kafka with Logstash and Beats to Elasticsearch

kafka integrate beats and logstash



在服務企業的過程中,遇到一天 TB、PB 級的資料是常態,且可能在短時間內會有大量的 Log 資料寫入到 Elasticsearch 中,因此透過 Kafka 分擔工作量是很常使用的技巧之一,底下筆者將標準用法整理出來。


安裝


安裝 Kafka

wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0

啟動 Kafka 服務

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties


設定 Filebeat

Filebeat 做為一個 producer 的角色,指定一個 topic 推播工作消息。

需要調整的參數有:Log 路徑、Kafka Hostname、Kafka Topic、帳號、密碼。

    filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /your/log/path
    
    output.kafka:
      topic: elk-lab
      required_acks: 1
      version: '1.0.0'
      partition.round_robin:
        reachable_only: false
      hosts:
        - "your-kafka-domain:9092"
      #username: ""
      #password: ""
      #ssl.enabled: true
      compression: none
      
    logging:
      level: debug
      to_files: true
      to_syslog: false
      files:
        path: /opt/filebeat/log
        name: eventhub.log
        keepfiles: 7
        rotateeverybytes: 10485760 # 10 MB    



設定 Logstash

Logstash 做為一個 consumer 的角色,可以處理多個 topics,並轉送至 elasticsearch。

    input {
        kafka {
          bootstrap_servers => "localhost:9092"
          topics => ["elk-lab"]
        }
      }
      
      output {
        elasticsearch {
          hosts => ["http://localhost:9200"]
          index => "logstash-kafka-%{+YYYY.MM.dd}"
        }
      } 

可以進一步參考 grok filter plugin 處理 Log 資料




建立 Index Pattern

選單中選取 Stack Management > Index Pattern > Create Index Pattern

kafka index pattern


Discover it

建立完 Index Pattern 後,即可以在 Discover 中檢視資料。

kafka discover



有任何問題,或是想看新主題? 聯絡我們

延伸閱讀
winstonlu的大頭照
ELK 達人

我們致力於 ELK 的各種應用,協助企業建置相關服務。我們也提供基於 ELK 的各種解決方案,有任何問題,歡迎加入我們的官方 Line,或來信詢問,期待與您面對面的機會。