Table of Content
JDBC 를 이용한 Elasticsearch RDBMS 연동
Logstash 와 jdbc 를 이용해 Elasticsearch RDBMS 사이의 데이타를 동기화 합니다.
운영환경에서 사용하기 위해서는 Bulk Insert
를 병행해서 사용해야 하지만 문서의 양을 줄이기 위해 생략합니다.
또, JDK, ES, Logstash, MySQL 의 설치는 생략합니다.
참조 사이트
- https://peung.tistory.com/m/13
- https://www.elastic.co/kr/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash
MySQL 설정
CREATE DATABASE db_test DEFAULT CHARACTER SET UTF8 COLLATE UTF8_GENERAL_CI;
USE db_test;
DROP TABLE IF EXISTS items;
CREATE TABLE items (
itemid BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (itemid),
UNIQUE KEY unique_id (itemid),
itemname VARCHAR(512) NOT NULL,
category VARCHAR(128) NULL,
price DECIMAL(13, 4) NULL,
lastupdate TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
regdate TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO items (itemid, itemname) VALUES (1, '블랙 아이폰 케이스');
itemid
가 PK 로 ES 의 document id
와 매칭됩니다. lastupdate 가 변경된 내역만 동기화됩니다.
Elasticsearch 설정
인덱스는 생성해 주지 않아도, Logstash 가 자동생성하지만 수기로 생성해주는 것이 안전합니다.
curl -XDELETE http://localhost:9200/items?pretty
curl -XPUT http://localhost:9200/items?pretty -H 'Content-Type: application/json' -d '{
"settings" : {
"number_of_shards": "5",
"number_of_replicas": "1",
"index":{
"analysis":{
"analyzer":{
"korean":{
"type":"custom",
"tokenizer":"seunjeon_default_tokenizer"
}
},
"tokenizer": {
"seunjeon_default_tokenizer": {
"index_eojeol": "true",
"user_words": [
"텐바이텐", "엠디", "에어팟", "러브플라보", "씰스티커",
"1+1"
],
"index_poses": [
"UNK", "EP", "I", "J", "M",
"N", "SL", "SH", "SN", "VCP",
"XP", "XS", "XR"
],
"decompound": "true",
"type": "seunjeon_tokenizer"
}
}
}
}
},
"mappings" : {
"_doc" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"itemid" : {
"type" : "integer"
},
"itemname" : {
"type" : "text",
"analyzer": "korean"
},
"category" : {
"type" : "text",
"analyzer": "korean"
},
"price" : {
"type": "scaled_float",
"scaling_factor": 10000
},
"lastupdate" : {
"type" : "date"
},
"regdate" : {
"type" : "date"
}
}
}
}
}'
Logstash 설정
로그스태시 데몬이 스케줄에 맞춰 jdbc 를 호출합니다.
sudo mkdir /last_run
sudo chown logstash:logstash /last_run
sudo vi /etc/logstash/conf.d/logstash-jdbc-es.conf
동기화 해야할 인덱스가 여러 개일 경우 last_run_metadata_path
를 지정해 스케줄마다 각각 sql_last_value
를 저장할 수 있게 해주어야 합니다.
input {
jdbc {
jdbc_driver_library => "/home/ubuntu/mysql-connector-java-8.0.20.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_test?zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Seoul"
jdbc_user => "testuser"
jdbc_password => "test1234"
jdbc_paging_enabled => true
jdbc_page_size => 100000
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(lastupdate) AS unix_ts_in_secs FROM items WHERE (UNIX_TIMESTAMP(lastupdate) > :sql_last_value AND lastupdate < NOW()) ORDER BY lastupdate ASC"
last_run_metadata_path => "/last_run/logstash-jdbc-es"
}
}
filter {
mutate {
copy => { "itemid" => "[@metadata][_id]"}
remove_field => ["@version", "unix_ts_in_secs"]
}
}
output {
# stdout { codec => "rubydebug"}
elasticsearch {
hosts => ["localhost:9200"]
index => "items"
document_id => "%{[@metadata][_id]}"
document_type => "_doc"
}
}
테스트하기
use db_test;
update items set itemname = '블랙 아이폰 케이스1' where itemid = 1;
curl -XGET http://localhost:9200/items/_search?pretty
curl -XGET http://localhost:9200/items/doc/1?pretty
curl -XGET http://localhost:9200/items/_search?pretty -H 'Content-Type: application/json' -d '{
"query": {
"match": {
"itemname": "아이폰"
}
}
}'