Skip to content
Snippets Groups Projects
main.py 9.91 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import requests
    
    from elasticsearch import Elasticsearch, NotFoundError, AuthorizationException
    
    from copy import deepcopy
    from pymongo import MongoClient
    from collections import OrderedDict
    
    from lib.geonetwork_helper import get_metadata_records
    
    from lib.exit_gracefully import exit_gracefully
    from lib.my_logging import logging
    
    def setup_indices(cfg):
    
        source_es = Elasticsearch([cfg['indexer']['url']], timeout=60)
        source_index = cfg['indexer']['index']
    
        es_body = { "settings" : {
                        "number_of_shards" : 1,
                        "number_of_replicas" : 0,
                        "index.mapping.total_fields.limit": 10000,
                        "refresh_interval": "30s"
                        },
                    "mappings": {
                        "_doc": {
                            "dynamic_templates": [ # priority is given by order!
                                {
                                    "uuid" : {
                                        "path_match": "uuid",
                                        "mapping": {
                                                "type": "text",
                                                "index": False,
                                                "fields": {
                                                    "keyword": {
                                                        "type": "keyword"
                                                    }
                                                }
                                            }
                                        }
                                },
                                {
                                    "default" : {
                                        "path_match": "*",
                                        "mapping": {
                                            "enabled": "false"
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
    
        try:
            rep = source_es.indices.create(source_index, es_body)#, wait_for_active_shards=0)
        except Exception as e:
            logging.error(e)
    
    
        destin_es = Elasticsearch([cfg['indexer']['url']], timeout=60)
        destin_index = cfg['indexer']['index']
    
        from lib.elasticsearch_template import template
        template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
        template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards']
        template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
    
        try:
            rep = destin_es.indices.delete_template(cfg['reindexer']['template_name'])
            logging.debug(rep)
        except Exception as e:
            logging.error(e)
    
        rep = destin_es.indices.put_template(cfg['reindexer']['template_name'], template)
        logging.debug(rep)
    
        return
    
    # def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def send_record_to_the_metadata_processor( the_cfg, the_record ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        """
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        This function sends a GeoNetwork metadata record to a RabbitMQ queue.
    
        the_cfg = deepcopy(the_cfg)
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
        channel = connection.channel()
        exchange = the_cfg['rabbitmq']['exchange']
    
        queue_name  = the_cfg['rabbitmq']['queue_name_1']
        routing_key = the_cfg['rabbitmq']['routing_key_1']
    
        del the_cfg['rabbitmq']['queue_name_1']
        del the_cfg['rabbitmq']['routing_key_1']
    
        channel.exchange_declare(exchange=exchange, exchange_type='direct')
        channel.queue_declare(queue=queue_name, durable=True)
        channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    
        msg['header'] = dict()
        msg['header']['cfg'] = the_cfg
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        msg['body'] = the_record
    
        the_body = msgpack.packb(msg, use_bin_type=False)
    
        channel.basic_publish( exchange=exchange,
                               routing_key=routing_key,
                               body=the_body,
                               properties=pika.BasicProperties(delivery_mode = 2)
                             )
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    # def delete_dataset_from_indices( the_cfg, the_uuid ):
    #
    #     the_cfg = deepcopy(the_cfg)
    #
    #     connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
    #     channel = connection.channel()
    #     exchange = the_cfg['rabbitmq']['exchange']
    #     queue_name  = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_4_suffix']
    #     routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     del the_cfg['rabbitmq']['queue_name_4_suffix']
    #     del the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     channel.exchange_declare(exchange=exchange, exchange_type='direct')
    #     channel.queue_declare(queue=queue_name, durable=True)
    #     channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    #
    #     msg = dict()
    #
    #     msg['header'] = dict()
    #     msg['header']['cfg'] = the_cfg
    #     msg['body'] = the_uuid
    #
    #     the_body = msgpack.packb(msg, use_bin_type=False)
    #
    #     channel.basic_publish( exchange=exchange,
    #                            routing_key=routing_key,
    #                            body=the_body,
    #                            properties=pika.BasicProperties(delivery_mode = 2)
    #                          )
    #
    #     connection.close()
    #
    #     return
    
    def delete_dataset_from_dest_index( the_cfg, the_uuid ):
    
        logging.info("Removing dataset with uuid = %s from the destination index..." % the_uuid)
    
    
        es = Elasticsearch([the_cfg['indexer']['url']], timeout=60)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        es_logger = logging.getLogger('elasticsearch')
        es_logger.setLevel(logging.INFO)
    
        index = the_cfg['indexer']['index']
    
        # only one version should be present, either "meta" or "full"
        # we try removing them both as we do not know which one is present for a given dataset
        for suffix in ['meta', 'full']:
    
            the_query = dict()
            the_query['query'] = dict()
            the_query['query']['term'] = {'uuid.keyword': '{0}.{1}'.format(the_uuid, suffix)}
    
            try:
                res = es.delete_by_query(index, doc_type='_doc', body=the_query)
    
                #logging.debug(res)
            except AuthorizationException as e:
                logging.critical(e)
                exit(1)
            except NotFoundError as e:
                logging.error(e)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            except Exception as e:
                logging.error(e)
    
    
        return
    
    
    
    
    
        logging.info("Setting up ingest and digest ")
        setup_indices(cfg)
    
    
        uuids_to_get        = cfg['metadata_getter']['uuids_to_get']
        uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
    
    
        del cfg['metadata_getter'] # <- as this info is no longer needed
    
    
        username = cfg['geonetwork']['username']
    
        del cfg['geonetwork']['username'] # <- as this info is no longer needed
    
        password = cfg['geonetwork']['password']
    
        del cfg['geonetwork']['password'] # <- as this info is no longer needed
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        if 'all' not in uuids_to_get:
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                for record in get_metadata_records( cfg['geonetwork']['url'],
                                                    cfg['geonetwork']['records_per_page'],
                                                    uuid=uuid_to_get,
                                                    the_filter=uuids_to_filter_out,
                                                    username=username, password=password ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                    # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
                    delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
                    send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            for record in get_metadata_records( cfg['geonetwork']['url'],
                                                cfg['geonetwork']['records_per_page'],
                                                uuid=None,
                                                the_filter=uuids_to_filter_out,
                                                username=username, password=password ):
    
                # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
                delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
                send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    if __name__ == '__main__':
    
        import signal
    
        from yaml import load, dump
        try:
            from yaml import CLoader as Loader, CDumper as Dumper
        except ImportError:
            from yaml import Loader, Dumper
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        signal.signal(signal.SIGINT, exit_gracefully)
    
    
        parser = argparse.ArgumentParser(description='Metadata getter')
        parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True)
        parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672)
        parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True)
        # parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True)
        parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
    
        args = parser.parse_args()
    
        logging.getLogger().setLevel(args.loglevel)
        logging.info('Starting...')
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # read 'n' parse the configuration
        with open("config.yaml", 'r') as yamlfile:
    
            cfg = load(yamlfile, Loader=Loader)
    
    
        cfg['rabbitmq']['host'] = args.host
        cfg['rabbitmq']['port'] = args.port
        cfg['rabbitmq']['exchange'] = args.exchange
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        try:
            main(cfg)
    
        except pika.exceptions.AMQPConnectionError:
            logging.info('RabbitMQ is not reachable: exiting.')
            #time.sleep(5)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        except Exception as e:
            logging.error(e)