From 6ca39ffc1efdd91f2631e8381f77265454b4d14f Mon Sep 17 00:00:00 2001
From: DESPRES Damien <ddespres@neogeo.fr>
Date: Thu, 14 Oct 2021 16:41:43 +0200
Subject: [PATCH] add tempo when next queue is full

---
 workers/doc_enricher.py  | 10 ++++++++--
 workers/doc_processor.py |  5 +++++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/workers/doc_enricher.py b/workers/doc_enricher.py
index 1d36785..ab2000d 100644
--- a/workers/doc_enricher.py
+++ b/workers/doc_enricher.py
@@ -138,7 +138,7 @@ def enrich_docs( channel, method, properties, body ):
 
     channel.exchange_declare(exchange=exchange, exchange_type='direct')
 
-    channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
+    dest_queue = channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
     channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk)
 
 
@@ -178,7 +178,13 @@ def enrich_docs( channel, method, properties, body ):
         msg['body'] = doc_page
 
         the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime)
-
+        q_len = dest_queue.method.message_count
+        logging.info('Queue Size is %i for %s...' % ( q_len, doc_pages_to_process_qn))
+        while q_len > 50:
+            logging.info('Waiting for next queue to process elements...')
+            time.sleep(5)
+            q_len = dest_queue.method.message_count
+        
         channel.basic_publish( exchange=exchange,
                                routing_key=doc_pages_to_process_rk,
                                body=the_body,
diff --git a/workers/doc_processor.py b/workers/doc_processor.py
index ffd51d4..dec3f76 100644
--- a/workers/doc_processor.py
+++ b/workers/doc_processor.py
@@ -169,6 +169,11 @@ def process_docs( channel, method, properties, body ):
 
     q_len = dest_queue.method.message_count
     logging.info('Queue Size is %i for %s...' % ( q_len, docs_to_index_qn))
+    while q_len > 50:
+        logging.info('Waiting for next queue to process elements...')
+        time.sleep(5)
+        q_len = dest_queue.method.message_count
+    
     channel.basic_publish( exchange=exchange,
                            routing_key=docs_to_index_rk,
                            body=the_body,
-- 
GitLab