Table of Contents
Filebeat + Kafka + Logstash
어떠한 이유로 ELK 스택이 다운되었을 때, 저장하지 못한 로그를 보관하기 위해 Apache Kafka
를 이용합니다.
JAVA 8 이상이 필요합니다. 설치되어 있지 않으면 여기 를 참조해서 설치합니다.
ELK 스택이 필요합니다. 설치되어 있지 않다면 여기 를 참조해서 설치합니다.
Filebeat 가 설치되어 있지 않다면 여기 를 참조해서 설치합니다.
Kafka 가 설치되어 있지 않으면 여기 를 참조하여 설치합니다.
Kafka 로 로그 전송하기
기존에 filebeat 에서 logstash 로 직접 로그를 전송하던 방식에서, Kafka 를 거쳐 전송되도록 수정합니다.
우선 filebeat 가 Kafka 로 로그를 전송하도록 수정합니다.
sudo vi /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/*.log
exclude_files: ['\.gz$']
# scan_frequency: 10s
output.kafka:
hosts: ["<카프카서버아이피>:9092"]
codec.format:
string: '%{[message]}'
topic: 'skyer9.pe.kr'
partition.round_robin:
reachable_only: false
# output.logstash:
# hosts: ["172.31.31.196:5400"]
# # ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"]
# # ssl.certificate: "/etc/elk-certs/elk-ssl.crt"
# # ssl.key: "/etc/elk-certs/elk-ssl.key"
서비스를 재시작 합니다.
sudo systemctl restart filebeat
Logstash 에서 로그 수신하기
sudo vi /etc/logstash/conf.d/logstash-nginx-es.conf
input {
kafka {
bootstrap_servers => "<카프카서버아이피>:9092"
topics => ["skyer9.pe.kr"]
consumer_threads => 2
# group_id => "Your group"
}
}
filter {
grok {
match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
# geoip {
# source => "clientip"
# target => "geoip"
# add_tag => [ "nginx-geoip" ]
# }
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
useragent {
source => "agent"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "weblogs-%{+YYYY.MM.dd}"
document_type => "nginx_logs"
}
stdout { codec => rubydebug }
}
서비스를 재시작합니다.
sudo systemctl restart logstash
인덱스에 입력이 잘 안될 때는 인덱스를 삭제하고 다시 테스트 합니다.