Table of Content
Elasticsearch zero downtime reindex
JDBC 를 이용한 RDBMS 연동환경 에서의 무중단 리인덱싱입니다.
무중단 리인덱싱 전략
-
최초에
testindex_1
인덱스와testindex
라는 alias 가 있습니다. -
testindex_타임스탬프
라는 인덱스를 생성합니다. -
reindex
합니다. -
alias 가 새로 생긴
testindex_타임스탬프
를 향하도록 변경합니다. -
원래의 인덱스는 삭제합니다.
-
reindex 도중에 발생하는 update 가 누락될 수 있으므로 디비를 업데이트 해서 변경분을 새로 생긴 인덱스에 재전송하도록 합니다.
python 확인
python3 -V
Python 3.6.9
sudo apt install python3-pip
pip3 install elasticsearch
pip3 install pymysql
pip3 install --upgrade requests
mappings.json 생성
cat mappings.json
{
"settings" : {
"number_of_shards": "5",
"number_of_replicas": "1",
"index":{
"analysis":{
"analyzer":{
"korean":{
"type":"custom",
"tokenizer":"seunjeon_default_tokenizer",
"filter" : ["lowercase", "synonym", "stopword"]
},
"whitespace": {
"type": "custom",
"tokenizer": "whitespace",
"char_filter": [
"sepcial_char_filter"
],
"filter": [
"lowercase",
"stop",
"snowball"
]
},
"standard": {
"type": "custom",
"tokenizer": "standard",
"char_filter": [
"sepcial_char_filter"
],
"filter": [
"lowercase",
"stop",
"snowball"
]
}
},
"char_filter": {
"sepcial_char_filter": {
"type": "mapping",
"mappings": [ "+ => _plus_", "- => _minus_", "# => _sharp_" ]
}
},
"filter" : {
"synonym" : {
"type" : "synonym",
"synonyms_path" : "synonyms.txt"
},
"stopword" : {
"type" : "stop",
"stopwords_path" : "stopwords.txt"
}
},
"tokenizer": {
"seunjeon_default_tokenizer": {
"index_eojeol": "true",
"user_dict_path": "user_dict.csv",
"index_poses": [
"UNK", "EP", "I", "J", "M",
"N", "SL", "SH", "SN", "VCP",
"XP", "XS", "XR"
],
"decompound": "true",
"type": "seunjeon_tokenizer"
}
}
}
}
},
"mappings" : {
"_doc" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"itemid" : {
"type" : "integer"
},
"itemname" : {
"type" : "text",
"fields": {
"en": {
"type": "text",
"analyzer": "standard"
},
"ko": {
"type": "text",
"analyzer": "korean"
}
}
},
"category" : {
"type" : "text",
"analyzer": "korean"
},
"price" : {
"type": "scaled_float",
"scaling_factor": 10000
},
"lastupdate" : {
"type" : "date"
},
"regdate" : {
"type" : "date"
}
}
}
}
}
python 파일 생성
cat reindex.py
from elasticsearch import Elasticsearch
import pymysql
import json
from datetime import datetime
def _suffix():
return datetime.now().strftime("%Y%m%d%H%M%S%f")
es = Elasticsearch('localhost:9200')
index = 'items_1'
alias = 'items'
with open('mappings.json', 'r') as f:
mappings = json.load(f)
# if es.indices.exists(index=index):
# pass
# else:
# es.indices.create(index=index, body=mappings)
# es.indices.update_aliases({
# "actions": [
# {"add": {"index": index, "alias": alias}}
# ]
# })
old_index, new_index = '', ''
for idx in es.indices.get('items*'):
if old_index == '':
old_index = idx
elif new_index == '':
new_index = idx
if old_index != '' and new_index != '':
print('reindexing in process...')
quit()
new_index = '%s_%s' % (alias, _suffix())
es.indices.create(index=new_index, body=mappings)
now = datetime.now()
result = es.reindex({
"source": { "index": old_index },
"dest": { "index": new_index }
}, wait_for_completion=True, request_timeout=600)
print('reindex finished.')
es.indices.update_aliases({
"actions": [
{"remove": { "index": old_index, "alias": alias }},
{"add": { "index": new_index, "alias": alias }}
]
})
es.indices.delete(index=old_index, ignore=[400, 404])
conn = pymysql.connect(host='localhost', user='testuser', password='test1234',
db='db_test', charset='utf8')
sql = "update items set lastupdate = now() where lastupdate >= %s"
curs = conn.cursor()
curs.execute(sql, (now))
conn.commit()
conn.close()