diff --git a/workers/doc_processor.py b/workers/doc_processor.py index 0a0ea1f4ea1bf29f8b2e45da50b7cef7dc61c754..ffd51d42d1f41f8501cc7b1b46add9b7e1f2f586 100644 --- a/workers/doc_processor.py +++ b/workers/doc_processor.py @@ -140,7 +140,7 @@ def process_docs( channel, method, properties, body ): del cfg['rabbitmq']['routing_key_2'] channel.exchange_declare(exchange=exchange, exchange_type='direct') - channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}) + dest_queue = channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}) channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) @@ -167,7 +167,8 @@ def process_docs( channel, method, properties, body ): the_body = msgpack.packb(msg, use_bin_type=True) - + q_len = dest_queue.method.message_count + logging.info('Queue Size is %i for %s...' % ( q_len, docs_to_index_qn)) channel.basic_publish( exchange=exchange, routing_key=docs_to_index_rk, body=the_body,