diff --git a/lib/log_message.py b/lib/log_message.py index 8ad373cf5c17d9f6c87daf87f0cab3cb31c5de30..7c920a5f81ea3701823d7130d9bc979dd004c595 100644 --- a/lib/log_message.py +++ b/lib/log_message.py @@ -4,7 +4,7 @@ import uuid as uuid_lib class LogMessage: - def __init__(self, session_id, uuid, step, status, uuid_prefix, info, loglevel): + def __init__(self, session_id, uuid, step, status, uuid_prefix, info, loglevel, progress_ratio): if(session_id.lower()) == 'new': self.session_id = str(uuid_lib.uuid4()) else: @@ -12,6 +12,7 @@ class LogMessage: self.uuid = uuid self.step = step self.status = status + self.progress_ratio = progress_ratio self.uuid_prefix = uuid_prefix self.info = info self.loglevel = loglevel diff --git a/lib/mongo_session.py b/lib/mongo_session.py index 8bc0be9f15bebb021055cd96e17698465d249b88..4c273323693839937e8f12553652736cfb5179cf 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -52,6 +52,7 @@ class MongoSession: "status": body_object["status"], "uuid_prefix": body_object["uuid_prefix"], "info": body_object["info"], + "progress_ratio": body_object["progress_ratio"], "loglevel": body_object["loglevel"]}) except Exception as exc: print('[ERROR saving log]:', exc) diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 8af1bafbe43c929a2c437596f2381ba2c765326d..2017586d1080e35d79a31ccb3f0d107aa9086b81 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -151,7 +151,7 @@ def old_enrich_docs( channel, method, properties, body, **kwargs ): channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['docs_to_enrich_rk'], body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) + properties=pika.BasicProperties(delivery_mode=2) ) @@ -170,14 +170,14 @@ def old_enrich_docs( channel, method, properties, body, **kwargs ): channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) + properties=pika.BasicProperties(delivery_mode=2) ) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_process_rk'], body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) + properties=pika.BasicProperties(delivery_mode=2) ) logging.info('...done!') @@ -239,7 +239,7 @@ def enrich_docs( channel, method, properties, body ): doc_page = [{**decoded_body['body'], 'last_update': last_update, 'data-fr': feature} for feature in feature_page] logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) - progress_rounded = round(progress_ratio * 100) + progress_rounded = round(progress_ratio, 2) # status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( # len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio*100, @@ -253,7 +253,8 @@ def enrich_docs( channel, method, properties, body ): status=status_message, uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) @@ -283,7 +284,7 @@ def enrich_docs( channel, method, properties, body ): ) #logging.info('...done!') - if progress_rounded == 100: + if progress_rounded == 1 or progress_rounded == 1.00: # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], @@ -293,7 +294,8 @@ def enrich_docs( channel, method, properties, body ): status='done', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 41c61040884f7e9cc485dc6bef0dadadbf58eff3..5d039187d2dd97a59acda4206607721f25a38bfa 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -122,7 +122,7 @@ def index_docs(channel, method, properties, body): logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug'])) - progress_rounded = round(progress_ratio * 100) + progress_rounded = round(progress_ratio, 2) status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio * 100, len(docs_to_index), docs_to_index[0]['slug']) @@ -134,7 +134,8 @@ def index_docs(channel, method, properties, body): status=status_message, uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) @@ -205,7 +206,8 @@ def index_docs(channel, method, properties, body): status=rep, uuid_prefix='meta', info='no info', - loglevel='ERROR' + loglevel='ERROR', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) @@ -221,7 +223,7 @@ def index_docs(channel, method, properties, body): #time.sleep(5) - if progress_rounded == 100: + if progress_rounded == 1 or progress_rounded == 1.00: # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], @@ -231,7 +233,8 @@ def index_docs(channel, method, properties, body): status='done', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/doc-processor.py b/workers/doc-processor.py index 5ce324644cf96f55297e55649333dea8c3a583f6..b1b9f3d4724b96b3ce52713f428d5043e1cc7961 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -111,7 +111,7 @@ def process_docs( channel, method, properties, body ): logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) - progress_rounded = round(progress_ratio * 100) + progress_rounded = round(progress_ratio, 2) status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio * 100, len(docs), docs[0]['slug']) @@ -123,7 +123,8 @@ def process_docs( channel, method, properties, body ): status=status_message, uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) @@ -157,7 +158,7 @@ def process_docs( channel, method, properties, body ): ) logging.info('...done!') - if progress_rounded == 100: + if progress_rounded == 1 or progress_rounded == 1.00: # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], @@ -166,7 +167,8 @@ def process_docs( channel, method, properties, body ): status='done', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=progress_rounded ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py index b48e21de4d42da48c67531acca34803b56d1f51e..03cc81f99e10feb6b795636c9e42c619048a4207 100644 --- a/workers/metadata-processor.py +++ b/workers/metadata-processor.py @@ -316,7 +316,8 @@ def callback( channel, method, properties, body ): status='Starting...', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=0 ) json_body = json.dumps(log_message.__dict__) @@ -389,7 +390,8 @@ def callback( channel, method, properties, body ): status='sent task doc to enrich', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=0.5 ) json_body = json.dumps(log_message.__dict__) @@ -438,7 +440,8 @@ def callback( channel, method, properties, body ): status='sent task doc to index', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=50 ) json_body = json.dumps(log_message.__dict__) @@ -460,7 +463,8 @@ def callback( channel, method, properties, body ): status='terminated', uuid_prefix='meta', info='no info', - loglevel='INFO' + loglevel='INFO', + progress_ratio=1 ) json_body = json.dumps(log_message.__dict__)