Skip to content
Snippets Groups Projects
1-metadata-getter.py 5.77 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import requests
    
    import urllib
    import json
    from copy import deepcopy
    from pymongo import MongoClient
    from collections import OrderedDict
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    from utils.exit_gracefully import exit_gracefully
    from utils.my_logging import logging
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    class RecordNotFound(Exception):
        pass
    
    
    def filter_function( x, the_uuids_to_filter_out ):
    
        return x['geonet:info']['uuid'] not in the_uuids_to_filter_out
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    # GEONETWORK METADATA GETTER
    
    def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None, username=None, password=None ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        # GeoSource's q service
        params['_content_type'] = 'json'
        params['resultType'] = 'details'
        params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
        #params['sortOrder'] = 'reverse'
        params['fast'] = 'index'
        params['buildSummary'] = 'false'
    
        if uuid != None:
            params['uuid'] = uuid
    
        #fromRecord = 0
        cnt = 0
    
        while True:
            params['from'] = 1 + cnt*no_records_per_page
            params['to']   = params['from'] + no_records_per_page - 1
    
            logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
    
    
            if username != None and password != None:
                res = requests.get(root_url, params=params, auth=(username, password))
            else:
                res = requests.get(root_url, params=params)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            logging.debug(res.url)
    
            try:
                res.json()['metadata']
            except KeyError as e:
    
                raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists and that you have the proper access rights?' % uuid)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if type(res.json()['metadata']) is list:
                records = res.json()['metadata']
            else:
                records = [res.json()['metadata']]
            # print(records)
    
            logging.debug("Got %s records." % len(records))
    
    
            # adding hrefs...
            params_copy = params.copy()
            del params_copy['from']
            del params_copy['to']
            del params_copy['sortBy']
    
            for record in records:
                params_copy['uuid'] = record['geonet:info']['uuid']
                record['href'] = root_url + "?" + urllib.parse.urlencode(params_copy)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
            filtered_records = records
    
            if the_filter != None:
                logging.debug("Filtering out unwanted records, if present.")
                filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter]
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if len(records) < no_records_per_page:
                break # it means that we have reached the last page
    
            cnt += 1
    
    
    
    # def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
    def send_page( the_cfg, the_page ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        """
        This function sends a page of GeoNetwork results to a RabbitMQ queue.
        """
    
    
        the_cfg = deepcopy(the_cfg)
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
        channel = connection.channel()
        exchange = the_cfg['rabbitmq']['exchange']
        queue_name  = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_1_suffix']
        routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_1_suffix']
    
        del the_cfg['rabbitmq']['queue_name_1_suffix']
        del the_cfg['rabbitmq']['routing_key_1_suffix']
    
        channel.exchange_declare(exchange=exchange, exchange_type='direct')
        channel.queue_declare(queue=queue_name, durable=True)
        channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
    
        msg['header'] = dict()
        msg['header']['cfg'] = the_cfg
        msg['body'] = the_page
    
        the_body = msgpack.packb(msg, use_bin_type=False)
    
        channel.basic_publish( exchange=exchange,
                               routing_key=routing_key,
                               body=the_body,
                               properties=pika.BasicProperties(delivery_mode = 2)
                             )
    
        logging.info('Starting indexing session: %s.' % cfg['session']['id'])
    
        uuids_to_get        = cfg['metadata_getter']['uuids_to_get']
        uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
    
    
        del cfg['metadata_getter'] # <- as this info is no longer needed
    
    
        username = cfg['geonetwork']['username']
    
        del cfg['geonetwork']['username'] # <- as this info is no longer needed
    
        password = cfg['geonetwork']['password']
    
        del cfg['geonetwork']['password'] # <- as this info is no longer needed
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        if 'all' not in uuids_to_get:
    
    
                for page in get_pages( cfg['geonetwork']['url'],
                                       cfg['geonetwork']['records_per_page'],
                                       uuid=uuid_to_get,
                                       the_filter=uuids_to_filter_out,
                                       username=username, password=password ):
    
                    send_page(cfg, page)
    
            # get all the datasets
            for page in get_pages( cfg['geonetwork']['url'],
                                   cfg['geonetwork']['records_per_page'],
                                   uuid=None,
                                   the_filter=uuids_to_filter_out,
                                   username=username, password=password ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    if __name__ == '__main__':
    
        import yaml
        import signal
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        signal.signal(signal.SIGINT, exit_gracefully)
    
        # read 'n' parse the configuration
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        try:
            main(cfg)
        except Exception as e:
            logging.error(e)