diff --git a/api.py b/api.py index b0b57d6a480e97c52959fe5ea6e6350caed7a45f..e590480beeef2a0790b2e7471b7a1c98f5237a78 100644 --- a/api.py +++ b/api.py @@ -116,6 +116,13 @@ def lateststatus(session_id): return Response(json.dumps(body, indent=4, sort_keys=True, default=str), mimetype='application/json') +@api.route('/status/<session_id>/error') +def taskerror(session_id): + my_task_errors = GetLatestLog(session_id=session_id, cfg=cfg) + body = my_task_errors.main() + + return Response(json.dumps(body, indent=4, sort_keys=True, default=str), mimetype='application/json') + if __name__ == '__main__': api.run(host='0.0.0.0', port=8000, debug=True) diff --git a/lib/mongo_session.py b/lib/mongo_session.py index a61c1ae00f4e7f359d8fec55882a98fab31f365f..8bc0be9f15bebb021055cd96e17698465d249b88 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -67,6 +67,17 @@ class MongoSession: except Exception as exc: print('[ERROR reading log]:', exc) + def read_mongo_errors(self, this_session_id): + try: + data = [] + request_result = self.mongo_data_collection.find({"session_id": this_session_id, + "loglevel": "ERROR"}) + for res in request_result: + data.append(res) + return data + except Exception as exc: + print('[ERROR reading log]:', exc) + def get_most_recent_log(self, this_session_id): try: data = [] diff --git a/lib/read_errors_logs.py b/lib/read_errors_logs.py new file mode 100644 index 0000000000000000000000000000000000000000..7e9d977da33bf7364f87b1624853c95eddab6820 --- /dev/null +++ b/lib/read_errors_logs.py @@ -0,0 +1,62 @@ +import pika +import os, sys +from elasticsearch.exceptions import AuthorizationException + +from yaml import load +try: + from yaml import CLoader as Loader, CDumper as Dumper +except ImportError: + from yaml import Loader, Dumper + +try: + from types import SimpleNamespace as Namespace +except ImportError: + # Python 2.x fallback + from argparse import Namespace +try: + from lib.mongo_session import MongoSession +except ImportError: + from mongo_session import MongoSession + +fileDir = os.path.dirname(os.path.abspath(__file__)) +parentDir = os.path.dirname(fileDir) +newPath = os.path.join(parentDir) +sys.path.append(newPath) + + +class ReadErrorLogger: + def __init__(self, session_id, cfg): + self.session_id = session_id + self.cfg = cfg + + def main(self): + + response_body = MongoSession(self.cfg).read_mongo_errors(self.session_id) + return response_body + + +if __name__ == '__main__': + + import time + + # read 'n' parse the configuration + with open("../config.yaml", 'r') as yamlfile: + cfg = load(yamlfile, Loader=Loader) + try: + session_id = '17-06-14:57-indexation' + my_process_logger = ReadErrorLogger(session_id=session_id, cfg=cfg) + data = my_process_logger.main() + print('data : ', data) + except pika.exceptions.ChannelClosed: + print('Waiting for tasks...') + # logging.info("Waiting for tasks...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + # logging.info('Waiting for RabbitMQ to be reachable...') + print('Waiting for RabbitMQ to be reachable...') + time.sleep(5) + except AuthorizationException as e: + print('e') + # logging.error(e) + time.sleep(5) + exit(1) diff --git a/main.py b/main.py index 565a9002c06a18a93f71d7d701211181ed902e7e..f60ffe02330b9a8747efa94744a819ee7b8d3197 100644 --- a/main.py +++ b/main.py @@ -307,6 +307,19 @@ def main(cfg): cfg['session']['current_uuid'] = uuid_to_get rabbit.publish_log(log_message=log_message2.__dict__) # ------------------------------------------------------------ + + # ---------------------- send log ---------------------------- + log_message2 = LogMessage(session_id=cfg['session']['id'], + uuid=uuid_to_get, + step='main', + status='this message is for testing only', + uuid_prefix='meta', + info='no info', + loglevel='ERROR' + ) + cfg['session']['current_uuid'] = uuid_to_get + rabbit.publish_log(log_message=log_message2.__dict__) + # ------------------------------------------------------------ # <-- the rabbit connexion is automatically closed here # if 'all' not in uuids_to_get: diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 4c9189c782e9c98ee5bfdf13082ad4c0a962466a..41c61040884f7e9cc485dc6bef0dadadbf58eff3 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -197,6 +197,28 @@ def index_docs(channel, method, properties, body): logging.error(json.dumps(rep, indent=4)) logging.error("Failed") + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-indexer', + status=rep, + uuid_prefix='meta', + info='no info', + loglevel='ERROR' + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + #time.sleep(5) if progress_rounded == 100: diff --git a/workers/reindexer.py b/workers/reindexer.py index 3dbf746d15a52db580e0d5f0ee82fe1653428069..a2681e210f5055bf7e020eb5ccab999695c28845 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -181,6 +181,29 @@ def on_msg_callback(channel, method, properties, body): es.indices.refresh(index=index) except NotFoundError: # the destination index may not be already present + # ---------------------- send log ---------------------------- + message = 'the destination index may not be already present' + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status=message, + uuid_prefix='full', + info=uuid, + loglevel='ERROR' + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ pass the_query = dict() @@ -194,9 +217,32 @@ def on_msg_callback(channel, method, properties, body): logging.debug(res) except NotFoundError: pass - except Exception as e: - logging.error(e) + except Exception as exc: + logging.error(exc) channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + # ---------------------- send log ---------------------------- + message = 'the destination index may not be already present' + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status=exc, + uuid_prefix='full', + info=uuid, + loglevel='ERROR' + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ return # # 2. setup template diff --git a/workers/sample-generator.py b/workers/sample-generator.py index d1089e7bdc0109afd3f8cc2e61ae06af4f6fc4ca..34d086b1035f92d3cb2a878ee7796d6d1ecb4256 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -211,8 +211,30 @@ def callback(channel, method, properties, body): time.sleep(5) channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) return - except Exception as e: + except Exception as exc: logging.error("Exception:") + # ---------------------- send log ---------------------------- + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='sampler', + status=exc, + uuid_prefix='full', + info=uuid, + loglevel='ERROR' + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ logging.error(e) logging.error("Exiting.") exit(1) @@ -267,6 +289,28 @@ def callback(channel, method, properties, body): else: channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) logging.error(json.dumps(rep, indent=4)) + # ---------------------- send log ---------------------------- + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='sampler', + status=rep, + uuid_prefix='full', + info=uuid, + loglevel='ERROR' + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ logging.error("Failed") # else: