Newer
Older
Alessandro Cerioni
committed
from elasticsearch import Elasticsearch
from pymongo import MongoClient
import time
import msgpack
import pika
from utils.my_logging import logging
from utils.exit_gracefully import exit_gracefully
class NotEmptyQueueException(Exception):
pass
def main(cfg, wait=True):
from es_template import template
template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards']
template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
Alessandro Cerioni
committed
if wait:
if 'source_url' in cfg['reindexer'].keys():
es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60)
else:
es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
Alessandro Cerioni
committed
count = es_source.count(cfg['reindexer']['source_index']).get('count')
time.sleep(5)
es_source.indices.refresh(index=cfg['reindexer']['source_index'])
if es_source.count(cfg['reindexer']['source_index']).get('count') != count:
raise NotEmptyQueueException('Documents are still being pushed to the source index. Waiting...')
es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
try:
rep = es.indices.delete_template(cfg['reindexer']['template_name'])
logging.debug(rep)
except:
pass
rep = es.indices.put_template(cfg['reindexer']['template_name'], template)
# rep = es.indices.get_template("template_1")
logging.debug(rep)
body = { "source": {
#"remote": { "host": cfg['reindexer']['source_url'] },
Alessandro Cerioni
committed
"index": cfg['reindexer']['source_index'],
"type": "_doc",
Alessandro Cerioni
committed
},
"dest": {
"index": cfg['reindexer']['destination_index'],
"type": "_doc"
}
}
if 'source_url' in cfg['reindexer'].keys():
body['source']['remote'] = cfg['reindexer']['source_url']
Alessandro Cerioni
committed
Alessandro Cerioni
committed
rep = es.reindex(body, wait_for_completion=False) #, slices=24)#, timeout='120s')# timeout='120s', slices=1000, requests_per_second=-1)#, timeout='2m')
Alessandro Cerioni
committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
if 'task' in rep:
# logging the indexing session into MongoDB
mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
cfg['mongo']['password'],
cfg['mongo']['host'],
cfg['mongo']['port']))
mongo_db = mongo_client[cfg['mongo']['report-db']]
mongo_collection = mongo_db['indexing-sessions']
reindex_task_url = "%s/_tasks/%s" % (cfg['reindexer']['destination_url'], rep['task'])
res = mongo_collection.find_one_and_update({'_id': cfg['session']['id']}, {'$set': {'reindex_task': reindex_task_url}}, upsert=True)
if __name__ == '__main__':
import yaml
import time
import signal
signal.signal(signal.SIGINT, exit_gracefully)
with open("config.yaml", 'r') as yamlfile:
cfg = yaml.load(yamlfile)
try:
main(cfg, wait=True)
except pika.exceptions.ChannelClosed:
logging.info("Waiting for tasks...")
#time.sleep(5)
except Exception as e:
logging.error(e)
#time.sleep(5)