diff --git a/lib/log_message.py b/lib/log_message.py index 9a4d84b59e469e856bbf7fb13809acaa21940181..8ad373cf5c17d9f6c87daf87f0cab3cb31c5de30 100644 --- a/lib/log_message.py +++ b/lib/log_message.py @@ -4,7 +4,7 @@ import uuid as uuid_lib class LogMessage: - def __init__(self, session_id, uuid, step, status, uuid_prefix, info): + def __init__(self, session_id, uuid, step, status, uuid_prefix, info, loglevel): if(session_id.lower()) == 'new': self.session_id = str(uuid_lib.uuid4()) else: @@ -14,6 +14,7 @@ class LogMessage: self.status = status self.uuid_prefix = uuid_prefix self.info = info + self.loglevel = loglevel self.timestamp = self.generate_timestamp() @staticmethod diff --git a/lib/mongo_session.py b/lib/mongo_session.py index 27aa8bcd4d3258e1cf79c7d2af8767f6914bb41a..a61c1ae00f4e7f359d8fec55882a98fab31f365f 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -52,7 +52,7 @@ class MongoSession: "status": body_object["status"], "uuid_prefix": body_object["uuid_prefix"], "info": body_object["info"], - }) + "loglevel": body_object["loglevel"]}) except Exception as exc: print('[ERROR saving log]:', exc) ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/main.py b/main.py index 03cad9c152fe63e7a099a33b3a919b8cc1e81506..a10cb202d2da4f34a295cde387d2e7b3161efbd3 100644 --- a/main.py +++ b/main.py @@ -185,7 +185,8 @@ def delete_dataset_from_dest_index(the_rabbit, the_cfg, the_uuid): step='main', status='delete_dataset_from_dest_index', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) the_rabbit.publish_log(log_message=log_message.__dict__) # ------------------------------------------------------------ @@ -233,12 +234,12 @@ def get_metadata_records_processor(the_rabbit, the_uuid = record['geonet:info']['uuid'] - if is_locked(the_cfg['session']['working_directory'], the_uuid ): - logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid) - continue - else: - logging.info("Setting lock for dataset with uuid = %s" % the_uuid) - lock(the_cfg['session']['working_directory'], the_uuid) + # if is_locked(the_cfg['session']['working_directory'], the_uuid ): + # logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid) + # continue + # else: + # logging.info("Setting lock for dataset with uuid = %s" % the_uuid) + # lock(the_cfg['session']['working_directory'], the_uuid) # delete_dataset_from_indices(cfg, record['geonet:info']['uuid']) delete_dataset_from_dest_index(the_rabbit=the_rabbit, @@ -277,7 +278,8 @@ def main(cfg): step='main', status='Starting...', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) cfg['session']['current_uuid'] = uuid_to_get with RabbitSession(cfg) as rabbit: @@ -299,7 +301,8 @@ def main(cfg): step='main', status='Terminated', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) cfg['session']['current_uuid'] = uuid_to_get rabbit.publish_log(log_message=log_message2.__dict__) diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index f183ac6afd124f233f51f474dd06d7d116536177..9ec2d39c83bbe64427a37e6dbd97b046aa9203d9 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -249,7 +249,8 @@ def enrich_docs( channel, method, properties, body ): step='doc-enricher', status=status_message, uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -288,7 +289,8 @@ def enrich_docs( channel, method, properties, body ): step='doc-enricher', status='done', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 193aa74882ee0adaf09c40c15bec5e836b41d8bd..6e9e5e78346cd2940c6dc172ff0f9f7670777e7e 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -132,7 +132,8 @@ def index_docs(channel, method, properties, body): step='doc-indexer', status=status_message, uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -206,7 +207,8 @@ def index_docs(channel, method, properties, body): step='doc-indexer', status='done', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -262,7 +264,7 @@ if __name__ == '__main__': parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, - choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + argu=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) diff --git a/workers/doc-processor.py b/workers/doc-processor.py index 5be41ba9afd1aad058ef6286856113f53afb4f77..b121f9e2141f1f449f50f8f28434a52d54e4c718 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -121,7 +121,8 @@ def process_docs( channel, method, properties, body ): step='doc-processor', status=status_message, uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -163,7 +164,8 @@ def process_docs( channel, method, properties, body ): step='doc-processor', status='done', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py index e9c39fcc699dfcada2777ed3d07b8b28a792c223..b48e21de4d42da48c67531acca34803b56d1f51e 100644 --- a/workers/metadata-processor.py +++ b/workers/metadata-processor.py @@ -315,7 +315,8 @@ def callback( channel, method, properties, body ): step='metadata-processor', status='Starting...', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -387,7 +388,8 @@ def callback( channel, method, properties, body ): step='metadata-processor', status='sent task doc to enrich', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -435,7 +437,8 @@ def callback( channel, method, properties, body ): step='metadata-processor', status='sent task doc to index', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -456,7 +459,8 @@ def callback( channel, method, properties, body ): step='metadata-processor', status='terminated', uuid_prefix='meta', - info='no info' + info='no info', + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/reindexer.py b/workers/reindexer.py index f7f35efc6f7a86b39a5f95053ae8b5a0bdfa1847..f695990eb186278a127f2cb32a127a2ff9b3db00 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -40,7 +40,8 @@ def create_sampling_task(cfg, channel, uuid): step='reindexer', status='create sampling task', uuid_prefix='full', - info=uuid + info=uuid, + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -58,7 +59,7 @@ def create_sampling_task(cfg, channel, uuid): routing_key=cfg['rabbitmq']['routing_key_6'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) #connection.close() @@ -135,7 +136,7 @@ def on_msg_callback(channel, method, properties, body): # else: # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) # logging.error("The uuid ends neither with .full nor with .meta. What shall I do?") - # return + # return if count_es != count_ref: @@ -157,7 +158,8 @@ def on_msg_callback(channel, method, properties, body): step='reindexer', status=message, uuid_prefix='full', - info=uuid + info=uuid, + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -252,7 +254,8 @@ def on_msg_callback(channel, method, properties, body): step='reindexer', status=message, uuid_prefix='full', - info=uuid + info=uuid, + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -278,7 +281,8 @@ def on_msg_callback(channel, method, properties, body): step='reindexer', status=message, uuid_prefix='meta', - info=uuid + info=uuid, + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -308,7 +312,8 @@ def on_msg_callback(channel, method, properties, body): step='reindexer', status=message, uuid_prefix='', - info=uuid + info=uuid, + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -344,7 +349,7 @@ def main(cfg): channel.basic_qos(prefetch_count=1) channel.basic_consume(on_message_callback=lambda ch, method, properties, body: on_msg_callback(ch, method, properties, body), - queue=reindex_tasks_to_create_qn) + queue=reindex_tasks_to_create_qn) channel.start_consuming() diff --git a/workers/sample-generator.py b/workers/sample-generator.py index 1ff6a03022ead69e78fd9960642bc626f4b042c1..9e5f2a8660ee556bee65a8cde0136465ac0ce11f 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -188,7 +188,8 @@ def callback(channel, method, properties, body): step='sampler', status=message, uuid_prefix='', - info=uuid + info=uuid, + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -238,7 +239,8 @@ def callback(channel, method, properties, body): step='sampler', status=message, uuid_prefix='', - info=uuid + info=uuid, + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__) @@ -278,7 +280,8 @@ def callback(channel, method, properties, body): step='sampler', status=message, uuid_prefix='', - info=uuid + info=uuid, + loglevel='INFO' ) json_body = json.dumps(log_message.__dict__)