Skip to content
Snippets Groups Projects
metadata-getter.py 9.59 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import requests
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    from elasticsearch import Elasticsearch, NotFoundError
    
    from copy import deepcopy
    from pymongo import MongoClient
    from collections import OrderedDict
    
    from lib.exit_gracefully import exit_gracefully
    from lib.my_logging import logging
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    class RecordNotFound(Exception):
        pass
    
    
    def filter_function( x, the_uuids_to_filter_out ):
    
        return x['geonet:info']['uuid'] not in the_uuids_to_filter_out
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    # GEONETWORK METADATA GETTER
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def get_metadata_records( root_url, no_records_per_page, uuid=None, the_filter=None, username=None, password=None ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        # GeoSource's q service
        params['_content_type'] = 'json'
        params['resultType'] = 'details'
        params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
        #params['sortOrder'] = 'reverse'
        params['fast'] = 'index'
        params['buildSummary'] = 'false'
    
        if uuid != None:
            params['uuid'] = uuid
    
        #fromRecord = 0
        cnt = 0
    
        while True:
            params['from'] = 1 + cnt*no_records_per_page
            params['to']   = params['from'] + no_records_per_page - 1
    
            logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
    
    
            if username != None and password != None:
                res = requests.get(root_url, params=params, auth=(username, password))
            else:
                res = requests.get(root_url, params=params)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            logging.debug(res.url)
    
            try:
                res.json()['metadata']
            except KeyError as e:
    
                raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists and that you have the proper access rights?' % uuid)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if type(res.json()['metadata']) is list:
                records = res.json()['metadata']
            else:
                records = [res.json()['metadata']]
            # print(records)
    
            logging.debug("Got %s records." % len(records))
    
    
            # adding hrefs...
            params_copy = params.copy()
            del params_copy['from']
            del params_copy['to']
            del params_copy['sortBy']
    
            for record in records:
                params_copy['uuid'] = record['geonet:info']['uuid']
                record['href'] = root_url + "?" + urllib.parse.urlencode(params_copy)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
            filtered_records = records
    
            if the_filter != None:
                logging.debug("Filtering out unwanted records, if present.")
                filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter]
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            yield from filtered_records
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if len(records) < no_records_per_page:
                break # it means that we have reached the last page
    
            cnt += 1
    
    
    
    # def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def send_record_to_the_metadata_processor( the_cfg, the_record ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        """
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        This function sends a GeoNetwork metadata record to a RabbitMQ queue.
    
        the_cfg = deepcopy(the_cfg)
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
        channel = connection.channel()
        exchange = the_cfg['rabbitmq']['exchange']
    
        queue_name  = the_cfg['rabbitmq']['queue_name_1']
        routing_key = the_cfg['rabbitmq']['routing_key_1']
    
        del the_cfg['rabbitmq']['queue_name_1']
        del the_cfg['rabbitmq']['routing_key_1']
    
        channel.exchange_declare(exchange=exchange, exchange_type='direct')
        channel.queue_declare(queue=queue_name, durable=True)
        channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    
        msg['header'] = dict()
        msg['header']['cfg'] = the_cfg
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        msg['body'] = the_record
    
        the_body = msgpack.packb(msg, use_bin_type=False)
    
        channel.basic_publish( exchange=exchange,
                               routing_key=routing_key,
                               body=the_body,
                               properties=pika.BasicProperties(delivery_mode = 2)
                             )
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    # def delete_dataset_from_indices( the_cfg, the_uuid ):
    #
    #     the_cfg = deepcopy(the_cfg)
    #
    #     connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
    #     channel = connection.channel()
    #     exchange = the_cfg['rabbitmq']['exchange']
    #     queue_name  = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_4_suffix']
    #     routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     del the_cfg['rabbitmq']['queue_name_4_suffix']
    #     del the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     channel.exchange_declare(exchange=exchange, exchange_type='direct')
    #     channel.queue_declare(queue=queue_name, durable=True)
    #     channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    #
    #     msg = dict()
    #
    #     msg['header'] = dict()
    #     msg['header']['cfg'] = the_cfg
    #     msg['body'] = the_uuid
    #
    #     the_body = msgpack.packb(msg, use_bin_type=False)
    #
    #     channel.basic_publish( exchange=exchange,
    #                            routing_key=routing_key,
    #                            body=the_body,
    #                            properties=pika.BasicProperties(delivery_mode = 2)
    #                          )
    #
    #     connection.close()
    #
    #     return
    
    def delete_dataset_from_dest_index( the_cfg, the_uuid ):
    
        logging.info("Removing dataset with uuid = %s from the destination index..." % the_uuid)
    
    
        es = Elasticsearch([the_cfg['indexer']['url']], timeout=60)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        es_logger = logging.getLogger('elasticsearch')
        es_logger.setLevel(logging.INFO)
    
        index = the_cfg['indexer']['index']
    
        # only one version should be present, either "meta" or "full"
        # we try removing them both as we do not know which one is present for a given dataset
        for suffix in ['meta', 'full']:
    
            the_query = dict()
            the_query['query'] = dict()
            the_query['query']['term'] = {'uuid.keyword': '{0}.{1}'.format(the_uuid, suffix)}
    
            try:
                res = es.delete_by_query(index, doc_type='_doc', body=the_query)
                logging.debug(res)
            except NotFoundError:
                pass
            except Exception as e:
                logging.error(e)
    
    
        return
    
    
    
    
    
        uuids_to_get        = cfg['metadata_getter']['uuids_to_get']
        uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
    
    
        del cfg['metadata_getter'] # <- as this info is no longer needed
    
    
        username = cfg['geonetwork']['username']
    
        del cfg['geonetwork']['username'] # <- as this info is no longer needed
    
        password = cfg['geonetwork']['password']
    
        del cfg['geonetwork']['password'] # <- as this info is no longer needed
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        if 'all' not in uuids_to_get:
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                for record in get_metadata_records( cfg['geonetwork']['url'],
                                                    cfg['geonetwork']['records_per_page'],
                                                    uuid=uuid_to_get,
                                                    the_filter=uuids_to_filter_out,
                                                    username=username, password=password ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                    # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
                    delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
                    send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            for record in get_metadata_records( cfg['geonetwork']['url'],
                                                cfg['geonetwork']['records_per_page'],
                                                uuid=None,
                                                the_filter=uuids_to_filter_out,
                                                username=username, password=password ):
    
                # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
                delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
                send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    if __name__ == '__main__':
    
        import signal
    
        from yaml import load, dump
        try:
            from yaml import CLoader as Loader, CDumper as Dumper
        except ImportError:
            from yaml import Loader, Dumper
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        signal.signal(signal.SIGINT, exit_gracefully)
    
    
        parser = argparse.ArgumentParser(description='Metadata getter')
        parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True)
        parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672)
        parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True)
        # parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True)
        parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
    
        args = parser.parse_args()
    
        logging.getLogger().setLevel(args.loglevel)
        logging.info('Starting...')
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # read 'n' parse the configuration
        with open("config.yaml", 'r') as yamlfile:
    
            cfg = load(yamlfile, Loader=Loader)
    
    
        cfg['rabbitmq']['host'] = args.host
        cfg['rabbitmq']['port'] = args.port
        cfg['rabbitmq']['exchange'] = args.exchange
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        try:
            main(cfg)
    
        except pika.exceptions.AMQPConnectionError:
            logging.info('RabbitMQ is not reachable: exiting.')
            #time.sleep(5)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        except Exception as e:
            logging.error(e)