Commit 6ca39ffc authored by DESPRES Damien's avatar DESPRES Damien
Browse files

add tempo when next queue is full

parent 5ceca742
Pipeline #16666 passed with stages
in 16 seconds
......@@ -138,7 +138,7 @@ def enrich_docs( channel, method, properties, body ):
channel.exchange_declare(exchange=exchange, exchange_type='direct')
channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
dest_queue = channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk)
......@@ -178,7 +178,13 @@ def enrich_docs( channel, method, properties, body ):
msg['body'] = doc_page
the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime)
q_len = dest_queue.method.message_count
logging.info('Queue Size is %i for %s...' % ( q_len, doc_pages_to_process_qn))
while q_len > 50:
logging.info('Waiting for next queue to process elements...')
time.sleep(5)
q_len = dest_queue.method.message_count
channel.basic_publish( exchange=exchange,
routing_key=doc_pages_to_process_rk,
body=the_body,
......
......@@ -169,6 +169,11 @@ def process_docs( channel, method, properties, body ):
q_len = dest_queue.method.message_count
logging.info('Queue Size is %i for %s...' % ( q_len, docs_to_index_qn))
while q_len > 50:
logging.info('Waiting for next queue to process elements...')
time.sleep(5)
q_len = dest_queue.method.message_count
channel.basic_publish( exchange=exchange,
routing_key=docs_to_index_rk,
body=the_body,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment