Skip to content
Snippets Groups Projects
4-docs-to-mongodb.py 3.09 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import json
    import hashlib
    from pymongo import MongoClient
    from pymongo.errors import ServerSelectionTimeoutError
    from utils.exit_gracefully import exit_gracefully
    from utils.my_logging import logging
    
    def process_doc_pages( channel, method, properties, body, mongodb ):
    
        decoded_body = msgpack.unpackb(body, raw=False)
        the_uuid = decoded_body['header']['metadata']['metadata-fr']['geonet:info']['uuid']
        the_title = decoded_body['header']['metadata']['metadata-fr']['title']
        the_session_id = decoded_body['header']['session_id']
    
        doc_page = decoded_body['body']
    
        logging.info('Pushing %i docs to MongoDB; uuid = %s; title = %s' % (len(doc_page), the_uuid, the_title))
    
        collection = mongodb[ 'indexing-session-' + the_session_id ]
        # collection = mongodb['geodata']
    
        # add _id
        for doc in doc_page:
            doc['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
            res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True)
            if res.acknowledged != True:
                logging.error("Unable to push the following doc to MongoDB:")
                logging.error(doc)
                exit(1)
    
        # if we arrive here, it means that all docs have been pushed to MongoDB
        channel.basic_ack(delivery_tag = method.delivery_tag)
    
    
        return
    
    
    def main(cfg):
    
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
        channel = connection.channel()
        exchange    = cfg['rabbitmq']['exchange']
    
        # the queue this program will be consuming messages from
        doc_pages_to_store_in_mongo_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_4_suffix']
        doc_pages_to_store_in_mongo_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_4_suffix']
    
    
        mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
                                                               cfg['mongo']['password'],
                                                               cfg['mongo']['host'],
                                                               cfg['mongo']['port']))
    
        mongo_db = mongo_client[cfg['mongo']['data-db']]
    
    
        #channel.exchange_declare(exchange=out_exchange, exchange_type='direct')
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(lambda ch, method, properties, body: process_doc_pages(ch, method, properties, body, mongo_db), queue=doc_pages_to_store_in_mongo_qn)#, no_ack=True)
        channel.start_consuming()
    
        connection.close()
    
    
    if __name__ == '__main__':
    
        import yaml
        import time
        import signal
        signal.signal(signal.SIGINT, exit_gracefully)
    
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        while True:
    
            try:
                main(cfg)
            except pika.exceptions.ChannelClosed:
                logging.info("Waiting for tasks...")
                time.sleep(5)
            except ServerSelectionTimeoutError:
                logging.error('Waiting for MongoDB to be reachable...')
                time.sleep(5)
            except Exception as e:
                logging.error(e)
                time.sleep(5)