Skip to content
Snippets Groups Projects
Commit 04eb73a7 authored by Alessandro Cerioni's avatar Alessandro Cerioni
Browse files

Preventing the doc-indexer from entering kind of an infinite loop in case...

Preventing the doc-indexer from entering kind of an infinite loop in case documents cannot be pushed to Elasticsearch.
parent a5da4a5a
Branches
No related tags found
No related merge requests found
...@@ -113,10 +113,8 @@ def index_docs(channel, method, properties, body): ...@@ -113,10 +113,8 @@ def index_docs(channel, method, properties, body):
# header['index']['_id'] = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest() # header['index']['_id'] = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest()
es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc))
rep = es.bulk(body=es_body) rep = es.bulk(body=es_body)
#print(rep)
t2 = time.time() t2 = time.time()
if rep['errors'] == False: if rep['errors'] == False:
...@@ -157,23 +155,17 @@ def index_docs(channel, method, properties, body): ...@@ -157,23 +155,17 @@ def index_docs(channel, method, properties, body):
properties=pika.BasicProperties(delivery_mode = 2)) properties=pika.BasicProperties(delivery_mode = 2))
else: else:
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
#print("") #print("")
logging.error(json.dumps(rep, indent=4)) #logging.error(json.dumps(rep, indent=4))
logging.error("Failed") raise Exception('Failed to push documents to Elasticsearch.')
#time.sleep(5)
return return
def main(cfg): def main(cfg):
#from lib.close_connection import on_timeout
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port']))
# timeout = 5 # timeout = 5
# connection.add_timeout(timeout, on_timeout(connection)) # connection.add_timeout(timeout, on_timeout(connection))
...@@ -232,3 +224,8 @@ if __name__ == '__main__': ...@@ -232,3 +224,8 @@ if __name__ == '__main__':
logging.error(e) logging.error(e)
time.sleep(5) time.sleep(5)
exit(1) exit(1)
except Exception as e:
logging.error(e)
logging.info('Sleeping for 5 seconds before shutting down.')
time.sleep(5)
exit(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment