diff --git a/api.py b/api.py index 0b86b01423400aaf5c5557ab73f239b1e6993975..466f4b4b286a5fa791ab9bc8591fe81823782126 100644 --- a/api.py +++ b/api.py @@ -149,7 +149,8 @@ def build_plot(session_id, step): def build_all_plot(session_id): img = io.BytesIO() plt.clf() - for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']: + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', + 'pre-reindexer-checker', 'reindexer', 'sampler']: try: x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) plt.plot(x, y, label=step) @@ -158,8 +159,8 @@ def build_all_plot(session_id): plt.legend() plt.xlabel('time') plt.ylabel('progress ratio') - plt.title('indexer session scripts progress report.') - plt.grid(True) + plt.title('indexer-session-scripts: progress-report.') + plt.grid(False) plt.savefig(img, format='png') img.seek(0) @@ -172,7 +173,8 @@ def build_all_plot(session_id): def build_full_session_json(session_id): data = dict() - for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']: + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', + 'pre-reindexer-checker', 'reindexer', 'sampler']: try: data[step] = dict() data[step]['timestamp'], data[step]['progress_ratio'] =\ diff --git a/lib/mongo_session.py b/lib/mongo_session.py index 41715c5a4b3299eb27de058476d90114c194f91c..2a4b32622929558311b0a0f5e0af3e5513297530 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -100,7 +100,9 @@ class MongoSession: data_time = [] data_value = [] - request_result = self.mongo_data_collection.find({"session_id": this_session_id, "step": step_name}, + request_result = self.mongo_data_collection.find({"session_id": this_session_id, + "step": step_name, + 'loglevel': 'INFO'}, {'timestamp': 1, 'progress_ratio': 1}) for res in request_result: diff --git a/workers/reindexer.py b/workers/reindexer.py index f29ff16180bda1f00b9b49463d94a56d85879b72..90f3ccb78b114517164e997ffff5a2b1cf6a5ba2 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -173,7 +173,7 @@ def on_msg_callback(channel, method, properties, body): log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], - step='reindexer', + step='pre-reindexer-checker', status='Documents are still being pushed to the source index', uuid_prefix='full', info=uuid, @@ -302,7 +302,7 @@ def on_msg_callback(channel, method, properties, body): uuid_prefix='full', info=uuid, loglevel='ERROR', - progress_ratio=0.5 + progress_ratio=None ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/sample-generator.py b/workers/sample-generator.py index c530f193fe69674aaf011a6ed685cbd4ff065197..a83c4939e5ab5469b80ec4607cd6e7dc6a823c49 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -182,12 +182,12 @@ def callback(channel, method, properties, body): logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) # ---------------------- send log ---------------------------- - message = "Deleting already existing samples for dataset with slug = {:s} ".format(docs_to_index[0]['slug']) + message = "Deleting already existing samples for dataset " log_message = LogMessage(session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], step='sampler', status=message, - uuid_prefix='', + uuid_prefix='meta', info=uuid, loglevel='INFO', progress_ratio=0 @@ -224,7 +224,7 @@ def callback(channel, method, properties, body): uuid_prefix='full', info=uuid, loglevel='ERROR', - progress_ratio=None + progress_ratio=0 ) json_body = json.dumps(log_message.__dict__) @@ -265,7 +265,7 @@ def callback(channel, method, properties, body): uuid_prefix='', info=uuid, loglevel='INFO', - progress_ratio=1 + progress_ratio=0.9 ) json_body = json.dumps(log_message.__dict__) @@ -289,6 +289,29 @@ def callback(channel, method, properties, body): destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', '')) unlock( cfg['session']['working_directory'], uuid.replace('.full', '') ) + # ---------------------- send log ---------------------------- + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='sampler', + status='Removing lock for dataset', + uuid_prefix='full', + info=uuid, + loglevel='INFO', + progress_ratio=1 + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ else: channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) logging.error(json.dumps(rep, indent=4))