Skip to content
Snippets Groups Projects
8-reindexer.py 3.47 KiB
Newer Older
  • Learn to ignore specific revisions
  • from elasticsearch import Elasticsearch
    from pymongo import MongoClient
    import time
    import msgpack
    import pika
    from utils.my_logging import logging
    from utils.exit_gracefully import exit_gracefully
    
    class NotEmptyQueueException(Exception):
        pass
    
    def main(cfg, wait=True):
    
        from es_template import template
        template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards']
        template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
        
    
    
            if 'source_url' in cfg['reindexer'].keys():
                es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60)
            else:
                es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
    
    
            count = es_source.count(cfg['reindexer']['source_index']).get('count')
    
            time.sleep(5)
    
            es_source.indices.refresh(index=cfg['reindexer']['source_index'])
    
            if es_source.count(cfg['reindexer']['source_index']).get('count') != count:
                raise NotEmptyQueueException('Documents are still being pushed to the source index. Waiting...')
    
    
        es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
    
        try:
          rep = es.indices.delete_template(cfg['reindexer']['template_name'])
          logging.debug(rep)
        except:
          pass
    
        rep = es.indices.put_template(cfg['reindexer']['template_name'], template)
        # rep = es.indices.get_template("template_1")
        logging.debug(rep)
    
    
    
        body = {    "source": {
    
                                #"remote": { "host": cfg['reindexer']['source_url'] },
    
                                "index": cfg['reindexer']['source_index'],
                                "type": "_doc",
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                                "size": 1000
    
                    },
                    "dest": {
                                "index": cfg['reindexer']['destination_index'],
                                "type": "_doc"
                    }
                }
    
    
        if 'source_url' in cfg['reindexer'].keys():
            body['source']['remote'] = cfg['reindexer']['source_url']
    
        logging.debug(body)
    
        rep = es.reindex(body, wait_for_completion=False) #, slices=24)#, timeout='120s')# timeout='120s', slices=1000, requests_per_second=-1)#, timeout='2m')
    
    
        if 'task' in rep:
            # logging the indexing session into MongoDB
            mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
                                                                   cfg['mongo']['password'],
                                                                   cfg['mongo']['host'],
                                                                   cfg['mongo']['port']))
    
            mongo_db = mongo_client[cfg['mongo']['report-db']]
            mongo_collection = mongo_db['indexing-sessions']
    
            reindex_task_url = "%s/_tasks/%s" % (cfg['reindexer']['destination_url'], rep['task'])
    
            res = mongo_collection.find_one_and_update({'_id': cfg['session']['id']}, {'$set': {'reindex_task': reindex_task_url}}, upsert=True)
    
    
    if __name__ == '__main__':
    
        import yaml
        import time
        import signal
        signal.signal(signal.SIGINT, exit_gracefully)
    
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        try:
            main(cfg, wait=True)
        except pika.exceptions.ChannelClosed:
            logging.info("Waiting for tasks...")
            #time.sleep(5)
        except Exception as e:
            logging.error(e)
            #time.sleep(5)