From 4d8be09a1349967574e150e040eac95a1e34ac3b Mon Sep 17 00:00:00 2001 From: ddamiron <ddamiron@sii.fr> Date: Tue, 2 Jul 2019 11:45:34 +0200 Subject: [PATCH] add status/<session_id>/error filtering endpoint --- lib/log_message.py | 3 ++- lib/mongo_session.py | 1 + workers/doc-enricher.py | 16 +++++++++------- workers/doc-indexer.py | 13 ++++++++----- workers/doc-processor.py | 10 ++++++---- workers/metadata-processor.py | 12 ++++++++---- 6 files changed, 34 insertions(+), 21 deletions(-) diff --git a/lib/log_message.py b/lib/log_message.py index 8ad373c..7c920a5 100644 --- a/lib/log_message.py +++ b/lib/log_message.py @@ -4,7 +4,7 @@ import uuid as uuid_lib class LogMessage: - def __init__(self, session_id, uuid, step, status, uuid_prefix, info, loglevel): + def __init__(self, session_id, uuid, step, status, uuid_prefix, info, loglevel, progress_ratio): if(session_id.lower()) == 'new': self.session_id = str(uuid_lib.uuid4()) else: @@ -12,6 +12,7 @@ class LogMessage: self.uuid = uuid self.step = step self.status = status + self.progress_ratio = progress_ratio self.uuid_prefix = uuid_prefix self.info = info self.loglevel = loglevel diff --git a/lib/mongo_session.py b/lib/mongo_session.py index 8bc0be9..4c27332 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -52,6 +52,7 @@ class MongoSession: "status": body_object["status"], "uuid_prefix": body_object["uuid_prefix"], "info": body_object["info"], + "progress_ratio": body_object["progress_ratio"], "loglevel": body_object["loglevel"]}) except Exception as exc: print('[ERROR saving log]:', exc) diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 8af1baf..2017586 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -151,7 +151,7 @@ def old_enrich_docs( channel, method, properties, body, **kwargs ): channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['docs_to_enrich_rk'], body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) + properties=pika.BasicProperties(delivery_mode=2) ) @@ -170,14 +170,14 @@ def old_enrich_docs( channel, method, properties, body, **kwargs ): channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) + properties=pika.BasicProperties(delivery_mode=2) ) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_process_rk'], body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) + properties=pika.BasicProperties(delivery_mode=2) ) logging.info('...done!') @@ -239,7 +239,7 @@ def enrich_docs( channel, method, properties, body ): doc_page = [{**decoded_body['body'], 'last_update': last_update, 'data-fr': feature} for feature in feature_page] logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) - progress_rounded = round(progress_ratio * 100) + progress_rounded = round(progress_ratio, 2) # status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( # len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio*100, @@ -253,7 +253,8 @@ def enrich_docs( channel, method, properties, body ): status=status_message, uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) @@ -283,7 +284,7 @@ def enrich_docs( channel, method, properties, body ): ) #logging.info('...done!') - if progress_rounded == 100: + if progress_rounded == 1 or progress_rounded == 1.00: # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], @@ -293,7 +294,8 @@ def enrich_docs( channel, method, properties, body ): status='done', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 41c6104..5d03918 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -122,7 +122,7 @@ def index_docs(channel, method, properties, body): logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug'])) - progress_rounded = round(progress_ratio * 100) + progress_rounded = round(progress_ratio, 2) status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio * 100, len(docs_to_index), docs_to_index[0]['slug']) @@ -134,7 +134,8 @@ def index_docs(channel, method, properties, body): status=status_message, uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) @@ -205,7 +206,8 @@ def index_docs(channel, method, properties, body): status=rep, uuid_prefix='meta', info='no info', - loglevel='ERROR' + loglevel='ERROR', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) @@ -221,7 +223,7 @@ def index_docs(channel, method, properties, body): #time.sleep(5) - if progress_rounded == 100: + if progress_rounded == 1 or progress_rounded == 1.00: # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], @@ -231,7 +233,8 @@ def index_docs(channel, method, properties, body): status='done', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/doc-processor.py b/workers/doc-processor.py index 5ce3246..b1b9f3d 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -111,7 +111,7 @@ def process_docs( channel, method, properties, body ): logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) - progress_rounded = round(progress_ratio * 100) + progress_rounded = round(progress_ratio, 2) status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio * 100, len(docs), docs[0]['slug']) @@ -123,7 +123,8 @@ def process_docs( channel, method, properties, body ): status=status_message, uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) @@ -157,7 +158,7 @@ def process_docs( channel, method, properties, body ): ) logging.info('...done!') - if progress_rounded == 100: + if progress_rounded == 1 or progress_rounded == 1.00: # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], @@ -166,7 +167,8 @@ def process_docs( channel, method, properties, body ): status='done', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py index b48e21d..03cc81f 100644 --- a/workers/metadata-processor.py +++ b/workers/metadata-processor.py @@ -316,7 +316,8 @@ def callback( channel, method, properties, body ): status='Starting...', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=0 ) json_body = json.dumps(log_message.__dict__) @@ -389,7 +390,8 @@ def callback( channel, method, properties, body ): status='sent task doc to enrich', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=0.5 ) json_body = json.dumps(log_message.__dict__) @@ -438,7 +440,8 @@ def callback( channel, method, properties, body ): status='sent task doc to index', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=50 ) json_body = json.dumps(log_message.__dict__) @@ -460,7 +463,8 @@ def callback( channel, method, properties, body ): status='terminated', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=1 ) json_body = json.dumps(log_message.__dict__) -- GitLab