Skip to content
Snippets Groups Projects
main.py 9.92 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import requests
    
    from elasticsearch import Elasticsearch, NotFoundError, AuthorizationException
    
    from copy import deepcopy
    
    from collections import OrderedDict
    
    from lib.geonetwork_helper import get_metadata_records
    
    from lib.exit_gracefully import exit_gracefully
    from lib.my_logging import logging
    
    def setup_indices(cfg):
    
        source_es = Elasticsearch([cfg['indexer']['url']], timeout=60)
        source_index = cfg['indexer']['index']
    
        es_body = { "settings" : {
                        "number_of_shards" : 1,
                        "number_of_replicas" : 0,
                        "index.mapping.total_fields.limit": 10000,
                        "refresh_interval": "30s"
                        },
                    "mappings": {
                        "_doc": {
                            "dynamic_templates": [ # priority is given by order!
                                {
                                    "uuid" : {
                                        "path_match": "uuid",
                                        "mapping": {
                                                "type": "text",
                                                "index": False,
                                                "fields": {
                                                    "keyword": {
                                                        "type": "keyword"
                                                    }
                                                }
                                            }
                                        }
                                },
                                {
                                    "default" : {
                                        "path_match": "*",
                                        "mapping": {
                                            "enabled": "false"
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
    
        try:
            rep = source_es.indices.create(source_index, es_body)#, wait_for_active_shards=0)
        except Exception as e:
            logging.error(e)
    
    
        destin_es = Elasticsearch([cfg['indexer']['url']], timeout=60)
        destin_index = cfg['indexer']['index']
    
        from lib.elasticsearch_template import template
        template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
        template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards']
        template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
    
        try:
            rep = destin_es.indices.delete_template(cfg['reindexer']['template_name'])
            logging.debug(rep)
        except Exception as e:
            logging.error(e)
    
        rep = destin_es.indices.put_template(cfg['reindexer']['template_name'], template)
        logging.debug(rep)
    
        return
    
    # def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def send_record_to_the_metadata_processor( the_cfg, the_record ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        """
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        This function sends a GeoNetwork metadata record to a RabbitMQ queue.
    
        the_cfg = deepcopy(the_cfg)
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
        channel = connection.channel()
        exchange = the_cfg['rabbitmq']['exchange']
    
        queue_name  = the_cfg['rabbitmq']['queue_name_1']
        routing_key = the_cfg['rabbitmq']['routing_key_1']
    
        del the_cfg['rabbitmq']['queue_name_1']
        del the_cfg['rabbitmq']['routing_key_1']
    
        channel.exchange_declare(exchange=exchange, exchange_type='direct')
        channel.queue_declare(queue=queue_name, durable=True)
        channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    
        msg['header'] = dict()
        msg['header']['cfg'] = the_cfg
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        msg['body'] = the_record
    
        the_body = msgpack.packb(msg, use_bin_type=False)
    
        channel.basic_publish( exchange=exchange,
                               routing_key=routing_key,
                               body=the_body,
                               properties=pika.BasicProperties(delivery_mode = 2)
                             )
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    # def delete_dataset_from_indices( the_cfg, the_uuid ):
    #
    #     the_cfg = deepcopy(the_cfg)
    #
    #     connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
    #     channel = connection.channel()
    #     exchange = the_cfg['rabbitmq']['exchange']
    #     queue_name  = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_4_suffix']
    #     routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     del the_cfg['rabbitmq']['queue_name_4_suffix']
    #     del the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     channel.exchange_declare(exchange=exchange, exchange_type='direct')
    #     channel.queue_declare(queue=queue_name, durable=True)
    #     channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    #
    #     msg = dict()
    #
    #     msg['header'] = dict()
    #     msg['header']['cfg'] = the_cfg
    #     msg['body'] = the_uuid
    #
    #     the_body = msgpack.packb(msg, use_bin_type=False)
    #
    #     channel.basic_publish( exchange=exchange,
    #                            routing_key=routing_key,
    #                            body=the_body,
    #                            properties=pika.BasicProperties(delivery_mode = 2)
    #                          )
    #
    #     connection.close()
    #
    #     return
    
    def delete_dataset_from_dest_index( the_cfg, the_uuid ):
    
        logging.info("Removing dataset with uuid = %s from the destination index..." % the_uuid)
    
    
        es = Elasticsearch([the_cfg['indexer']['url']], timeout=60)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        es_logger = logging.getLogger('elasticsearch')
        es_logger.setLevel(logging.INFO)
    
        index = the_cfg['indexer']['index']
    
        # only one version should be present, either "meta" or "full"
        # we try removing them both as we do not know which one is present for a given dataset
        for suffix in ['meta', 'full']:
    
            the_query = dict()
            the_query['query'] = dict()
            the_query['query']['term'] = {'uuid.keyword': '{0}.{1}'.format(the_uuid, suffix)}
    
            try:
                res = es.delete_by_query(index, doc_type='_doc', body=the_query)
    
            except AuthorizationException as e:
                logging.critical(e)
                exit(1)
            except NotFoundError as e:
                logging.error(e)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            except Exception as e:
                logging.error(e)
    
    
        return
    
    
    
    
    
        logging.info("Setting up ingest and digest ")
        setup_indices(cfg)
    
    
        uuids_to_get        = cfg['metadata_getter']['uuids_to_get']
        uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
    
    
        del cfg['metadata_getter'] # <- as this info is no longer needed
    
    
        username = cfg['geonetwork']['username']
    
        del cfg['geonetwork']['username'] # <- as this info is no longer needed
    
        password = cfg['geonetwork']['password']
    
        del cfg['geonetwork']['password'] # <- as this info is no longer needed
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        if 'all' not in uuids_to_get:
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                for record in get_metadata_records( cfg['geonetwork']['url'],
                                                    cfg['geonetwork']['records_per_page'],
                                                    uuid=uuid_to_get,
                                                    the_filter=uuids_to_filter_out,
                                                    username=username, password=password ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                    # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
                    delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
                    send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            for record in get_metadata_records( cfg['geonetwork']['url'],
                                                cfg['geonetwork']['records_per_page'],
                                                uuid=None,
                                                the_filter=uuids_to_filter_out,
                                                username=username, password=password ):
    
                # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
                delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
                send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    if __name__ == '__main__':
    
        import signal
    
        from yaml import load, dump
        try:
            from yaml import CLoader as Loader, CDumper as Dumper
        except ImportError:
            from yaml import Loader, Dumper
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        signal.signal(signal.SIGINT, exit_gracefully)
    
    
        parser = argparse.ArgumentParser(description='Metadata getter')
        parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True)
        parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672)
        parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True)
        # parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True)
        parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
    
        args = parser.parse_args()
    
        logging.getLogger().setLevel(args.loglevel)
        logging.info('Starting...')
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # read 'n' parse the configuration
        with open("config.yaml", 'r') as yamlfile:
    
            cfg = load(yamlfile, Loader=Loader)
    
    
        cfg['rabbitmq']['host'] = args.host
        cfg['rabbitmq']['port'] = args.port
        cfg['rabbitmq']['exchange'] = args.exchange
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        try:
            main(cfg)
    
        except pika.exceptions.AMQPConnectionError:
            logging.info('RabbitMQ is not reachable: exiting.')
            #time.sleep(5)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        except Exception as e:
            logging.error(e)