JDBC 를 이용한 Elasticsearch RDBMS 연동

By | 2020년 6월 20일
Table of Contents

JDBC 를 이용한 Elasticsearch RDBMS 연동

Logstash 와 jdbc 를 이용해 Elasticsearch RDBMS 사이의 데이타를 동기화 합니다.

운영환경에서 사용하기 위해서는 Bulk Insert 를 병행해서 사용해야 하지만 문서의 양을 줄이기 위해 생략합니다.

또, JDK, ES, Logstash, MySQL 의 설치는 생략합니다.

참조 사이트

MySQL 설정

CREATE DATABASE db_test DEFAULT CHARACTER SET UTF8 COLLATE UTF8_GENERAL_CI;

USE db_test;

DROP TABLE IF EXISTS items;

CREATE TABLE items (
    itemid BIGINT(20) UNSIGNED NOT NULL,
    PRIMARY KEY (itemid),
    UNIQUE KEY unique_id (itemid),
    itemname VARCHAR(512) NOT NULL,
    category VARCHAR(128) NULL,
    price DECIMAL(13, 4) NULL,
    lastupdate TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    regdate TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO items (itemid, itemname) VALUES (1, '블랙 아이폰 케이스');

itemid 가 PK 로 ES 의 document id 와 매칭됩니다. lastupdate 가 변경된 내역만 동기화됩니다.

Elasticsearch 설정

인덱스는 생성해 주지 않아도, Logstash 가 자동생성하지만 수기로 생성해주는 것이 안전합니다.

curl -XDELETE http://localhost:9200/items?pretty

curl -XPUT http://localhost:9200/items?pretty -H 'Content-Type: application/json' -d '{
  "settings" : {
    "number_of_shards": "5",
    "number_of_replicas": "1",
    "index":{
      "analysis":{
        "analyzer":{
          "korean":{
            "type":"custom",
            "tokenizer":"seunjeon_default_tokenizer"
          }
        },
        "tokenizer": {
          "seunjeon_default_tokenizer": {
            "index_eojeol": "true",
            "user_words": [
              "텐바이텐", "엠디", "에어팟", "러브플라보", "씰스티커",
              "1+1"
            ],
            "index_poses": [
                "UNK", "EP", "I", "J", "M",
                "N", "SL", "SH", "SN", "VCP",
                "XP", "XS", "XR"
            ],
            "decompound": "true",
            "type": "seunjeon_tokenizer"
          }
        }
      }
    }
  },
  "mappings" : {
    "_doc" : {
      "properties" : {
        "@timestamp" : {
          "type" : "date"
        },
        "itemid" : {
          "type" : "integer"
        },
        "itemname" : {
          "type" : "text",
          "analyzer": "korean"
        },
        "category" : {
          "type" : "text",
          "analyzer": "korean"
        },
        "price" : {
          "type": "scaled_float",
          "scaling_factor": 10000
        },
        "lastupdate" : {
          "type" : "date"
        },
        "regdate" : {
          "type" : "date"
        }
      }
    }
  }
}'

Logstash 설정

로그스태시 데몬이 스케줄에 맞춰 jdbc 를 호출합니다.

sudo mkdir /last_run
sudo chown logstash:logstash /last_run
sudo vi /etc/logstash/conf.d/logstash-jdbc-es.conf

동기화 해야할 인덱스가 여러 개일 경우 last_run_metadata_path 를 지정해 스케줄마다 각각 sql_last_value 를 저장할 수 있게 해주어야 합니다.

input {
    jdbc {
        jdbc_driver_library => "/home/ubuntu/mysql-connector-java-8.0.20.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/db_test?zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Seoul"
        jdbc_user => "testuser"
        jdbc_password => "test1234"
        jdbc_paging_enabled => true
        jdbc_page_size => 100000
        tracking_column => "unix_ts_in_secs"
        use_column_value => true
        tracking_column_type => "numeric"
        schedule => "*/5 * * * * *"
        statement => "SELECT *, UNIX_TIMESTAMP(lastupdate) AS unix_ts_in_secs FROM items WHERE (UNIX_TIMESTAMP(lastupdate) > :sql_last_value AND lastupdate < NOW()) ORDER BY lastupdate ASC"
        last_run_metadata_path => "/last_run/logstash-jdbc-es"
    }
}

filter {
    mutate {
        copy => { "itemid" => "[@metadata][_id]"}
        remove_field => ["@version", "unix_ts_in_secs"]
    }
}

output {
    # stdout { codec =>  "rubydebug"}
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "items"
        document_id => "%{[@metadata][_id]}"
        document_type => "_doc"
    }
}

테스트하기

use db_test;
update items set itemname = '블랙 아이폰 케이스1' where itemid = 1;
curl -XGET http://localhost:9200/items/_search?pretty
curl -XGET http://localhost:9200/items/doc/1?pretty

curl -XGET http://localhost:9200/items/_search?pretty -H 'Content-Type: application/json' -d '{
  "query": {
    "match": {
      "itemname": "아이폰"
    }
  }
}'

답글 남기기