Newer
Older
import pika
import msgpack
import json
import hashlib
from pymongo import MongoClient, ReplaceOne
from pymongo.errors import ServerSelectionTimeoutError
from utils.exit_gracefully import exit_gracefully
from utils.my_logging import logging
def process_doc_pages( channel, method, properties, body, mongodb ):
decoded_body = msgpack.unpackb(body, raw=False)
the_uuid = decoded_body['header']['metadata']['metadata-fr']['geonet:info']['uuid']
the_title = decoded_body['header']['metadata']['metadata-fr']['title']
the_session_id = decoded_body['header']['session_id']
doc_page = decoded_body['body']
logging.info('Pushing %i docs to MongoDB; uuid = %s; title = %s' % (len(doc_page), the_uuid, the_title))
collection = mongodb[ 'indexing-session-' + the_session_id ]
# collection = mongodb['geodata']
# add _id
for doc in doc_page:
doc['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
#res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True)
bulk_write_operations.append(ReplaceOne({'_id': doc['_id']}, doc, upsert=True))
# execute bulk write
res = collection.bulk_write( bulk_write_operations, ordered=False )
if res.acknowledged == True:
channel.basic_ack(delivery_tag = method.delivery_tag)
else:
logging.error("Unable to push documents to MongoDB :-(")
channel.basic_nack(delivery_tag = method.delivery_tag)
from utils.close_connection import on_timeout
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
timeout = 5
connection.add_timeout(timeout, on_timeout(connection))
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
channel = connection.channel()
exchange = cfg['rabbitmq']['exchange']
# the queue this program will be consuming messages from
doc_pages_to_store_in_mongo_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_4_suffix']
doc_pages_to_store_in_mongo_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_4_suffix']
mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
cfg['mongo']['password'],
cfg['mongo']['host'],
cfg['mongo']['port']))
mongo_db = mongo_client[cfg['mongo']['data-db']]
#channel.exchange_declare(exchange=out_exchange, exchange_type='direct')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(lambda ch, method, properties, body: process_doc_pages(ch, method, properties, body, mongo_db), queue=doc_pages_to_store_in_mongo_qn)#, no_ack=True)
channel.start_consuming()
connection.close()
if __name__ == '__main__':
import yaml
import time
import signal
signal.signal(signal.SIGINT, exit_gracefully)
with open("config.yaml", 'r') as yamlfile:
cfg = yaml.load(yamlfile)
while True:
try:
main(cfg)
except pika.exceptions.ChannelClosed:
logging.info("Waiting for tasks...")
time.sleep(5)
except ServerSelectionTimeoutError:
logging.error('Waiting for MongoDB to be reachable...')
time.sleep(5)
except Exception as e:
logging.error(e)
time.sleep(5)