-
Alessandro Cerioni authored
- table whitelist - analyzing tables and views
Alessandro Cerioni authored- table whitelist - analyzing tables and views
8-reindexer.py 3.47 KiB
from elasticsearch import Elasticsearch
from pymongo import MongoClient
import time
import msgpack
import pika
from utils.my_logging import logging
from utils.exit_gracefully import exit_gracefully
class NotEmptyQueueException(Exception):
pass
def main(cfg, wait=True):
from es_template import template
template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
template['settings']['number_of_shards'] = [ cfg['reindexer']['number_of_shards'] ]
template['settings']['number_of_replicas'] = [ cfg['reindexer']['number_of_replicas'] ]
if wait:
if 'source_url' in cfg['reindexer'].keys():
es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60)
else:
es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
count = es_source.count(cfg['reindexer']['source_index']).get('count')
time.sleep(5)
es_source.indices.refresh(index=cfg['reindexer']['source_index'])
if es_source.count(cfg['reindexer']['source_index']).get('count') != count:
raise NotEmptyQueueException('Documents are still being pushed to the source index. Waiting...')
es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
try:
rep = es.indices.delete_template(cfg['reindexer']['template_name'])
logging.debug(rep)
except:
pass
rep = es.indices.put_template(cfg['reindexer']['template_name'], template)
# rep = es.indices.get_template("template_1")
logging.debug(rep)
body = { "source": {
#"remote": { "host": cfg['reindexer']['source_url'] },
"index": cfg['reindexer']['source_index'],
"type": "_doc",
"size": 1000
},
"dest": {
"index": cfg['reindexer']['destination_index'],
"type": "_doc"
}
}
if 'source_url' in cfg['reindexer'].keys():
body['source']['remote'] = cfg['reindexer']['source_url']
logging.debug(body)
rep = es.reindex(body, wait_for_completion=False) #, slices=24)#, timeout='120s')# timeout='120s', slices=1000, requests_per_second=-1)#, timeout='2m')
logging.info(rep)
if 'task' in rep:
# logging the indexing session into MongoDB
mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
cfg['mongo']['password'],
cfg['mongo']['host'],
cfg['mongo']['port']))
mongo_db = mongo_client[cfg['mongo']['report-db']]
mongo_collection = mongo_db['indexing-sessions']
reindex_task_url = "%s/_tasks/%s" % (cfg['reindexer']['destination_url'], rep['task'])
res = mongo_collection.find_one_and_update({'_id': cfg['session']['id']}, {'$set': {'reindex_task': reindex_task_url}}, upsert=True)
if __name__ == '__main__':
import yaml
import time
import signal
signal.signal(signal.SIGINT, exit_gracefully)
with open("config.yaml", 'r') as yamlfile:
cfg = yaml.load(yamlfile)
try:
main(cfg, wait=True)
except pika.exceptions.ChannelClosed:
logging.info("Waiting for tasks...")
#time.sleep(5)
except Exception as e:
logging.error(e)
#time.sleep(5)