import pika import msgpack import json import hashlib from pymongo import MongoClient, ReplaceOne from pymongo.errors import ServerSelectionTimeoutError from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging def process_doc_pages( channel, method, properties, body, mongodb ): decoded_body = msgpack.unpackb(body, raw=False) the_uuid = decoded_body['header']['metadata']['metadata-fr']['geonet:info']['uuid'] the_title = decoded_body['header']['metadata']['metadata-fr']['title'] the_session_id = decoded_body['header']['session_id'] doc_page = decoded_body['body'] logging.info('Pushing %i docs to MongoDB; uuid = %s; title = %s' % (len(doc_page), the_uuid, the_title)) collection = mongodb[ 'indexing-session-' + the_session_id ] # collection = mongodb['geodata'] bulk_write_operations = [] # add _id for doc in doc_page: doc['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() #res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True) bulk_write_operations.append(ReplaceOne({'_id': doc['_id']}, doc, upsert=True)) # execute bulk write res = collection.bulk_write( bulk_write_operations, ordered=False ) if res.acknowledged == True: channel.basic_ack(delivery_tag = method.delivery_tag) else: logging.error("Unable to push documents to MongoDB :-(") channel.basic_nack(delivery_tag = method.delivery_tag) return def main(cfg): from utils.close_connection import on_timeout connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) timeout = 5 connection.add_timeout(timeout, on_timeout(connection)) channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] # the queue this program will be consuming messages from doc_pages_to_store_in_mongo_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_4_suffix'] doc_pages_to_store_in_mongo_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_4_suffix'] mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'], cfg['mongo']['password'], cfg['mongo']['host'], cfg['mongo']['port'])) mongo_db = mongo_client[cfg['mongo']['data-db']] #channel.exchange_declare(exchange=out_exchange, exchange_type='direct') channel.basic_qos(prefetch_count=1) channel.basic_consume(lambda ch, method, properties, body: process_doc_pages(ch, method, properties, body, mongo_db), queue=doc_pages_to_store_in_mongo_qn)#, no_ack=True) channel.start_consuming() connection.close() if __name__ == '__main__': import yaml import time import signal signal.signal(signal.SIGINT, exit_gracefully) with open("config.yaml", 'r') as yamlfile: cfg = yaml.load(yamlfile) while True: try: main(cfg) except pika.exceptions.ChannelClosed: logging.info("Waiting for tasks...") time.sleep(5) except ServerSelectionTimeoutError: logging.error('Waiting for MongoDB to be reachable...') time.sleep(5) except Exception as e: logging.error(e) time.sleep(5)