diff --git a/api.py b/api.py index 463a765e47788b1a6eb0e97d1b38806b39ffb7c8..29dc418d7823b0807e50a0377395572687b20f41 100644 --- a/api.py +++ b/api.py @@ -27,7 +27,7 @@ except ImportError: from lib.read_status_logs import ReadLogger from lib.get_most_recent_status_logs import GetLatestLog from lib.read_and_filter_logs import ReadFilterLogger -from lib.mongo_session import MongoSession +from lib.mongo_helper import MongoSession # read 'n' parse the configuration with open("config.yaml", 'r') as yamlfile: @@ -108,33 +108,36 @@ def _main(the_uuid): @api.route('/status/<session_id>') def taskstatus(session_id): - my_process_logger = ReadLogger(session_id=session_id, cfg=cfg) + my_process_logger = ReadLogger(session_id=session_id, cfg=cfg['mongo']) body = my_process_logger.main() - return Response(json.dumps(body, indent=4, sort_keys=True, default=str), mimetype='application/json') + return Response(json.dumps(body, sort_keys=True, default=str), mimetype='application/json'), 200 + #return jsonify(body), 200 @api.route('/status/<session_id>/latest') def lateststatus(session_id): - my_latest_log = GetLatestLog(session_id=session_id, cfg=cfg) + my_latest_log = GetLatestLog(session_id=session_id, cfg=cfg['mongo']) body = my_latest_log.main() - return Response(json.dumps(body, indent=4, sort_keys=True, default=str), mimetype='application/json') + return Response(json.dumps(body, sort_keys=True, default=str), mimetype='application/json'), 200 + #return jsonify(body), 200 @api.route('/status/<session_id>/<query_key>/<query_value>') def taskfilter(session_id, query_key, query_value): - my_task_filter = ReadFilterLogger(session_id=session_id, cfg=cfg, query_key=query_key, query_value=query_value) + my_task_filter = ReadFilterLogger(session_id=session_id, cfg=cfg['mongo'], query_key=query_key, query_value=query_value) body = my_task_filter.main() - return Response(json.dumps(body, indent=4, sort_keys=True, default=str), mimetype='application/json') + return Response(json.dumps(body, sort_keys=True, default=str), mimetype='application/json'), 200 + #return jsonify(body), 200 @api.route('/plot/<session_id>/<uuid>/<step>') def build_plot(session_id, uuid, step): img = io.BytesIO() - x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step, this_uuid=uuid) + x, y = MongoSession(cfg=cfg['mongo']).get_array_from_step(this_session_id=session_id, step_name=step, this_uuid=uuid) plt.clf() plt.plot(x, y, label=step) plt.savefig(img, format='png') @@ -142,18 +145,18 @@ def build_plot(session_id, uuid, step): plot_url = base64.b64encode(img.getvalue()).decode() - return '<img src="data:image/png;base64,{}">'.format(plot_url) + return '<img src="data:image/png;base64,{}">'.format(plot_url), 200 @api.route('/plot/<session_id>') def build_all_plot(session_id): img = io.BytesIO() plt.clf() - my_mongo_session = MongoSession(cfg=cfg) + my_mongo_session = MongoSession(cfg=cfg['mongo']) uuid_list = my_mongo_session.get_uuid_list_from_session_id(this_session_id=session_id) nb_uuids = len(uuid_list) - print('nb_uuid: ', nb_uuids) - print('uuid_list: ', uuid_list) + logging.debug('nb_uuid: ', nb_uuids) + logging.debug('uuid_list: ', uuid_list) if nb_uuids == 1: for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'pre-reindexer-checker', 'reindexer', 'sampler']: @@ -163,7 +166,8 @@ def build_all_plot(session_id): step_name=step) plt.plot(x, y, label=step) except Exception as _: - print('cannot get data from :' + step) + logging.error('cannot get data from :' + step) + return None, 404 plt.legend() plt.xlabel('time') plt.ylabel('progress ratio') @@ -182,14 +186,15 @@ def build_all_plot(session_id): response['info'] = 'please use <url/plot/session_id/uuid> end-point!' response['nb uuids'] = nb_uuids response['uuids list'] = uuid_list - return Response(json.dumps(response, indent=4, sort_keys=True, default=str), mimetype='application/json') + return Response(json.dumps(response, sort_keys=True, default=str), mimetype='application/json'), 400 + #return jsonify(response), 400 @api.route('/plot/<session_id>/<uuid>') def build_all_plot_using_uuid(session_id, uuid): img = io.BytesIO() plt.clf() - my_mongo_session = MongoSession(cfg=cfg) + my_mongo_session = MongoSession(cfg=cfg['mongo']) for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'pre-reindexer-checker', 'reindexer', 'sampler']: @@ -199,7 +204,9 @@ def build_all_plot_using_uuid(session_id, uuid): step_name=step) plt.plot(x, y, label=step) except Exception as _: - print('cannot get data from :' + step) + logging.error('Cannot get data from :' + step) + return None, 404 + plt.legend() plt.xlabel('time') plt.ylabel('progress ratio') @@ -211,13 +218,13 @@ def build_all_plot_using_uuid(session_id, uuid): plot_url = base64.b64encode(img.getvalue()).decode() - return '<img src="data:image/png;base64,{}">'.format(plot_url) + return '<img src="data:image/png;base64,{}">'.format(plot_url), 200 @api.route('/json/<session_id>') def build_full_session_json(session_id): data = dict() - my_mongo_session = MongoSession(cfg=cfg) + my_mongo_session = MongoSession(cfg=cfg['mongo']) uuid_list = my_mongo_session.get_uuid_list_from_session_id(this_session_id=session_id) nb_uuids = len(uuid_list) print('nb_uuid: ', nb_uuids) @@ -233,22 +240,29 @@ def build_full_session_json(session_id): step_name=step, this_uuid=uuid_list[0]) + return Response(json.dumps(data, sort_keys=True, default=str), mimetype='application/json'), 200 + #return jsonify(data), 200 + + except Exception as _: - print('cannot get data from :' + step) + logging.error('cannot get data from :' + step) + return None, 404 - return Response(json.dumps(data, indent=4, sort_keys=True, default=str), mimetype='application/json') + return Response(json.dumps(data, sort_keys=True, default=str), mimetype='application/json'), 200 + #return jsonify(data), 200 else: response = dict() response['info'] = 'please use <url/plot/session_id/uuid> end-point!' response['nb uuids'] = nb_uuids response['uuids list'] = uuid_list - return Response(json.dumps(response, indent=4, sort_keys=True, default=str), mimetype='application/json') + return Response(json.dumps(response, sort_keys=True, default=str), mimetype='application/json'), 400 + #return jsonify(response), 400 @api.route('/json/<session_id>/<uuid>') def build_full_session_json_using_one_uuid(session_id, uuid): data = dict() - my_mongo_session = MongoSession(cfg=cfg) + my_mongo_session = MongoSession(cfg=cfg['mongo']) for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'pre-reindexer-checker', 'reindexer', 'sampler']: @@ -258,11 +272,15 @@ def build_full_session_json_using_one_uuid(session_id, uuid): my_mongo_session.get_array_from_step(this_session_id=session_id, step_name=step, this_uuid=uuid) + #return jsonify(data), 200 + return Response(json.dumps(data, sort_keys=True, default=str), mimetype='application/json'), 200 except Exception as _: - print('cannot get data from :' + step) + logging.error('Cannot get data from :' + step) + return None, 404 + + #return Response(json.dumps(data, indent=4, sort_keys=True, default=str), mimetype='application/json') - return Response(json.dumps(data, indent=4, sort_keys=True, default=str), mimetype='application/json') if __name__ == '__main__': diff --git a/docker-compose-workers.yml b/docker-compose-workers.yml index 8bac4a1878c284c14c990502b1f483a480f92e3b..a4a5e41d7d3c32d4cc6464de73d758265574507a 100644 --- a/docker-compose-workers.yml +++ b/docker-compose-workers.yml @@ -104,28 +104,28 @@ services: # volumes: # - ./config.yaml:/app/config.yaml:ro - metabase: - image: metabase/metabase -# restart: unless-stopped - ports: - - 3001:3000 - volumes: - # declare your mount volume /host/dir:/container/dir - - /home/app/metabase-data:/metabase-data -# environment: -# MB_DB_TYPE: mongo -# MB_DB_DBNAME: indexerdb -# MB_DB_PORT: 27017 -# MB_DB_USER: ${MONGO_USER} -# MB_DB_PASS: ${MONGO_PASSWORD} -# MB_DB_HOST: mongo - depends_on: - - mongo - links: - - mongo +# metabase: +# image: metabase/metabase +# # restart: unless-stopped +# ports: +# - 3001:3000 +# volumes: +# # declare your mount volume /host/dir:/container/dir +# - metabase:/metabase-data +# # environment: +# # MB_DB_TYPE: mongo +# # MB_DB_DBNAME: indexerdb +# # MB_DB_PORT: 27017 +# # MB_DB_USER: ${MONGO_USER} +# # MB_DB_PASS: ${MONGO_PASSWORD} +# # MB_DB_HOST: mongo +# depends_on: +# - mongo +# links: +# - mongo volumes: rabbitmq: working-directory: mongo: - metabase: + #metabase: diff --git a/lib/get_most_recent_status_logs.py b/lib/get_most_recent_status_logs.py index 69256e2ae47e1df42a00cad9d528227dde1521b2..8dc19434a46fe80bf2c7890fbcc4007f85859424 100644 --- a/lib/get_most_recent_status_logs.py +++ b/lib/get_most_recent_status_logs.py @@ -14,9 +14,9 @@ except ImportError: # Python 2.x fallback from argparse import Namespace try: - from lib.mongo_session import MongoSession + from lib.mongo_helper import MongoSession except ImportError: - from mongo_session import MongoSession + from mongo_helper import MongoSession fileDir = os.path.dirname(os.path.abspath(__file__)) parentDir = os.path.dirname(fileDir) diff --git a/lib/log_message.py b/lib/log_message.py index 7c920a5f81ea3701823d7130d9bc979dd004c595..602421b81b329fa106c5c1621a4643f44bd0b7fa 100644 --- a/lib/log_message.py +++ b/lib/log_message.py @@ -4,36 +4,32 @@ import uuid as uuid_lib class LogMessage: - def __init__(self, session_id, uuid, step, status, uuid_prefix, info, loglevel, progress_ratio): - if(session_id.lower()) == 'new': - self.session_id = str(uuid_lib.uuid4()) - else: - self.session_id = session_id + def __init__(self, session_id, uuid, step, status, uuid_suffix, info, loglevel, progress_ratio): + self.session_id = session_id self.uuid = uuid self.step = step self.status = status self.progress_ratio = progress_ratio - self.uuid_prefix = uuid_prefix + self.uuid_suffix = uuid_suffix self.info = info self.loglevel = loglevel self.timestamp = self.generate_timestamp() @staticmethod def generate_timestamp(): - print('timestamp generated') - datetime_without_timezone = datetime.datetime.now() - print("Original:", datetime_without_timezone) - timestamp_as_str = datetime_without_timezone.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - return timestamp_as_str + return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") if __name__ == '__main__': - my_message = LogMessage(session_id='new', + + my_message = LogMessage(session_id='1234', uuid='7c31bbee-9155-4a2d-964f-b13a9df5c542', step='doc-enricher', status='pending', - uuid_prefix='meta', - info='no info' + uuid_suffix='meta', + info='no info', + loglevel='DEBUG', + progress_ratio='0.12' ) print('my message.session_id :', my_message.session_id) diff --git a/lib/mongo_session.py b/lib/mongo_helper.py similarity index 86% rename from lib/mongo_session.py rename to lib/mongo_helper.py index 0a8972cb2844a2c43848c4a362ed95ee6ec39803..4958e0ae0d95b5f09111bc164b7626d8c0ef7ff3 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_helper.py @@ -2,13 +2,13 @@ from pymongo import MongoClient, errors, ASCENDING, DESCENDING import datetime import json import time - +from lib.my_logging import logging class MongoSession: def __init__(self, cfg): self.cfg = cfg - self.collection_name = cfg['mongo']['collection'] + self.collection_name = cfg['collection'] self.mongo_client = self.init_client_mongo() self.mongo_data_collection = self.set_mongo_collection(mongo_client=self.mongo_client, mongo_collection=self.collection_name) @@ -19,11 +19,11 @@ class MongoSession: def init_client_mongo(self): # init Mongo - mongo_hostname = self.cfg['mongo']['host'] - mongo_port = self.cfg['mongo']['port'] - mongo_database = self.cfg['mongo']['database'] - mongo_user = self.cfg['mongo']['user'] - mongo_pass = self.cfg['mongo']['password'] + mongo_hostname = self.cfg['host'] + mongo_port = self.cfg['port'] + mongo_database = self.cfg['database'] + mongo_user = self.cfg['user'] + mongo_pass = self.cfg['password'] while True: try: @@ -32,7 +32,7 @@ class MongoSession: db.authenticate(mongo_user, mongo_pass) return db except errors.ServerSelectionTimeoutError: - print('Waiting for mongodb to be reachable...') + logging.info('Waiting for MongoDB to be reachable...') time.sleep(5) @staticmethod @@ -41,11 +41,11 @@ class MongoSession: return mongo_client[mongo_collection] def save_log_in_mongo(self, ch, method, body): - print('body: ', body) + logging.debug('body: ', body) try: body_object = json.loads(body.decode('utf-8')) - print('body session_id: ', body_object['session_id']) + logging.debug('body session_id: ', body_object['session_id']) timestamp = datetime.datetime.strptime(body_object["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") float_progress_ratio = float(body_object["progress_ratio"]) @@ -55,12 +55,12 @@ class MongoSession: "uuid": body_object["uuid"], "step": body_object["step"], "status": body_object["status"], - "uuid_prefix": body_object["uuid_prefix"], + "uuid_suffix": body_object["uuid_suffix"], "info": body_object["info"], "progress_ratio": float_progress_ratio, "loglevel": body_object["loglevel"]}) except Exception as exc: - print('[ERROR saving log]:', exc) + logging.error('[ERROR saving log]:', exc) ch.basic_nack(delivery_tag=method.delivery_tag, requeue=1) ch.basic_ack(delivery_tag=method.delivery_tag) @@ -69,8 +69,7 @@ class MongoSession: try: data = [] request_result = self.mongo_data_collection.find({"session_id": this_session_id}) - for res in request_result: - data.append(res) + data = list(request_result) return data except Exception as exc: print('[ERROR reading log]:', exc) @@ -96,7 +95,7 @@ class MongoSession: data['value'].append(res) all_data.append(data) except Exception as exc: - print('[ERROR reading log]:', exc) + logging.error('[ERROR reading log]:', exc) return all_data def get_most_recent_log(self, this_session_id): @@ -132,7 +131,7 @@ class MongoSession: return data_time, data_value except Exception as exc: - print('[ERROR reading log]:', exc) + logging.error('[ERROR reading log]:', exc) if __name__ == '__main__': @@ -148,8 +147,6 @@ if __name__ == '__main__': cfg['mongo']['password'] = 'example' cfg['mongo']['collection'] = 'indexer_logs' this_session_id = 'bb0764fe-324e-4719-9214-dc4abc59fe50' - data_time, data_value = MongoSession(cfg=cfg).get_array_from_step(this_session_id, "doc-enricher") + data_time, data_value = MongoSession(cfg=cfg['mongo']).get_array_from_step(this_session_id, "doc-enricher") print('data_time: ', data_time) print('data_value: ', data_value) - - diff --git a/lib/read_and_filter_logs.py b/lib/read_and_filter_logs.py index 5687bcd32c0e9c21e16d3b50f59b30d309081cb0..35482d58c5530b15982320b80cc7b8bbb6a42b28 100644 --- a/lib/read_and_filter_logs.py +++ b/lib/read_and_filter_logs.py @@ -14,9 +14,9 @@ except ImportError: # Python 2.x fallback from argparse import Namespace try: - from lib.mongo_session import MongoSession + from lib.mongo_helper import MongoSession except ImportError: - from mongo_session import MongoSession + from mongo_helper import MongoSession fileDir = os.path.dirname(os.path.abspath(__file__)) parentDir = os.path.dirname(fileDir) diff --git a/lib/read_status_logs.py b/lib/read_status_logs.py index a7c6063cd985d1aa497bd4a8bf5c120b04f4b1ce..e953b7932182dfde4d71566171ee2364a348e6e3 100644 --- a/lib/read_status_logs.py +++ b/lib/read_status_logs.py @@ -14,9 +14,9 @@ except ImportError: # Python 2.x fallback from argparse import Namespace try: - from lib.mongo_session import MongoSession + from lib.mongo_helper import MongoSession except ImportError: - from mongo_session import MongoSession + from mongo_helper import MongoSession fileDir = os.path.dirname(os.path.abspath(__file__)) parentDir = os.path.dirname(fileDir) diff --git a/main.py b/main.py index e376f9f20e6de8a3bbcab2eb247c737aafd40819..23a4f1d0324b607a746f51c2e43eb8a405d8d271 100644 --- a/main.py +++ b/main.py @@ -184,7 +184,7 @@ def delete_dataset_from_dest_index(the_rabbit, the_cfg, the_uuid): uuid=the_uuid, step='main', status='delete_dataset_from_dest_index', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=0.1 @@ -281,7 +281,7 @@ def main(cfg): uuid=uuid_to_log, step='main', status='Starting...', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=0 @@ -305,7 +305,7 @@ def main(cfg): uuid=uuid_to_log, step='main', status='Terminated', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=1 diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 6e91d0d2441607ee01e54fe358f095450e13484c..1d1f6b6cec1924cc79253b4fe8eb4f06bd30b7c4 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -246,7 +246,7 @@ def enrich_docs( channel, method, properties, body ): uuid=cfg['session']['current_uuid'], step='doc-enricher', status=status_message, - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=progress_rounded @@ -287,7 +287,7 @@ def enrich_docs( channel, method, properties, body ): uuid=cfg['session']['current_uuid'], step='doc-enricher', status='done', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=progress_rounded diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 5d039187d2dd97a59acda4206607721f25a38bfa..0d10899340244a2ec124cfa9588e0862683c0cb8 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -132,7 +132,7 @@ def index_docs(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='doc-indexer', status=status_message, - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=progress_rounded @@ -204,7 +204,7 @@ def index_docs(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='doc-indexer', status=rep, - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='ERROR', progress_ratio=progress_rounded @@ -231,7 +231,7 @@ def index_docs(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='doc-indexer', status='done', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=progress_rounded diff --git a/workers/doc-processor.py b/workers/doc-processor.py index d1e952742c8ff015591a2440ef09abb53514b894..6b68703f2e11b2a6e763a6394bb7d691461e26c9 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -121,7 +121,7 @@ def process_docs( channel, method, properties, body ): uuid=cfg['session']['current_uuid'], step='doc-processor', status=status_message, - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=progress_rounded @@ -165,7 +165,7 @@ def process_docs( channel, method, properties, body ): uuid=cfg['session']['current_uuid'], step='doc-processor', status='done', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=progress_rounded diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py index c772f9e7aba2e2f603a276231256bd85dae6431e..12d7eb55a434363d9aac05225cf34af71cab71bb 100644 --- a/workers/metadata-processor.py +++ b/workers/metadata-processor.py @@ -317,7 +317,7 @@ def callback( channel, method, properties, body ): uuid=cfg['session']['current_uuid'], step='metadata-processor', status='Starting...', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=0 @@ -325,7 +325,7 @@ def callback( channel, method, properties, body ): json_body = json.dumps(log_message.__dict__) - print(" [x] json body : ", json_body) + logging.debug(" [x] json body : %s" % json_body) channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, @@ -391,7 +391,7 @@ def callback( channel, method, properties, body ): uuid=cfg['session']['current_uuid'], step='metadata-processor', status='sent task doc to enrich', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=1 @@ -399,7 +399,7 @@ def callback( channel, method, properties, body ): json_body = json.dumps(log_message.__dict__) - print(" [x] json body : ", json_body) + logging.debug(" [x] json body: %s" % json_body) channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, @@ -441,7 +441,7 @@ def callback( channel, method, properties, body ): uuid=cfg['session']['current_uuid'], step='metadata-processor', status='sent task doc to index', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=0.5 @@ -449,7 +449,7 @@ def callback( channel, method, properties, body ): json_body = json.dumps(log_message.__dict__) - print(" [x] json body : ", json_body) + logging.debug(" [x] json body: %s" % json_body) channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, @@ -464,7 +464,7 @@ def callback( channel, method, properties, body ): uuid=cfg['session']['current_uuid'], step='metadata-processor', status='terminated', - uuid_prefix='meta', + uuid_suffix='meta', info='no info', loglevel='INFO', progress_ratio=1 @@ -472,7 +472,7 @@ def callback( channel, method, properties, body ): json_body = json.dumps(log_message.__dict__) - print(" [x] json body : ", json_body) + logging.debug(" [x] json body: %s" % json_body) channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, diff --git a/workers/process-logger.py b/workers/process-logger.py index 5eeea5d2943c0d587cb1fb213d33d437383f2b5e..e571b599992fc72472e8313f685e00c47066a5e5 100644 --- a/workers/process-logger.py +++ b/workers/process-logger.py @@ -7,7 +7,7 @@ fileDir = os.path.dirname(os.path.abspath(__file__)) parentDir = os.path.dirname(fileDir) newPath = os.path.join(parentDir) sys.path.append(newPath) -from lib.mongo_session import MongoSession +from lib.mongo_helper import MongoSession try: @@ -37,7 +37,7 @@ def main(cfg): channel.basic_qos(prefetch_count=1) channel.basic_consume( - on_message_callback=lambda ch, method, properties, body: MongoSession(cfg).save_log_in_mongo(ch, + on_message_callback=lambda ch, method, properties, body: MongoSession(cfg['mongo']).save_log_in_mongo(ch, method, body), diff --git a/workers/reindexer.py b/workers/reindexer.py index 75fb6aae51dc24fa896af1441fda5b7cc336d678..43f9909ef96079c33e9e009b1023da407182b6c4 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -39,7 +39,7 @@ def create_sampling_task(cfg, channel, uuid): uuid=cfg['session']['current_uuid'], step='reindexer', status='create sampling task', - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='INFO', progress_ratio=0.8 @@ -98,7 +98,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='reindexer', status='starting', - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='INFO', progress_ratio=0 @@ -175,7 +175,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='pre-reindexer-checker', status='Documents are still being pushed to the source index', - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='INFO', progress_ratio=ratio @@ -204,7 +204,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='reindexer', status=message, - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='INFO', progress_ratio=0.2 @@ -236,7 +236,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='reindexer', status=message, - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='ERROR', progress_ratio=None @@ -270,7 +270,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='reindexer', status='refreshed indices', - uuid_prefix='meta', + uuid_suffix='meta', info=uuid, loglevel='INFO', progress_ratio=0.6 @@ -299,7 +299,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='reindexer', status=exc, - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='ERROR', progress_ratio=None @@ -372,7 +372,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='reindexer', status=message, - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='INFO', progress_ratio=0.9 @@ -400,7 +400,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='reindexer', status=message, - uuid_prefix='meta', + uuid_suffix='meta', info=uuid, loglevel='INFO', progress_ratio=1 @@ -432,7 +432,7 @@ def on_msg_callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='reindexer', status=message, - uuid_prefix='', + uuid_suffix='', info=uuid, loglevel='INFO', progress_ratio=1 diff --git a/workers/sample-generator.py b/workers/sample-generator.py index 511aaa47eb18ae6709910590c41b863dbbe287e7..37c8d74aa3ceb265eae29e4ea8ba17197072c0ed 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -187,7 +187,7 @@ def callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='sampler', status=message, - uuid_prefix='meta', + uuid_suffix='meta', info=uuid, loglevel='INFO', progress_ratio=0 @@ -221,7 +221,7 @@ def callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='sampler', status=exc, - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='ERROR', progress_ratio=0 @@ -262,7 +262,7 @@ def callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='sampler', status=message, - uuid_prefix='meta', + uuid_suffix='meta', info=uuid, loglevel='INFO', progress_ratio=0.9 @@ -296,7 +296,7 @@ def callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='sampler', status='Removing lock for dataset', - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='INFO', progress_ratio=1 @@ -322,7 +322,7 @@ def callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='sampler', status=rep, - uuid_prefix='full', + uuid_suffix='full', info=uuid, loglevel='ERROR', progress_ratio=0 @@ -350,7 +350,7 @@ def callback(channel, method, properties, body): # uuid=cfg['session']['current_uuid'], # step='sampler', # status=message, - # uuid_prefix='meta', + # uuid_suffix='meta', # info=uuid, # loglevel='INFO', # progress_ratio=1