Skip to content
Snippets Groups Projects
1-metadata-getter.py 5.38 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import requests
    from utils.exit_gracefully import exit_gracefully
    from utils.my_logging import logging
    from pymongo import MongoClient
    
    
    class RecordNotFound(Exception):
        pass
    
    
    def filter_function( x, the_uuids_to_filter_out ):
    
        return x['geonet:info']['uuid'] not in the_uuids_to_filter_out
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    # GEONETWORK METADATA GETTER
    
    def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        params = {}
    
        # GeoSource's q service
        params['_content_type'] = 'json'
        params['resultType'] = 'details'
        params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
        #params['sortOrder'] = 'reverse'
        params['fast'] = 'index'
        params['buildSummary'] = 'false'
    
        if uuid != None:
            params['uuid'] = uuid
    
        #fromRecord = 0
        cnt = 0
    
        while True:
            params['from'] = 1 + cnt*no_records_per_page
            params['to']   = params['from'] + no_records_per_page - 1
    
            logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
    
            res = requests.get(root_url, params=params)
            logging.debug(res.url)
    
            try:
                res.json()['metadata']
            except KeyError as e:
                raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists?' % uuid)
    
            if type(res.json()['metadata']) is list:
                records = res.json()['metadata']
            else:
                records = [res.json()['metadata']]
            # print(records)
    
            logging.debug("Got %s records." % len(records))
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            filtered_records = records
    
    
            # apply filter
            if the_filter != None:
                logging.debug("Filtering out unwanted records, if present.")
                filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter]
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if len(records) < no_records_per_page:
                break # it means that we have reached the last page
    
            cnt += 1
    
    
    def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
        """
        This function sends a page of GeoNetwork results to a RabbitMQ queue.
        """
    
        msg = {'header': {'geonetwork_root_url': the_geonetwork_root_url, 'session_id': the_session_id, 'dest_index': the_dest_index}, 'body': the_page}
    
        the_body = msgpack.packb(msg, use_bin_type=True)
    
        the_channel.basic_publish( exchange=the_exchange,
                                   routing_key=the_routing_key,
                                   body=the_body,
                                   properties=pika.BasicProperties(delivery_mode = 2)
                                 )
    
        return
    
    
    def main(cfg):
    
        import datetime
        logging.info('Starting indexing session: %s.' % cfg['session']['id'])
    
        # logging the indexing session into MongoDB
        mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
                                                               cfg['mongo']['password'],
                                                               cfg['mongo']['host'],
                                                               cfg['mongo']['port']))
    
        mongo_db = mongo_client[cfg['mongo']['report-db']]
    
        collection = mongo_db['indexing-sessions']
        doc = dict()
        doc['_id'] = cfg['session']['id']
        doc['started_at'] = datetime.datetime.utcnow()
        doc['configuration'] = cfg
    
        res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True)
        if res.acknowledged != True:
            logging.error("Unable to push the following doc to MongoDB:")
            logging.error(doc)
            exit(1)
    
        # ------------------------------------------------------------------
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
        channel = connection.channel()
        exchange = cfg['rabbitmq']['exchange']
        queue_name  = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_1_suffix']
        routing_key = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_1_suffix']
    
        channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct')
        channel.queue_declare(queue=queue_name, durable=True)
        channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key)
    
    
    
        uuids_to_get        = cfg['metadata_getter']['uuids_to_get']
        uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        #print(uuids_to_get)
    
        if 'all' not in uuids_to_get:
    
    
            for uuid_to_get in uuids_to_get:
                for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=uuid_to_get, the_filter=uuids_to_filter_out ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                    send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name)
    
        else:
    
    
            for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=None, the_filter=uuids_to_filter_out ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name)
    
    
        connection.close()
    
    
    if __name__ == '__main__':
    
        import yaml
        import signal
        signal.signal(signal.SIGINT, exit_gracefully)
    
        # read 'n' parse the configuration
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        try:
            main(cfg)
        except Exception as e:
            logging.error(e)