Skip to content
Snippets Groups Projects
8-incremental-reindexer.py 7.91 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import time
    import json
    import msgpack
    import pika
    from elasticsearch import Elasticsearch, NotFoundError
    from utils.my_logging import logging
    from utils.exit_gracefully import exit_gracefully
    
    class NotEmptyQueueException(Exception):
        pass
    
    
    def create_sampling_task(cfg, uuid, reindex_task_url):
    
        # here-below we generate a task for the sample generator (full -> meta)
        msg = dict()
        msg['header'] = dict()
        msg['header']['cfg'] = cfg
        msg['header']['reindex_task_url'] = reindex_task_url
        msg['body'] = uuid
    
        the_body = msgpack.packb(msg, use_bin_type=True)
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
        channel = connection.channel()
        exchange = cfg['rabbitmq']['exchange']
    
        queue_name  = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_6_suffix']
        routing_key = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_6_suffix']
    
        channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct')
        channel.queue_declare(queue=queue_name, durable=True)
        channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key)
    
        channel.basic_publish( exchange=exchange,
                               routing_key=routing_key,
                               body=the_body,
                               properties=pika.BasicProperties(delivery_mode = 2)
                             )
    
        connection.close()
    
    
        return
    
    def on_msg_callback(channel, method, properties, body):
    
    
        decoded_body = msgpack.unpackb(body, raw=False)
        cfg = decoded_body['header']['cfg']
        uuid = decoded_body['body']
    
        from es_template import template
        template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
        template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards']
        template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
    
        if 'source_url' in cfg['reindexer'].keys():
            es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60)
        else:
            es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
    
        es_logger = logging.getLogger('elasticsearch')
        es_logger.setLevel(logging.INFO)
    
        the_query = dict()
        the_query['query'] = dict()
        the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)}
    
        es_source.indices.refresh(index=cfg['reindexer']['source_index'])
        count1 = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count')
    
        logging.debug("%i document(s) found in the source index with uuid = %s" % (count1, uuid))
    
            logging.debug("Waiting for 30 seconds before counting again...")
            time.sleep(30)
    
            es_source.indices.refresh(index=cfg['reindexer']['source_index'])
            count2 = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count')
    
            logging.debug("%i document(s) found in the source index with uuid = %s" % (count2, uuid))
    
    
            if count1 != count2 or count2 == 0:
    
                logging.warning('Documents are still being pushed to the source index. Waiting...')
                time.sleep(5)
                channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
                return
                #raise NotEmptyQueueException('Documents are still being pushed to the source index. Waiting...')
    
        elif uuid.endswith('.meta'):
    
            if count1 != 1:
    
                logging.warning('Documents are still being pushed to the source index. Waiting...')
                time.sleep(5)
                channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
                return
    
        else:
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
    
            logging.error("The uuid ends neither with .full nor with .meta. What shall I do?")
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            return
    
        # 1. remove already existing docs from destination index
        logging.info("Removing dataset with uuid = %s from the destination index..." % uuid)
    
        es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
        index = cfg['reindexer']['destination_index']
    
    
        try:
            es.indices.refresh(index=index)
        except NotFoundError:
            # the destination index may not be already present
            pass
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        the_query = dict()
        the_query['query'] = dict()
        the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)}
    
        try:
            res = es.delete_by_query(index, doc_type='_doc', body=the_query)
            logging.debug(res)
        except NotFoundError:
            pass
        except Exception as e:
            logging.error(e)
            channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
            return
    
        # 2. trigger reindexation
        try:
          rep = es.indices.delete_template(cfg['reindexer']['template_name'])
          logging.debug(rep)
        except:
          pass
    
        rep = es.indices.put_template(cfg['reindexer']['template_name'], template)
        # rep = es.indices.get_template("template_1")
        logging.debug(rep)
    
    
    
        body = {
            "source": {
                "index": cfg['reindexer']['source_index'],
                "query": {
                    "term": {"uuid.keyword": '{0}'.format(uuid)}
                },
                "type": "_doc",
                "size": 1000
            },
            "dest": {
                "index": cfg['reindexer']['destination_index'],
                "type": "_doc"
            }
        }
    
        if 'source_url' in cfg['reindexer'].keys():
            body['source']['remote'] = {'host': cfg['reindexer']['source_url']}
    
        rep = es.reindex(body, wait_for_completion=False)
    
        logging.debug(rep)
    
        if 'task' in rep:
            channel.basic_ack(delivery_tag = method.delivery_tag)
            #print("")
            reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task'])
            logging.info("Created reindex task: {0}".format(reindex_task_url))
    
            # 3. create sampling task (full -> meta)
            if '.full' in uuid:
                create_sampling_task(cfg, uuid, reindex_task_url)
    
    
    
        else:
            channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
            #print("")
            logging.error(json.dumps(rep, indent=4))
            logging.error("Failed")
    
    
        return
    
    
    def main(cfg):
    
        from utils.close_connection import on_timeout
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port']))
        timeout = 5
        connection.add_timeout(timeout, on_timeout(connection))
        channel = connection.channel()
        exchange    = cfg['rabbitmq_exchange']
        # the queue this program will consume messages from:
        reindex_tasks_to_create_qn = cfg['rabbitmq_queue']
    
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(lambda ch, method, properties, body: on_msg_callback(ch, method, properties, body), queue=reindex_tasks_to_create_qn)
    
        channel.start_consuming()
    
        connection.close()
    
        return
    
    
    if __name__ == '__main__':
    
        import yaml
        import time
        import signal
        import argparse
    
        signal.signal(signal.SIGINT, exit_gracefully)
    
        parser = argparse.ArgumentParser(description='Incremental reindexer')
    
        parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672)
    
        parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True)
        parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        args = parser.parse_args()
    
        cfg = dict()
        cfg['rabbitmq_host'] = args.host
        cfg['rabbitmq_port'] = args.port
        cfg['rabbitmq_exchange'] = args.exchange
        cfg['rabbitmq_queue'] = args.queue
    
        logging.info('Starting incremental reindexer...')
    
        while True:
            try:
                main(cfg)
            except pika.exceptions.ChannelClosed:
                logging.info("Waiting for tasks...")
                time.sleep(5)
            except Exception as e:
                logging.error(e)
                time.sleep(5)