diff --git a/api.py b/api.py index 466f4b4b286a5fa791ab9bc8591fe81823782126..463a765e47788b1a6eb0e97d1b38806b39ffb7c8 100644 --- a/api.py +++ b/api.py @@ -7,7 +7,7 @@ import matplotlib.pyplot as plt import io import base64 -from flask import Flask, jsonify, Response +from flask import Flask, jsonify, Response, render_template, url_for from flask import request, send_file from flask_executor import Executor @@ -130,11 +130,11 @@ def taskfilter(session_id, query_key, query_value): return Response(json.dumps(body, indent=4, sort_keys=True, default=str), mimetype='application/json') -@api.route('/plot/<session_id>/<step>') -def build_plot(session_id, step): +@api.route('/plot/<session_id>/<uuid>/<step>') +def build_plot(session_id, uuid, step): img = io.BytesIO() - x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) + x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step, this_uuid=uuid) plt.clf() plt.plot(x, y, label=step) plt.savefig(img, format='png') @@ -149,10 +149,54 @@ def build_plot(session_id, step): def build_all_plot(session_id): img = io.BytesIO() plt.clf() + my_mongo_session = MongoSession(cfg=cfg) + uuid_list = my_mongo_session.get_uuid_list_from_session_id(this_session_id=session_id) + nb_uuids = len(uuid_list) + print('nb_uuid: ', nb_uuids) + print('uuid_list: ', uuid_list) + if nb_uuids == 1: + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', + 'pre-reindexer-checker', 'reindexer', 'sampler']: + try: + x, y = my_mongo_session.get_array_from_step(this_session_id=session_id, + this_uuid=uuid_list[0], + step_name=step) + plt.plot(x, y, label=step) + except Exception as _: + print('cannot get data from :' + step) + plt.legend() + plt.xlabel('time') + plt.ylabel('progress ratio') + plt.title('indexer-session-scripts: progress-report.') + plt.grid(False) + + plt.savefig(img, format='png') + img.seek(0) + + plot_url = base64.b64encode(img.getvalue()).decode() + + return '<img src="data:image/png;base64,{}">'.format(plot_url) + + else: + response = dict() + response['info'] = 'please use <url/plot/session_id/uuid> end-point!' + response['nb uuids'] = nb_uuids + response['uuids list'] = uuid_list + return Response(json.dumps(response, indent=4, sort_keys=True, default=str), mimetype='application/json') + + +@api.route('/plot/<session_id>/<uuid>') +def build_all_plot_using_uuid(session_id, uuid): + img = io.BytesIO() + plt.clf() + my_mongo_session = MongoSession(cfg=cfg) + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'pre-reindexer-checker', 'reindexer', 'sampler']: try: - x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) + x, y = my_mongo_session.get_array_from_step(this_session_id=session_id, + this_uuid=uuid, + step_name=step) plt.plot(x, y, label=step) except Exception as _: print('cannot get data from :' + step) @@ -161,6 +205,7 @@ def build_all_plot(session_id): plt.ylabel('progress ratio') plt.title('indexer-session-scripts: progress-report.') plt.grid(False) + plt.savefig(img, format='png') img.seek(0) @@ -172,13 +217,47 @@ def build_all_plot(session_id): @api.route('/json/<session_id>') def build_full_session_json(session_id): data = dict() + my_mongo_session = MongoSession(cfg=cfg) + uuid_list = my_mongo_session.get_uuid_list_from_session_id(this_session_id=session_id) + nb_uuids = len(uuid_list) + print('nb_uuid: ', nb_uuids) + print('uuid_list: ', uuid_list) + if nb_uuids == 1: + + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', + 'pre-reindexer-checker', 'reindexer', 'sampler']: + try: + data[step] = dict() + data[step]['timestamp'], data[step]['progress_ratio'] = \ + my_mongo_session.get_array_from_step(this_session_id=session_id, + step_name=step, + this_uuid=uuid_list[0]) + + except Exception as _: + print('cannot get data from :' + step) + + return Response(json.dumps(data, indent=4, sort_keys=True, default=str), mimetype='application/json') + else: + response = dict() + response['info'] = 'please use <url/plot/session_id/uuid> end-point!' + response['nb uuids'] = nb_uuids + response['uuids list'] = uuid_list + return Response(json.dumps(response, indent=4, sort_keys=True, default=str), mimetype='application/json') + + +@api.route('/json/<session_id>/<uuid>') +def build_full_session_json_using_one_uuid(session_id, uuid): + data = dict() + my_mongo_session = MongoSession(cfg=cfg) for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'pre-reindexer-checker', 'reindexer', 'sampler']: try: data[step] = dict() - data[step]['timestamp'], data[step]['progress_ratio'] =\ - MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) + data[step]['timestamp'], data[step]['progress_ratio'] = \ + my_mongo_session.get_array_from_step(this_session_id=session_id, + step_name=step, + this_uuid=uuid) except Exception as _: print('cannot get data from :' + step) diff --git a/lib/mongo_session.py b/lib/mongo_session.py index 765194cbe293c24aec53435a5c93305917251734..0a8972cb2844a2c43848c4a362ed95ee6ec39803 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -1,4 +1,4 @@ -from pymongo import MongoClient, errors +from pymongo import MongoClient, errors, ASCENDING, DESCENDING import datetime import json import time @@ -12,6 +12,10 @@ class MongoSession: self.mongo_client = self.init_client_mongo() self.mongo_data_collection = self.set_mongo_collection(mongo_client=self.mongo_client, mongo_collection=self.collection_name) + self.mongo_data_collection.create_index( + [("session_id", ASCENDING), ("uuid", ASCENDING), ("step", ASCENDING)], + unique=False + ) def init_client_mongo(self): # init Mongo @@ -33,6 +37,7 @@ class MongoSession: @staticmethod def set_mongo_collection(mongo_client, mongo_collection): + return mongo_client[mongo_collection] def save_log_in_mongo(self, ch, method, body): @@ -71,20 +76,28 @@ class MongoSession: print('[ERROR reading log]:', exc) def read_and_filter_mongo(self, this_session_id, query_key, query_value): - try: - data = [] - if query_key == 'progress_ratio': - number = float(query_value) - request_result = self.mongo_data_collection.find({"session_id": this_session_id, - query_key: number}) - else: - request_result = self.mongo_data_collection.find({"session_id": this_session_id, - query_key: query_value}) - for res in request_result: - data.append(res) - return data - except Exception as exc: - print('[ERROR reading log]:', exc) + uuid_list = self.mongo_data_collection.find({"session_id": this_session_id}).distinct("uuid") + all_data = [] + for uuid in uuid_list: + try: + data = dict() + data['value'] = [] + data['uuid'] = uuid + if query_key == 'progress_ratio': + number = float(query_value) + request_result = self.mongo_data_collection.find({"session_id": this_session_id, + 'uuid': uuid, + query_key: number}) + else: + request_result = self.mongo_data_collection.find({"session_id": this_session_id, + 'uuid': uuid, + query_key: query_value}) + for res in request_result: + data['value'].append(res) + all_data.append(data) + except Exception as exc: + print('[ERROR reading log]:', exc) + return all_data def get_most_recent_log(self, this_session_id): try: @@ -97,12 +110,18 @@ class MongoSession: except Exception as exc: print('[ERROR reading log]:', exc) - def get_array_from_step(self, this_session_id, step_name): + def get_uuid_list_from_session_id(self, this_session_id): + + uuid_list = self.mongo_data_collection.find({"session_id": this_session_id}).distinct("uuid") + return uuid_list + + def get_array_from_step(self, this_session_id, this_uuid, step_name): try: data_time = [] data_value = [] request_result = self.mongo_data_collection.find({"session_id": this_session_id, + "uuid": this_uuid, "step": step_name, 'loglevel': 'INFO'}, {'timestamp': 1, 'progress_ratio': 1}) diff --git a/main.py b/main.py index a12a90638e00555c3e3b8b2e5e30a440d3dce2b3..e376f9f20e6de8a3bbcab2eb247c737aafd40819 100644 --- a/main.py +++ b/main.py @@ -272,10 +272,13 @@ def main(cfg): uuids_to_get = [None] for uuid_to_get in uuids_to_get: - + if uuid_to_get is None: + uuid_to_log = 'all' + else: + uuid_to_log = uuid_to_get # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], - uuid=uuid_to_get, + uuid=uuid_to_log, step='main', status='Starting...', uuid_prefix='meta', @@ -283,7 +286,7 @@ def main(cfg): loglevel='INFO', progress_ratio=0 ) - cfg['session']['current_uuid'] = uuid_to_get + cfg['session']['current_uuid'] = uuid_to_log with RabbitSession(cfg) as rabbit: rabbit.publish_log(log_message=log_message.__dict__) # ------------------------------------------------------------ @@ -293,13 +296,13 @@ def main(cfg): the_cfg=cfg, the_root_url=cfg['geonetwork']['url'], the_no_records_per_page=cfg['geonetwork']['records_per_page'], - the_uuid_to_get=cfg['session']['current_uuid'], + the_uuid_to_get=uuid_to_get, the_uuids_to_filter_out=uuids_to_filter_out, the_username=username, the_password=password) # ---------------------- send log ---------------------------- log_message2 = LogMessage(session_id=cfg['session']['id'], - uuid=uuid_to_get, + uuid=uuid_to_log, step='main', status='Terminated', uuid_prefix='meta', @@ -307,7 +310,6 @@ def main(cfg): loglevel='INFO', progress_ratio=1 ) - cfg['session']['current_uuid'] = uuid_to_get rabbit.publish_log(log_message=log_message2.__dict__) # ------------------------------------------------------------ diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py index 97bf4a995389fe21e46e2869c2ae9397025d699a..c772f9e7aba2e2f603a276231256bd85dae6431e 100644 --- a/workers/metadata-processor.py +++ b/workers/metadata-processor.py @@ -308,6 +308,9 @@ def callback( channel, method, properties, body ): channel.queue_declare(queue=docs_to_enrich_qn, durable=True) channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk) + if cfg['session']['current_uuid'] == 'all': + cfg['session']['current_uuid'] = in_record['geonet:info']['uuid'] + # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'],