From 1f12dc5acde3507b9924f6484f40ac53676ca876 Mon Sep 17 00:00:00 2001 From: ddamiron <ddamiron@sii.fr> Date: Wed, 3 Jul 2019 11:25:59 +0200 Subject: [PATCH] add get json for all progress_ratio and timestamp end-point --- api.py | 10 ++++++---- lib/mongo_session.py | 4 +++- workers/reindexer.py | 4 ++-- workers/sample-generator.py | 31 +++++++++++++++++++++++++++---- 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/api.py b/api.py index 0b86b01..466f4b4 100644 --- a/api.py +++ b/api.py @@ -149,7 +149,8 @@ def build_plot(session_id, step): def build_all_plot(session_id): img = io.BytesIO() plt.clf() - for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']: + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', + 'pre-reindexer-checker', 'reindexer', 'sampler']: try: x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) plt.plot(x, y, label=step) @@ -158,8 +159,8 @@ def build_all_plot(session_id): plt.legend() plt.xlabel('time') plt.ylabel('progress ratio') - plt.title('indexer session scripts progress report.') - plt.grid(True) + plt.title('indexer-session-scripts: progress-report.') + plt.grid(False) plt.savefig(img, format='png') img.seek(0) @@ -172,7 +173,8 @@ def build_all_plot(session_id): def build_full_session_json(session_id): data = dict() - for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']: + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', + 'pre-reindexer-checker', 'reindexer', 'sampler']: try: data[step] = dict() data[step]['timestamp'], data[step]['progress_ratio'] =\ diff --git a/lib/mongo_session.py b/lib/mongo_session.py index 41715c5..2a4b326 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -100,7 +100,9 @@ class MongoSession: data_time = [] data_value = [] - request_result = self.mongo_data_collection.find({"session_id": this_session_id, "step": step_name}, + request_result = self.mongo_data_collection.find({"session_id": this_session_id, + "step": step_name, + 'loglevel': 'INFO'}, {'timestamp': 1, 'progress_ratio': 1}) for res in request_result: diff --git a/workers/reindexer.py b/workers/reindexer.py index f29ff16..90f3ccb 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -173,7 +173,7 @@ def on_msg_callback(channel, method, properties, body): log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], - step='reindexer', + step='pre-reindexer-checker', status='Documents are still being pushed to the source index', uuid_prefix='full', info=uuid, @@ -302,7 +302,7 @@ def on_msg_callback(channel, method, properties, body): uuid_prefix='full', info=uuid, loglevel='ERROR', - progress_ratio=0.5 + progress_ratio=None ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/sample-generator.py b/workers/sample-generator.py index c530f19..a83c493 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -182,12 +182,12 @@ def callback(channel, method, properties, body): logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) # ---------------------- send log ---------------------------- - message = "Deleting already existing samples for dataset with slug = {:s} ".format(docs_to_index[0]['slug']) + message = "Deleting already existing samples for dataset " log_message = LogMessage(session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], step='sampler', status=message, - uuid_prefix='', + uuid_prefix='meta', info=uuid, loglevel='INFO', progress_ratio=0 @@ -224,7 +224,7 @@ def callback(channel, method, properties, body): uuid_prefix='full', info=uuid, loglevel='ERROR', - progress_ratio=None + progress_ratio=0 ) json_body = json.dumps(log_message.__dict__) @@ -265,7 +265,7 @@ def callback(channel, method, properties, body): uuid_prefix='', info=uuid, loglevel='INFO', - progress_ratio=1 + progress_ratio=0.9 ) json_body = json.dumps(log_message.__dict__) @@ -289,6 +289,29 @@ def callback(channel, method, properties, body): destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', '')) unlock( cfg['session']['working_directory'], uuid.replace('.full', '') ) + # ---------------------- send log ---------------------------- + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='sampler', + status='Removing lock for dataset', + uuid_prefix='full', + info=uuid, + loglevel='INFO', + progress_ratio=1 + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ else: channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) logging.error(json.dumps(rep, indent=4)) -- GitLab