Skip to content
Snippets Groups Projects
main.py 13.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import requests
    
    from elasticsearch import Elasticsearch, NotFoundError, AuthorizationException
    
    from copy import deepcopy
    
    from collections import OrderedDict
    
    from lib.geonetwork_helper import get_metadata_records
    
    from lib.exit_gracefully import exit_gracefully
    from lib.my_logging import logging
    
    from lib.locker import lock, unlock, is_locked
    
    def setup_indices(cfg):
    
        source_es = Elasticsearch([cfg['indexer']['url']], timeout=60)
        source_index = cfg['indexer']['index']
    
        es_body = { "settings" : {
                        "number_of_shards" : 1,
                        "number_of_replicas" : 0,
                        "index.mapping.total_fields.limit": 10000,
                        "refresh_interval": "30s"
                        },
                    "mappings": {
                        "_doc": {
                            "dynamic_templates": [ # priority is given by order!
                                {
                                    "uuid" : {
                                        "path_match": "uuid",
                                        "mapping": {
                                                "type": "text",
                                                "index": False,
                                                "fields": {
                                                    "keyword": {
                                                        "type": "keyword"
                                                    }
                                                }
                                            }
                                        }
                                },
                                {
                                    "default" : {
                                        "path_match": "*",
                                        "mapping": {
                                            "enabled": "false"
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
    
        try:
    
            es_body['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
    
            rep = source_es.indices.create(source_index, es_body)#, wait_for_active_shards=0)
        except Exception as e:
            logging.error(e)
    
    
        destin_es = Elasticsearch([cfg['indexer']['url']], timeout=60)
        destin_index = cfg['indexer']['index']
    
        from lib.elasticsearch_template import template
        template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
        template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards']
        template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
    
        try:
            rep = destin_es.indices.delete_template(cfg['reindexer']['template_name'])
            logging.debug(rep)
        except Exception as e:
            logging.error(e)
    
    
        try:
            rep = destin_es.indices.create(cfg['reindexer']['template_name'], template)
            logging.debug(rep)
        except Exception as e:
            logging.warning(e)
    
        return
    
    # def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def send_record_to_the_metadata_processor( the_cfg, the_record ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        """
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        This function sends a GeoNetwork metadata record to a RabbitMQ queue.
    
        the_cfg = deepcopy(the_cfg)
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
        channel = connection.channel()
        exchange = the_cfg['rabbitmq']['exchange']
    
        queue_name  = the_cfg['rabbitmq']['queue_name_1']
        routing_key = the_cfg['rabbitmq']['routing_key_1']
    
        del the_cfg['rabbitmq']['queue_name_1']
        del the_cfg['rabbitmq']['routing_key_1']
    
        channel.exchange_declare(exchange=exchange, exchange_type='direct')
    
        channel.queue_declare(queue=queue_name, durable=True, arguments={'x-message-ttl' : the_cfg['rabbitmq']['ttl']})
    
        channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    
        msg['header'] = dict()
        msg['header']['cfg'] = the_cfg
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        msg['body'] = the_record
    
        the_body = msgpack.packb(msg, use_bin_type=False)
    
        channel.basic_publish( exchange=exchange,
                               routing_key=routing_key,
                               body=the_body,
                               properties=pika.BasicProperties(delivery_mode = 2)
                             )
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    # def delete_dataset_from_indices( the_cfg, the_uuid ):
    #
    #     the_cfg = deepcopy(the_cfg)
    #
    #     connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
    #     channel = connection.channel()
    #     exchange = the_cfg['rabbitmq']['exchange']
    #     queue_name  = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_4_suffix']
    #     routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     del the_cfg['rabbitmq']['queue_name_4_suffix']
    #     del the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     channel.exchange_declare(exchange=exchange, exchange_type='direct')
    #     channel.queue_declare(queue=queue_name, durable=True)
    #     channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    #
    #     msg = dict()
    #
    #     msg['header'] = dict()
    #     msg['header']['cfg'] = the_cfg
    #     msg['body'] = the_uuid
    #
    #     the_body = msgpack.packb(msg, use_bin_type=False)
    #
    #     channel.basic_publish( exchange=exchange,
    #                            routing_key=routing_key,
    #                            body=the_body,
    #                            properties=pika.BasicProperties(delivery_mode = 2)
    #                          )
    #
    #     connection.close()
    #
    #     return
    
    def delete_dataset_from_dest_index( the_cfg, the_uuid ):
    
        logging.info("Removing dataset with uuid = %s from the destination index..." % the_uuid)
    
    
        es = Elasticsearch([the_cfg['indexer']['url']], timeout=60)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        es_logger = logging.getLogger('elasticsearch')
        es_logger.setLevel(logging.INFO)
    
        index = the_cfg['indexer']['index']
    
        # only one version should be present, either "meta" or "full"
        # we try removing them both as we do not know which one is present for a given dataset
        for suffix in ['meta', 'full']:
    
            the_query = dict()
            the_query['query'] = dict()
            the_query['query']['term'] = {'uuid.keyword': '{0}.{1}'.format(the_uuid, suffix)}
    
            try:
    
                res = es.delete_by_query(index, doc_type='_doc', body=the_query, conflicts='proceed', refresh=True, wait_for_completion=False)
    
                task_id = res['task']
                # wait until ES is done
    
                while True:
                    res = es.tasks.get(task_id=task_id)
                    logging.debug(res)
                    completed = res['completed']
                    if not completed:
    
                        logging.info('Waiting for delete_by_query to complete: sleeping for %i seconds...' % seconds_to_sleep_for)
                        time.sleep(seconds_to_sleep_for)
                        seconds_to_sleep_for += 1
    
            except AuthorizationException as e:
                logging.critical(e)
                exit(1)
            except NotFoundError as e:
                logging.error(e)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            except Exception as e:
                logging.error(e)
    
    
        return
    
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def get_metadata_records_processor( the_cfg,
                                        the_root_url,
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                                        the_no_records_per_page,
                                        the_uuid_to_get,
                                        the_uuids_to_filter_out=None,
                                        the_username=None,
                                        the_password=None):
    
        for record in get_metadata_records( root_url=the_root_url,
                                            no_records_per_page=the_no_records_per_page,
                                            uuid=the_uuid_to_get,
                                            the_filter=the_uuids_to_filter_out,
                                            username=the_username, password=the_password ):
    
            the_uuid = record['geonet:info']['uuid']
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            if is_locked(the_cfg['session']['working_directory'], the_uuid ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid)
                continue
            else:
                logging.info("Setting lock for dataset with uuid = %s" % the_uuid)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                lock(the_cfg['session']['working_directory'], the_uuid)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            delete_dataset_from_dest_index(the_cfg, the_uuid)
            send_record_to_the_metadata_processor(the_cfg, record)
    
        logging.info("Setting up ingest and digest ")
        setup_indices(cfg)
    
    
        uuids_to_get        = cfg['metadata_getter']['uuids_to_get']
        uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
    
    
        del cfg['metadata_getter'] # <- as this info is no longer needed
    
    
        username = cfg['geonetwork']['username']
    
        del cfg['geonetwork']['username'] # <- as this info is no longer needed
    
        password = cfg['geonetwork']['password']
    
        del cfg['geonetwork']['password'] # <- as this info is no longer needed
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        if 'all' in uuids_to_get:
          uuids_to_get = [None]
    
        for uuid_to_get in uuids_to_get:
    
            get_metadata_records_processor(
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                the_cfg = cfg,
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                the_root_url = cfg['geonetwork']['url'],
                the_no_records_per_page = cfg['geonetwork']['records_per_page'],
                the_uuid_to_get = uuid_to_get,
                the_uuids_to_filter_out = uuids_to_filter_out,
                the_username = username,
                the_password = password)
    
    
    
        # if 'all' not in uuids_to_get:
        #
        #     # get some datasets only
        #     for uuid_to_get in uuids_to_get:
        #         for record in get_metadata_records( cfg['geonetwork']['url'],
        #                                             cfg['geonetwork']['records_per_page'],
        #                                             uuid=uuid_to_get,
        #                                             the_filter=uuids_to_filter_out,
        #                                             username=username, password=password ):
        #
        #             the_uuid = record['geonet:info']['uuid']
        #
        #             if is_locked( cfg['session']['working_directory'], the_uuid ):
        #                 logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid)
        #                 continue
        #             else:
        #                 lock(cfg['session']['working_directory'], the_uuid)
        #
        #             # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
        #             delete_dataset_from_dest_index(cfg, the_uuid)
        #             send_record_to_the_metadata_processor(cfg, record)
        #
        # else:
        #
        #     # get all the datasets
        #     for record in get_metadata_records( cfg['geonetwork']['url'],
        #                                         cfg['geonetwork']['records_per_page'],
        #                                         uuid=None,
        #                                         the_filter=uuids_to_filter_out,
        #                                         username=username, password=password ):
        #
        #         the_uuid = record['geonet:info']['uuid']
        #
        #         if is_locked( cfg['session']['working_directory'], the_uuid ):
        #             logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid)
        #             continue
        #         else:
        #             lock(cfg['session']['working_directory'], the_uuid)
        #
        #         # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
        #         delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
        #         send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    if __name__ == '__main__':
    
        import signal
    
        from yaml import load, dump
        try:
            from yaml import CLoader as Loader, CDumper as Dumper
        except ImportError:
            from yaml import Loader, Dumper
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        signal.signal(signal.SIGINT, exit_gracefully)
    
    
        parser = argparse.ArgumentParser(description='Metadata getter')
        parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True)
        parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672)
        parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True)
        # parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True)
        parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
    
        args = parser.parse_args()
    
        logging.getLogger().setLevel(args.loglevel)
        logging.info('Starting...')
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # read 'n' parse the configuration
        with open("config.yaml", 'r') as yamlfile:
    
            cfg = load(yamlfile, Loader=Loader)
    
    
        cfg['rabbitmq']['host'] = args.host
        cfg['rabbitmq']['port'] = args.port
        cfg['rabbitmq']['exchange'] = args.exchange
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        try:
            main(cfg)
    
        except pika.exceptions.AMQPConnectionError:
            logging.info('RabbitMQ is not reachable: exiting.')
            #time.sleep(5)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        except Exception as e:
            logging.error(e)