Elasticsearch zero downtime reindex

By | 2020년 6월 28일
Table of Contents

Elasticsearch zero downtime reindex

JDBC 를 이용한 RDBMS 연동환경 에서의 무중단 리인덱싱입니다.

무중단 리인덱싱 전략

  1. 최초에 testindex_1 인덱스와 testindex 라는 alias 가 있습니다.

  2. testindex_타임스탬프 라는 인덱스를 생성합니다.

  3. reindex 합니다.

  4. alias 가 새로 생긴 testindex_타임스탬프 를 향하도록 변경합니다.

  5. 원래의 인덱스는 삭제합니다.

  6. reindex 도중에 발생하는 update 가 누락될 수 있으므로 디비를 업데이트 해서 변경분을 새로 생긴 인덱스에 재전송하도록 합니다.

python 확인

python3 -V
Python 3.6.9

sudo apt install python3-pip

pip3 install elasticsearch
pip3 install pymysql

pip3 install --upgrade requests

mappings.json 생성

cat mappings.json
{
  "settings" : {
    "number_of_shards": "5",
    "number_of_replicas": "1",
    "index":{
      "analysis":{
        "analyzer":{
          "korean":{
            "type":"custom",
            "tokenizer":"seunjeon_default_tokenizer",
            "filter" : ["lowercase", "synonym", "stopword"]
          },
          "whitespace": {
            "type": "custom",
            "tokenizer": "whitespace",
            "char_filter": [
              "sepcial_char_filter"
            ],
            "filter": [
              "lowercase",
              "stop",
              "snowball"
            ]
          },
          "standard": {
            "type": "custom",
            "tokenizer": "standard",
            "char_filter": [
              "sepcial_char_filter"
            ],
            "filter": [
              "lowercase",
              "stop",
              "snowball"
            ]
          }
        },
        "char_filter": {
          "sepcial_char_filter": {
            "type": "mapping",
            "mappings": [ "+ => _plus_", "- => _minus_", "# => _sharp_" ]
          }
        },
        "filter" : {
          "synonym" : {
            "type" : "synonym",
            "synonyms_path" : "synonyms.txt"
          },
          "stopword" : {
            "type" : "stop",
            "stopwords_path" : "stopwords.txt"
          }
        },
        "tokenizer": {
          "seunjeon_default_tokenizer": {
            "index_eojeol": "true",
            "user_dict_path": "user_dict.csv",
            "index_poses": [
                "UNK", "EP", "I", "J", "M",
                "N", "SL", "SH", "SN", "VCP",
                "XP", "XS", "XR"
            ],
            "decompound": "true",
            "type": "seunjeon_tokenizer"
          }
        }
      }
    }
  },
  "mappings" : {
    "_doc" : {
      "properties" : {
        "@timestamp" : {
          "type" : "date"
        },
        "itemid" : {
          "type" : "integer"
        },
        "itemname" : {
          "type" : "text",
          "fields": {
            "en": {
              "type": "text",
              "analyzer": "standard"
            },
            "ko": {
              "type": "text",
              "analyzer": "korean"
            }
          }
        },
        "category" : {
          "type" : "text",
          "analyzer": "korean"
        },
        "price" : {
          "type": "scaled_float",
          "scaling_factor": 10000
        },
        "lastupdate" : {
          "type" : "date"
        },
        "regdate" : {
          "type" : "date"
        }
      }
    }
  }
}

python 파일 생성

cat reindex.py
from elasticsearch import Elasticsearch
import pymysql
import json
from datetime import datetime

def _suffix():
    return datetime.now().strftime("%Y%m%d%H%M%S%f")

es = Elasticsearch('localhost:9200')
index = 'items_1'
alias = 'items'

with open('mappings.json', 'r') as f:
    mappings = json.load(f)

# if es.indices.exists(index=index):
#     pass
# else:
#     es.indices.create(index=index, body=mappings)
#     es.indices.update_aliases({
#         "actions": [
#             {"add": {"index": index, "alias": alias}}
#         ]
#     })

old_index, new_index = '', ''
for idx in es.indices.get('items*'):
    if old_index == '':
        old_index = idx
    elif new_index == '':
        new_index = idx

    if old_index != '' and new_index != '':
        print('reindexing in process...')
        quit()

new_index = '%s_%s' % (alias, _suffix())
es.indices.create(index=new_index, body=mappings)

now = datetime.now()

result = es.reindex({
        "source": { "index": old_index },
        "dest": { "index": new_index }
    }, wait_for_completion=True, request_timeout=600)

print('reindex finished.')

es.indices.update_aliases({
    "actions": [
        {"remove": { "index": old_index, "alias": alias }},
        {"add": { "index": new_index, "alias": alias }}
    ]
})

es.indices.delete(index=old_index, ignore=[400, 404])

conn = pymysql.connect(host='localhost', user='testuser', password='test1234',
                       db='db_test', charset='utf8')

sql = "update items set lastupdate = now() where lastupdate >= %s"

curs = conn.cursor()
curs.execute(sql, (now))
conn.commit()

conn.close()

답글 남기기