Skip to content
Snippets Groups Projects
1-metadata-getter.py 8.57 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import requests
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    from elasticsearch import Elasticsearch, NotFoundError
    
    from copy import deepcopy
    from pymongo import MongoClient
    from collections import OrderedDict
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    from utils.exit_gracefully import exit_gracefully
    from utils.my_logging import logging
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    class RecordNotFound(Exception):
        pass
    
    
    def filter_function( x, the_uuids_to_filter_out ):
    
        return x['geonet:info']['uuid'] not in the_uuids_to_filter_out
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    # GEONETWORK METADATA GETTER
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def get_metadata_records( root_url, no_records_per_page, uuid=None, the_filter=None, username=None, password=None ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        # GeoSource's q service
        params['_content_type'] = 'json'
        params['resultType'] = 'details'
        params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
        #params['sortOrder'] = 'reverse'
        params['fast'] = 'index'
        params['buildSummary'] = 'false'
    
        if uuid != None:
            params['uuid'] = uuid
    
        #fromRecord = 0
        cnt = 0
    
        while True:
            params['from'] = 1 + cnt*no_records_per_page
            params['to']   = params['from'] + no_records_per_page - 1
    
            logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
    
    
            if username != None and password != None:
                res = requests.get(root_url, params=params, auth=(username, password))
            else:
                res = requests.get(root_url, params=params)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            logging.debug(res.url)
    
            try:
                res.json()['metadata']
            except KeyError as e:
    
                raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists and that you have the proper access rights?' % uuid)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if type(res.json()['metadata']) is list:
                records = res.json()['metadata']
            else:
                records = [res.json()['metadata']]
            # print(records)
    
            logging.debug("Got %s records." % len(records))
    
    
            # adding hrefs...
            params_copy = params.copy()
            del params_copy['from']
            del params_copy['to']
            del params_copy['sortBy']
    
            for record in records:
                params_copy['uuid'] = record['geonet:info']['uuid']
                record['href'] = root_url + "?" + urllib.parse.urlencode(params_copy)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
            filtered_records = records
    
            if the_filter != None:
                logging.debug("Filtering out unwanted records, if present.")
                filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter]
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            yield from filtered_records
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if len(records) < no_records_per_page:
                break # it means that we have reached the last page
    
            cnt += 1
    
    
    
    # def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def send_record_to_the_metadata_processor( the_cfg, the_record ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        """
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        This function sends a GeoNetwork metadata record to a RabbitMQ queue.
    
        the_cfg = deepcopy(the_cfg)
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
        channel = connection.channel()
        exchange = the_cfg['rabbitmq']['exchange']
        queue_name  = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_1_suffix']
        routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_1_suffix']
    
        del the_cfg['rabbitmq']['queue_name_1_suffix']
        del the_cfg['rabbitmq']['routing_key_1_suffix']
    
        channel.exchange_declare(exchange=exchange, exchange_type='direct')
        channel.queue_declare(queue=queue_name, durable=True)
        channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    
        msg['header'] = dict()
        msg['header']['cfg'] = the_cfg
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        msg['body'] = the_record
    
        the_body = msgpack.packb(msg, use_bin_type=False)
    
        channel.basic_publish( exchange=exchange,
                               routing_key=routing_key,
                               body=the_body,
                               properties=pika.BasicProperties(delivery_mode = 2)
                             )
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    # def delete_dataset_from_indices( the_cfg, the_uuid ):
    #
    #     the_cfg = deepcopy(the_cfg)
    #
    #     connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
    #     channel = connection.channel()
    #     exchange = the_cfg['rabbitmq']['exchange']
    #     queue_name  = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_4_suffix']
    #     routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     del the_cfg['rabbitmq']['queue_name_4_suffix']
    #     del the_cfg['rabbitmq']['routing_key_4_suffix']
    #
    #     channel.exchange_declare(exchange=exchange, exchange_type='direct')
    #     channel.queue_declare(queue=queue_name, durable=True)
    #     channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    #
    #     msg = dict()
    #
    #     msg['header'] = dict()
    #     msg['header']['cfg'] = the_cfg
    #     msg['body'] = the_uuid
    #
    #     the_body = msgpack.packb(msg, use_bin_type=False)
    #
    #     channel.basic_publish( exchange=exchange,
    #                            routing_key=routing_key,
    #                            body=the_body,
    #                            properties=pika.BasicProperties(delivery_mode = 2)
    #                          )
    #
    #     connection.close()
    #
    #     return
    
    def delete_dataset_from_dest_index( the_cfg, the_uuid ):
    
        logging.info("Removing dataset with uuid = %s from the destination index..." % the_uuid)
    
        es = Elasticsearch([the_cfg['indexer']['url']])
        es_logger = logging.getLogger('elasticsearch')
        es_logger.setLevel(logging.INFO)
    
        index = the_cfg['indexer']['index']
    
        # only one version should be present, either "meta" or "full"
        # we try removing them both as we do not know which one is present for a given dataset
        for suffix in ['meta', 'full']:
    
            the_query = dict()
            the_query['query'] = dict()
            the_query['query']['term'] = {'uuid.keyword': '{0}.{1}'.format(the_uuid, suffix)}
    
            try:
                res = es.delete_by_query(index, doc_type='_doc', body=the_query)
                logging.debug(res)
            except NotFoundError:
                pass
            except Exception as e:
                logging.error(e)
    
    
        return
    
    
    
    
    
        logging.info('Starting indexing session: %s.' % cfg['session']['id'])
    
        uuids_to_get        = cfg['metadata_getter']['uuids_to_get']
        uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
    
    
        del cfg['metadata_getter'] # <- as this info is no longer needed
    
    
        username = cfg['geonetwork']['username']
    
        del cfg['geonetwork']['username'] # <- as this info is no longer needed
    
        password = cfg['geonetwork']['password']
    
        del cfg['geonetwork']['password'] # <- as this info is no longer needed
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        if 'all' not in uuids_to_get:
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                for record in get_metadata_records( cfg['geonetwork']['url'],
                                                    cfg['geonetwork']['records_per_page'],
                                                    uuid=uuid_to_get,
                                                    the_filter=uuids_to_filter_out,
                                                    username=username, password=password ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                    # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
                    delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
                    send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            for record in get_metadata_records( cfg['geonetwork']['url'],
                                                cfg['geonetwork']['records_per_page'],
                                                uuid=None,
                                                the_filter=uuids_to_filter_out,
                                                username=username, password=password ):
    
                # delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
                delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
                send_record_to_the_metadata_processor(cfg, record)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    if __name__ == '__main__':
    
        import yaml
        import signal
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        signal.signal(signal.SIGINT, exit_gracefully)
    
        # read 'n' parse the configuration
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        try:
            main(cfg)
        except Exception as e:
            logging.error(e)