Skip to content
Snippets Groups Projects
4-docs-to-mongodb.py 3.37 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import json
    import hashlib
    
    from pymongo import MongoClient, ReplaceOne
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    from pymongo.errors import ServerSelectionTimeoutError
    from utils.exit_gracefully import exit_gracefully
    from utils.my_logging import logging
    
    def process_doc_pages( channel, method, properties, body, mongodb ):
    
        decoded_body = msgpack.unpackb(body, raw=False)
        the_uuid = decoded_body['header']['metadata']['metadata-fr']['geonet:info']['uuid']
        the_title = decoded_body['header']['metadata']['metadata-fr']['title']
        the_session_id = decoded_body['header']['session_id']
    
        doc_page = decoded_body['body']
    
        logging.info('Pushing %i docs to MongoDB; uuid = %s; title = %s' % (len(doc_page), the_uuid, the_title))
    
        collection = mongodb[ 'indexing-session-' + the_session_id ]
        # collection = mongodb['geodata']
    
    
        bulk_write_operations = []
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # add _id
        for doc in doc_page:
            doc['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
    
            #res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True)
            bulk_write_operations.append(ReplaceOne({'_id': doc['_id']}, doc, upsert=True))
    
        # execute bulk write
    
        res = collection.bulk_write( bulk_write_operations, ordered=False )
    
        if res.acknowledged == True:
            channel.basic_ack(delivery_tag = method.delivery_tag)
        else:
            logging.error("Unable to push documents to MongoDB :-(")
            channel.basic_nack(delivery_tag = method.delivery_tag)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        return
    
    
    def main(cfg):
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        from utils.close_connection import on_timeout
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        timeout = 5
        connection.add_timeout(timeout, on_timeout(connection))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        channel = connection.channel()
        exchange    = cfg['rabbitmq']['exchange']
    
        # the queue this program will be consuming messages from
        doc_pages_to_store_in_mongo_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_4_suffix']
        doc_pages_to_store_in_mongo_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_4_suffix']
    
    
        mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
                                                               cfg['mongo']['password'],
                                                               cfg['mongo']['host'],
                                                               cfg['mongo']['port']))
    
        mongo_db = mongo_client[cfg['mongo']['data-db']]
    
    
        #channel.exchange_declare(exchange=out_exchange, exchange_type='direct')
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(lambda ch, method, properties, body: process_doc_pages(ch, method, properties, body, mongo_db), queue=doc_pages_to_store_in_mongo_qn)#, no_ack=True)
        channel.start_consuming()
    
        connection.close()
    
    
    if __name__ == '__main__':
    
        import yaml
        import time
        import signal
        signal.signal(signal.SIGINT, exit_gracefully)
    
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        while True:
    
            try:
                main(cfg)
            except pika.exceptions.ChannelClosed:
                logging.info("Waiting for tasks...")
                time.sleep(5)
            except ServerSelectionTimeoutError:
                logging.error('Waiting for MongoDB to be reachable...')
                time.sleep(5)
            except Exception as e:
                logging.error(e)
                time.sleep(5)