from elasticsearch import Elasticsearch from pymongo import MongoClient import time import msgpack import pika from utils.my_logging import logging from utils.exit_gracefully import exit_gracefully class NotEmptyQueueException(Exception): pass def main(cfg, wait=True): from es_template import template template['index_patterns'] = [ cfg['reindexer']['destination_index'] ] template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards'] template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas'] if wait: if 'source_url' in cfg['reindexer'].keys(): es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) else: es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) count = es_source.count(cfg['reindexer']['source_index']).get('count') time.sleep(5) es_source.indices.refresh(index=cfg['reindexer']['source_index']) if es_source.count(cfg['reindexer']['source_index']).get('count') != count: raise NotEmptyQueueException('Documents are still being pushed to the source index. Waiting...') es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) try: rep = es.indices.delete_template(cfg['reindexer']['template_name']) logging.debug(rep) except: pass rep = es.indices.put_template(cfg['reindexer']['template_name'], template) # rep = es.indices.get_template("template_1") logging.debug(rep) body = { "source": { #"remote": { "host": cfg['reindexer']['source_url'] }, "index": cfg['reindexer']['source_index'], "type": "_doc", "size": 1000 }, "dest": { "index": cfg['reindexer']['destination_index'], "type": "_doc" } } if 'source_url' in cfg['reindexer'].keys(): body['source']['remote'] = cfg['reindexer']['source_url'] logging.debug(body) rep = es.reindex(body, wait_for_completion=False) #, slices=24)#, timeout='120s')# timeout='120s', slices=1000, requests_per_second=-1)#, timeout='2m') logging.info(rep) if 'task' in rep: # logging the indexing session into MongoDB mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'], cfg['mongo']['password'], cfg['mongo']['host'], cfg['mongo']['port'])) mongo_db = mongo_client[cfg['mongo']['report-db']] mongo_collection = mongo_db['indexing-sessions'] reindex_task_url = "%s/_tasks/%s" % (cfg['reindexer']['destination_url'], rep['task']) res = mongo_collection.find_one_and_update({'_id': cfg['session']['id']}, {'$set': {'reindex_task': reindex_task_url}}, upsert=True) if __name__ == '__main__': import yaml import time import signal signal.signal(signal.SIGINT, exit_gracefully) with open("config.yaml", 'r') as yamlfile: cfg = yaml.load(yamlfile) try: main(cfg, wait=True) except pika.exceptions.ChannelClosed: logging.info("Waiting for tasks...") #time.sleep(5) except Exception as e: logging.error(e) #time.sleep(5)