Filebeat + Kafka + Logstash

By | 2020년 3월 14일
Table of Contents

Filebeat + Kafka + Logstash

어떠한 이유로 ELK 스택이 다운되었을 때, 저장하지 못한 로그를 보관하기 위해 Apache Kafka 를 이용합니다.

JAVA 8 이상이 필요합니다. 설치되어 있지 않으면 여기 를 참조해서 설치합니다.

ELK 스택이 필요합니다. 설치되어 있지 않다면 여기 를 참조해서 설치합니다.

Filebeat 가 설치되어 있지 않다면 여기 를 참조해서 설치합니다.

Kafka 가 설치되어 있지 않으면 여기 를 참조하여 설치합니다.

Kafka 로 로그 전송하기

기존에 filebeat 에서 logstash 로 직접 로그를 전송하던 방식에서, Kafka 를 거쳐 전송되도록 수정합니다.

우선 filebeat 가 Kafka 로 로그를 전송하도록 수정합니다.

sudo vi /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  paths:
    - /var/log/nginx/*.log
  exclude_files: ['\.gz$']
  # scan_frequency: 10s

output.kafka:
  hosts: ["<카프카서버아이피>:9092"]
  codec.format:
    string: '%{[message]}'
  topic: 'skyer9.pe.kr'
  partition.round_robin:
    reachable_only: false

# output.logstash:
#   hosts: ["172.31.31.196:5400"]
#   # ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"]
#   # ssl.certificate: "/etc/elk-certs/elk-ssl.crt"
#   # ssl.key: "/etc/elk-certs/elk-ssl.key"

서비스를 재시작 합니다.

sudo systemctl restart filebeat

Logstash 에서 로그 수신하기

sudo vi /etc/logstash/conf.d/logstash-nginx-es.conf
input {
    kafka {
        bootstrap_servers => "<카프카서버아이피>:9092"
            topics => ["skyer9.pe.kr"]
            consumer_threads => 2
            # group_id => "Your group"
    }
}

filter {
    grok {
        match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
        overwrite => [ "message" ]
    }
    mutate {
        convert => ["response", "integer"]
        convert => ["bytes", "integer"]
        convert => ["responsetime", "float"]
    }
    # geoip {
    #     source => "clientip"
    #     target => "geoip"
    #     add_tag => [ "nginx-geoip" ]
    # }
    date {
        match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
        remove_field => [ "timestamp" ]
    }
    useragent {
        source => "agent"
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "weblogs-%{+YYYY.MM.dd}"
        document_type => "nginx_logs"
    }
    stdout { codec => rubydebug }
}

서비스를 재시작합니다.

sudo systemctl restart logstash

인덱스에 입력이 잘 안될 때는 인덱스를 삭제하고 다시 테스트 합니다.

답글 남기기